From 7741aa5836ee69a4b44d4e9514d75baa10d538f4 Mon Sep 17 00:00:00 2001 From: Brandon Sneed Date: Fri, 17 Nov 2023 10:48:37 -0800 Subject: [PATCH] Added operating mode config option & behavior. (#269) * Added operatingMode configuration option and logic. * Changed unit-test settings setup to respect operating mode. * Made some startup plugins not auto-load when operating in server mode. * Fixed logging output to be correct. * Updated testPurgeStorage to reflect new flush behavior. * Added test for server operation mode. * Improved flush policy interval test. * Modified equality check to specify pointer value. * Fixed issue on linux w/ recursive sync * Logic update to simplify queueing and protection * updated commentary --- Sources/Segment/Analytics.swift | 74 ++++++++++++++++++- Sources/Segment/Configuration.swift | 30 ++++++++ Sources/Segment/Plugins.swift | 4 + .../Segment/Plugins/SegmentDestination.swift | 14 +++- Sources/Segment/Settings.swift | 4 +- Sources/Segment/Startup.swift | 2 +- Sources/Segment/Utilities/Logging.swift | 7 +- Tests/Segment-Tests/Analytics_Tests.swift | 43 ++++++++++- Tests/Segment-Tests/FlushPolicy_Tests.swift | 14 +++- 9 files changed, 175 insertions(+), 17 deletions(-) diff --git a/Sources/Segment/Analytics.swift b/Sources/Segment/Analytics.swift index 10d06afd..0667f03f 100644 --- a/Sources/Segment/Analytics.swift +++ b/Sources/Segment/Analytics.swift @@ -146,6 +146,11 @@ extension Analytics { return nil } + /// Returns the current operating mode this instance was given. + public var operatingMode: OperatingMode { + return configuration.values.operatingMode + } + /// Adjusts the flush interval post configuration. public var flushInterval: TimeInterval { get { @@ -196,16 +201,52 @@ extension Analytics { } /// Tells this instance of Analytics to flush any queued events up to Segment.com. This command will also - /// be sent to each plugin present in the system. - public func flush() { + /// be sent to each plugin present in the system. A completion handler can be optionally given and will be + /// called when flush has completed. + public func flush(completion: (() -> Void)? = nil) { // only flush if we're enabled. guard enabled == true else { return } + let flushGroup = DispatchGroup() + // gotta call enter at least once before we ask to be notified. + flushGroup.enter() + apply { plugin in - if let p = plugin as? EventPlugin { - p.flush() + operatingMode.run(queue: configuration.values.flushQueue) { + if let p = plugin as? FlushCompletion { + // this is async + // flush(group:completion:) handles the enter/leave. + p.flush(group: flushGroup) { plugin in + // we don't really care about the plugin value .. yet. + } + } else if let p = plugin as? EventPlugin { + // we have no idea if this will be async or not, assume it's sync. + flushGroup.enter() + p.flush() + flushGroup.leave() + } } } + + // if we're not in server mode, we need to be notified when it's done. + if let completion, operatingMode != .synchronous { + // set up our callback to know when the group has completed, if we're not + // in .server operating mode. + flushGroup.notify(queue: configuration.values.flushQueue) { + DispatchQueue.main.async { completion() } + } + } + + flushGroup.leave() // matches our initial enter(). + + // if we ARE in server mode, we need to wait on the group. + // This effectively ends up being a `sync` operation. + if operatingMode == .synchronous { + flushGroup.wait() + // we need to call completion on our own since + // we skipped setting up notify. + if let completion { DispatchQueue.main.async { completion() }} + } } /// Resets this instance of Analytics to a clean slate. Traits, UserID's, anonymousId, etc are all cleared or reset. This @@ -384,3 +425,28 @@ extension Analytics { return configuration.values.writeKey == Self.deadInstance } } + +// MARK: Operating mode based scheduling + +extension OperatingMode { + func run(queue: DispatchQueue, task: @escaping () -> Void) { + // + switch self { + case .asynchronous: + queue.async { + task() + } + case .synchronous: + // if for some reason, we're told to do all this stuff on + // main, ignore it, and use the default queue. this prevents + // a possible deadlock. + if queue === DispatchQueue.main { + OperatingMode.defaultQueue.asyncAndWait { + task() + } + } else { + queue.asyncAndWait { task() } + } + } + } +} diff --git a/Sources/Segment/Configuration.swift b/Sources/Segment/Configuration.swift index bbeba21e..8de631e8 100644 --- a/Sources/Segment/Configuration.swift +++ b/Sources/Segment/Configuration.swift @@ -10,6 +10,17 @@ import Foundation import FoundationNetworking #endif +// MARK: - Operating Mode +/// Specifies the operating mode/context +public enum OperatingMode { + /// The operation of the Analytics client are synchronous. + case synchronous + /// The operation of the Analytics client are asynchronous. + case asynchronous + + static internal let defaultQueue = DispatchQueue(label: "com.segment.operatingModeQueue", qos: .utility) +} + // MARK: - Internal Configuration public class Configuration { @@ -26,6 +37,9 @@ public class Configuration { var requestFactory: ((URLRequest) -> URLRequest)? = nil var errorHandler: ((Error) -> Void)? = nil var flushPolicies: [FlushPolicy] = [CountBasedFlushPolicy(), IntervalBasedFlushPolicy()] + + var operatingMode: OperatingMode = .asynchronous + var flushQueue: DispatchQueue = OperatingMode.defaultQueue var userAgent: String? = nil } @@ -184,6 +198,22 @@ public extension Configuration { return self } + /// Informs the Analytics instance of its operating mode/context. + /// Use `.server` when operating in a web service, or when synchronous operation + /// is desired. Use `.client` when operating in a long lived process, + /// desktop/mobile application. + @discardableResult + func operatingMode(_ mode: OperatingMode) -> Configuration { + values.operatingMode = mode + return self + } + + /// Specify a custom queue to use when performing a flush operation. The default + /// value is a Segment owned background queue. + @discardableResult + func flushQueue(_ queue: DispatchQueue) -> Configuration { + values.flushQueue = queue + @discardableResult func userAgent(_ userAgent: String) -> Configuration { values.userAgent = userAgent diff --git a/Sources/Segment/Plugins.swift b/Sources/Segment/Plugins.swift index 7890e77a..cf0086f3 100644 --- a/Sources/Segment/Plugins.swift +++ b/Sources/Segment/Plugins.swift @@ -62,6 +62,10 @@ public protocol VersionedPlugin { static func version() -> String } +public protocol FlushCompletion { + func flush(group: DispatchGroup, completion: @escaping (DestinationPlugin) -> Void) +} + // For internal platform-specific bits internal protocol PlatformPlugin: Plugin { } diff --git a/Sources/Segment/Plugins/SegmentDestination.swift b/Sources/Segment/Plugins/SegmentDestination.swift index 2b1c76ba..91257438 100644 --- a/Sources/Segment/Plugins/SegmentDestination.swift +++ b/Sources/Segment/Plugins/SegmentDestination.swift @@ -16,7 +16,7 @@ import Sovran import FoundationNetworking #endif -public class SegmentDestination: DestinationPlugin, Subscriber { +public class SegmentDestination: DestinationPlugin, Subscriber, FlushCompletion { internal enum Constants: String { case integrationName = "Segment.io" case apiHost = "apiHost" @@ -113,6 +113,10 @@ public class SegmentDestination: DestinationPlugin, Subscriber { } public func flush() { + // unused .. see flush(group:completion:) + } + + public func flush(group: DispatchGroup, completion: @escaping (DestinationPlugin) -> Void) { guard let storage = self.storage else { return } guard let analytics = self.analytics else { return } guard let httpClient = self.httpClient else { return } @@ -131,7 +135,9 @@ public class SegmentDestination: DestinationPlugin, Subscriber { if pendingUploads == 0 { for url in data { analytics.log(message: "Processing Batch:\n\(url.lastPathComponent)") - + // enter the dispatch group + group.enter() + // set up the task let uploadTask = httpClient.startBatchUpload(writeKey: analytics.configuration.values.writeKey, batch: url) { (result) in switch result { case .success(_): @@ -146,6 +152,10 @@ public class SegmentDestination: DestinationPlugin, Subscriber { // make sure it gets removed and it's cleanup() called rather // than waiting on the next flush to come around. self.cleanupUploads() + // call the completion + completion(self) + // leave the dispatch group + group.leave() } // we have a legit upload in progress now, so add it to our list. if let upload = uploadTask { diff --git a/Sources/Segment/Settings.swift b/Sources/Segment/Settings.swift index 41a6aeb3..7bd87207 100644 --- a/Sources/Segment/Settings.swift +++ b/Sources/Segment/Settings.swift @@ -131,12 +131,14 @@ extension Analytics { // we don't really wanna wait for this network call during tests... // but we should make it work similarly. store.dispatch(action: System.ToggleRunningAction(running: false)) - DispatchQueue.main.async { + + operatingMode.run(queue: DispatchQueue.main) { if let state: System = self.store.currentState(), let settings = state.settings { self.store.dispatch(action: System.UpdateSettingsAction(settings: settings)) } self.store.dispatch(action: System.ToggleRunningAction(running: true)) } + return } #endif diff --git a/Sources/Segment/Startup.swift b/Sources/Segment/Startup.swift index 8766e1ba..73444203 100644 --- a/Sources/Segment/Startup.swift +++ b/Sources/Segment/Startup.swift @@ -48,7 +48,7 @@ extension Analytics: Subscriber { plugins += VendorSystem.current.requiredPlugins // setup lifecycle if desired - if configuration.values.trackApplicationLifecycleEvents { + if configuration.values.trackApplicationLifecycleEvents, operatingMode != .synchronous { #if os(iOS) || os(tvOS) plugins.append(iOSLifecycleEvents()) #endif diff --git a/Sources/Segment/Utilities/Logging.swift b/Sources/Segment/Utilities/Logging.swift index 52f28408..98868ec0 100644 --- a/Sources/Segment/Utilities/Logging.swift +++ b/Sources/Segment/Utilities/Logging.swift @@ -8,12 +8,15 @@ import Foundation extension Analytics { - internal enum LogKind { + internal enum LogKind: CustomStringConvertible, CustomDebugStringConvertible { case error case warning case debug case none + var description: String { return string } + var debugDescription: String { return string } + var string: String { switch self { case .error: @@ -23,7 +26,7 @@ extension Analytics { case .debug: return "SEG_DEBUG: " case .none: - return "" + return "SEG_INFO: " } } } diff --git a/Tests/Segment-Tests/Analytics_Tests.swift b/Tests/Segment-Tests/Analytics_Tests.swift index de9eb28d..1db8c2d5 100644 --- a/Tests/Segment-Tests/Analytics_Tests.swift +++ b/Tests/Segment-Tests/Analytics_Tests.swift @@ -457,7 +457,10 @@ final class Analytics_Tests: XCTestCase { func testPurgeStorage() { // Use a specific writekey to this test so we do not collide with other cached items. - let analytics = Analytics(configuration: Configuration(writeKey: "testFlush_do_not_reuse_this_writekey_either").flushInterval(9999).flushAt(9999)) + let analytics = Analytics(configuration: Configuration(writeKey: "testFlush_do_not_reuse_this_writekey_either") + .flushInterval(9999) + .flushAt(9999) + .operatingMode(.synchronous)) waitUntilStarted(analytics: analytics) @@ -479,13 +482,13 @@ final class Analytics_Tests: XCTestCase { analytics.track(name: "test") var newPendingCount = analytics.pendingUploads!.count - XCTAssertEqual(newPendingCount, 4) + XCTAssertEqual(newPendingCount, 1) let pending = analytics.pendingUploads! analytics.purgeStorage(fileURL: pending.first!) newPendingCount = analytics.pendingUploads!.count - XCTAssertEqual(newPendingCount, 3) + XCTAssertEqual(newPendingCount, 0) analytics.purgeStorage() newPendingCount = analytics.pendingUploads!.count @@ -688,4 +691,38 @@ final class Analytics_Tests: XCTestCase { XCTAssertTrue(shared2 === shared) } + + func testServerOperatingMode() { + // Use a specific writekey to this test so we do not collide with other cached items. + let analytics = Analytics(configuration: Configuration(writeKey: "testFlush_serverMode") + .flushInterval(9999) + .flushAt(9999) + .operatingMode(.synchronous)) + + waitUntilStarted(analytics: analytics) + + analytics.storage.hardReset(doYouKnowHowToUseThis: true) + + @Atomic var completionCalled = false + + // put an event in the pipe ... + analytics.track(name: "completion test1") + // flush it, that'll get us an upload going + analytics.flush { + // verify completion is called. + completionCalled = true + } + + // completion shouldn't be called before flush returned. + XCTAssertTrue(completionCalled) + XCTAssertEqual(analytics.pendingUploads!.count, 0) + + // put another event in the pipe. + analytics.track(name: "completion test2") + analytics.flush() + + // flush shouldn't return until all uploads are done, cuz + // it's running in sync mode. + XCTAssertEqual(analytics.pendingUploads!.count, 0) + } } diff --git a/Tests/Segment-Tests/FlushPolicy_Tests.swift b/Tests/Segment-Tests/FlushPolicy_Tests.swift index f3813b35..0de096c0 100644 --- a/Tests/Segment-Tests/FlushPolicy_Tests.swift +++ b/Tests/Segment-Tests/FlushPolicy_Tests.swift @@ -137,9 +137,15 @@ class FlushPolicyTests: XCTestCase { XCTAssertTrue(analytics.hasUnsentEvents) - // sleep for 4 seconds for 2 second flush policy - RunLoop.main.run(until: Date.init(timeIntervalSinceNow: 4)) - - XCTAssertFalse(analytics.hasUnsentEvents) + @Atomic var flushSent = false + while !flushSent { + RunLoop.main.run(until: Date.distantPast) + if analytics.pendingUploads!.count > 0 { + // flush was triggered + flushSent = true + } + } + + XCTAssertTrue(flushSent) } }