Skip to content

Commit

Permalink
Extract Handler.fs for proProjector
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Feb 10, 2020
1 parent 1c026e0 commit e1e5aa6
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 40 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ The `Unreleased` section name is replaced by the expected version of next releas
### Changed

- Target `Propulsion`.* v `2.0.0-rc3`
- `proProjector`: extract `Handler.fs`

### Fixed

Expand Down
38 changes: 38 additions & 0 deletions propulsion-projector/Handler.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
module ProjectorTemplate.Handler

open Propulsion.Cosmos

//let replaceLongDataWithNull (x : FsCodec.ITimelineEvent<byte[]>) : FsCodec.ITimelineEvent<_> =
// if x.Data.Length < 900_000 then x
// else FsCodec.Core.TimelineEvent.Create(x.Index, x.EventType, null, x.Meta, timestamp=x.Timestamp)
//
//let hackDropBigBodies (e : Propulsion.Streams.StreamEvent<_>) : Propulsion.Streams.StreamEvent<_> =
// { stream = e.stream; event = replaceLongDataWithNull e.event }

let mapToStreamItems (docs : Microsoft.Azure.Documents.Document seq) : Propulsion.Streams.StreamEvent<_> seq =
docs
|> Seq.collect EquinoxCosmosParser.enumStreamEvents
// TODO use Seq.filter and/or Seq.map to adjust what's being sent etc
// |> Seq.map hackDropBigBodies

#if kafka
#if parallelOnly
type ExampleOutput = { Id : string }

let render (doc : Microsoft.Azure.Documents.Document) : string * string =
let equinoxPartition, documentId = doc.GetPropertyValue "p", doc.Id
equinoxPartition, FsCodec.NewtonsoftJson.Serdes.Serialize { Id = documentId }
#else
let render (stream: FsCodec.StreamName, span: Propulsion.Streams.StreamSpan<_>) = async {
let value =
span
|> Propulsion.Codec.NewtonsoftJson.RenderedSpan.ofStreamSpan stream
|> Propulsion.Codec.NewtonsoftJson.Serdes.Serialize
return FsCodec.StreamName.toString stream, value }
#endif
#else
let handle (_stream, span: Propulsion.Streams.StreamSpan<_>) = async {
let r = System.Random()
let ms = r.Next(1, span.events.Length)
do! Async.Sleep ms }
#endif
48 changes: 8 additions & 40 deletions propulsion-projector/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -168,62 +168,30 @@ module Logging =
c.WriteTo.Console(theme=Sinks.SystemConsole.Themes.AnsiConsoleTheme.Code, outputTemplate=t)
|> fun c -> c.CreateLogger()

let replaceLongDataWithNull (x : FsCodec.ITimelineEvent<byte[]>) : FsCodec.ITimelineEvent<_> =
if x.Data.Length < 900_000 then x
else FsCodec.Core.TimelineEvent.Create(x.Index, x.EventType, null, x.Meta, timestamp=x.Timestamp)

let hackDropBigBodies (e : Propulsion.Streams.StreamEvent<_>) : Propulsion.Streams.StreamEvent<_> =
{ stream = e.stream; event = replaceLongDataWithNull e.event }

let mapToStreamItems (docs : Microsoft.Azure.Documents.Document seq) : Propulsion.Streams.StreamEvent<_> seq =
docs
|> Seq.collect EquinoxCosmosParser.enumStreamEvents
// TODO use Seq.filter and/or Seq.map to adjust what's being sent etc
// |> Seq.map hackDropBigBodies

#if kafka && nostreams
type ExampleOutput = { Id : string }
#endif

let [<Literal>] AppName = "ProjectorTemplate"

let build (args : CmdParser.Arguments) =
let discovery, source, connector = args.Cosmos.BuildConnectionDetails()
let aux, leaseId, startFromTail, maxDocuments, lagFrequency, (maxReadAhead, maxConcurrentStreams) = args.BuildChangeFeedParams()
#if kafka
let (broker, topic) = args.Target.BuildTargetParams()
#if parallelOnly
let render (doc : Microsoft.Azure.Documents.Document) : string * string =
let equinoxPartition, documentId = doc.GetPropertyValue "p", doc.Id
equinoxPartition, FsCodec.NewtonsoftJson.Serdes.Serialize { Id = documentId }
let producer = Propulsion.Kafka.Producer(Log.Logger, AppName, broker, topic)
let projector =
Propulsion.Kafka.ParallelProducerSink.Start(maxReadAhead, maxConcurrentStreams, render, producer, statsInterval=TimeSpan.FromMinutes 1.)
let createObserver () = CosmosSource.CreateObserver(Log.Logger, projector.StartIngester, fun x -> upcast x)
#if parallelOnly
let sink = Propulsion.Kafka.ParallelProducerSink.Start(maxReadAhead, maxConcurrentStreams, Handler.render, producer, statsInterval=TimeSpan.FromMinutes 1.)
let createObserver () = CosmosSource.CreateObserver(Log.Logger, sink.StartIngester, fun x -> upcast x)
#else
let render (stream: FsCodec.StreamName, span: Propulsion.Streams.StreamSpan<_>) = async {
let value =
span
|> Propulsion.Codec.NewtonsoftJson.RenderedSpan.ofStreamSpan stream
|> Propulsion.Codec.NewtonsoftJson.Serdes.Serialize
return FsCodec.StreamName.toString stream, value }
let producer = Propulsion.Kafka.Producer(Log.Logger, AppName, broker, topic)
let projector =
let sink =
Propulsion.Kafka.StreamsProducerSink.Start(
Log.Logger, maxReadAhead, maxConcurrentStreams, render, producer,
Log.Logger, maxReadAhead, maxConcurrentStreams, Handler.render, producer,
statsInterval=TimeSpan.FromMinutes 1., stateInterval=TimeSpan.FromMinutes 2.)
let createObserver () = CosmosSource.CreateObserver(Log.Logger, projector.StartIngester, mapToStreamItems)
let createObserver () = CosmosSource.CreateObserver(Log.Logger, sink.StartIngester, Handler.mapToStreamItems)
#endif
#else
let project (_stream, span: Propulsion.Streams.StreamSpan<_>) = async {
let r = Random()
let ms = r.Next(1, span.events.Length)
do! Async.Sleep ms }
let sink =
Propulsion.Streams.StreamsProjector.Start(
Log.Logger, maxReadAhead, maxConcurrentStreams, project,
Log.Logger, maxReadAhead, maxConcurrentStreams, Handler.handle,
statsInterval=TimeSpan.FromMinutes 1., stateInterval=TimeSpan.FromMinutes 5.)
let createObserver () = CosmosSource.CreateObserver(Log.Logger, sink.StartIngester, mapToStreamItems)
let createObserver () = CosmosSource.CreateObserver(Log.Logger, sink.StartIngester, Handler.mapToStreamItems)
#endif
let runSourcePipeline =
CosmosSource.Run(
Expand Down
1 change: 1 addition & 0 deletions propulsion-projector/Projector.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

<ItemGroup>
<None Include="README.md" />
<Compile Include="Handler.fs" />
<Compile Include="Program.fs" />
</ItemGroup>

Expand Down

0 comments on commit e1e5aa6

Please sign in to comment.