Skip to content

Commit

Permalink
Merge pull request #164 from mattfont/add-KDS-pub-channel
Browse files Browse the repository at this point in the history
Add Kinesis publisher channel type
  • Loading branch information
jsccast authored Mar 14, 2023
2 parents b63e462 + 17599a8 commit 788364c
Show file tree
Hide file tree
Showing 7 changed files with 207 additions and 1 deletion.
Binary file added .DS_Store
Binary file not shown.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ dist/
vendor/
*~
**/__debug_bin
.DS_Store
25 changes: 25 additions & 0 deletions chans/kdspub/kds_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright 2023 Comcast Cable Communications Management, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/

package kdspub

import "testing"

func TestDocs(t *testing.T) {
(&KDSPubChan{}).DocSpec().Write("kdspub")
}
147 changes: 147 additions & 0 deletions chans/kdspub/kdspub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*
* Copyright 2023 Comcast Cable Communications Management, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/

package kdspub

import (
"context"
"encoding/json"
"fmt"

"github.com/Comcast/plax/dsl"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/kinesis"
)

func init() {
dsl.TheChanRegistry.Register(dsl.NewCtx(nil), "kdspub", NewKDSPubChan)
}

// KDSOpts is a configuration for a Kinesis consumer for a given
// stream.
type KDSOpts struct {
// StreamName is of course the name of the KDS.
StreamName string

// BufferSize is the size of the underlying channel buffer.
// Defaults to DefaultChanBufferSize.
BufferSize int
}

// KDSPubChan is a basic Kinesis stream consumer.
//
// This channel consumes messages from a Kinesis stream.
type KDSPubChan struct {
c chan dsl.Msg
ctl chan bool
svc *kinesis.Client

opts *KDSOpts
}

func (c *KDSPubChan) DocSpec() *dsl.DocSpec {
return &dsl.DocSpec{
Chan: &KDSPubChan{},
Opts: &KDSOpts{},
}
}

func NewKDSPubChan(ctx *dsl.Ctx, o interface{}) (dsl.Chan, error) {
js, err := json.Marshal(&o)
if err != nil {
return nil, dsl.NewBroken(err)
}

opts := KDSOpts{
BufferSize: dsl.DefaultChanBufferSize,
}

if err = json.Unmarshal(js, &opts); err != nil {
return nil, dsl.NewBroken(err)
}

return &KDSPubChan{
c: make(chan dsl.Msg, opts.BufferSize),
ctl: make(chan bool),
opts: &opts,
}, nil
}

func (c *KDSPubChan) Kind() dsl.ChanKind {
return "KDSPUB"
}

func (c *KDSPubChan) Open(ctx *dsl.Ctx) error {

cfg, err := config.LoadDefaultConfig(context.TODO())
if err != nil {
ctx.Logf("Error opening KDS stream %s", c.opts.StreamName)
}

c.svc = kinesis.NewFromConfig(cfg)

go c.Pub(ctx, dsl.Msg{})
return nil
}

func (c *KDSPubChan) Close(ctx *dsl.Ctx) error {
return nil
}

func (c *KDSPubChan) Sub(ctx *dsl.Ctx, topic string) error {
return dsl.Brokenf("Can't Sub on a KDS (%s)", c.opts.StreamName)
}

func (c *KDSPubChan) Pub(ctx *dsl.Ctx, m dsl.Msg) error {

ctx.Logf("Publishing to KDS %s", c.opts.StreamName)

// Load the Shared AWS Configuration (~/.aws/config)

input := &kinesis.PutRecordInput{
Data: []byte(m.Payload),
StreamName: aws.String(c.opts.StreamName),
PartitionKey: aws.String("test"),
}

_, err := c.svc.PutRecord(context.TODO(), input)

if err != nil {
ctx.Warnf("warning: KDSPubChan.PUB %s", err)
}
return nil
}
func (c *KDSPubChan) Recv(ctx *dsl.Ctx) chan dsl.Msg {
ctx.Logf("KDSPubChan Recv()")
return c.c
}

func (c *KDSPubChan) Kill(ctx *dsl.Ctx) error {
return fmt.Errorf("Kill is not supported by a %T", c)
}

func (c *KDSPubChan) To(ctx *dsl.Ctx, m dsl.Msg) error {
ctx.Logf("KDSPubChan To %s", m.Topic)
select {
case <-ctx.Done():
case c.c <- m:
}
return nil
}
3 changes: 2 additions & 1 deletion chans/std/std.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021 Comcast Cable Communications Management, LLC
* Copyright 2023 Comcast Cable Communications Management, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -30,6 +30,7 @@ import (
_ "github.com/Comcast/plax/chans/httpclient"
_ "github.com/Comcast/plax/chans/httpserver"
_ "github.com/Comcast/plax/chans/kds"
_ "github.com/Comcast/plax/chans/kdspub"
_ "github.com/Comcast/plax/chans/mqtt"
_ "github.com/Comcast/plax/chans/shell"
_ "github.com/Comcast/plax/chans/sqlc"
Expand Down
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ require (
github.com/alecthomas/jsonschema v0.0.0-20210526225647-edb03dcab7bc
github.com/avarabyeu/goRP/v5 v5.0.1 // indirect
github.com/aws/aws-sdk-go v1.40.4
github.com/aws/aws-sdk-go-v2 v1.17.5
github.com/aws/aws-sdk-go-v2/config v1.18.15
github.com/aws/aws-sdk-go-v2/service/kinesis v1.17.6
github.com/dop251/goja v0.0.0-20210720190508-a7a3a1366b2e
github.com/eclipse/paho.mqtt.golang v1.3.5
github.com/go-resty/resty/v2 v2.7.0 // indirect
Expand Down
29 changes: 29 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,34 @@ github.com/avarabyeu/goRP/v5 v5.0.1/go.mod h1:6rg0WJHFysx5jdEdUk8NbFxkEKwVuypxeZ
github.com/aws/aws-sdk-go v1.15.0/go.mod h1:mFuSZ37Z9YOHbQEwBWztmVzqXrEkub65tZoCYDt7FT0=
github.com/aws/aws-sdk-go v1.40.4 h1:kxTX1kVjuXN1vuq6JgZvWI/Lt9zCfUFuAAxFoq0dHYI=
github.com/aws/aws-sdk-go v1.40.4/go.mod h1:585smgzpB/KqRA+K3y/NL/oYRqQvpNJYvLm+LY1U59Q=
github.com/aws/aws-sdk-go-v2 v1.17.5 h1:TzCUW1Nq4H8Xscph5M/skINUitxM5UBAyvm2s7XBzL4=
github.com/aws/aws-sdk-go-v2 v1.17.5/go.mod h1:uzbQtefpm44goOPmdKyAlXSNcwlRgF3ePWVW6EtJvvw=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.10 h1:dK82zF6kkPeCo8J1e+tGx4JdvDIQzj7ygIoLg8WMuGs=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.10/go.mod h1:VeTZetY5KRJLuD/7fkQXMU6Mw7H5m/KP2J5Iy9osMno=
github.com/aws/aws-sdk-go-v2/config v1.18.15 h1:509yMO0pJUGUugBP2H9FOFyV+7Mz7sRR+snfDN5W4NY=
github.com/aws/aws-sdk-go-v2/config v1.18.15/go.mod h1:vS0tddZqpE8cD9CyW0/kITHF5Bq2QasW9Y1DFHD//O0=
github.com/aws/aws-sdk-go-v2/credentials v1.13.15 h1:0rZQIi6deJFjOEgHI9HI2eZcLPPEGQPictX66oRFLL8=
github.com/aws/aws-sdk-go-v2/credentials v1.13.15/go.mod h1:vRMLMD3/rXU+o6j2MW5YefrGMBmdTvkLLGqFwMLBHQc=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.23 h1:Kbiv9PGnQfG/imNI4L/heyUXvzKmcWSBeDvkrQz5pFc=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.23/go.mod h1:mOtmAg65GT1HIL/HT/PynwPbS+UG0BgCZ6vhkPqnxWo=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.29 h1:9/aKwwus0TQxppPXFmf010DFrE+ssSbzroLVYINA+xE=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.29/go.mod h1:Dip3sIGv485+xerzVv24emnjX5Sg88utCL8fwGmCeWg=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.23 h1:b/Vn141DBuLVgXbhRWIrl9g+ww7G+ScV5SzniWR13jQ=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.23/go.mod h1:mr6c4cHC+S/MMkrjtSlG4QA36kOznDep+0fga5L/fGQ=
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.30 h1:IVx9L7YFhpPq0tTnGo8u8TpluFu7nAn9X3sUDMb11c0=
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.30/go.mod h1:vsbq62AOBwQ1LJ/GWKFxX8beUEYeRp/Agitrxee2/qM=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.23 h1:QoOybhwRfciWUBbZ0gp9S7XaDnCuSTeK/fySB99V1ls=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.23/go.mod h1:9uPh+Hrz2Vn6oMnQYiUi/zbh3ovbnQk19YKINkQny44=
github.com/aws/aws-sdk-go-v2/service/kinesis v1.17.6 h1:SrZJRJ8uupEztO2zT40JbtYziEZqRv3T/obvAYGZ+cI=
github.com/aws/aws-sdk-go-v2/service/kinesis v1.17.6/go.mod h1:CHlutPX3XBKs83BLG5WVROg9Ob97F7THaoIuTbPJVAg=
github.com/aws/aws-sdk-go-v2/service/sso v1.12.4 h1:qJdM48OOLl1FBSzI7ZrA1ZfLwOyCYqkXV5lko1hYDBw=
github.com/aws/aws-sdk-go-v2/service/sso v1.12.4/go.mod h1:jtLIhd+V+lft6ktxpItycqHqiVXrPIRjWIsFIlzMriw=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.14.4 h1:YRkWXQveFb0tFC0TLktmmhGsOcCgLwvq88MC2al47AA=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.14.4/go.mod h1:zVwRrfdSmbRZWkUkWjOItY7SOalnFnq/Yg2LVPqDjwc=
github.com/aws/aws-sdk-go-v2/service/sts v1.18.5 h1:L1600eLr0YvTT7gNh3Ni24yGI7NSHkq9Gp62vijPRCs=
github.com/aws/aws-sdk-go-v2/service/sts v1.18.5/go.mod h1:1mKZHLLpDMHTNSYPJ7qrcnCQdHCWsNQaT0xRvq2u80s=
github.com/aws/smithy-go v1.13.5 h1:hgz0X/DX0dGqTYpGALqXJoRKRj5oQ7150i5FdTePzO8=
github.com/aws/smithy-go v1.13.5/go.mod h1:Tg+OJXh4MB2R/uN61Ko2f6hTZwB/ZYGOtib8J3gBHzA=
github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
Expand Down Expand Up @@ -65,6 +93,7 @@ github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5a
github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.4 h1:L8R9j+yAqZuZjsqh/z+F1NCffTKKLShY6zXTItVIZ8M=
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
Expand Down

0 comments on commit 788364c

Please sign in to comment.