Skip to content

Commit

Permalink
Provider pattern for EventPipleline and new SyncEventPipeline for syn…
Browse files Browse the repository at this point in the history
…chronous flush (#101)

* Provider pattern for EventPipleline and new SyncEventPipeline for synchronous flush
* Modifying EventPipelineTest to test multiple types
  • Loading branch information
MichaelGHSeg authored Apr 30, 2024
1 parent 9963e25 commit 690f8b4
Show file tree
Hide file tree
Showing 9 changed files with 337 additions and 58 deletions.
6 changes: 5 additions & 1 deletion Analytics-CSharp/Segment/Analytics/Configuration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ private set

public IList<IFlushPolicy> FlushPolicies { get; }

public IEventPipelineProvider EventPipelineProvider { get; }

/// <summary>
/// Configuration that analytics can use
/// </summary>
Expand Down Expand Up @@ -82,7 +84,8 @@ public Configuration(string writeKey,
IAnalyticsErrorHandler analyticsErrorHandler = null,
IStorageProvider storageProvider = default,
IHTTPClientProvider httpClientProvider = default,
IList<IFlushPolicy> flushPolicies = default)
IList<IFlushPolicy> flushPolicies = default,
EventPipelineProvider eventPipelineProvider = default)
{
WriteKey = writeKey;
FlushAt = flushAt;
Expand All @@ -98,6 +101,7 @@ public Configuration(string writeKey,
FlushPolicies = flushPolicies == null ? new ConcurrentList<IFlushPolicy>() : new ConcurrentList<IFlushPolicy>(flushPolicies);
FlushPolicies.Add(new CountFlushPolicy(flushAt));
FlushPolicies.Add(new FrequencyFlushPolicy(flushInterval * 1000L));
EventPipelineProvider = eventPipelineProvider ?? new EventPipelineProvider();
}

public Configuration(string writeKey,
Expand Down
10 changes: 2 additions & 8 deletions Analytics-CSharp/Segment/Analytics/Plugins/SegmentDestination.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ namespace Segment.Analytics.Plugins
/// </summary>
public class SegmentDestination : DestinationPlugin, ISubscriber
{
private EventPipeline _pipeline = null;
private IEventPipeline _pipeline = null;

public override string Key => "Segment.io";

Expand Down Expand Up @@ -64,13 +64,7 @@ public override void Configure(Analytics analytics)
// Add DestinationMetadata enrichment plugin
Add(new DestinationMetadataPlugin());

_pipeline = new EventPipeline(
analytics,
Key,
analytics.Configuration.WriteKey,
analytics.Configuration.FlushPolicies,
analytics.Configuration.ApiHost
);
_pipeline = analytics.Configuration.EventPipelineProvider.Create(analytics, Key);

analytics.AnalyticsScope.Launch(analytics.AnalyticsDispatcher, async () =>
{
Expand Down
4 changes: 2 additions & 2 deletions Analytics-CSharp/Segment/Analytics/Utilities/EventPipeline.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

namespace Segment.Analytics.Utilities
{
internal class EventPipeline
public class EventPipeline: IEventPipeline
{
private readonly Analytics _analytics;

Expand All @@ -23,7 +23,7 @@ internal class EventPipeline

private readonly IStorage _storage;

internal string ApiHost { get; set; }
public string ApiHost { get; set; }

public bool Running { get; private set; }

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
namespace Segment.Analytics.Utilities
{
public class EventPipelineProvider:IEventPipelineProvider
{
public EventPipelineProvider()
{
}

public IEventPipeline Create(Analytics analytics, string key)
{
return new EventPipeline(analytics, key,
analytics.Configuration.WriteKey,
analytics.Configuration.FlushPolicies,
analytics.Configuration.ApiHost);
}
}
}
13 changes: 13 additions & 0 deletions Analytics-CSharp/Segment/Analytics/Utilities/IEventPipeline.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
namespace Segment.Analytics.Utilities
{
public interface IEventPipeline
{
bool Running { get; }
string ApiHost { get; set; }

void Put(RawEvent @event);
void Flush();
void Start();
void Stop();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
using System.Collections.Generic;

namespace Segment.Analytics.Utilities
{
public interface IEventPipelineProvider
{
IEventPipeline Create(Analytics analytics, string key);
}
}
202 changes: 202 additions & 0 deletions Analytics-CSharp/Segment/Analytics/Utilities/SyncEventPipeline.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
using System.Collections.Generic;
using System.Threading;
using global::System;
using global::System.Linq;
using Segment.Analytics.Policies;
using Segment.Concurrent;
using Segment.Serialization;

namespace Segment.Analytics.Utilities
{
internal sealed class FlushEvent : RawEvent
{
public override string Type => "flush";
public readonly SemaphoreSlim _semaphore;

internal FlushEvent(SemaphoreSlim semaphore)
{
_semaphore = semaphore;
}
}


public class SyncEventPipeline: IEventPipeline
{
private readonly Analytics _analytics;

private readonly string _logTag;

private readonly IList<IFlushPolicy> _flushPolicies;

private Channel<RawEvent> _writeChannel;

private Channel<FlushEvent> _uploadChannel;

private readonly HTTPClient _httpClient;

private readonly IStorage _storage;

public string ApiHost { get; set; }

public bool Running { get; private set; }

internal int _flushTimeout = -1;
internal CancellationToken _flushCancellationToken = CancellationToken.None;

public SyncEventPipeline(
Analytics analytics,
string logTag,
string apiKey,
IList<IFlushPolicy> flushPolicies,
string apiHost = HTTPClient.DefaultAPIHost,
int flushTimeout = -1,
CancellationToken? flushCancellationToken = null)
{
_analytics = analytics;
_logTag = logTag;
_flushPolicies = flushPolicies;
ApiHost = apiHost;

_writeChannel = new Channel<RawEvent>();
_uploadChannel = new Channel<FlushEvent>();
_httpClient = analytics.Configuration.HttpClientProvider.CreateHTTPClient(apiKey, apiHost: apiHost);
_httpClient.AnalyticsRef = analytics;
_storage = analytics.Storage;
Running = false;
_flushTimeout = flushTimeout;
_flushCancellationToken = flushCancellationToken ?? CancellationToken.None;
}

public void Put(RawEvent @event) => _writeChannel.Send(@event);

public void Flush() {
FlushEvent flushEvent = new FlushEvent(new SemaphoreSlim(1,1));
_writeChannel.Send(flushEvent);
flushEvent._semaphore.Wait(_flushTimeout, _flushCancellationToken);
}

public void Start()
{
if (Running) return;

// avoid to re-establish a channel if the pipeline just gets created
if (_writeChannel.isCancelled)
{
_writeChannel = new Channel<RawEvent>();
_uploadChannel = new Channel<FlushEvent>();
}

Running = true;
Schedule();
Write();
Upload();
}

public void Stop()
{
if (!Running) return;
Running = false;

_uploadChannel.Cancel();
_writeChannel.Cancel();
Unschedule();
}

private void Write() => _analytics.AnalyticsScope.Launch(_analytics.FileIODispatcher, async () =>
{
while (!_writeChannel.isCancelled)
{
RawEvent e = await _writeChannel.Receive();
bool isPoison = e is FlushEvent;
if (!isPoison)
{
try
{
string str = JsonUtility.ToJson(e);
Analytics.Logger.Log(LogLevel.Debug, message: _logTag + " running " + str);
await _storage.Write(StorageConstants.Events, str);
foreach (IFlushPolicy flushPolicy in _flushPolicies)
{
flushPolicy.UpdateState(e);
}
}
catch (Exception exception)
{
Analytics.Logger.Log(LogLevel.Error, exception, _logTag + ": Error writing events to storage.");
}
}
if (isPoison || _flushPolicies.Any(o => o.ShouldFlush()))
{
FlushEvent flushEvent = e as FlushEvent ?? new FlushEvent(null);
_uploadChannel.Send(flushEvent);
foreach (IFlushPolicy flushPolicy in _flushPolicies)
{
flushPolicy.Reset();
}
}
}
});

private void Upload() => _analytics.AnalyticsScope.Launch(_analytics.NetworkIODispatcher, async () =>
{
while (!_uploadChannel.isCancelled)
{
FlushEvent flushEvent = await _uploadChannel.Receive();
Analytics.Logger.Log(LogLevel.Debug, message: _logTag + " performing flush");
await Scope.WithContext(_analytics.FileIODispatcher, async () => await _storage.Rollover());
string[] fileUrlList = _storage.Read(StorageConstants.Events).Split(',');
foreach (string url in fileUrlList)
{
if (string.IsNullOrEmpty(url))
{
continue;
}
byte[] data = _storage.ReadAsBytes(url);
if (data == null)
{
continue;
}
bool shouldCleanup = true;
try
{
shouldCleanup = await _httpClient.Upload(data);
Analytics.Logger.Log(LogLevel.Debug, message: _logTag + " uploaded " + url);
}
catch (Exception e)
{
Analytics.Logger.Log(LogLevel.Error, e, _logTag + ": Error uploading to url");
}
if (shouldCleanup)
{
_storage.RemoveFile(url);
}
}
flushEvent._semaphore?.Release();
}
});

private void Schedule()
{
foreach (IFlushPolicy flushPolicy in _flushPolicies)
{
flushPolicy.Schedule(_analytics);
}
}

private void Unschedule()
{
foreach (IFlushPolicy flushPolicy in _flushPolicies)
{
flushPolicy.Unschedule();
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
using System.Threading;

namespace Segment.Analytics.Utilities
{
public class SyncEventPipelineProvider: IEventPipelineProvider
{
internal int _flushTimeout = -1;
internal CancellationToken? _flushCancellationToken = null;

public SyncEventPipelineProvider(
int flushTimeout = -1,
CancellationToken? flushCancellationToken = null)
{
_flushTimeout = flushTimeout;
_flushCancellationToken = flushCancellationToken;
}

public IEventPipeline Create(Analytics analytics, string key)
{
return new SyncEventPipeline(analytics, key,
analytics.Configuration.WriteKey,
analytics.Configuration.FlushPolicies,
analytics.Configuration.ApiHost,
_flushTimeout,
_flushCancellationToken);
}
}
}
Loading

0 comments on commit 690f8b4

Please sign in to comment.