Skip to content

Commit

Permalink
Merge pull request #67 from Alkorin/async
Browse files Browse the repository at this point in the history
Send activities asynchronously via another goroutine
  • Loading branch information
Alkorin authored Feb 10, 2019
2 parents 114db20 + 6dc24e3 commit ae65ad2
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 21 deletions.
37 changes: 31 additions & 6 deletions conversation.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"bytes"
"encoding/json"
"io"
"io/ioutil"
"net/http"
"sync"
Expand All @@ -25,6 +26,8 @@ type Conversation struct {
newSpaceEventHandlers []func(*Space)
newActivityEventHandlers []func(*Space, *Activity)

activityQueue chan io.Reader

logger *log.Entry
}

Expand All @@ -35,18 +38,20 @@ type Team struct {

func NewConversation(device *Device, mercury *Mercury, kms *KMS) *Conversation {
c := &Conversation{
device: device,
mercury: mercury,
kms: kms,
spaces: make(map[string]*Space),
teams: make(map[string]*Team),
logger: log.WithField("type", "Conversation"),
device: device,
mercury: mercury,
kms: kms,
spaces: make(map[string]*Space),
teams: make(map[string]*Team),
logger: log.WithField("type", "Conversation"),
activityQueue: make(chan io.Reader, 64),
}

mercury.RegisterHandler("conversation.activity", c.ParseActivity)

// Fetch current spaces
go c.FetchAllSpaces()
go c.HandleActivityQueue()

return c
}
Expand Down Expand Up @@ -279,6 +284,26 @@ func (c *Conversation) AddNewActivityEventHandler(f func(*Space, *Activity)) {
c.newActivityEventHandlers = append(c.newActivityEventHandlers, f)
}

func (c *Conversation) HandleActivityQueue() {
for data := range c.activityQueue {
response, err := c.device.RequestService("POST", "conversationServiceUrl", "/activities", data)
if err != nil {
log.WithError(err).Error("Failed to create request")
continue
}
defer response.Body.Close()

if response.StatusCode != http.StatusOK {
responseError, err := ioutil.ReadAll(response.Body)
if err != nil {
c.logger.WithError(err).Error("Failed to read error response")
} else {
c.logger.WithError(errors.New(string(responseError))).Error("Failed to send message")
}
}
}
}

func (c *Conversation) CreateSpace(name string) {
logger := c.logger.WithField("func", "CreateSpace").WithField("spaceName", name)

Expand Down
16 changes: 1 addition & 15 deletions space.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ package main
import (
"bytes"
"encoding/json"
"io/ioutil"
"net/http"
"strings"
"sync"

Expand Down Expand Up @@ -157,19 +155,7 @@ func (s *Space) SendMessage(msg string) error {
}

logger.Trace("Send message")
response, err := s.conversation.device.RequestService("POST", "conversationServiceUrl", "/activities", bytes.NewReader(data))
if err != nil {
return errors.Wrap(err, "failed to create request")
}

if response.StatusCode != http.StatusOK {
responseError, err := ioutil.ReadAll(response.Body)
if err != nil {
return errors.Wrap(err, "failed to read error response")
}
return errors.Errorf("failed to send message: %s", responseError)
}

s.conversation.activityQueue <- bytes.NewReader(data)
return nil
}

Expand Down

0 comments on commit ae65ad2

Please sign in to comment.