From 05e32f5d1e69eaf80b5fe1c3491015e7834af1c1 Mon Sep 17 00:00:00 2001 From: Jeremy Steele Date: Wed, 13 Sep 2023 23:06:58 -0400 Subject: [PATCH] Add method to backend to update flow session timeouts by msg ID Adds a single query that will update a message's flowsession if it is in waiting status, with the correct timeout. --- backend.go | 3 +++ backends/rapidpro/backend.go | 16 ++++++++++++++++ handlers/postmaster/postmaster.go | 18 ++++++++++++------ test.go | 4 ++++ 4 files changed, 35 insertions(+), 6 deletions(-) diff --git a/backend.go b/backend.go index 924cc25..180476a 100644 --- a/backend.go +++ b/backend.go @@ -94,6 +94,9 @@ type Backend interface { // used to determine any sort of deduping of msg sends MarkOutgoingMsgComplete(context.Context, Msg, MsgStatus) + // SetFlowSessionTimeoutByMsgId If the flow session for a given msg ID is waiting, set its timeout + SetFlowSessionTimeoutByMsgId(context.Context, MsgID) error + // Check if external ID has been seen in a period CheckExternalIDSeen(Msg) Msg diff --git a/backends/rapidpro/backend.go b/backends/rapidpro/backend.go index 267793d..23dcdeb 100644 --- a/backends/rapidpro/backend.go +++ b/backends/rapidpro/backend.go @@ -250,6 +250,22 @@ func (b *backend) MarkOutgoingMsgComplete(ctx context.Context, msg courier.Msg, } } +// Joins the flow session and gets the last event of the last run and checks if it is a msg_wait event +const updateFlowSessionTimeoutByMsgID = ` +update flows_flowsession sess_orig + set timeout_on=(NOW() + (sess.output::json->'runs'->-1->'events'->-1->>'timeout_seconds')::int * interval '1 second') +from flows_flowsession as sess + left join msgs_msg msg on sess.current_flow_id=msg.flow_id +where msg.id=$1 and sess.status='W' + and sess.output::json->'runs'->-1->'events'->-1->>'type' = 'msg_wait' +and sess_orig.id=sess.id; +` + +func (b *backend) SetFlowSessionTimeoutByMsgId(ctx context.Context, id courier.MsgID) error { + _, err := b.db.ExecContext(ctx, updateFlowSessionTimeoutByMsgID, id) + return err +} + // WriteMsg writes the passed in message to our store func (b *backend) WriteMsg(ctx context.Context, m courier.Msg) error { timeout, cancel := context.WithTimeout(ctx, backendTimeout) diff --git a/handlers/postmaster/postmaster.go b/handlers/postmaster/postmaster.go index 867409d..c27f921 100644 --- a/handlers/postmaster/postmaster.go +++ b/handlers/postmaster/postmaster.go @@ -6,7 +6,11 @@ import ( "encoding/json" "fmt" "github.com/go-chi/chi" + "github.com/nyaruka/courier" "github.com/nyaruka/courier/backends/rapidpro" + "github.com/nyaruka/courier/handlers" + "github.com/nyaruka/courier/utils" + "github.com/nyaruka/gocommon/urns" "github.com/sirupsen/logrus" "net/http" "os" @@ -14,11 +18,6 @@ import ( "strconv" "strings" - "github.com/nyaruka/courier" - "github.com/nyaruka/courier/handlers" - "github.com/nyaruka/courier/utils" - "github.com/nyaruka/gocommon/urns" - validator "gopkg.in/go-playground/validator.v9" ) @@ -173,7 +172,14 @@ func (h *handler) receiveStatus(ctx context.Context, channel courier.Channel, w cid, err := strconv.ParseInt(payload.MessageID, 10, 64) - status := h.Backend().NewMsgStatusForID(channel, courier.NewMsgID(cid), statusMapping[payload.Status]) + courierStatus := statusMapping[payload.Status] + id := courier.NewMsgID(cid) + + if courierStatus == courier.MsgSent { + _ = h.Backend().SetFlowSessionTimeoutByMsgId(ctx, id) + } + + status := h.Backend().NewMsgStatusForID(channel, id, courierStatus) return handlers.WriteMsgStatusAndResponse(ctx, h, channel, status, w, r) } diff --git a/test.go b/test.go index f41dae5..ba23885 100644 --- a/test.go +++ b/test.go @@ -177,6 +177,10 @@ func (mb *MockBackend) MarkOutgoingMsgComplete(ctx context.Context, msg Msg, s M mb.sentMsgs[msg.ID()] = true } +func (mb *MockBackend) SetFlowSessionTimeoutByMsgId(ctx context.Context, id MsgID) error { + return nil +} + // WriteChannelLogs writes the passed in channel logs to the DB func (mb *MockBackend) WriteChannelLogs(ctx context.Context, logs []*ChannelLog) error { mb.mutex.Lock()