Skip to content

Commit

Permalink
Added operating mode config option & behavior. (#269)
Browse files Browse the repository at this point in the history
* Added operatingMode configuration option and logic.

* Changed unit-test settings setup to respect operating mode.

* Made some startup plugins not auto-load when operating in server mode.

* Fixed logging output to be correct.

* Updated testPurgeStorage to reflect new flush behavior.

* Added test for server operation mode.

* Improved flush policy interval test.

* Modified equality check to specify pointer value.

* Fixed issue on linux w/ recursive sync

* Logic update to simplify queueing and protection

* updated commentary
  • Loading branch information
bsneed authored Nov 17, 2023
1 parent 3fbc48f commit 7741aa5
Show file tree
Hide file tree
Showing 9 changed files with 175 additions and 17 deletions.
74 changes: 70 additions & 4 deletions Sources/Segment/Analytics.swift
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,11 @@ extension Analytics {
return nil
}

/// Returns the current operating mode this instance was given.
public var operatingMode: OperatingMode {
return configuration.values.operatingMode
}

/// Adjusts the flush interval post configuration.
public var flushInterval: TimeInterval {
get {
Expand Down Expand Up @@ -196,16 +201,52 @@ extension Analytics {
}

/// Tells this instance of Analytics to flush any queued events up to Segment.com. This command will also
/// be sent to each plugin present in the system.
public func flush() {
/// be sent to each plugin present in the system. A completion handler can be optionally given and will be
/// called when flush has completed.
public func flush(completion: (() -> Void)? = nil) {
// only flush if we're enabled.
guard enabled == true else { return }

let flushGroup = DispatchGroup()
// gotta call enter at least once before we ask to be notified.
flushGroup.enter()

apply { plugin in
if let p = plugin as? EventPlugin {
p.flush()
operatingMode.run(queue: configuration.values.flushQueue) {
if let p = plugin as? FlushCompletion {
// this is async
// flush(group:completion:) handles the enter/leave.
p.flush(group: flushGroup) { plugin in
// we don't really care about the plugin value .. yet.
}
} else if let p = plugin as? EventPlugin {
// we have no idea if this will be async or not, assume it's sync.
flushGroup.enter()
p.flush()
flushGroup.leave()
}
}
}

// if we're not in server mode, we need to be notified when it's done.
if let completion, operatingMode != .synchronous {
// set up our callback to know when the group has completed, if we're not
// in .server operating mode.
flushGroup.notify(queue: configuration.values.flushQueue) {
DispatchQueue.main.async { completion() }
}
}

flushGroup.leave() // matches our initial enter().

// if we ARE in server mode, we need to wait on the group.
// This effectively ends up being a `sync` operation.
if operatingMode == .synchronous {
flushGroup.wait()
// we need to call completion on our own since
// we skipped setting up notify.
if let completion { DispatchQueue.main.async { completion() }}
}
}

/// Resets this instance of Analytics to a clean slate. Traits, UserID's, anonymousId, etc are all cleared or reset. This
Expand Down Expand Up @@ -384,3 +425,28 @@ extension Analytics {
return configuration.values.writeKey == Self.deadInstance
}
}

// MARK: Operating mode based scheduling

extension OperatingMode {
func run(queue: DispatchQueue, task: @escaping () -> Void) {
//
switch self {
case .asynchronous:
queue.async {
task()
}
case .synchronous:
// if for some reason, we're told to do all this stuff on
// main, ignore it, and use the default queue. this prevents
// a possible deadlock.
if queue === DispatchQueue.main {
OperatingMode.defaultQueue.asyncAndWait {
task()
}
} else {
queue.asyncAndWait { task() }
}
}
}
}
30 changes: 30 additions & 0 deletions Sources/Segment/Configuration.swift
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,17 @@ import Foundation
import FoundationNetworking
#endif

// MARK: - Operating Mode
/// Specifies the operating mode/context
public enum OperatingMode {
/// The operation of the Analytics client are synchronous.
case synchronous
/// The operation of the Analytics client are asynchronous.
case asynchronous

static internal let defaultQueue = DispatchQueue(label: "com.segment.operatingModeQueue", qos: .utility)
}

// MARK: - Internal Configuration

public class Configuration {
Expand All @@ -26,6 +37,9 @@ public class Configuration {
var requestFactory: ((URLRequest) -> URLRequest)? = nil
var errorHandler: ((Error) -> Void)? = nil
var flushPolicies: [FlushPolicy] = [CountBasedFlushPolicy(), IntervalBasedFlushPolicy()]

var operatingMode: OperatingMode = .asynchronous
var flushQueue: DispatchQueue = OperatingMode.defaultQueue
var userAgent: String? = nil
}

Expand Down Expand Up @@ -184,6 +198,22 @@ public extension Configuration {
return self
}

/// Informs the Analytics instance of its operating mode/context.
/// Use `.server` when operating in a web service, or when synchronous operation
/// is desired. Use `.client` when operating in a long lived process,
/// desktop/mobile application.
@discardableResult
func operatingMode(_ mode: OperatingMode) -> Configuration {
values.operatingMode = mode
return self
}

/// Specify a custom queue to use when performing a flush operation. The default
/// value is a Segment owned background queue.
@discardableResult
func flushQueue(_ queue: DispatchQueue) -> Configuration {
values.flushQueue = queue

@discardableResult
func userAgent(_ userAgent: String) -> Configuration {
values.userAgent = userAgent
Expand Down
4 changes: 4 additions & 0 deletions Sources/Segment/Plugins.swift
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ public protocol VersionedPlugin {
static func version() -> String
}

public protocol FlushCompletion {
func flush(group: DispatchGroup, completion: @escaping (DestinationPlugin) -> Void)
}

// For internal platform-specific bits
internal protocol PlatformPlugin: Plugin { }

Expand Down
14 changes: 12 additions & 2 deletions Sources/Segment/Plugins/SegmentDestination.swift
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import Sovran
import FoundationNetworking
#endif

public class SegmentDestination: DestinationPlugin, Subscriber {
public class SegmentDestination: DestinationPlugin, Subscriber, FlushCompletion {
internal enum Constants: String {
case integrationName = "Segment.io"
case apiHost = "apiHost"
Expand Down Expand Up @@ -113,6 +113,10 @@ public class SegmentDestination: DestinationPlugin, Subscriber {
}

public func flush() {
// unused .. see flush(group:completion:)
}

public func flush(group: DispatchGroup, completion: @escaping (DestinationPlugin) -> Void) {
guard let storage = self.storage else { return }
guard let analytics = self.analytics else { return }
guard let httpClient = self.httpClient else { return }
Expand All @@ -131,7 +135,9 @@ public class SegmentDestination: DestinationPlugin, Subscriber {
if pendingUploads == 0 {
for url in data {
analytics.log(message: "Processing Batch:\n\(url.lastPathComponent)")

// enter the dispatch group
group.enter()
// set up the task
let uploadTask = httpClient.startBatchUpload(writeKey: analytics.configuration.values.writeKey, batch: url) { (result) in
switch result {
case .success(_):
Expand All @@ -146,6 +152,10 @@ public class SegmentDestination: DestinationPlugin, Subscriber {
// make sure it gets removed and it's cleanup() called rather
// than waiting on the next flush to come around.
self.cleanupUploads()
// call the completion
completion(self)
// leave the dispatch group
group.leave()
}
// we have a legit upload in progress now, so add it to our list.
if let upload = uploadTask {
Expand Down
4 changes: 3 additions & 1 deletion Sources/Segment/Settings.swift
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,14 @@ extension Analytics {
// we don't really wanna wait for this network call during tests...
// but we should make it work similarly.
store.dispatch(action: System.ToggleRunningAction(running: false))
DispatchQueue.main.async {

operatingMode.run(queue: DispatchQueue.main) {
if let state: System = self.store.currentState(), let settings = state.settings {
self.store.dispatch(action: System.UpdateSettingsAction(settings: settings))
}
self.store.dispatch(action: System.ToggleRunningAction(running: true))
}

return
}
#endif
Expand Down
2 changes: 1 addition & 1 deletion Sources/Segment/Startup.swift
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ extension Analytics: Subscriber {
plugins += VendorSystem.current.requiredPlugins

// setup lifecycle if desired
if configuration.values.trackApplicationLifecycleEvents {
if configuration.values.trackApplicationLifecycleEvents, operatingMode != .synchronous {
#if os(iOS) || os(tvOS)
plugins.append(iOSLifecycleEvents())
#endif
Expand Down
7 changes: 5 additions & 2 deletions Sources/Segment/Utilities/Logging.swift
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,15 @@
import Foundation

extension Analytics {
internal enum LogKind {
internal enum LogKind: CustomStringConvertible, CustomDebugStringConvertible {
case error
case warning
case debug
case none

var description: String { return string }
var debugDescription: String { return string }

var string: String {
switch self {
case .error:
Expand All @@ -23,7 +26,7 @@ extension Analytics {
case .debug:
return "SEG_DEBUG: "
case .none:
return ""
return "SEG_INFO: "
}
}
}
Expand Down
43 changes: 40 additions & 3 deletions Tests/Segment-Tests/Analytics_Tests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,10 @@ final class Analytics_Tests: XCTestCase {

func testPurgeStorage() {
// Use a specific writekey to this test so we do not collide with other cached items.
let analytics = Analytics(configuration: Configuration(writeKey: "testFlush_do_not_reuse_this_writekey_either").flushInterval(9999).flushAt(9999))
let analytics = Analytics(configuration: Configuration(writeKey: "testFlush_do_not_reuse_this_writekey_either")
.flushInterval(9999)
.flushAt(9999)
.operatingMode(.synchronous))

waitUntilStarted(analytics: analytics)

Expand All @@ -479,13 +482,13 @@ final class Analytics_Tests: XCTestCase {
analytics.track(name: "test")

var newPendingCount = analytics.pendingUploads!.count
XCTAssertEqual(newPendingCount, 4)
XCTAssertEqual(newPendingCount, 1)

let pending = analytics.pendingUploads!
analytics.purgeStorage(fileURL: pending.first!)

newPendingCount = analytics.pendingUploads!.count
XCTAssertEqual(newPendingCount, 3)
XCTAssertEqual(newPendingCount, 0)

analytics.purgeStorage()
newPendingCount = analytics.pendingUploads!.count
Expand Down Expand Up @@ -688,4 +691,38 @@ final class Analytics_Tests: XCTestCase {
XCTAssertTrue(shared2 === shared)

}

func testServerOperatingMode() {
// Use a specific writekey to this test so we do not collide with other cached items.
let analytics = Analytics(configuration: Configuration(writeKey: "testFlush_serverMode")
.flushInterval(9999)
.flushAt(9999)
.operatingMode(.synchronous))

waitUntilStarted(analytics: analytics)

analytics.storage.hardReset(doYouKnowHowToUseThis: true)

@Atomic var completionCalled = false

// put an event in the pipe ...
analytics.track(name: "completion test1")
// flush it, that'll get us an upload going
analytics.flush {
// verify completion is called.
completionCalled = true
}

// completion shouldn't be called before flush returned.
XCTAssertTrue(completionCalled)
XCTAssertEqual(analytics.pendingUploads!.count, 0)

// put another event in the pipe.
analytics.track(name: "completion test2")
analytics.flush()

// flush shouldn't return until all uploads are done, cuz
// it's running in sync mode.
XCTAssertEqual(analytics.pendingUploads!.count, 0)
}
}
14 changes: 10 additions & 4 deletions Tests/Segment-Tests/FlushPolicy_Tests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,15 @@ class FlushPolicyTests: XCTestCase {

XCTAssertTrue(analytics.hasUnsentEvents)

// sleep for 4 seconds for 2 second flush policy
RunLoop.main.run(until: Date.init(timeIntervalSinceNow: 4))

XCTAssertFalse(analytics.hasUnsentEvents)
@Atomic var flushSent = false
while !flushSent {
RunLoop.main.run(until: Date.distantPast)
if analytics.pendingUploads!.count > 0 {
// flush was triggered
flushSent = true
}
}

XCTAssertTrue(flushSent)
}
}

0 comments on commit 7741aa5

Please sign in to comment.