From 26589be413c7fe016b630baeaa11661ebaf2fa3d Mon Sep 17 00:00:00 2001 From: Ohki Nozomu Date: Sat, 18 Nov 2023 10:12:53 +0900 Subject: [PATCH] Use autopaho in agent (#54) --- e2e/basic/manifests/agent.yaml | 2 +- e2e/loadbalancing/manifests/agent1.yaml | 2 +- e2e/loadbalancing/manifests/agent2.yaml | 2 +- e2e/password/manifests/agent.yaml | 2 +- e2e/protobuf/manifests/agent.yaml | 2 +- e2e/split/manifests/agent.yaml | 2 +- e2e/storage_relay/manifests/agent.yaml | 2 +- e2e/tls/manifests/agent.yaml | 2 +- e2e/zstd/manifests/agent.yaml | 2 +- go.mod | 3 +- go.sum | 4 + internal/agent/agent.go | 238 +++++++++++++++--------- internal/agent/router.go | 32 ---- internal/hub/router.go | 3 +- 14 files changed, 165 insertions(+), 133 deletions(-) delete mode 100644 internal/agent/router.go diff --git a/e2e/basic/manifests/agent.yaml b/e2e/basic/manifests/agent.yaml index f596d2b..8e8e32f 100644 --- a/e2e/basic/manifests/agent.yaml +++ b/e2e/basic/manifests/agent.yaml @@ -53,7 +53,7 @@ spec: command: ["/bin/sh", "-c"] args: - | - /app/fuyuu-router agent -c /app/config/config.toml --id agent01 -b mosquitto.mosquitto:1883 --proxy-host 127.0.0.1:8000 --loglevel debug + /app/fuyuu-router agent -c /app/config/config.toml --id agent01 -b mqtt://mosquitto.mosquitto:1883 --proxy-host 127.0.0.1:8000 --loglevel debug - name: appserver image: golang:1.21.3 volumeMounts: diff --git a/e2e/loadbalancing/manifests/agent1.yaml b/e2e/loadbalancing/manifests/agent1.yaml index 06b0562..b7b9a46 100644 --- a/e2e/loadbalancing/manifests/agent1.yaml +++ b/e2e/loadbalancing/manifests/agent1.yaml @@ -22,7 +22,7 @@ spec: command: ["/bin/sh", "-c"] args: - | - /app/fuyuu-router agent -c /app/config/config.toml --id agent01 -b mosquitto.mosquitto:1883 --proxy-host 127.0.0.1:8000 --loglevel debug + /app/fuyuu-router agent -c /app/config/config.toml --id agent01 -b mqtt://mosquitto.mosquitto:1883 --proxy-host 127.0.0.1:8000 --loglevel debug - name: appserver image: golang:1.21.3 command: ["/bin/sh", "-c"] diff --git a/e2e/loadbalancing/manifests/agent2.yaml b/e2e/loadbalancing/manifests/agent2.yaml index 70c6325..ffb7042 100644 --- a/e2e/loadbalancing/manifests/agent2.yaml +++ b/e2e/loadbalancing/manifests/agent2.yaml @@ -22,7 +22,7 @@ spec: command: ["/bin/sh", "-c"] args: - | - /app/fuyuu-router agent -c /app/config/config.toml --id agent02 -b mosquitto.mosquitto:1883 --proxy-host 127.0.0.1:8000 --loglevel debug + /app/fuyuu-router agent -c /app/config/config.toml --id agent02 -b mqtt://mosquitto.mosquitto:1883 --proxy-host 127.0.0.1:8000 --loglevel debug - name: appserver image: golang:1.21.3 command: ["/bin/sh", "-c"] diff --git a/e2e/password/manifests/agent.yaml b/e2e/password/manifests/agent.yaml index f43b270..addd3c6 100644 --- a/e2e/password/manifests/agent.yaml +++ b/e2e/password/manifests/agent.yaml @@ -22,7 +22,7 @@ spec: command: ["/bin/sh", "-c"] args: - | - /app/fuyuu-router agent -c /app/config/config.toml --id agent01 -b mosquitto.mosquitto:1883 --proxy-host 127.0.0.1:8000 --loglevel debug --username admin --password adminpassword + /app/fuyuu-router agent -c /app/config/config.toml --id agent01 -b mqtt://mosquitto.mosquitto:1883 --proxy-host 127.0.0.1:8000 --loglevel debug --username admin --password adminpassword - name: appserver image: golang:1.21.3 command: ["/bin/sh", "-c"] diff --git a/e2e/protobuf/manifests/agent.yaml b/e2e/protobuf/manifests/agent.yaml index 9ecf410..856a738 100644 --- a/e2e/protobuf/manifests/agent.yaml +++ b/e2e/protobuf/manifests/agent.yaml @@ -53,7 +53,7 @@ spec: command: ["/bin/sh", "-c"] args: - | - /app/fuyuu-router agent -c /app/config/config.toml --id agent01 -b mosquitto.mosquitto:1883 --proxy-host 127.0.0.1:8000 --loglevel debug + /app/fuyuu-router agent -c /app/config/config.toml --id agent01 -b mqtt://mosquitto.mosquitto:1883 --proxy-host 127.0.0.1:8000 --loglevel debug - name: appserver image: golang:1.21.3 volumeMounts: diff --git a/e2e/split/manifests/agent.yaml b/e2e/split/manifests/agent.yaml index e2ea6ee..b439a4e 100644 --- a/e2e/split/manifests/agent.yaml +++ b/e2e/split/manifests/agent.yaml @@ -24,7 +24,7 @@ spec: command: ["/bin/sh", "-c"] args: - | - /app/fuyuu-router agent -c /app/config/config.toml --id agent01 -b mosquitto.mosquitto:1883 --proxy-host 127.0.0.1:8000 --loglevel debug + /app/fuyuu-router agent -c /app/config/config.toml --id agent01 -b mqtt://mosquitto.mosquitto:1883 --proxy-host 127.0.0.1:8000 --loglevel debug - name: appserver image: golang:1.21.3 command: ["/bin/sh", "-c"] diff --git a/e2e/storage_relay/manifests/agent.yaml b/e2e/storage_relay/manifests/agent.yaml index f33b230..1362e77 100644 --- a/e2e/storage_relay/manifests/agent.yaml +++ b/e2e/storage_relay/manifests/agent.yaml @@ -24,7 +24,7 @@ spec: command: ["/bin/sh", "-c"] args: - | - /app/fuyuu-router agent -c /app/config/config.toml --id agent01 -b mosquitto.mosquitto:1883 --proxy-host 127.0.0.1:8000 --loglevel debug + /app/fuyuu-router agent -c /app/config/config.toml --id agent01 -b mqtt://mosquitto.mosquitto:1883 --proxy-host 127.0.0.1:8000 --loglevel debug - name: appserver image: golang:1.21.3 command: ["/bin/sh", "-c"] diff --git a/e2e/tls/manifests/agent.yaml b/e2e/tls/manifests/agent.yaml index de0b646..4d01227 100644 --- a/e2e/tls/manifests/agent.yaml +++ b/e2e/tls/manifests/agent.yaml @@ -19,7 +19,7 @@ spec: command: ["/bin/sh", "-c"] args: - | - /app/fuyuu-router agent -c /app/config/config.toml --id agent01 -b mosquitto.mosquitto:8883 --proxy-host 127.0.0.1:8000 --loglevel debug --cafile /etc/certs/cacert.pem --cert /etc/certs/cert.pem --key /etc/certs/key.pem + /app/fuyuu-router agent -c /app/config/config.toml --id agent01 -b mqtts://mosquitto.mosquitto:8883 --proxy-host 127.0.0.1:8000 --loglevel debug --cafile /etc/certs/cacert.pem --cert /etc/certs/cert.pem --key /etc/certs/key.pem volumeMounts: - name: certs-volume mountPath: /etc/certs diff --git a/e2e/zstd/manifests/agent.yaml b/e2e/zstd/manifests/agent.yaml index 8524167..05af79a 100644 --- a/e2e/zstd/manifests/agent.yaml +++ b/e2e/zstd/manifests/agent.yaml @@ -53,7 +53,7 @@ spec: command: ["/bin/sh", "-c"] args: - | - /app/fuyuu-router agent -c /app/config/config.toml --id agent01 -b mosquitto.mosquitto:1883 --proxy-host 127.0.0.1:8000 --loglevel debug + /app/fuyuu-router agent -c /app/config/config.toml --id agent01 -b mqtt://mosquitto.mosquitto:1883 --proxy-host 127.0.0.1:8000 --loglevel debug - name: appserver image: golang:1.21.3 volumeMounts: diff --git a/go.mod b/go.mod index 4d1edb2..19fea0c 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.21.1 require ( github.com/dgraph-io/badger/v4 v4.2.0 - github.com/eclipse/paho.golang v0.12.0 + github.com/eclipse/paho.golang v0.12.1-0.20231115041409-5f4de1dba813 github.com/go-playground/validator/v10 v10.16.0 github.com/google/uuid v1.4.0 github.com/gorilla/mux v1.8.1 @@ -70,6 +70,7 @@ require ( github.com/google/s2a-go v0.1.7 // indirect github.com/googleapis/enterprise-certificate-proxy v0.3.1 // indirect github.com/googleapis/gax-go/v2 v2.12.0 // indirect + github.com/gorilla/websocket v1.5.0 // indirect github.com/grafana/pyroscope-go v1.0.4 // indirect github.com/grafana/pyroscope-go/godeltaprof v0.1.4 // indirect github.com/hashicorp/hcl v1.0.0 // indirect diff --git a/go.sum b/go.sum index 1cb8bc4..b278959 100644 --- a/go.sum +++ b/go.sum @@ -130,6 +130,8 @@ github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkp github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= github.com/eclipse/paho.golang v0.12.0 h1:EXQFJbJklDnUqW6lyAknMWRhM2NgpHxwrrL8riUmp3Q= github.com/eclipse/paho.golang v0.12.0/go.mod h1:TSDCUivu9JnoR9Hl+H7sQMcHkejWH2/xKK1NJGtLbIE= +github.com/eclipse/paho.golang v0.12.1-0.20231115041409-5f4de1dba813 h1:yu//BXrsaMc14YJpePcWBEZhwsJ24K9qApseBJzKQcA= +github.com/eclipse/paho.golang v0.12.1-0.20231115041409-5f4de1dba813/go.mod h1:TSDCUivu9JnoR9Hl+H7sQMcHkejWH2/xKK1NJGtLbIE= github.com/efficientgo/core v1.0.0-rc.0.0.20221201130417-ba593f67d2a4 h1:rydBwnBoywKQMjWF0z8SriYtQ+uUcaFsxuijMjJr5PI= github.com/efficientgo/core v1.0.0-rc.0.0.20221201130417-ba593f67d2a4/go.mod h1:kQa0V74HNYMfuJH6jiPiwNdpWXl4xd/K4tzlrcvYDQI= github.com/efficientgo/e2e v0.13.1-0.20220922081603-45de9fc588a8 h1:UFLc39BcUXahSNCLUrKjNGZABMUZaS4M74EZvTRnq3k= @@ -269,6 +271,8 @@ github.com/googleapis/gax-go/v2 v2.12.0/go.mod h1:y+aIqrI5eb1YGMVJfuV3185Ts/D7qK github.com/googleapis/google-cloud-go-testing v0.0.0-20200911160855-bcd43fbb19e8/go.mod h1:dvDLG8qkwmyD9a/MJJN3XJcT3xFxOKAvTZGvuZmac9g= github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY= github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ= +github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= +github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/grafana/pyroscope-go v1.0.4 h1:oyQX0BOkL+iARXzHuCdIF5TQ7/sRSel1YFViMHC7Bm0= github.com/grafana/pyroscope-go v1.0.4/go.mod h1:0d7ftwSMBV/Awm7CCiYmHQEG8Y44Ma3YSjt+nWcWztY= github.com/grafana/pyroscope-go/godeltaprof v0.1.4 h1:mDsJ3ngul7UfrHibGQpV66PbZ3q1T8glz/tK3bQKKEk= diff --git a/internal/agent/agent.go b/internal/agent/agent.go index e87fa3b..6af3818 100644 --- a/internal/agent/agent.go +++ b/internal/agent/agent.go @@ -3,11 +3,14 @@ package agent import ( "bytes" "context" + "crypto/tls" + "crypto/x509" "encoding/json" "net/http" + "net/url" "os" - "time" + "github.com/eclipse/paho.golang/autopaho" "github.com/eclipse/paho.golang/paho" "github.com/klauspost/compress/zstd" "github.com/ohkinozomu/fuyuu-router/internal/common" @@ -30,7 +33,7 @@ type mergeChPayload struct { } type server struct { - client *paho.Client + client *autopaho.ConnectionManager id string proxyHost string logger *zap.Logger @@ -45,21 +48,154 @@ type server struct { merger *data.Merger } +func createWillMessage(c AgentConfig) *paho.WillMessage { + teminatePacket := data.TerminatePacket{ + AgentId: c.ID, + Labels: c.Labels, + } + + var terminatePayload []byte + var err error + switch c.CommonConfigV2.Networking.Format { + case "json": + terminatePayload, err = json.Marshal(&teminatePacket) + if err != nil { + c.Logger.Fatal(err.Error()) + } + case "protobuf": + terminatePayload, err = proto.Marshal(&teminatePacket) + if err != nil { + c.Logger.Fatal(err.Error()) + } + default: + c.Logger.Fatal("Unknown format: " + c.CommonConfigV2.Networking.Format) + } + return &paho.WillMessage{ + Retain: false, + QoS: 0, + Topic: topics.TerminateTopic(), + Payload: terminatePayload, + } +} + func newServer(c AgentConfig) server { - ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) - defer cancel() - conn, err := common.TCPConnect(ctx, c.CommonConfig) + u, err := url.Parse(c.MQTTBroker) if err != nil { - c.Logger.Fatal("Error: " + err.Error()) + c.Logger.Fatal("Error parsing MQTT broker URL: " + err.Error()) } + payloadCh := make(chan []byte) mergeCh := make(chan mergeChPayload) processCh := make(chan processChPayload, 1000) - clientConfig := paho.ClientConfig{ - Conn: conn, - Router: NewRouter(payloadCh, c, c.Logger), + + var connectUsername string + var connectPassword []byte + if c.Username != "" && c.Password != "" { + connectUsername = c.Username + connectPassword = []byte(c.Password) + } + + var tlsConfig tls.Config + if c.CAFile != "" && c.Cert != "" && c.Key != "" { + caCert, err := os.ReadFile(c.CAFile) + if err != nil { + c.Logger.Sugar().Fatalf("failed to read the CA cert file: %s", err) + } + + caCertPool := x509.NewCertPool() + if ok := caCertPool.AppendCertsFromPEM(caCert); !ok { + c.Logger.Sugar().Fatalf("failed to append CA cert to the pool") + } + + cert, err := tls.LoadX509KeyPair(c.Cert, c.Key) + if err != nil { + c.Logger.Sugar().Fatalf("failed to load the cert file: %s", err) + } + tlsConfig = tls.Config{ + RootCAs: caCertPool, + Certificates: []tls.Certificate{cert}, + } + } + + cliCfg := autopaho.ClientConfig{ + TlsCfg: &tlsConfig, + ServerUrls: []*url.URL{u}, + KeepAlive: 20, + CleanStartOnInitialConnection: false, + SessionExpiryInterval: 60, + OnConnectionUp: func(cm *autopaho.ConnectionManager, connAck *paho.Connack) { + launchPacket := data.LaunchPacket{ + AgentId: c.ID, + Labels: c.Labels, + } + + var launchPayload []byte + switch c.CommonConfigV2.Networking.Format { + case "json": + launchPayload, err = json.Marshal(&launchPacket) + if err != nil { + c.Logger.Fatal(err.Error()) + } + case "protobuf": + launchPayload, err = proto.Marshal(&launchPacket) + if err != nil { + c.Logger.Fatal(err.Error()) + } + default: + c.Logger.Fatal("Unknown format: " + c.CommonConfigV2.Networking.Format) + } + + _, err = cm.Publish(context.Background(), &paho.Publish{ + Topic: topics.LaunchTopic(), + // Maybe it should be 1. + QoS: 0, + Payload: launchPayload, + }) + if err != nil { + c.Logger.Fatal(err.Error()) + } + + if _, err := cm.Subscribe(context.Background(), &paho.Subscribe{ + Subscriptions: []paho.SubscribeOptions{ + {Topic: topics.RequestTopic(c.ID), QoS: 1}, + }, + }); err != nil { + c.Logger.Fatal("Error subscribing to MQTT topic: " + err.Error()) + } + c.Logger.Info("Subscribed to MQTT topic") + }, + OnConnectError: func(err error) { + c.Logger.Error("Error connecting to MQTT broker: " + err.Error()) + }, + ClientConfig: paho.ClientConfig{ + ClientID: c.ID, + Router: paho.NewStandardRouterWithDefault(func(m *paho.Publish) { + c.Logger.Debug("Received message") + payloadCh <- m.Payload + }), + OnClientError: func(err error) { + c.Logger.Error("Error from MQTT client: " + err.Error()) + }, + OnServerDisconnect: func(d *paho.Disconnect) { + if d.Properties != nil { + c.Logger.Error("server requested disconnect", zap.String("reason", d.Properties.ReasonString)) + } else { + c.Logger.Error("server requested disconnect") + } + }, + }, + WillMessage: createWillMessage(c), + ConnectUsername: connectUsername, + ConnectPassword: connectPassword, + } + + cm, err := autopaho.NewConnection(context.Background(), cliCfg) + if err != nil { + c.Logger.Fatal(err.Error()) + } + if err = cm.AwaitConnection(context.Background()); err != nil { + c.Logger.Fatal(err.Error()) } - client := paho.NewClient(clientConfig) var encoder *zstd.Encoder var decoder *zstd.Decoder @@ -88,7 +224,7 @@ func newServer(c AgentConfig) server { } return server{ - client: client, + client: cm, payloadCh: payloadCh, mergeCh: mergeCh, processCh: processCh, @@ -138,86 +274,7 @@ func Start(c AgentConfig) { } s := newServer(c) - connect := common.MQTTConnect(c.CommonConfig) - teminatePacket := data.TerminatePacket{ - AgentId: c.ID, - Labels: c.Labels, - } - - var terminatePayload []byte - var err error - switch c.CommonConfigV2.Networking.Format { - case "json": - terminatePayload, err = json.Marshal(&teminatePacket) - if err != nil { - c.Logger.Fatal(err.Error()) - } - case "protobuf": - terminatePayload, err = proto.Marshal(&teminatePacket) - if err != nil { - c.Logger.Fatal(err.Error()) - } - default: - c.Logger.Fatal("Unknown format: " + c.CommonConfigV2.Networking.Format) - } - if err != nil { - c.Logger.Fatal(err.Error()) - } - - connect.WillMessage = &paho.WillMessage{ - Retain: false, - QoS: 0, - Topic: topics.TerminateTopic(), - Payload: terminatePayload, - } - _, err = s.client.Connect(context.Background(), connect) - if err != nil { - c.Logger.Fatal("Error connecting to MQTT broker: " + err.Error()) - } - - launchPacket := data.LaunchPacket{ - AgentId: c.ID, - Labels: c.Labels, - } - - var launchPayload []byte - switch c.CommonConfigV2.Networking.Format { - case "json": - launchPayload, err = json.Marshal(&launchPacket) - if err != nil { - c.Logger.Fatal(err.Error()) - } - case "protobuf": - launchPayload, err = proto.Marshal(&launchPacket) - if err != nil { - c.Logger.Fatal(err.Error()) - } - default: - c.Logger.Fatal("Unknown format: " + c.CommonConfigV2.Networking.Format) - } - - _, err = s.client.Publish(context.Background(), &paho.Publish{ - Topic: topics.LaunchTopic(), - // Maybe it should be 1. - QoS: 0, - Payload: launchPayload, - }) - if err != nil { - c.Logger.Fatal(err.Error()) - } - - _, err = s.client.Subscribe(context.Background(), &paho.Subscribe{ - Subscriptions: []paho.SubscribeOptions{ - { - Topic: topics.RequestTopic(c.ID), - QoS: 0, - }, - }, - }) - if err != nil { - c.Logger.Fatal(err.Error()) - } for { select { case payload := <-s.payloadCh: @@ -276,6 +333,7 @@ func Start(c AgentConfig) { return } + var err error if processChPayload.requestPacket.Compress == "zstd" && s.decoder != nil { processChPayload.httpRequestData.Body.Body, err = s.decoder.DecodeAll(processChPayload.httpRequestData.Body.Body, nil) if err != nil { diff --git a/internal/agent/router.go b/internal/agent/router.go deleted file mode 100644 index 85476dc..0000000 --- a/internal/agent/router.go +++ /dev/null @@ -1,32 +0,0 @@ -package agent - -import ( - "github.com/eclipse/paho.golang/packets" - "github.com/eclipse/paho.golang/paho" - "go.uber.org/zap" -) - -type Router struct { - payloadCh chan []byte - logger *zap.Logger -} - -var _ paho.Router = (*Router)(nil) - -func NewRouter(payloadCh chan []byte, c AgentConfig, logger *zap.Logger) *Router { - return &Router{ - payloadCh: payloadCh, - logger: logger, - } -} - -func (r *Router) Route(p *packets.Publish) { - r.logger.Debug("Received message") - r.payloadCh <- p.Payload -} - -func (r *Router) RegisterHandler(string, paho.MessageHandler) {} - -func (r *Router) UnregisterHandler(string) {} - -func (r *Router) SetDebugLogger(paho.Logger) {} diff --git a/internal/hub/router.go b/internal/hub/router.go index 80814b4..47afdf9 100644 --- a/internal/hub/router.go +++ b/internal/hub/router.go @@ -4,6 +4,7 @@ import ( badger "github.com/dgraph-io/badger/v4" "github.com/eclipse/paho.golang/packets" "github.com/eclipse/paho.golang/paho" + "github.com/eclipse/paho.golang/paho/log" "github.com/klauspost/compress/zstd" "github.com/ohkinozomu/fuyuu-router/internal/common" "github.com/ohkinozomu/fuyuu-router/pkg/data" @@ -65,4 +66,4 @@ func (r *Router) RegisterHandler(string, paho.MessageHandler) {} func (r *Router) UnregisterHandler(string) {} -func (r *Router) SetDebugLogger(paho.Logger) {} +func (r *Router) SetDebugLogger(log.Logger) {}