Skip to content

Commit

Permalink
Add method to backend to update flow session timeouts by msg ID
Browse files Browse the repository at this point in the history
Adds a single query that will update a message's flowsession if it is in waiting status, with the correct timeout.
  • Loading branch information
jeremyist committed Sep 14, 2023
1 parent 198e154 commit 05e32f5
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 6 deletions.
3 changes: 3 additions & 0 deletions backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ type Backend interface {
// used to determine any sort of deduping of msg sends
MarkOutgoingMsgComplete(context.Context, Msg, MsgStatus)

// SetFlowSessionTimeoutByMsgId If the flow session for a given msg ID is waiting, set its timeout
SetFlowSessionTimeoutByMsgId(context.Context, MsgID) error

// Check if external ID has been seen in a period
CheckExternalIDSeen(Msg) Msg

Expand Down
16 changes: 16 additions & 0 deletions backends/rapidpro/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,22 @@ func (b *backend) MarkOutgoingMsgComplete(ctx context.Context, msg courier.Msg,
}
}

// Joins the flow session and gets the last event of the last run and checks if it is a msg_wait event
const updateFlowSessionTimeoutByMsgID = `
update flows_flowsession sess_orig
set timeout_on=(NOW() + (sess.output::json->'runs'->-1->'events'->-1->>'timeout_seconds')::int * interval '1 second')
from flows_flowsession as sess
left join msgs_msg msg on sess.current_flow_id=msg.flow_id
where msg.id=$1 and sess.status='W'
and sess.output::json->'runs'->-1->'events'->-1->>'type' = 'msg_wait'
and sess_orig.id=sess.id;
`

func (b *backend) SetFlowSessionTimeoutByMsgId(ctx context.Context, id courier.MsgID) error {
_, err := b.db.ExecContext(ctx, updateFlowSessionTimeoutByMsgID, id)
return err
}

// WriteMsg writes the passed in message to our store
func (b *backend) WriteMsg(ctx context.Context, m courier.Msg) error {
timeout, cancel := context.WithTimeout(ctx, backendTimeout)
Expand Down
18 changes: 12 additions & 6 deletions handlers/postmaster/postmaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,18 @@ import (
"encoding/json"
"fmt"
"github.com/go-chi/chi"
"github.com/nyaruka/courier"
"github.com/nyaruka/courier/backends/rapidpro"
"github.com/nyaruka/courier/handlers"
"github.com/nyaruka/courier/utils"
"github.com/nyaruka/gocommon/urns"
"github.com/sirupsen/logrus"
"net/http"
"os"
"regexp"
"strconv"
"strings"

"github.com/nyaruka/courier"
"github.com/nyaruka/courier/handlers"
"github.com/nyaruka/courier/utils"
"github.com/nyaruka/gocommon/urns"

validator "gopkg.in/go-playground/validator.v9"
)

Expand Down Expand Up @@ -173,7 +172,14 @@ func (h *handler) receiveStatus(ctx context.Context, channel courier.Channel, w

cid, err := strconv.ParseInt(payload.MessageID, 10, 64)

status := h.Backend().NewMsgStatusForID(channel, courier.NewMsgID(cid), statusMapping[payload.Status])
courierStatus := statusMapping[payload.Status]
id := courier.NewMsgID(cid)

if courierStatus == courier.MsgSent {
_ = h.Backend().SetFlowSessionTimeoutByMsgId(ctx, id)
}

status := h.Backend().NewMsgStatusForID(channel, id, courierStatus)

return handlers.WriteMsgStatusAndResponse(ctx, h, channel, status, w, r)
}
Expand Down
4 changes: 4 additions & 0 deletions test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,10 @@ func (mb *MockBackend) MarkOutgoingMsgComplete(ctx context.Context, msg Msg, s M
mb.sentMsgs[msg.ID()] = true
}

func (mb *MockBackend) SetFlowSessionTimeoutByMsgId(ctx context.Context, id MsgID) error {
return nil
}

// WriteChannelLogs writes the passed in channel logs to the DB
func (mb *MockBackend) WriteChannelLogs(ctx context.Context, logs []*ChannelLog) error {
mb.mutex.Lock()
Expand Down

0 comments on commit 05e32f5

Please sign in to comment.