Skip to content

Commit

Permalink
Add check to ensure each subscription replies only once
Browse files Browse the repository at this point in the history
  • Loading branch information
Alkenso committed Aug 29, 2024
1 parent e4565cb commit e882552
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,13 @@ internal final class ESServiceSubscriptionStore {
return
}

let group = ESMultipleResolution(count: subscribers.count, reply: reply)
subscribers.forEach { entry in
let group = ESMultipleResolution(count: subscribers.count, completion: reply)
for i in 0..<subscribers.count {
let entry = subscribers[i]
entry.subscription.queue.async {
entry.subscription.authMessageHandler(message, group.resolve)
entry.subscription.authMessageHandler(message) {
group.resolve($0, by: i, name: entry.subscription.name)
}
}
}
}
Expand Down Expand Up @@ -171,23 +174,29 @@ internal final class ESServiceSubscriptionStore {

internal final class ESMultipleResolution {
private var lock = UnfairLock()
private var fulfilled = 0
private var resolved = 0
private var resolutions: [ESAuthResolution]
private let reply: (ESAuthResolution) -> Void
private var resolutionsState: [Bool]
private let completion: (ESAuthResolution) -> Void

init(count: Int, reply: @escaping (ESAuthResolution) -> Void) {
init(count: Int, completion: @escaping (ESAuthResolution) -> Void) {
self.resolutions = .init(repeating: .allow, count: count)
self.reply = reply
self.resolutionsState = .init(repeating: false, count: count)
self.completion = completion
}

func resolve(_ resolution: ESAuthResolution) {
func resolve(_ resolution: ESAuthResolution, by subscription: Int, name: String) {
lock.withLock {
resolutions[fulfilled] = resolution
fulfilled += 1
guard !updateSwap(&resolutionsState[subscription], true) else {
log.error("Invalid multiple resolutions provided by subscription \(name)(\(subscription))", assert: true)
return
}
resolved += 1
resolutions[subscription] = resolution

if fulfilled == resolutions.count {
if resolved == resolutions.count {
let combined = ESAuthResolution.combine(resolutions)
reply(combined)
completion(combined)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ public struct ESSubscription {

public init() {}

/// Custom name of subscription for log & debug purposes.
public var name = "ESSubscription"

/// Set of events to subscribe on.
public var events: [es_event_type_t] = []

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class ESClientTypesTests: XCTestCase {
XCTAssertEqual($0, .allowOnce)
exp.fulfill()
}
(0..<count).forEach { _ in group.resolve(.allowOnce) }
(0..<count).forEach { group.resolve(.allowOnce, by: $0, name: "") }

waitForExpectations()
}
Expand Down

0 comments on commit e882552

Please sign in to comment.