diff --git a/CHANGELOG.md b/CHANGELOG.md index b1f372a00..5731727bd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,7 @@ The `Unreleased` section name is replaced by the expected version of next releas ### Changed - Target `Propulsion`.* v `2.0.0-rc3` +- `proProjector`: extract `Handler.fs` ### Fixed diff --git a/propulsion-projector/Handler.fs b/propulsion-projector/Handler.fs new file mode 100644 index 000000000..bff8b980f --- /dev/null +++ b/propulsion-projector/Handler.fs @@ -0,0 +1,38 @@ +module ProjectorTemplate.Handler + +open Propulsion.Cosmos + +//let replaceLongDataWithNull (x : FsCodec.ITimelineEvent) : FsCodec.ITimelineEvent<_> = +// if x.Data.Length < 900_000 then x +// else FsCodec.Core.TimelineEvent.Create(x.Index, x.EventType, null, x.Meta, timestamp=x.Timestamp) +// +//let hackDropBigBodies (e : Propulsion.Streams.StreamEvent<_>) : Propulsion.Streams.StreamEvent<_> = +// { stream = e.stream; event = replaceLongDataWithNull e.event } + +let mapToStreamItems (docs : Microsoft.Azure.Documents.Document seq) : Propulsion.Streams.StreamEvent<_> seq = + docs + |> Seq.collect EquinoxCosmosParser.enumStreamEvents + // TODO use Seq.filter and/or Seq.map to adjust what's being sent etc + // |> Seq.map hackDropBigBodies + +#if kafka +#if parallelOnly +type ExampleOutput = { Id : string } + +let render (doc : Microsoft.Azure.Documents.Document) : string * string = + let equinoxPartition, documentId = doc.GetPropertyValue "p", doc.Id + equinoxPartition, FsCodec.NewtonsoftJson.Serdes.Serialize { Id = documentId } +#else +let render (stream: FsCodec.StreamName, span: Propulsion.Streams.StreamSpan<_>) = async { + let value = + span + |> Propulsion.Codec.NewtonsoftJson.RenderedSpan.ofStreamSpan stream + |> Propulsion.Codec.NewtonsoftJson.Serdes.Serialize + return FsCodec.StreamName.toString stream, value } +#endif +#else +let handle (_stream, span: Propulsion.Streams.StreamSpan<_>) = async { + let r = System.Random() + let ms = r.Next(1, span.events.Length) + do! Async.Sleep ms } +#endif \ No newline at end of file diff --git a/propulsion-projector/Program.fs b/propulsion-projector/Program.fs index d82d70c24..a70e2748b 100644 --- a/propulsion-projector/Program.fs +++ b/propulsion-projector/Program.fs @@ -168,23 +168,6 @@ module Logging = c.WriteTo.Console(theme=Sinks.SystemConsole.Themes.AnsiConsoleTheme.Code, outputTemplate=t) |> fun c -> c.CreateLogger() -let replaceLongDataWithNull (x : FsCodec.ITimelineEvent) : FsCodec.ITimelineEvent<_> = - if x.Data.Length < 900_000 then x - else FsCodec.Core.TimelineEvent.Create(x.Index, x.EventType, null, x.Meta, timestamp=x.Timestamp) - -let hackDropBigBodies (e : Propulsion.Streams.StreamEvent<_>) : Propulsion.Streams.StreamEvent<_> = - { stream = e.stream; event = replaceLongDataWithNull e.event } - -let mapToStreamItems (docs : Microsoft.Azure.Documents.Document seq) : Propulsion.Streams.StreamEvent<_> seq = - docs - |> Seq.collect EquinoxCosmosParser.enumStreamEvents - // TODO use Seq.filter and/or Seq.map to adjust what's being sent etc - // |> Seq.map hackDropBigBodies - -#if kafka && nostreams -type ExampleOutput = { Id : string } -#endif - let [] AppName = "ProjectorTemplate" let build (args : CmdParser.Arguments) = @@ -192,38 +175,23 @@ let build (args : CmdParser.Arguments) = let aux, leaseId, startFromTail, maxDocuments, lagFrequency, (maxReadAhead, maxConcurrentStreams) = args.BuildChangeFeedParams() #if kafka let (broker, topic) = args.Target.BuildTargetParams() -#if parallelOnly - let render (doc : Microsoft.Azure.Documents.Document) : string * string = - let equinoxPartition, documentId = doc.GetPropertyValue "p", doc.Id - equinoxPartition, FsCodec.NewtonsoftJson.Serdes.Serialize { Id = documentId } let producer = Propulsion.Kafka.Producer(Log.Logger, AppName, broker, topic) - let projector = - Propulsion.Kafka.ParallelProducerSink.Start(maxReadAhead, maxConcurrentStreams, render, producer, statsInterval=TimeSpan.FromMinutes 1.) - let createObserver () = CosmosSource.CreateObserver(Log.Logger, projector.StartIngester, fun x -> upcast x) +#if parallelOnly + let sink = Propulsion.Kafka.ParallelProducerSink.Start(maxReadAhead, maxConcurrentStreams, Handler.render, producer, statsInterval=TimeSpan.FromMinutes 1.) + let createObserver () = CosmosSource.CreateObserver(Log.Logger, sink.StartIngester, fun x -> upcast x) #else - let render (stream: FsCodec.StreamName, span: Propulsion.Streams.StreamSpan<_>) = async { - let value = - span - |> Propulsion.Codec.NewtonsoftJson.RenderedSpan.ofStreamSpan stream - |> Propulsion.Codec.NewtonsoftJson.Serdes.Serialize - return FsCodec.StreamName.toString stream, value } - let producer = Propulsion.Kafka.Producer(Log.Logger, AppName, broker, topic) - let projector = + let sink = Propulsion.Kafka.StreamsProducerSink.Start( - Log.Logger, maxReadAhead, maxConcurrentStreams, render, producer, + Log.Logger, maxReadAhead, maxConcurrentStreams, Handler.render, producer, statsInterval=TimeSpan.FromMinutes 1., stateInterval=TimeSpan.FromMinutes 2.) - let createObserver () = CosmosSource.CreateObserver(Log.Logger, projector.StartIngester, mapToStreamItems) + let createObserver () = CosmosSource.CreateObserver(Log.Logger, sink.StartIngester, Handler.mapToStreamItems) #endif #else - let project (_stream, span: Propulsion.Streams.StreamSpan<_>) = async { - let r = Random() - let ms = r.Next(1, span.events.Length) - do! Async.Sleep ms } let sink = Propulsion.Streams.StreamsProjector.Start( - Log.Logger, maxReadAhead, maxConcurrentStreams, project, + Log.Logger, maxReadAhead, maxConcurrentStreams, Handler.handle, statsInterval=TimeSpan.FromMinutes 1., stateInterval=TimeSpan.FromMinutes 5.) - let createObserver () = CosmosSource.CreateObserver(Log.Logger, sink.StartIngester, mapToStreamItems) + let createObserver () = CosmosSource.CreateObserver(Log.Logger, sink.StartIngester, Handler.mapToStreamItems) #endif let runSourcePipeline = CosmosSource.Run( diff --git a/propulsion-projector/Projector.fsproj b/propulsion-projector/Projector.fsproj index 72f697c70..a26336b89 100644 --- a/propulsion-projector/Projector.fsproj +++ b/propulsion-projector/Projector.fsproj @@ -8,6 +8,7 @@ +