Skip to content

Commit

Permalink
fix(confmws/confmqtt): fix default KeepAlive and client cache
Browse files Browse the repository at this point in the history
  • Loading branch information
saitofun committed Jul 19, 2024
1 parent 7ebbcbc commit ad62729
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 5 deletions.
12 changes: 7 additions & 5 deletions confmws/confmqtt/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type Broker struct {
func (b *Broker) SetDefault() {
b.Retry.SetDefault()
if b.Keepalive == 0 {
b.Keepalive = 3 * time.Second
b.Keepalive = time.Minute
}
if b.Timeout == 0 {
b.Timeout = 10 * time.Second
Expand Down Expand Up @@ -86,6 +86,7 @@ func (b *Broker) options(cid string) *mqtt.ClientOptions {
}
}

// reconnecting
options.SetAutoReconnect(true)
options.SetConnectRetryInterval(b.Timeout)

Expand Down Expand Up @@ -135,7 +136,8 @@ func (b *Broker) NewClient(id, topic string) (*Client, error) {
if topic == "" {
return nil, ErrInvalidTopic
}
if c, ok := b.clients.Load(id); ok && c != nil {
key := id + topic
if c, ok := b.clients.Load(key); ok && c != nil {
return c.(*Client), nil
}

Expand All @@ -152,15 +154,15 @@ func (b *Broker) NewClient(id, topic string) (*Client, error) {
if err := c.connect(); err != nil {
return nil, err
}
b.clients.Store(id, c)
b.clients.Store(key, c)
return c, nil
}

func (b *Broker) Close(c *Client) {
b.CloseByClientID(c.id)
b.CloseByClientKey(c.key())
}

func (b *Broker) CloseByClientID(id string) {
func (b *Broker) CloseByClientKey(id string) {
if c, ok := b.clients.LoadAndDelete(id); ok && c != nil {
cc := c.(*Client)
cc.cli.Unsubscribe(cc.topic)
Expand Down
61 changes: 61 additions & 0 deletions confmws/confmqtt/broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,13 @@ func TestBroker(t *testing.T) {
})
NewWithT(t).Expect(err).To(BeNil())

t.Run("NewClientInClientCache", func(t *testing.T) {
cc, err := broker.NewClient(suber.ID(), suber.Topic())
NewWithT(t).Expect(err).To(BeNil())
NewWithT(t).Expect(cc.ID()).To(Equal(suber.ID()))
NewWithT(t).Expect(cc.Topic()).To(Equal(suber.Topic()))
})

num := 5
for i := 0; i < num; i++ {
err = puber.Publish(must.NoErrorV(json.Marshal(NewMessage("payload"))))
Expand Down Expand Up @@ -104,6 +111,7 @@ func TestBrokerExt(t *testing.T) {
b := &Broker{}
b.Retry.Repeats = -1
b.SetDefault()
b.Server.Port = 9999
NewWithT(t).Expect(b.Init()).NotTo(BeNil())
liveness := b.LivenessCheck()[b.Server.Hostname()]
NewWithT(t).Expect(liveness).NotTo(Equal("ok"))
Expand All @@ -116,7 +124,28 @@ func TestBrokerExt(t *testing.T) {
})
}

func TestClientTimeout(t *testing.T) {
b := &Broker{}
err := broker.Server.UnmarshalText([]byte("tcp://broker.emqx.io:1883"))
NewWithT(t).Expect(err).To(BeNil())

b.SetDefault()
b.Timeout = time.Second
b.Keepalive = time.Second
c, err := b.NewClient("eof_client", "try_eof_client")
NewWithT(t).Expect(err).To(BeNil())
defer b.Close(c)

err = c.Subscribe(func(client mqtt.Client, message mqtt.Message) {
t.Logf(string(message.Payload()))
})
NewWithT(t).Expect(err).To(BeNil())

time.Sleep(time.Minute)
}

func TestClientReconnection(t *testing.T) {
t.Skip("this is a local debug test case")
b := &Broker{}
err := b.Server.UnmarshalText([]byte("tcp://broker.emqx.io:1883"))
NewWithT(t).Expect(err).To(BeNil())
Expand Down Expand Up @@ -194,3 +223,35 @@ func TestClientReconnection(t *testing.T) {
t.Logf("published: %v", pubed)
t.Logf("subscribed: %v", subed)
}

func TestLocalSubscribing(t *testing.T) {
t.Skip("this is a local debug test case")
b := &Broker{}
b.SetDefault()
NewWithT(t).Expect(b.Init()).To(BeNil())

suber, err := b.NewClient("sub_"+uuid.NewString(), "any_topic")
NewWithT(t).Expect(err).To(BeNil())

err = suber.Subscribe(func(client mqtt.Client, message mqtt.Message) {
t.Logf(string(message.Payload()))
})
NewWithT(t).Expect(err).To(BeNil())

puber, err := b.NewClient("pub_"+uuid.NewString(), "any_topic")
NewWithT(t).Expect(err).To(BeNil())
go func() {
seq := 1
for {
err = puber.Publish(strconv.Itoa(seq))
NewWithT(t).Expect(err).To(BeNil())
time.Sleep(time.Second)
seq++
}
}()

time.Sleep(300 * time.Second)
b.Close(suber)
b.Close(puber)
return
}
4 changes: 4 additions & 0 deletions confmws/confmqtt/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ func (c *Client) Topic() string {
return c.topic
}

func (c *Client) key() string {
return c.id + c.topic
}

func (c *Client) wait(tok mqtt.Token) error {
tok.WaitTimeout(c.timeout)
if err := tok.Error(); err != nil {
Expand Down

0 comments on commit ad62729

Please sign in to comment.