diff --git a/go.mod b/go.mod index 9777dd1c..dd0f0648 100644 --- a/go.mod +++ b/go.mod @@ -22,7 +22,6 @@ require ( github.com/nats-io/nats-server/v2 v2.9.14 // indirect github.com/nats-io/nkeys v0.3.0 // indirect github.com/nats-io/nuid v1.0.1 // indirect - golang.org/x/crypto v0.5.0 // indirect ) require ( diff --git a/go.sum b/go.sum index dfb20db4..c11c69c8 100644 --- a/go.sum +++ b/go.sum @@ -214,6 +214,8 @@ github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsT github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= +github.com/rabbitmq/amqp091-go v1.7.0 h1:V5CF5qPem5OGSnEo8BoSbsDGwejg6VUJsKEdneaoTUo= +github.com/rabbitmq/amqp091-go v1.7.0/go.mod h1:wfClAtY0C7bOHxd3GjmF26jEHn+rR/0B3+YV+Vn9/NI= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/segmentio/kafka-go v0.4.32 h1:Ohr+9E+kDv/Ld2UPJN9hnKZRd2qgiqCmI8v2e1qlfLM= github.com/segmentio/kafka-go v0.4.32/go.mod h1:JAPPIiY3MQIwVHj64CWOP0LsFFfQ7H0w69kuoxnMIS0= @@ -224,6 +226,7 @@ github.com/sirupsen/logrus v1.9.0 h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0 github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= @@ -231,6 +234,8 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.2 h1:4jaiDzPyXQvSd7D0EjG45355tLlV3VOECpq10pLC+8s= github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1FQKckRals= +github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk= github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0= @@ -239,11 +244,13 @@ github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= +go.uber.org/goleak v1.2.0/go.mod h1:XJYK+MuIchqpmGmUSAzotztawfKvYLUIgg7guXrwVUo= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190506204251-e1dfcc566284/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= @@ -284,6 +291,7 @@ golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzB golang.org/x/mod v0.1.1-0.20191107180719-034126e5016b/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -315,6 +323,7 @@ golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81R golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= @@ -374,7 +383,9 @@ golang.org/x/sys v0.0.0-20200803210538-64077c9b5642/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -438,6 +449,7 @@ golang.org/x/tools v0.0.0-20200729194436-6467de6f59a7/go.mod h1:njjCfa9FT2d7l9Bc golang.org/x/tools v0.0.0-20200804011535-6c149bb5ef0d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= golang.org/x/tools v0.0.0-20200825202427-b303f430e36d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/receiver/auto.go b/receiver/auto.go index 27ef9dc8..1363005d 100644 --- a/receiver/auto.go +++ b/receiver/auto.go @@ -115,4 +115,9 @@ func init() { Alloc: func() interface{} { return &Kafka{} }, Help: "Connect to a Kafka topic and consume messages.", }) + Auto.Add(skogul.Module{ + Name: "rabbitmq", + Alloc: func() interface{} { return &Rabbitmq{} }, + Help: "Connect to a Rabbitmq topic and consume messages.", + }) } diff --git a/receiver/rabbitmq.go b/receiver/rabbitmq.go new file mode 100644 index 00000000..2c43d97f --- /dev/null +++ b/receiver/rabbitmq.go @@ -0,0 +1,123 @@ +/* + * skogul, rabbitmq-receiver + * + * Copyright (c) 2023 Telenor Norge AS + * Author(s): + * - Kamil Oracz + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA + * 02110-1301 USA + */ + +package receiver + +import ( + "fmt" + + amqp "github.com/rabbitmq/amqp091-go" + "github.com/telenornms/skogul" +) + +type Rabbitmq struct { + Username skogul.Secret `doc:"Username for rabbitmq instance"` + Password skogul.Secret `doc:"Password for rabbitmq instance"` + Host string `doc:"Hostname for rabbitmq instance. Fallback is localhost"` + Port string `doc:"Port for rabbitmq instance. Fallback is 5672"` + Queue string `doc:"Queue to read from"` + Handler *skogul.HandlerRef `doc:"Handler used to parse, transform and send data. Default skogul."` +} + +func (r *Rabbitmq) Start() error { + if r.Port == "" { + r.Port = "5672" + } + + if r.Host == "" { + r.Host = "localhost" + } + + conn, err := amqp.Dial(fmt.Sprintf("amqp://%s:%s@%s:%s/", r.Username.Expose(), r.Password.Expose(), r.Host, r.Port)) + if err != nil { + return err + } + + ch, err := conn.Channel() + + if err != nil { + return err + } + + _, err = ch.QueueDeclare( + r.Queue, + false, + false, + false, + false, + nil, + ) + + if err != nil { + return err + } + + msgs, err := ch.Consume( + r.Queue, + "", + true, + false, + false, + false, + nil, + ) + + if err != nil { + return err + } + + for message := range msgs { + container, err := r.Handler.H.Parse(message.Body) + + if err != nil { + return err + } + + err = r.Handler.H.TransformAndSend(container) + if err != nil { + return err + } + } + + return nil +} + +func (r *Rabbitmq) Verify() error { + if r.Handler.Name == "" { + return skogul.MissingArgument("Handler") + } + + if r.Username.Expose() == "" { + return skogul.MissingArgument("Username") + } + + if r.Password.Expose() == "" { + return skogul.MissingArgument("Password") + } + + if r.Queue == "" { + return skogul.MissingArgument("Queue") + } + + return nil +} diff --git a/receiver/rabbitmq_test.go b/receiver/rabbitmq_test.go new file mode 100644 index 00000000..6b3c5e22 --- /dev/null +++ b/receiver/rabbitmq_test.go @@ -0,0 +1,80 @@ +/* + * skogul, rabbitmq-receiver test + * + * Copyright (c) 2023 Telenor Norge AS + * Author(s): + * - Kamil Oracz + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA + * 02110-1301 USA + */ + +package receiver_test + +import ( + "fmt" + "testing" + + "github.com/telenornms/skogul/config" + "github.com/telenornms/skogul/receiver" +) + +func TestRabbitmq(t *testing.T) { + if testing.Short() { + t.Skip("Short test: Not connecting to a Rabbitmq instance") + } + + sconf := fmt.Sprintf(` + { + "receivers": { + "x": { + "type": "rabbitmq", + "handler": "kek", + "username":"guest", + "password":"guest", + "queue":"test-queue" + } + }, + "handlers": { + "kek": { + "parser": "skogulmetric", + "transformers": [ + "now" + ], + "sender": "test" + } + }, + "senders": { + "test": { + "type": "test" + } + } + }`) + + conf, err := config.Bytes([]byte(sconf)) + + if err != nil { + t.Errorf("Failed to load config: %v", err) + return + } + + rcv := conf.Receivers["x"].Receiver.(*receiver.Rabbitmq) + + err = rcv.Start() + + if err != nil { + t.Error(err) + } +} diff --git a/sender/auto.go b/sender/auto.go index 0c25827e..42f96760 100644 --- a/sender/auto.go +++ b/sender/auto.go @@ -178,6 +178,11 @@ func init() { Help: "EXPERIMENTAL Kafka sender", }) Auto.Add(skogul.Module{ + Name: "rabbitmq", + Alloc: func() interface{} { return &Rabbitmq{} }, + Help: "Rabbitmq sender", + }) + Auto.Add(skogul.Module{ Name: "snmp", Alloc: func() interface{} { return &SNMP{} }, Help: "Encodes and sends an snmp trap. This is an experimental feature, please use with caution.", diff --git a/sender/rabbitmq.go b/sender/rabbitmq.go new file mode 100644 index 00000000..77176704 --- /dev/null +++ b/sender/rabbitmq.go @@ -0,0 +1,149 @@ +/* + * skogul, rabbitmq producer/sender + * + * Copyright (c) 2023 Telenor Norge AS + * Author(s): + * - Kamil Oracz + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA + * 02110-1301 USA + */ + +package sender + +import ( + "context" + "fmt" + "sync" + "time" + + amqp "github.com/rabbitmq/amqp091-go" + "github.com/telenornms/skogul" + "github.com/telenornms/skogul/encoder" +) + +type Rabbitmq struct { + Username skogul.Secret `doc:"Username for rabbitmq instance"` + Password skogul.Secret `doc:"Password for rabbitmq instance"` + Host string `doc:"Hostname for rabbitmq instance. Fallback is localhost"` + Port string `doc:"Port for rabbitmq instance. Fallback is 5672"` + Queue string `doc:"Queue to write to"` + Encoder skogul.EncoderRef `doc:"Encoder to use. Fallback is json"` + Timeout int `doc:"Timeout for rabbitmq instance connection. Fallback is 10 seconds."` + channel *amqp.Channel + once sync.Once +} + +var rabbitmqLog = skogul.Logger("sender", "rabbitmq") + +func (r *Rabbitmq) init() { + if r.Port == "" { + r.Port = "5672" + } + + if r.Host == "" { + r.Host = "localhost" + } + + if r.Timeout == 0 { + r.Timeout = 10 + } + + if r.Encoder.E == nil { + r.Encoder.E = encoder.JSON{} + } + + conn, err := amqp.Dial(fmt.Sprintf("amqp://%s:%s@%s:%s/", r.Username.Expose(), r.Password.Expose(), r.Host, r.Port)) + if err != nil { + rabbitmqLog.WithError(err).Error("Failed initializing broker connection") + return + } + + ch, err := conn.Channel() + if err != nil { + rabbitmqLog.WithError(err).Error("Failed initializing channel") + return + } + + r.channel = ch + + _, err = ch.QueueDeclare( + r.Queue, + false, + false, + false, + false, + nil, + ) + + if err != nil { + rabbitmqLog.WithError(err).Error("Failed to declare a queue") + return + } +} + +func (r *Rabbitmq) Send(c *skogul.Container) error { + r.once.Do(func() { + r.init() + }) + + if r.channel == nil { + return fmt.Errorf("No active rabbitmq connections") + } + + body, err := r.Encoder.E.Encode(c) + if err != nil { + r.channel.Close() + return err + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(r.Timeout)*time.Second) + defer cancel() + + err = r.channel.PublishWithContext( + ctx, + "", + r.Queue, + false, + false, + amqp.Publishing{ + ContentType: "text/plain", + Body: body, + }, + ) + + if err != nil { + r.channel.Close() + return err + } + + return nil +} + +func (r *Rabbitmq) Verify() error { + if r.Username.Expose() == "" { + return skogul.MissingArgument("Username") + } + + if r.Password.Expose() == "" { + return skogul.MissingArgument("Password") + } + + if r.Queue == "" { + return skogul.MissingArgument("Queue") + } + + return nil +} diff --git a/sender/rabbitmq_test.go b/sender/rabbitmq_test.go new file mode 100644 index 00000000..ad937d58 --- /dev/null +++ b/sender/rabbitmq_test.go @@ -0,0 +1,94 @@ +/* + * skogul, rabbitmq producer/sender test + * + * Copyright (c) 2023 Telenor Norge AS + * Author(s): + * - Kamil Oracz + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA + * 02110-1301 USA + */ + +package sender + +import ( + "testing" + "time" + + "github.com/telenornms/skogul" +) + +func createContainer() *skogul.Container { + meta := make(map[string]interface{}) + meta["foo"] = "bar" + data := make(map[string]interface{}) + data["baz"] = "qux" + + metric := skogul.Metric{ + Time: &time.Time{}, + Metadata: meta, + Data: data, + } + metrics := make([]*skogul.Metric, 0) + metrics = append(metrics, &metric) + + return &skogul.Container{ + Metrics: metrics, + } +} + +func TestRabbitmq(t *testing.T) { + if testing.Short() { + t.Skip("Short test: Not connecting to a Rabbitmq instance") + } + + data := createContainer() + + r := Rabbitmq{ + Username: "guest", + Password: "guest", + Queue: "test-queue", + } + + err := r.Send(data) + + if err != nil { + t.Error(err) + } +} + +func TestRabbitmqTonsOfMessages(t *testing.T) { + if testing.Short() { + t.Skip("Short test: Not connecting to a Rabbitmq instance") + } + + data := createContainer() + + r := Rabbitmq{ + Username: "guest", + Password: "guest", + Queue: "test-queue", + } + + i := 0 + for i < 100000 { + err := r.Send(data) + + if err != nil { + t.Error(err) + } + i++ + } +}