diff --git a/broker/main.go b/broker/main.go index 5e69402..ecbd293 100644 --- a/broker/main.go +++ b/broker/main.go @@ -6,6 +6,7 @@ import ( "context" "time" + "github.com/gofrs/uuid" "github.com/inconshreveable/log15" ) @@ -157,6 +158,7 @@ func (handler *RequestProcessor) startApiCall(ctx context.Context, request *requ // emit event that a request was started handler.store.Insert( ctx, + uuid.Nil, APIRequestEvent{ Attempt: attempt, }, @@ -171,6 +173,7 @@ func (handler *RequestProcessor) startApiCall(ctx context.Context, request *requ func() { _, err := handler.store.Insert( ctx, + uuid.Nil, APITimeoutEvent{ Attempt: attempt, }, @@ -192,6 +195,7 @@ func (handler *RequestProcessor) startApiCall(ctx context.Context, request *requ if response != nil { handler.store.Insert( ctx, + uuid.Nil, APIResponseEvent{ Attempt: attempt, Response: *response, @@ -201,6 +205,7 @@ func (handler *RequestProcessor) startApiCall(ctx context.Context, request *requ } else if err != nil { handler.store.Insert( ctx, + uuid.Nil, APIFailureEvent{ Attempt: attempt, Failure: err.Error(), diff --git a/broker/main_test.go b/broker/main_test.go index d53dfab..13b4794 100644 --- a/broker/main_test.go +++ b/broker/main_test.go @@ -4,6 +4,8 @@ import ( "api-broker-prototype/events" "testing" "time" + + "github.com/gofrs/uuid" ) // mock for the events.Envelope interface @@ -13,6 +15,10 @@ func (envelope envelopeMock) ID() int32 { return 42 } +func (envelope envelopeMock) ExternalUUID() uuid.UUID { + return uuid.FromStringOrNil("22428f46-a2d8-4d51-b6b5-bc8551bd0921") +} + func (envelope envelopeMock) Created() time.Time { return time.Now() } diff --git a/events/errors.go b/events/errors.go new file mode 100644 index 0000000..96c1811 --- /dev/null +++ b/events/errors.go @@ -0,0 +1,8 @@ +package events + +// This file collects error types. + +import "errors" + +// DuplicateEventUUID is used to signal that the UUID identifying an event is already in use +var DuplicateEventUUID = errors.New("duplicate event identifier UUID") diff --git a/events/errors_test.go b/events/errors_test.go new file mode 100644 index 0000000..5e3731f --- /dev/null +++ b/events/errors_test.go @@ -0,0 +1,15 @@ +package events + +import ( + "errors" + "testing" +) + +func TestDuplicateEventUUID(t *testing.T) { + // make sure the type implements the `error` interface + var err error = DuplicateEventUUID + + if !errors.Is(err, DuplicateEventUUID) { + t.Error("error type is not recognized") + } +} diff --git a/events/store.go b/events/store.go index b8834b5..25ba179 100644 --- a/events/store.go +++ b/events/store.go @@ -14,6 +14,8 @@ package events import ( "context" "time" + + "github.com/gofrs/uuid" ) // The Envelope is a container for the actual event, which it carries as @@ -24,6 +26,8 @@ type Envelope interface { ID() int32 // Created returns the time the event was persisted. Created() time.Time + // ExternalUUID returns the (optional) UUID assigned to the event + ExternalUUID() uuid.UUID // CausationID returns the ID of the event that caused this event. // This can be zero if this event was not caused by another event. CausationID() int32 @@ -56,9 +60,15 @@ type EventStore interface { // Insert an event as payload into the store. // The event is wrapped in an envelope and returned. + // The external UUID can be used to attach a client-supplied identifier + // to the event. If the UUID is not the nil UUID, it must be unique. + // If the UUID is already used, DuplicateEventUUID is returned as error. // The causation ID is that of the preceding event that caused this new // event. It can be zero when its cause is not a preceding event. - Insert(ctx context.Context, event Event, causationID int32) (Envelope, error) + Insert(ctx context.Context, externalUUID uuid.UUID, event Event, causationID int32) (Envelope, error) + + // Resolve an external UUID to the according internal ID + ResolveUUID(ctx context.Context, externalUUID uuid.UUID) (int32, error) // Retrieve just the event with the given ID. RetrieveOne(ctx context.Context, id int32) (Envelope, error) diff --git a/go.mod b/go.mod index beac516..39415c2 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module api-broker-prototype go 1.21 require ( + github.com/gofrs/uuid v4.0.0+incompatible github.com/inconshreveable/log15 v2.16.0+incompatible github.com/jackc/pgtype v1.14.0 github.com/jackc/pgx/v5 v5.5.1 diff --git a/go.sum b/go.sum index 91f21fb..379f2ff 100644 --- a/go.sum +++ b/go.sum @@ -14,6 +14,7 @@ github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/go-stack/stack v1.8.1 h1:ntEHSVwIt7PNXNpgPmVfMrNhLtgjlmnZha2kOpuRiDw= github.com/go-stack/stack v1.8.1/go.mod h1:dcoOX6HbPZSZptuspn9bctJ+N/CnF5gGygcUP3XYfe4= +github.com/gofrs/uuid v4.0.0+incompatible h1:1SD/1F5pU8p29ybwgQSwpQk+mwdRrXCYuPhW6m+TnJw= github.com/gofrs/uuid v4.0.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= diff --git a/logging/logging_decorator.go b/logging/logging_decorator.go index e4c099a..9f18b14 100644 --- a/logging/logging_decorator.go +++ b/logging/logging_decorator.go @@ -9,6 +9,8 @@ import ( "api-broker-prototype/events" "context" "errors" + + "github.com/gofrs/uuid" "github.com/inconshreveable/log15" ) @@ -44,9 +46,9 @@ func (s *LoggingDecoratorEventStore) Close() error { return s.eventstore.Close() } -func (s *LoggingDecoratorEventStore) Insert(ctx context.Context, event events.Event, causationID int32) (events.Envelope, error) { - s.logger.Debug("Inserting event.", "class", event.Class(), "causation_id", causationID) - env, err := s.eventstore.Insert(ctx, event, causationID) +func (s *LoggingDecoratorEventStore) Insert(ctx context.Context, externalUUID uuid.UUID, event events.Event, causationID int32) (events.Envelope, error) { + s.logger.Debug("Inserting event.", "external_id", externalUUID, "class", event.Class(), "causation_id", causationID) + env, err := s.eventstore.Insert(ctx, externalUUID, event, causationID) if err == nil { s.logger.Debug("Inserted event.", "id", env.ID()) } else { @@ -55,11 +57,15 @@ func (s *LoggingDecoratorEventStore) Insert(ctx context.Context, event events.Ev return env, err } +func (s *LoggingDecoratorEventStore) ResolveUUID(ctx context.Context, externalUUID uuid.UUID) (int32, error) { + return s.eventstore.ResolveUUID(ctx, externalUUID) +} + func (s *LoggingDecoratorEventStore) RetrieveOne(ctx context.Context, id int32) (events.Envelope, error) { s.logger.Debug("Loading event.", "id", id) env, err := s.eventstore.RetrieveOne(ctx, id) if err == nil { - s.logger.Debug("Loaded event.", "class", env.Event().Class(), "causation_id", env.CausationID(), "created", env.Created()) + s.logger.Debug("Loaded event.", "external_id", env.ExternalUUID(), "class", env.Event().Class(), "causation_id", env.CausationID(), "created", env.Created()) } else { s.logger.Debug("Failed to load event.", "error", err) } @@ -142,6 +148,7 @@ func (s *LoggingDecoratorEventStore) FollowEvents(ctx context.Context, startAfte s.logger.Debug( "Loaded event.", "id", env.ID(), + "external_id", env.ExternalUUID(), "class", env.Event().Class(), "causation_id", env.CausationID(), "created", env.Created(), diff --git a/logging/logging_decorator_test.go b/logging/logging_decorator_test.go index 99f0141..41faf7c 100644 --- a/logging/logging_decorator_test.go +++ b/logging/logging_decorator_test.go @@ -4,8 +4,10 @@ import ( "api-broker-prototype/events" "context" "errors" - "github.com/inconshreveable/log15" "testing" + + "github.com/gofrs/uuid" + "github.com/inconshreveable/log15" ) var notImplemented error = errors.New("not implemented") @@ -25,10 +27,14 @@ func (store *eventstoreMock) Close() error { return notImplemented } -func (store *eventstoreMock) Insert(ctx context.Context, event events.Event, causationID int32) (events.Envelope, error) { +func (store *eventstoreMock) Insert(ctx context.Context, externalUUID uuid.UUID, event events.Event, causationID int32) (events.Envelope, error) { return nil, notImplemented } +func (store *eventstoreMock) ResolveUUID(ctx context.Context, externalUUID uuid.UUID) (int32, error) { + return 0, notImplemented +} + func (store *eventstoreMock) RetrieveOne(ctx context.Context, id int32) (events.Envelope, error) { return nil, notImplemented } @@ -140,9 +146,14 @@ func TestInsert(t *testing.T) { ctx := context.Background() + externalUUID, err := uuid.NewV4() + if err != nil { + t.Errorf("failed to generate UUID") + } + event := eventMock{} - res, err := decorator.Insert(ctx, &event, 0) + res, err := decorator.Insert(ctx, externalUUID, &event, 0) if res != nil { t.Errorf("expected nil as result") @@ -152,6 +163,25 @@ func TestInsert(t *testing.T) { } } +func TestResolveUUID(t *testing.T) { + decorator := createMock() + + ctx := context.Background() + + externalUUID, err := uuid.NewV4() + if err != nil { + t.Errorf("failed to generate UUID") + } + + res, err := decorator.ResolveUUID(ctx, externalUUID) + if res != 0 { + t.Errorf("expected zero as result") + } + if err != notImplemented { + t.Errorf("unexpected error") + } +} + func TestRetrieveOne(t *testing.T) { decorator := createMock() diff --git a/main.go b/main.go index 68452e3..ecac595 100644 --- a/main.go +++ b/main.go @@ -13,6 +13,7 @@ import ( "os/signal" "time" + "github.com/gofrs/uuid" "github.com/inconshreveable/log15" "github.com/urfave/cli/v2" // imports as package "cli" ) @@ -62,6 +63,11 @@ func main() { Usage: "Insert a configuration event into the store.", ArgsUsage: " ", Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "external-uuid", + Value: "", + Usage: "external identifier for the request", + }, &cli.IntFlag{ Name: "retries", Value: -1, @@ -77,7 +83,11 @@ func main() { if c.NArg() > 0 { return errors.New("no arguments expected") } - return configureMain(c.Context, c.Int("retries"), c.Float64("timeout")) + externalUUID, err := parseUUID(c.String("external-uuid")) + if err != nil { + return err + } + return configureMain(c.Context, externalUUID, c.Int("retries"), c.Float64("timeout")) }, }, { @@ -85,6 +95,11 @@ func main() { Usage: "Insert an event into the store.", ArgsUsage: "", Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "external-uuid", + Value: "", + Usage: "external identifier for the request", + }, &cli.StringFlag{ Name: "causation", Value: "0", @@ -96,7 +111,11 @@ func main() { if args.Len() != 2 { return errors.New("exactly two arguments expected") } - return insertMain(c.Context, args.Get(0), args.Get(1), c.String("causation")) + externalUUID, err := parseUUID(c.String("external-uuid")) + if err != nil { + return err + } + return insertMain(c.Context, args.Get(0), args.Get(1), externalUUID, c.String("causation")) }, }, { @@ -203,6 +222,17 @@ func main() { } } +// parse a string to a UUID +// Only a malformed string creates an error, an empty one will return a +// nil UUID value. +func parseUUID(arg string) (uuid.UUID, error) { + if arg == "" { + return uuid.Nil, nil + } + + return uuid.FromString(arg) +} + // configure mock API from optional flags passed on the commandline func configureAPIStub(c *cli.Context) { mock_api.Configure( @@ -254,7 +284,7 @@ func finalizeEventStore(store events.EventStore) { } // insert a configuration event -func configureMain(ctx context.Context, retries int, timeout float64) error { +func configureMain(ctx context.Context, externalUUID uuid.UUID, retries int, timeout float64) error { store, err := initEventStore() if err != nil { return err @@ -266,7 +296,7 @@ func configureMain(ctx context.Context, retries int, timeout float64) error { Timeout: timeout, } - envelope, err := store.Insert(ctx, event, 0) + envelope, err := store.Insert(ctx, externalUUID, event, 0) if err != nil { return err } @@ -276,7 +306,7 @@ func configureMain(ctx context.Context, retries int, timeout float64) error { } // insert a new event -func insertMain(ctx context.Context, class string, data string, causation string) error { +func insertMain(ctx context.Context, class string, data string, externalUUID uuid.UUID, causation string) error { store, err := initEventStore() if err != nil { return err @@ -305,7 +335,7 @@ func insertMain(ctx context.Context, class string, data string, causation string } // insert a document - envelope, err := store.Insert(ctx, event, causationID) + envelope, err := store.Insert(ctx, externalUUID, event, causationID) if err != nil { return err } @@ -342,6 +372,7 @@ func listMain(ctx context.Context, lastProcessed string) error { logger.Info( "event", "id", envelope.ID(), + "external_uuid", envelope.ExternalUUID(), "class", envelope.Event().Class(), "created", envelope.Created().Format(time.RFC3339), "causation_id", envelope.CausationID(), diff --git a/mongodb/eventstore.go b/mongodb/eventstore.go index 69de8ad..9a70ae9 100644 --- a/mongodb/eventstore.go +++ b/mongodb/eventstore.go @@ -12,6 +12,8 @@ import ( "api-broker-prototype/events" "context" "errors" + + "github.com/gofrs/uuid" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" @@ -69,6 +71,12 @@ func (env *mongoDBEnvelope) Created() time.Time { return env.CreatedVal.Time() } +// ExternalUUID implements the Envelope interface. +func (env *mongoDBEnvelope) ExternalUUID() uuid.UUID { + // TODO: implement UUID support + return uuid.Nil +} + // CausationID implements the Envelope interface. func (env *mongoDBEnvelope) CausationID() int32 { return env.CausationIDVal @@ -192,7 +200,12 @@ func (s *MongoDBEventStore) Close() error { } // Insert implements the EventStore interface. -func (s *MongoDBEventStore) Insert(ctx context.Context, event events.Event, causationID int32) (events.Envelope, error) { +func (s *MongoDBEventStore) Insert(ctx context.Context, externalUUID uuid.UUID, event events.Event, causationID int32) (events.Envelope, error) { + // TODO: implement UUID support + if externalUUID != uuid.Nil { + return nil, errors.New("UUID support not implemented") + } + // don't do anything if the error state of the store is set already s.connect(ctx) if s.err != nil { @@ -275,6 +288,12 @@ func (s *MongoDBEventStore) Insert(ctx context.Context, event events.Event, caus return res, nil } +// ResolveUUID implements the EventStore interface. +func (s *MongoDBEventStore) ResolveUUID(ctx context.Context, externalUUID uuid.UUID) (int32, error) { + // TODO: implement UUID support + return 0, errors.New("UUID support not implemented") +} + // find next free ID to use for an insert // This returns zero and sets the error state if an error occurs. func (s *MongoDBEventStore) findNextID(ctx context.Context) int32 { diff --git a/postgresql/eventstore.go b/postgresql/eventstore.go index f2679dc..5ae8ab9 100644 --- a/postgresql/eventstore.go +++ b/postgresql/eventstore.go @@ -12,6 +12,7 @@ import ( "strconv" "time" + "github.com/gofrs/uuid" "github.com/jackc/pgtype" "github.com/jackc/pgx/v5" ) @@ -54,6 +55,12 @@ func (env *postgreSQLEnvelope) Created() time.Time { return env.CreatedVal } +// ExternalUUID implements the Envelope interface. +func (env *postgreSQLEnvelope) ExternalUUID() uuid.UUID { + // TODO: implement UUID support + return uuid.Nil +} + // CausationID implements the EventStore interface. func (env *postgreSQLEnvelope) CausationID() int32 { return env.CausationIDVal @@ -174,7 +181,12 @@ func (s *PostgreSQLEventStore) Close() error { } // Insert implements the EventStore interface. -func (s *PostgreSQLEventStore) Insert(ctx context.Context, event events.Event, causationID int32) (events.Envelope, error) { +func (s *PostgreSQLEventStore) Insert(ctx context.Context, externalUUID uuid.UUID, event events.Event, causationID int32) (events.Envelope, error) { + // TODO: implement UUID support + if externalUUID != uuid.Nil { + return nil, errors.New("UUID support not implemented") + } + // locate codec for the event class class := event.Class() codec := s.codecs[class] @@ -222,6 +234,12 @@ func (s *PostgreSQLEventStore) Insert(ctx context.Context, event events.Event, c return &res, nil } +// ResolveUUID implements the EventStore interface. +func (s *PostgreSQLEventStore) ResolveUUID(ctx context.Context, externalUUID uuid.UUID) (int32, error) { + // TODO: implement UUID support + return 0, errors.New("UUID support not implemented") +} + // RetrieveOne implements the EventStore interface. func (s *PostgreSQLEventStore) RetrieveOne(ctx context.Context, id int32) (events.Envelope, error) { // The ID must be valid.