forked from blaines/tasque-go
-
Notifications
You must be signed in to change notification settings - Fork 1
/
sqs_handler.go
98 lines (83 loc) · 2.43 KB
/
sqs_handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
package main
import (
"io/ioutil"
"log"
"net/http"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sqs"
"github.com/aws/aws-sdk-go/service/sqs/sqsiface"
"github.com/blaines/tasque-go/result"
)
// SQSHandler hello world
type SQSHandler struct {
client sqsiface.SQSAPI
messageID string
messageBody string
receiptHandle string
queueURL string
awsRegion string
}
// SQSClient hello world
type SQSClient struct {
queueURL string
awsRegion string
sqsClient sqsiface.SQSAPI
}
func (handler *SQSHandler) id() *string {
return &handler.messageID
}
func (handler *SQSHandler) body() *string {
return &handler.messageBody
}
func (handler *SQSHandler) initialize() {
handler.newClient(sqs.New(session.New(), &aws.Config{
Region: &handler.awsRegion,
MaxRetries: aws.Int(30),
HTTPClient: &http.Client{
Timeout: 30 * time.Second,
},
}))
}
func (handler *SQSHandler) newClient(client sqsiface.SQSAPI) {
handler.client = client
}
func (handler *SQSHandler) receive() bool {
receiveMessageParams := &sqs.ReceiveMessageInput{
QueueUrl: aws.String(handler.queueURL),
MaxNumberOfMessages: aws.Int64(1),
WaitTimeSeconds: aws.Int64(20),
}
receiveMessageResponse, receiveMessageError := handler.client.ReceiveMessage(receiveMessageParams)
if receiveMessageError != nil {
// Print the error, cast err to awserr.Error to get the Code and
// Message from an error.
log.Println("E: ", receiveMessageError.Error())
return false
}
if len(receiveMessageResponse.Messages) == 0 {
log.Println("I: ", "No messages retrieved from queue")
return false
}
handler.messageBody = *receiveMessageResponse.Messages[0].Body
handler.messageID = *receiveMessageResponse.Messages[0].MessageId
handler.receiptHandle = *receiveMessageResponse.Messages[0].ReceiptHandle
writeFileError := ioutil.WriteFile("payload.json", []byte(handler.messageBody), 0644)
if writeFileError != nil {
panic(writeFileError)
}
return true
}
func (handler *SQSHandler) success() {
deleteMessageParams := &sqs.DeleteMessageInput{
QueueUrl: aws.String(handler.queueURL),
ReceiptHandle: aws.String(handler.receiptHandle),
}
_, deleteMessageError := handler.client.DeleteMessage(deleteMessageParams)
if deleteMessageError != nil {
return
}
}
func (handler *SQSHandler) failure(err result.Result) {}
func (handler *SQSHandler) heartbeat() {}