diff --git a/avro/marshaller.go b/avro/marshaller.go deleted file mode 100644 index b0b720b..0000000 --- a/avro/marshaller.go +++ /dev/null @@ -1,17 +0,0 @@ -package avro - -import "github.com/hamba/avro" - -type Marshaller struct { - Schema avro.Schema -} - -func NewMarshaller(schema avro.Schema) *Marshaller { - return &Marshaller{Schema: schema} -} -func (c *Marshaller) Unmarshal(data []byte, v interface{}) error { - return avro.Unmarshal(c.Schema, data, v) -} -func (c *Marshaller) Marshal(v interface{}) ([]byte, error) { - return avro.Marshal(c.Schema, v) -} diff --git a/sqs/func.go b/sqs/func.go deleted file mode 100644 index 78f730a..0000000 --- a/sqs/func.go +++ /dev/null @@ -1,30 +0,0 @@ -package sqs - -import ( - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/service/sqs" -) - -func GetQueueUrl(client *sqs.SQS, queueName string) (string, error) { - result, err := client.GetQueueUrl(&sqs.GetQueueUrlInput{ - QueueName: &queueName, - }) - if err != nil { - return "", err - } - return *result.QueueUrl, err -} - -func MapToAttributes(attributes map[string]string) map[string]*sqs.MessageAttributeValue { - attrs := make(map[string]*sqs.MessageAttributeValue) - if attributes != nil { - for k, v := range attributes { - x := sqs.MessageAttributeValue{ - DataType: aws.String("String"), - StringValue: aws.String(v), - } - attrs[k] = &x - } - } - return attrs -} diff --git a/sqs/health_checker.go b/sqs/health_checker.go deleted file mode 100644 index 4c113c2..0000000 --- a/sqs/health_checker.go +++ /dev/null @@ -1,57 +0,0 @@ -package sqs - -import ( - "context" - "github.com/aws/aws-sdk-go/service/sqs" - "time" -) - -type HealthChecker struct { - Client *sqs.SQS - QueueName *string - Service string - Timeout time.Duration -} - -func NewHealthChecker(client *sqs.SQS, queueName string, options ...string) *HealthChecker { - var name string - if len(options) > 0 && len(options[0]) > 0 { - name = options[0] - } else { - name = "sqs" - } - return NewSQSHealthChecker(client, name, queueName) -} -func NewSQSHealthChecker(client *sqs.SQS, name string, queueName string, options ...time.Duration) *HealthChecker { - var timeout time.Duration - if len(options) >= 1 && options[0] > 0 { - timeout = options[0] - } else { - timeout = 4 * time.Second - } - return &HealthChecker{Client: client, QueueName: &queueName, Service: name, Timeout: timeout} -} - -func (h *HealthChecker) Name() string { - return h.Service -} - -func (h *HealthChecker) Check(ctx context.Context) (map[string]interface{}, error) { - res := make(map[string]interface{}) - h.Client.Config.HTTPClient.Timeout = h.Timeout - _, err := h.Client.GetQueueUrl(&sqs.GetQueueUrlInput{ - QueueName: h.QueueName, - }) - return res, err -} - -func (h *HealthChecker) Build(ctx context.Context, data map[string]interface{}, err error) map[string]interface{} { - if err == nil { - return data - } - if data == nil { - data = make(map[string]interface{}, 0) - } - data["error"] = err.Error() - return data -} diff --git a/sqs/queue_sender.go b/sqs/queue_sender.go deleted file mode 100644 index 5d8f867..0000000 --- a/sqs/queue_sender.go +++ /dev/null @@ -1,44 +0,0 @@ -package sqs - -import ( - "context" - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/service/sqs" -) - -type QueueSender struct { - Client *sqs.SQS - DelaySeconds *int64 //could be 10 -} - -func NewQueueSender(client *sqs.SQS, delaySeconds int64) *QueueSender { - return &QueueSender{Client: client, DelaySeconds: &delaySeconds} -} -func (p *QueueSender) Send(ctx context.Context, queueName string, data []byte, attributes map[string]string) error { - queueUrl, er0 := GetQueueUrl(p.Client, queueName) - if er0 != nil { - return er0 - } - attrs := MapToAttributes(attributes) - s := string(data) - _, err := p.Client.SendMessage(&sqs.SendMessageInput{ - DelaySeconds: p.DelaySeconds, - MessageAttributes: attrs, - MessageBody: aws.String(s), - QueueUrl: &queueUrl, - }) - return err -} -func (p *QueueSender) SendBody(ctx context.Context, queueName string, data []byte) error { - queueUrl, er0 := GetQueueUrl(p.Client, queueName) - if er0 != nil { - return er0 - } - s := string(data) - _, err := p.Client.SendMessage(&sqs.SendMessageInput{ - DelaySeconds: p.DelaySeconds, - MessageBody: aws.String(s), - QueueUrl: &queueUrl, - }) - return err -} diff --git a/sqs/receiver.go b/sqs/receiver.go deleted file mode 100644 index be9fc66..0000000 --- a/sqs/receiver.go +++ /dev/null @@ -1,155 +0,0 @@ -package sqs - -import ( - "context" - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/service/sqs" -) - -type Receiver struct { - Client *sqs.SQS - QueueURL *string - AckOnConsume bool - VisibilityTimeout int64 // should be 20 (seconds) - WaitTimeSeconds int64 // should be 0 - LogError func(ctx context.Context, msg string) -} - -func NewReceiverByQueueName(client *sqs.SQS, queueName string, ackOnConsume bool, visibilityTimeout int64, waitTimeSeconds int64) (*Receiver, error) { - queueUrl, err := GetQueueUrl(client, queueName) - if err != nil { - return nil, err - } - return NewReceiver(client, queueUrl, ackOnConsume, visibilityTimeout, waitTimeSeconds), nil -} - -func NewReceiver(client *sqs.SQS, queueURL string, ackOnConsume bool, visibilityTimeout int64, waitTimeSeconds int64) *Receiver { - return &Receiver{Client: client, QueueURL: &queueURL, AckOnConsume: ackOnConsume, VisibilityTimeout: visibilityTimeout, WaitTimeSeconds: waitTimeSeconds} -} - -func (c *Receiver) Receive(ctx context.Context, handle func(context.Context, []byte, map[string]string)) { - var result *sqs.ReceiveMessageOutput - var er1 error -loop: - result, er1 = c.Client.ReceiveMessage(&sqs.ReceiveMessageInput{ - AttributeNames: []*string{ - aws.String(sqs.MessageSystemAttributeNameSentTimestamp), - }, - MessageAttributeNames: []*string{ - aws.String(sqs.QueueAttributeNameAll), - }, - QueueUrl: c.QueueURL, - MaxNumberOfMessages: aws.Int64(1), - VisibilityTimeout: aws.Int64(c.VisibilityTimeout), // 20 seconds - WaitTimeSeconds: aws.Int64(c.WaitTimeSeconds), - }) - if er1 != nil { - c.LogError(ctx, "Error when subscribe: "+er1.Error()) - } else { - if len(result.Messages) > 0 { - m := result.Messages[0] - data := []byte(*m.Body) - attributes := PtrToMap(m.Attributes) - if c.AckOnConsume { - _, er2 := c.Client.DeleteMessage(&sqs.DeleteMessageInput{ - QueueUrl: c.QueueURL, - ReceiptHandle: result.Messages[0].ReceiptHandle, - }) - if er2 != nil { - c.LogError(ctx, "Error when delete message: "+er2.Error()) - } else { - handle(ctx, data, attributes) - } - } else { - handle(ctx, data, attributes) - } - } - } - goto loop -} -func (c *Receiver) ReceiveBody(ctx context.Context, handle func(context.Context, []byte)) { - var result *sqs.ReceiveMessageOutput - var er1 error -loop: - result, er1 = c.Client.ReceiveMessage(&sqs.ReceiveMessageInput{ - AttributeNames: []*string{ - aws.String(sqs.MessageSystemAttributeNameSentTimestamp), - }, - MessageAttributeNames: []*string{ - aws.String(sqs.QueueAttributeNameAll), - }, - QueueUrl: c.QueueURL, - MaxNumberOfMessages: aws.Int64(1), - VisibilityTimeout: aws.Int64(c.VisibilityTimeout), // 20 seconds - WaitTimeSeconds: aws.Int64(c.WaitTimeSeconds), - }) - if er1 != nil { - c.LogError(ctx, "Error when subscribe: "+er1.Error()) - } else { - if len(result.Messages) > 0 { - m := result.Messages[0] - data := []byte(*m.Body) - if c.AckOnConsume { - _, er2 := c.Client.DeleteMessage(&sqs.DeleteMessageInput{ - QueueUrl: c.QueueURL, - ReceiptHandle: result.Messages[0].ReceiptHandle, - }) - if er2 != nil { - c.LogError(ctx, "Error when delete message: "+er2.Error()) - } else { - handle(ctx, data) - } - } else { - handle(ctx, data) - } - } - } - goto loop -} -func (c *Receiver) ReceiveMessage(ctx context.Context, handle func(context.Context, *sqs.Message)) { - var result *sqs.ReceiveMessageOutput - var er1 error -loop: - result, er1 = c.Client.ReceiveMessage(&sqs.ReceiveMessageInput{ - AttributeNames: []*string{ - aws.String(sqs.MessageSystemAttributeNameSentTimestamp), - }, - MessageAttributeNames: []*string{ - aws.String(sqs.QueueAttributeNameAll), - }, - QueueUrl: c.QueueURL, - MaxNumberOfMessages: aws.Int64(1), - VisibilityTimeout: aws.Int64(c.VisibilityTimeout), // 20 seconds - WaitTimeSeconds: aws.Int64(c.WaitTimeSeconds), - }) - if er1 != nil { - c.LogError(ctx, "Error when subscribe: "+er1.Error()) - } else { - if len(result.Messages) > 0 { - m := result.Messages[0] - if c.AckOnConsume { - _, er2 := c.Client.DeleteMessage(&sqs.DeleteMessageInput{ - QueueUrl: c.QueueURL, - ReceiptHandle: result.Messages[0].ReceiptHandle, - }) - if er2 != nil { - c.LogError(ctx, "Error when delete message: "+er2.Error()) - } else { - handle(ctx, m) - } - } else { - handle(ctx, m) - } - } - } - goto loop -} -func PtrToMap(m map[string]*string) map[string]string { - attributes := make(map[string]string) - for k, v := range m { - if v != nil { - attributes[k] = *v - } - } - return attributes -} diff --git a/sqs/sender.go b/sqs/sender.go deleted file mode 100644 index 49bb66d..0000000 --- a/sqs/sender.go +++ /dev/null @@ -1,56 +0,0 @@ -package sqs - -import ( - "context" - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/service/sqs" -) - -type Sender struct { - Client *sqs.SQS - QueueURL *string - DelaySeconds *int64 //could be 10 -} - -func NewSenderByQueueName(client *sqs.SQS, queueName string, delaySeconds int64) (*Sender, error) { - queueUrl, err := GetQueueUrl(client, queueName) - if err != nil { - return nil, err - } - return NewSender(client, queueUrl, delaySeconds), nil -} - -func NewSender(client *sqs.SQS, queueURL string, delaySeconds int64) *Sender { - return &Sender{Client: client, QueueURL: &queueURL, DelaySeconds: &delaySeconds} -} -func (p *Sender) Send(ctx context.Context, data []byte, attributes map[string]string) error { - attrs := MapToAttributes(attributes) - s := string(data) - _, err := p.Client.SendMessage(&sqs.SendMessageInput{ - DelaySeconds: p.DelaySeconds, - MessageAttributes: attrs, - MessageBody: aws.String(s), - QueueUrl: p.QueueURL, - }) - return err -} -func (p *Sender) SendBody(ctx context.Context, data []byte) error { - s := string(data) - _, err := p.Client.SendMessage(&sqs.SendMessageInput{ - DelaySeconds: p.DelaySeconds, - MessageBody: aws.String(s), - QueueUrl: p.QueueURL, - }) - return err -} -func (p *Sender) SendMessage(msg *sqs.SendMessageInput) (string, error) { - if msg == nil { - return "", nil - } - result, err := p.Client.SendMessage(msg) - if result != nil && result.MessageId != nil { - return *result.MessageId, err - } else { - return "", err - } -} diff --git a/sqs/sqs.go b/sqs/sqs.go deleted file mode 100644 index a2956dc..0000000 --- a/sqs/sqs.go +++ /dev/null @@ -1,38 +0,0 @@ -package sqs - -import ( - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/credentials" - "github.com/aws/aws-sdk-go/aws/session" - "github.com/aws/aws-sdk-go/service/sqs" -) - -type ( - Config struct { - Region string `yaml:"region" mapstructure:"region" json:"region,omitempty" gorm:"column:region" bson:"region,omitempty" dynamodbav:"region,omitempty" firestore:"region,omitempty"` - AccessKeyID string `yaml:"access_key_id" mapstructure:"access_key_id" json:"accessKeyID,omitempty" gorm:"column:accessKeyID" bson:"accessKeyID,omitempty" dynamodbav:"accessKeyID,omitempty" firestore:"accessKeyID,omitempty"` - SecretAccessKey string `yaml:"secret_access_key" mapstructure:"secret_access_key" json:"secretAccessKey,omitempty" gorm:"column:secretaccesskey" bson:"secretAccessKey,omitempty" dynamodbav:"secretAccessKey,omitempty" firestore:"secretAccessKey,omitempty"` - QueueName string `yaml:"a" mapstructure:"queue_name" json:"queueName,omitempty" gorm:"column:token" bson:"queueName,omitempty" dynamodbav:"queueName,omitempty" firestore:"queueName,omitempty"` - } -) - -func NewSession(config Config) (*session.Session, error) { - c := &aws.Config{ - Region: aws.String(config.Region), - Credentials: credentials.NewStaticCredentials(config.AccessKeyID, config.SecretAccessKey, ""), - } - return session.NewSession(c) -} - -func Connect(config Config) (*sqs.SQS, error) { - sess, err := NewSession(config) - if err != nil { - return nil, err - } - mySQS := sqs.New(sess) - return mySQS, nil -} - -func ConnectWithSession(session *session.Session) *sqs.SQS { - return sqs.New(session) -}