From 690f8b4bf453fbf01792aa9078278c67f6ff18cc Mon Sep 17 00:00:00 2001 From: Michael Grosse Huelsewiesche Date: Tue, 30 Apr 2024 09:39:50 -0700 Subject: [PATCH] Provider pattern for EventPipleline and new SyncEventPipeline for synchronous flush (#101) * Provider pattern for EventPipleline and new SyncEventPipeline for synchronous flush * Modifying EventPipelineTest to test multiple types --- .../Segment/Analytics/Configuration.cs | 6 +- .../Analytics/Plugins/SegmentDestination.cs | 10 +- .../Analytics/Utilities/EventPipeline.cs | 4 +- .../Utilities/EventPipelineProvider.cs | 17 ++ .../Analytics/Utilities/IEventPipeline.cs | 13 ++ .../Utilities/IEventPipelineProvider.cs | 9 + .../Analytics/Utilities/SyncEventPipeline.cs | 202 ++++++++++++++++++ .../Utilities/SyncEventPipelineProvider.cs | 28 +++ Tests/Utilities/EventPipelineTest.cs | 106 +++++---- 9 files changed, 337 insertions(+), 58 deletions(-) create mode 100644 Analytics-CSharp/Segment/Analytics/Utilities/EventPipelineProvider.cs create mode 100644 Analytics-CSharp/Segment/Analytics/Utilities/IEventPipeline.cs create mode 100644 Analytics-CSharp/Segment/Analytics/Utilities/IEventPipelineProvider.cs create mode 100644 Analytics-CSharp/Segment/Analytics/Utilities/SyncEventPipeline.cs create mode 100644 Analytics-CSharp/Segment/Analytics/Utilities/SyncEventPipelineProvider.cs diff --git a/Analytics-CSharp/Segment/Analytics/Configuration.cs b/Analytics-CSharp/Segment/Analytics/Configuration.cs index c492efe..d8d997b 100644 --- a/Analytics-CSharp/Segment/Analytics/Configuration.cs +++ b/Analytics-CSharp/Segment/Analytics/Configuration.cs @@ -45,6 +45,8 @@ private set public IList FlushPolicies { get; } + public IEventPipelineProvider EventPipelineProvider { get; } + /// /// Configuration that analytics can use /// @@ -82,7 +84,8 @@ public Configuration(string writeKey, IAnalyticsErrorHandler analyticsErrorHandler = null, IStorageProvider storageProvider = default, IHTTPClientProvider httpClientProvider = default, - IList flushPolicies = default) + IList flushPolicies = default, + EventPipelineProvider eventPipelineProvider = default) { WriteKey = writeKey; FlushAt = flushAt; @@ -98,6 +101,7 @@ public Configuration(string writeKey, FlushPolicies = flushPolicies == null ? new ConcurrentList() : new ConcurrentList(flushPolicies); FlushPolicies.Add(new CountFlushPolicy(flushAt)); FlushPolicies.Add(new FrequencyFlushPolicy(flushInterval * 1000L)); + EventPipelineProvider = eventPipelineProvider ?? new EventPipelineProvider(); } public Configuration(string writeKey, diff --git a/Analytics-CSharp/Segment/Analytics/Plugins/SegmentDestination.cs b/Analytics-CSharp/Segment/Analytics/Plugins/SegmentDestination.cs index 0c5db70..9586be0 100644 --- a/Analytics-CSharp/Segment/Analytics/Plugins/SegmentDestination.cs +++ b/Analytics-CSharp/Segment/Analytics/Plugins/SegmentDestination.cs @@ -15,7 +15,7 @@ namespace Segment.Analytics.Plugins /// public class SegmentDestination : DestinationPlugin, ISubscriber { - private EventPipeline _pipeline = null; + private IEventPipeline _pipeline = null; public override string Key => "Segment.io"; @@ -64,13 +64,7 @@ public override void Configure(Analytics analytics) // Add DestinationMetadata enrichment plugin Add(new DestinationMetadataPlugin()); - _pipeline = new EventPipeline( - analytics, - Key, - analytics.Configuration.WriteKey, - analytics.Configuration.FlushPolicies, - analytics.Configuration.ApiHost - ); + _pipeline = analytics.Configuration.EventPipelineProvider.Create(analytics, Key); analytics.AnalyticsScope.Launch(analytics.AnalyticsDispatcher, async () => { diff --git a/Analytics-CSharp/Segment/Analytics/Utilities/EventPipeline.cs b/Analytics-CSharp/Segment/Analytics/Utilities/EventPipeline.cs index e9dd4fc..21fe7ce 100644 --- a/Analytics-CSharp/Segment/Analytics/Utilities/EventPipeline.cs +++ b/Analytics-CSharp/Segment/Analytics/Utilities/EventPipeline.cs @@ -7,7 +7,7 @@ namespace Segment.Analytics.Utilities { - internal class EventPipeline + public class EventPipeline: IEventPipeline { private readonly Analytics _analytics; @@ -23,7 +23,7 @@ internal class EventPipeline private readonly IStorage _storage; - internal string ApiHost { get; set; } + public string ApiHost { get; set; } public bool Running { get; private set; } diff --git a/Analytics-CSharp/Segment/Analytics/Utilities/EventPipelineProvider.cs b/Analytics-CSharp/Segment/Analytics/Utilities/EventPipelineProvider.cs new file mode 100644 index 0000000..abd376c --- /dev/null +++ b/Analytics-CSharp/Segment/Analytics/Utilities/EventPipelineProvider.cs @@ -0,0 +1,17 @@ +namespace Segment.Analytics.Utilities +{ + public class EventPipelineProvider:IEventPipelineProvider + { + public EventPipelineProvider() + { + } + + public IEventPipeline Create(Analytics analytics, string key) + { + return new EventPipeline(analytics, key, + analytics.Configuration.WriteKey, + analytics.Configuration.FlushPolicies, + analytics.Configuration.ApiHost); + } + } +} \ No newline at end of file diff --git a/Analytics-CSharp/Segment/Analytics/Utilities/IEventPipeline.cs b/Analytics-CSharp/Segment/Analytics/Utilities/IEventPipeline.cs new file mode 100644 index 0000000..b9cce6d --- /dev/null +++ b/Analytics-CSharp/Segment/Analytics/Utilities/IEventPipeline.cs @@ -0,0 +1,13 @@ +namespace Segment.Analytics.Utilities +{ + public interface IEventPipeline + { + bool Running { get; } + string ApiHost { get; set; } + + void Put(RawEvent @event); + void Flush(); + void Start(); + void Stop(); + } +} \ No newline at end of file diff --git a/Analytics-CSharp/Segment/Analytics/Utilities/IEventPipelineProvider.cs b/Analytics-CSharp/Segment/Analytics/Utilities/IEventPipelineProvider.cs new file mode 100644 index 0000000..2e39b2b --- /dev/null +++ b/Analytics-CSharp/Segment/Analytics/Utilities/IEventPipelineProvider.cs @@ -0,0 +1,9 @@ +using System.Collections.Generic; + +namespace Segment.Analytics.Utilities +{ + public interface IEventPipelineProvider + { + IEventPipeline Create(Analytics analytics, string key); + } +} \ No newline at end of file diff --git a/Analytics-CSharp/Segment/Analytics/Utilities/SyncEventPipeline.cs b/Analytics-CSharp/Segment/Analytics/Utilities/SyncEventPipeline.cs new file mode 100644 index 0000000..a52565b --- /dev/null +++ b/Analytics-CSharp/Segment/Analytics/Utilities/SyncEventPipeline.cs @@ -0,0 +1,202 @@ +using System.Collections.Generic; +using System.Threading; +using global::System; +using global::System.Linq; +using Segment.Analytics.Policies; +using Segment.Concurrent; +using Segment.Serialization; + +namespace Segment.Analytics.Utilities +{ + internal sealed class FlushEvent : RawEvent + { + public override string Type => "flush"; + public readonly SemaphoreSlim _semaphore; + + internal FlushEvent(SemaphoreSlim semaphore) + { + _semaphore = semaphore; + } + } + + + public class SyncEventPipeline: IEventPipeline + { + private readonly Analytics _analytics; + + private readonly string _logTag; + + private readonly IList _flushPolicies; + + private Channel _writeChannel; + + private Channel _uploadChannel; + + private readonly HTTPClient _httpClient; + + private readonly IStorage _storage; + + public string ApiHost { get; set; } + + public bool Running { get; private set; } + + internal int _flushTimeout = -1; + internal CancellationToken _flushCancellationToken = CancellationToken.None; + + public SyncEventPipeline( + Analytics analytics, + string logTag, + string apiKey, + IList flushPolicies, + string apiHost = HTTPClient.DefaultAPIHost, + int flushTimeout = -1, + CancellationToken? flushCancellationToken = null) + { + _analytics = analytics; + _logTag = logTag; + _flushPolicies = flushPolicies; + ApiHost = apiHost; + + _writeChannel = new Channel(); + _uploadChannel = new Channel(); + _httpClient = analytics.Configuration.HttpClientProvider.CreateHTTPClient(apiKey, apiHost: apiHost); + _httpClient.AnalyticsRef = analytics; + _storage = analytics.Storage; + Running = false; + _flushTimeout = flushTimeout; + _flushCancellationToken = flushCancellationToken ?? CancellationToken.None; + } + + public void Put(RawEvent @event) => _writeChannel.Send(@event); + + public void Flush() { + FlushEvent flushEvent = new FlushEvent(new SemaphoreSlim(1,1)); + _writeChannel.Send(flushEvent); + flushEvent._semaphore.Wait(_flushTimeout, _flushCancellationToken); + } + + public void Start() + { + if (Running) return; + + // avoid to re-establish a channel if the pipeline just gets created + if (_writeChannel.isCancelled) + { + _writeChannel = new Channel(); + _uploadChannel = new Channel(); + } + + Running = true; + Schedule(); + Write(); + Upload(); + } + + public void Stop() + { + if (!Running) return; + Running = false; + + _uploadChannel.Cancel(); + _writeChannel.Cancel(); + Unschedule(); + } + + private void Write() => _analytics.AnalyticsScope.Launch(_analytics.FileIODispatcher, async () => + { + while (!_writeChannel.isCancelled) + { + RawEvent e = await _writeChannel.Receive(); + bool isPoison = e is FlushEvent; + + if (!isPoison) + { + try + { + string str = JsonUtility.ToJson(e); + Analytics.Logger.Log(LogLevel.Debug, message: _logTag + " running " + str); + await _storage.Write(StorageConstants.Events, str); + + foreach (IFlushPolicy flushPolicy in _flushPolicies) + { + flushPolicy.UpdateState(e); + } + } + catch (Exception exception) + { + Analytics.Logger.Log(LogLevel.Error, exception, _logTag + ": Error writing events to storage."); + } + } + + if (isPoison || _flushPolicies.Any(o => o.ShouldFlush())) + { + FlushEvent flushEvent = e as FlushEvent ?? new FlushEvent(null); + _uploadChannel.Send(flushEvent); + foreach (IFlushPolicy flushPolicy in _flushPolicies) + { + flushPolicy.Reset(); + } + } + } + }); + + private void Upload() => _analytics.AnalyticsScope.Launch(_analytics.NetworkIODispatcher, async () => + { + while (!_uploadChannel.isCancelled) + { + FlushEvent flushEvent = await _uploadChannel.Receive(); + Analytics.Logger.Log(LogLevel.Debug, message: _logTag + " performing flush"); + + await Scope.WithContext(_analytics.FileIODispatcher, async () => await _storage.Rollover()); + + string[] fileUrlList = _storage.Read(StorageConstants.Events).Split(','); + foreach (string url in fileUrlList) + { + if (string.IsNullOrEmpty(url)) + { + continue; + } + + byte[] data = _storage.ReadAsBytes(url); + if (data == null) + { + continue; + } + + bool shouldCleanup = true; + try + { + shouldCleanup = await _httpClient.Upload(data); + Analytics.Logger.Log(LogLevel.Debug, message: _logTag + " uploaded " + url); + } + catch (Exception e) + { + Analytics.Logger.Log(LogLevel.Error, e, _logTag + ": Error uploading to url"); + } + + if (shouldCleanup) + { + _storage.RemoveFile(url); + } + } + flushEvent._semaphore?.Release(); + } + }); + + private void Schedule() + { + foreach (IFlushPolicy flushPolicy in _flushPolicies) + { + flushPolicy.Schedule(_analytics); + } + } + + private void Unschedule() + { + foreach (IFlushPolicy flushPolicy in _flushPolicies) + { + flushPolicy.Unschedule(); + } + } + } +} diff --git a/Analytics-CSharp/Segment/Analytics/Utilities/SyncEventPipelineProvider.cs b/Analytics-CSharp/Segment/Analytics/Utilities/SyncEventPipelineProvider.cs new file mode 100644 index 0000000..5794677 --- /dev/null +++ b/Analytics-CSharp/Segment/Analytics/Utilities/SyncEventPipelineProvider.cs @@ -0,0 +1,28 @@ +using System.Threading; + +namespace Segment.Analytics.Utilities +{ + public class SyncEventPipelineProvider: IEventPipelineProvider + { + internal int _flushTimeout = -1; + internal CancellationToken? _flushCancellationToken = null; + + public SyncEventPipelineProvider( + int flushTimeout = -1, + CancellationToken? flushCancellationToken = null) + { + _flushTimeout = flushTimeout; + _flushCancellationToken = flushCancellationToken; + } + + public IEventPipeline Create(Analytics analytics, string key) + { + return new SyncEventPipeline(analytics, key, + analytics.Configuration.WriteKey, + analytics.Configuration.FlushPolicies, + analytics.Configuration.ApiHost, + _flushTimeout, + _flushCancellationToken); + } + } +} \ No newline at end of file diff --git a/Tests/Utilities/EventPipelineTest.cs b/Tests/Utilities/EventPipelineTest.cs index 2ac8d3d..02d20d2 100644 --- a/Tests/Utilities/EventPipelineTest.cs +++ b/Tests/Utilities/EventPipelineTest.cs @@ -1,4 +1,6 @@ using System; +using System.Collections; +using System.Collections.Generic; using System.Threading.Tasks; using Moq; using Segment.Analytics; @@ -13,8 +15,6 @@ namespace Tests.Utilities { public class EventPipelineTest { - private EventPipeline _eventPipeline; - private readonly Analytics _analytics; private readonly Mock _storage; @@ -25,6 +25,12 @@ public class EventPipelineTest private readonly byte[] _bytes; + public static IEnumerable GetPipelineProvider() + { + yield return new object[] { new EventPipelineProvider() }; + yield return new object[] { new SyncEventPipelineProvider() }; + } + public EventPipelineTest() { Settings? settings = JsonUtility.FromJson( @@ -50,14 +56,6 @@ public EventPipelineTest() storageProvider: new MockStorageProvider(_storage) ); _analytics = new Analytics(config); - _eventPipeline = new EventPipeline( - _analytics, - logTag: "key", - apiKey: _analytics.Configuration.WriteKey, - flushPolicies: _analytics.Configuration.FlushPolicies, - apiHost: _analytics.Configuration.ApiHost - ); - _file = Guid.NewGuid().ToString(); _bytes = _file.GetBytes(); _storage @@ -68,23 +66,27 @@ public EventPipelineTest() .Returns(_bytes); } - [Fact] - public async Task TestPut() + [Theory] + [MemberData(nameof(GetPipelineProvider))] + public async Task TestPut(IEventPipelineProvider provider) { - _eventPipeline.Start(); - _eventPipeline.Put(new ScreenEvent("test")); + IEventPipeline eventPipeline = provider.Create(_analytics, "key"); + eventPipeline.Start(); + eventPipeline.Put(new ScreenEvent("test")); await Task.Delay(1000); _storage.Verify(o => o.Write(StorageConstants.Events, It.IsAny()), Times.Exactly(1)); } - [Fact] - public async Task TestFlush() + [Theory] + [MemberData(nameof(GetPipelineProvider))] + public async Task TestFlush(IEventPipelineProvider provider) { - _eventPipeline.Start(); - _eventPipeline.Put(new ScreenEvent("test")); - _eventPipeline.Flush(); + IEventPipeline eventPipeline = provider.Create(_analytics, "key"); + eventPipeline.Start(); + eventPipeline.Put(new ScreenEvent("test")); + eventPipeline.Flush(); await Task.Delay(1000); @@ -94,28 +96,32 @@ public async Task TestFlush() _storage.Verify(o => o.RemoveFile(_file), Times.Exactly(1)); } - [Fact] - public void TestStart() + [Theory] + [MemberData(nameof(GetPipelineProvider))] + public void TestStart(IEventPipelineProvider provider) { - _eventPipeline.Start(); - Assert.True(_eventPipeline.Running); + IEventPipeline eventPipeline = provider.Create(_analytics, "key"); + eventPipeline.Start(); + Assert.True(eventPipeline.Running); } - [Fact] - public async void TestStop() + [Theory] + [MemberData(nameof(GetPipelineProvider))] + public async void TestStop(IEventPipelineProvider provider) { - _eventPipeline.Start(); - Assert.True(_eventPipeline.Running); - _eventPipeline.Stop(); - Assert.False(_eventPipeline.Running); + IEventPipeline eventPipeline = provider.Create(_analytics, "key"); + eventPipeline.Start(); + Assert.True(eventPipeline.Running); + eventPipeline.Stop(); + Assert.False(eventPipeline.Running); // make sure writeChannel is stopped - _eventPipeline.Put(new ScreenEvent("test")); + eventPipeline.Put(new ScreenEvent("test")); await Task.Delay(1000); _storage.Verify(o => o.Write(StorageConstants.Events, It.IsAny()), Times.Never); // make sure uploadChannel is stopped - _eventPipeline.Flush(); + eventPipeline.Flush(); await Task.Delay(1000); _storage.Verify(o => o.Rollover(), Times.Never); _storage.Verify(o => o.Read(StorageConstants.Events), Times.Never); @@ -123,12 +129,14 @@ public async void TestStop() _storage.Verify(o => o.RemoveFile(_file), Times.Never); } - [Fact] - public async Task TestFlushCausedByOverflow() + [Theory] + [MemberData(nameof(GetPipelineProvider))] + public async Task TestFlushCausedByOverflow(IEventPipelineProvider provider) { - _eventPipeline.Start(); - _eventPipeline.Put(new ScreenEvent("event 1")); - _eventPipeline.Put(new ScreenEvent("event 2")); + IEventPipeline eventPipeline = provider.Create(_analytics, "key"); + eventPipeline.Start(); + eventPipeline.Put(new ScreenEvent("event 1")); + eventPipeline.Put(new ScreenEvent("event 2")); await Task.Delay(1000); @@ -138,9 +146,11 @@ public async Task TestFlushCausedByOverflow() _storage.Verify(o => o.RemoveFile(_file), Times.Exactly(1)); } - [Fact] - public async Task TestPeriodicalFlush() + [Theory] + [MemberData(nameof(GetPipelineProvider))] + public async Task TestPeriodicalFlush(IEventPipelineProvider provider) { + IEventPipeline eventPipeline = provider.Create(_analytics, "key"); foreach (IFlushPolicy policy in _analytics.Configuration.FlushPolicies) { if (policy is FrequencyFlushPolicy) @@ -151,21 +161,21 @@ public async Task TestPeriodicalFlush() _analytics.AddFlushPolicy(new FrequencyFlushPolicy(1000L)); // since we set autoAddSegmentDestination = false, we need to manually add it to analytics. - // we need a mocked SegmentDestination so we can redirect Flush call to this _eventPipeline. + // we need a mocked SegmentDestination so we can redirect Flush call to this eventPipeline. var segmentDestination = new Mock(); - segmentDestination.Setup(o => o.Flush()).Callback(() => _eventPipeline.Flush()); + segmentDestination.Setup(o => o.Flush()).Callback(() => eventPipeline.Flush()); segmentDestination.Setup(o => o.Analytics).Returns(_analytics); _analytics.Add(segmentDestination.Object); - _eventPipeline = new EventPipeline( + eventPipeline = new EventPipeline( _analytics, logTag: "key", apiKey: _analytics.Configuration.WriteKey, flushPolicies: _analytics.Configuration.FlushPolicies, apiHost: _analytics.Configuration.ApiHost ); - _eventPipeline.Start(); - _eventPipeline.Put(new ScreenEvent("test")); + eventPipeline.Start(); + eventPipeline.Put(new ScreenEvent("test")); await Task.Delay(2050); @@ -175,16 +185,18 @@ public async Task TestPeriodicalFlush() _storage.Verify(o => o.RemoveFile(_file), Times.Exactly(2)); } - [Fact] - public async Task TestFlushInterruptedWhenNoFileExist() + [Theory] + [MemberData(nameof(GetPipelineProvider))] + public async Task TestFlushInterruptedWhenNoFileExist(IEventPipelineProvider provider) { + IEventPipeline eventPipeline = provider.Create(_analytics, "key"); // make sure the file does not exist _storage .Setup(o => o.ReadAsBytes(It.IsAny())) .Returns((byte[])null); - _eventPipeline.Start(); - _eventPipeline.Flush(); + eventPipeline.Start(); + eventPipeline.Flush(); await Task.Delay(1000);