Skip to content

Commit

Permalink
error check ffor ack send
Browse files Browse the repository at this point in the history
  • Loading branch information
SunilSKamath4s committed Feb 22, 2022
1 parent d3a7f7e commit 4c500f9
Show file tree
Hide file tree
Showing 7 changed files with 87 additions and 0 deletions.
25 changes: 25 additions & 0 deletions message/amqp_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,28 @@ func (m AmqpMessage) SendAck(ackMessage ...[]byte) {
func (m AmqpMessage) SendNack(ackMessage ...[]byte) {
m.RawMessage.Nack(false, true)
}

func (m AmqpMessage) SendAckWithError(ackMessage ...[]byte) error {
if val, ok := m.GetProperty("protocol_type"); ok && val == "reqrep" {
resp := []byte("OK")
if len(ackMessage) > 0 {
resp = ackMessage[0]
}
m.Responder.Publish(
"",
m.RawMessage.GetReplyTo(),
false,
false,
amqp.Publishing{
ContentType: "text/plain",
CorrelationId: m.RawMessage.GetCorrelationId(),
Body: resp,
},
)
}
return m.RawMessage.Ack(false)
}

func (m AmqpMessage) SendNackWithError(ackMessage ...[]byte) error {
return m.RawMessage.Nack(false, true)
}
2 changes: 2 additions & 0 deletions message/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ type Message interface {
SetProperty(string, string)
SendAck(...[]byte)
SendNack(...[]byte)
SendAckWithError(...[]byte) error
SendNackWithError(...[]byte) error
}

type RawMessage interface {
Expand Down
16 changes: 16 additions & 0 deletions message/nano_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,19 @@ func (m NanoMessage) SendNack(ackMessage ...[]byte) {
m.Responder.Send(resp)
return
}

func (m NanoMessage) SendAckWithError(ackMsg ...[]byte) error {
resp := []byte("OK")
if len(ackMsg) > 0 {
resp = ackMsg[0]
}
return m.Responder.Send(resp)
}

func (m NanoMessage) SendNackWithError(ackMessage ...[]byte) error {
resp := []byte("ERR")
if len(ackMessage) > 0 {
resp = ackMessage[0]
}
return m.Responder.Send(resp)
}
16 changes: 16 additions & 0 deletions message/nats_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,19 @@ func (m NatsMessage) SendNack(ackMessage ...[]byte) {
m.Responder.Publish(m.RawMessage.GetReplyTo(), resp)
return
}

func (m NatsMessage) SendAckWithError(ackMsg ...[]byte) error {
resp := []byte("OK")
if len(ackMsg) > 0 {
resp = ackMsg[0]
}
return m.Responder.Publish(m.RawMessage.GetReplyTo(), resp)
}

func (m NatsMessage) SendNackWithError(ackMessage ...[]byte) error {
resp := []byte("NOK")
if len(ackMessage) > 0 {
resp = ackMessage[0]
}
return m.Responder.Publish(m.RawMessage.GetReplyTo(), resp)
}
8 changes: 8 additions & 0 deletions message/nats_streaming_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,11 @@ func (m NatsStreamMessage) SendAck(ackMsg ...[]byte) {
func (m NatsStreamMessage) SendNack(ackMessage ...[]byte) {
return
}

func (m NatsStreamMessage) SendAckWithError(ackMsg ...[]byte) error {
return m.RawMessage.Ack(false)
}

func (m NatsStreamMessage) SendNackWithError(ackMessage ...[]byte) error {
return nil
}
10 changes: 10 additions & 0 deletions message/pubnub_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,13 @@ func (m *PubnubMessage) SendNack(ackMessage ...[]byte) {
m.SetProperty("ack", "NOK")
return
}

func (m *PubnubMessage) SendAckWithError(ackMsg ...[]byte) error {
m.SetProperty("ack", "OK")
return nil
}

func (m *PubnubMessage) SendNackWithError(ackMessage ...[]byte) error {
m.SetProperty("ack", "NOK")
return nil
}
10 changes: 10 additions & 0 deletions message/redis_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,13 @@ func (m *RedisMessage) SendNack(ackMessage ...[]byte) {
m.SetProperty("ack", "NOK")
return
}

func (m *RedisMessage) SendAckWithError(ackMsg ...[]byte) error {
m.SetProperty("ack", "OK")
return nil
}

func (m *RedisMessage) SendNackWithError(ackMessage ...[]byte) error {
m.SetProperty("ack", "NOK")
return nil
}

0 comments on commit 4c500f9

Please sign in to comment.