Skip to content

Commit

Permalink
Subs in a composed form of withLatestFrom and adds test cases for lon…
Browse files Browse the repository at this point in the history
…e upstream completions.
  • Loading branch information
jasdev authored and freak4pc committed Sep 21, 2021
1 parent 87cc7f5 commit fc3e405
Show file tree
Hide file tree
Showing 2 changed files with 146 additions and 211 deletions.
306 changes: 95 additions & 211 deletions Sources/Operators/WithLatestFrom.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,227 +12,111 @@ import Combine
// MARK: - Operator methods
@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
public extension Publisher {
/// Merges two publishers into a single publisher by combining each value
/// from self with the latest value from the second publisher, if any.
///
/// - parameter other: A second publisher source.
/// - parameter resultSelector: Function to invoke for each value from the self combined
/// with the latest value from the second source, if any.
///
/// - returns: A publisher containing the result of combining each value of the self
/// with the latest value from the second publisher, if any, using the
/// specified result selector function.
func withLatestFrom<Other: Publisher, Result>(_ other: Other,
resultSelector: @escaping (Output, Other.Output) -> Result)
-> Publishers.WithLatestFrom<Self, Other, Result> {
return .init(upstream: self, second: other, resultSelector: resultSelector)
}

/// Merges three publishers into a single publisher by combining each value
/// from self with the latest value from the second and third publisher, if any.
///
/// - parameter other: A second publisher source.
/// - parameter other1: A third publisher source.
/// - parameter resultSelector: Function to invoke for each value from the self combined
/// with the latest value from the second and third source, if any.
///
/// - returns: A publisher containing the result of combining each value of the self
/// with the latest value from the second and third publisher, if any, using the
/// specified result selector function.
func withLatestFrom<Other: Publisher, Other1: Publisher, Result>(_ other: Other,
_ other1: Other1,
resultSelector: @escaping (Output, (Other.Output, Other1.Output)) -> Result)
-> Publishers.WithLatestFrom<Self, AnyPublisher<(Other.Output, Other1.Output), Self.Failure>, Result>
where Other.Failure == Failure, Other1.Failure == Failure {
let combined = other.combineLatest(other1)
.eraseToAnyPublisher()
return .init(upstream: self, second: combined, resultSelector: resultSelector)
}

/// Merges four publishers into a single publisher by combining each value
/// from self with the latest value from the second, third and fourth publisher, if any.
///
/// - parameter other: A second publisher source.
/// - parameter other1: A third publisher source.
/// - parameter other2: A fourth publisher source.
/// - parameter resultSelector: Function to invoke for each value from the self combined
/// with the latest value from the second, third and fourth source, if any.
///
/// - returns: A publisher containing the result of combining each value of the self
/// with the latest value from the second, third and fourth publisher, if any, using the
/// specified result selector function.
func withLatestFrom<Other: Publisher, Other1: Publisher, Other2: Publisher, Result>(_ other: Other,
_ other1: Other1,
_ other2: Other2,
resultSelector: @escaping (Output, (Other.Output, Other1.Output, Other2.Output)) -> Result)
-> Publishers.WithLatestFrom<Self, AnyPublisher<(Other.Output, Other1.Output, Other2.Output), Self.Failure>, Result>
where Other.Failure == Failure, Other1.Failure == Failure, Other2.Failure == Failure {
let combined = other.combineLatest(other1, other2)
.eraseToAnyPublisher()
return .init(upstream: self, second: combined, resultSelector: resultSelector)
}

/// Upon an emission from self, emit the latest value from the
/// second publisher, if any exists.
///
/// - parameter other: A second publisher source.
///
/// - returns: A publisher containing the latest value from the second publisher, if any.
func withLatestFrom<Other: Publisher>(_ other: Other)
-> Publishers.WithLatestFrom<Self, Other, Other.Output> {
return .init(upstream: self, second: other) { $1 }
}

/// Upon an emission from self, emit the latest value from the
/// second and third publisher, if any exists.
///
/// - parameter other: A second publisher source.
/// - parameter other1: A third publisher source.
///
/// - returns: A publisher containing the latest value from the second and third publisher, if any.
func withLatestFrom<Other: Publisher, Other1: Publisher>(_ other: Other,
_ other1: Other1)
-> Publishers.WithLatestFrom<Self, AnyPublisher<(Other.Output, Other1.Output), Self.Failure>, (Other.Output, Other1.Output)>
where Other.Failure == Failure, Other1.Failure == Failure {
withLatestFrom(other, other1) { $1 }
}

/// Upon an emission from self, emit the latest value from the
/// second, third and forth publisher, if any exists.
///
/// - parameter other: A second publisher source.
/// - parameter other1: A third publisher source.
/// - parameter other2: A forth publisher source.
///
/// - returns: A publisher containing the latest value from the second, third and forth publisher, if any.
func withLatestFrom<Other: Publisher, Other1: Publisher, Other2: Publisher>(_ other: Other,
_ other1: Other1,
_ other2: Other2)
-> Publishers.WithLatestFrom<Self, AnyPublisher<(Other.Output, Other1.Output, Other2.Output), Self.Failure>, (Other.Output, Other1.Output, Other2.Output)>
where Other.Failure == Failure, Other1.Failure == Failure, Other2.Failure == Failure {
withLatestFrom(other, other1, other2) { $1 }
}
}

// MARK: - Publisher
@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
public extension Publishers {
struct WithLatestFrom<Upstream: Publisher,
Other: Publisher,
Output>: Publisher where Upstream.Failure == Other.Failure {
public typealias Failure = Upstream.Failure
public typealias ResultSelector = (Upstream.Output, Other.Output) -> Output

private let upstream: Upstream
private let second: Other
private let resultSelector: ResultSelector
private var latestValue: Other.Output?

init(upstream: Upstream,
second: Other,
resultSelector: @escaping ResultSelector) {
self.upstream = upstream
self.second = second
self.resultSelector = resultSelector
}

public func receive<S: Subscriber>(subscriber: S) where Failure == S.Failure, Output == S.Input {
subscriber.receive(subscription: Subscription(upstream: upstream,
downstream: subscriber,
second: second,
resultSelector: resultSelector))
/// Merges two publishers into a single publisher by combining each value
/// from self with the latest value from the second publisher, if any.
///
/// - parameter other: A second publisher source.
/// - parameter resultSelector: Function to invoke for each value from the self combined
/// with the latest value from the second source, if any.
///
/// - returns: A publisher containing the result of combining each value of the self
/// with the latest value from the second publisher, if any, using the
/// specified result selector function.
func withLatestFrom<Other: Publisher, Result>(_ other: Other,
resultSelector: @escaping (Output, Other.Output) -> Result)
-> AnyPublisher<Result, Failure>
where Other.Failure == Failure {
let upstream = share()

return other
.map { second in upstream.map { resultSelector($0, second) } }
.switchToLatest()
.zip(upstream) // `zip`ping and discarding `\.1` allows for
// upstream completions to be projected down immediately.
.map(\.0)
.eraseToAnyPublisher()
}
}
}

// MARK: - Subscription
@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
private extension Publishers.WithLatestFrom {
class Subscription<Downstream: Subscriber>: Combine.Subscription, CustomStringConvertible where Downstream.Input == Output, Downstream.Failure == Failure {
private let resultSelector: ResultSelector
private var sink: Sink<Upstream, Downstream>?

private let upstream: Upstream
private let downstream: Downstream
private let second: Other

// Secondary (other) publisher
private var latestValue: Other.Output?
private var otherSubscription: Cancellable?
private var preInitialDemand = Subscribers.Demand.none

init(upstream: Upstream,
downstream: Downstream,
second: Other,
resultSelector: @escaping ResultSelector) {
self.upstream = upstream
self.second = second
self.downstream = downstream
self.resultSelector = resultSelector

trackLatestFromSecond { [weak self] in
guard let self = self else { return }
self.request(self.preInitialDemand)
self.preInitialDemand = .none
}
/// Merges three publishers into a single publisher by combining each value
/// from self with the latest value from the second and third publisher, if any.
///
/// - parameter other: A second publisher source.
/// - parameter other1: A third publisher source.
/// - parameter resultSelector: Function to invoke for each value from the self combined
/// with the latest value from the second and third source, if any.
///
/// - returns: A publisher containing the result of combining each value of the self
/// with the latest value from the second and third publisher, if any, using the
/// specified result selector function.
func withLatestFrom<Other: Publisher, Other1: Publisher, Result>(_ other: Other,
_ other1: Other1,
resultSelector: @escaping (Output, (Other.Output, Other1.Output)) -> Result)
-> AnyPublisher<Result, Failure>
where Other.Failure == Failure, Other1.Failure == Failure {
withLatestFrom(other.combineLatest(other1), resultSelector: resultSelector)
}

func request(_ demand: Subscribers.Demand) {
guard latestValue != nil else {
preInitialDemand += demand
return
}

self.sink?.demand(demand)
/// Merges four publishers into a single publisher by combining each value
/// from self with the latest value from the second, third and fourth publisher, if any.
///
/// - parameter other: A second publisher source.
/// - parameter other1: A third publisher source.
/// - parameter other2: A fourth publisher source.
/// - parameter resultSelector: Function to invoke for each value from the self combined
/// with the latest value from the second, third and fourth source, if any.
///
/// - returns: A publisher containing the result of combining each value of the self
/// with the latest value from the second, third and fourth publisher, if any, using the
/// specified result selector function.
func withLatestFrom<Other: Publisher, Other1: Publisher, Other2: Publisher, Result>(_ other: Other,
_ other1: Other1,
_ other2: Other2,
resultSelector: @escaping (Output, (Other.Output, Other1.Output, Other2.Output)) -> Result)
-> AnyPublisher<Result, Failure>
where Other.Failure == Failure, Other1.Failure == Failure, Other2.Failure == Failure {
withLatestFrom(other.combineLatest(other1, other2), resultSelector: resultSelector)
}

// Create an internal subscription to the `Other` publisher,
// constantly tracking its latest value
private func trackLatestFromSecond(onInitialValue: @escaping () -> Void) {
var gotInitialValue = false

let subscriber = AnySubscriber<Other.Output, Other.Failure>(
receiveSubscription: { [weak self] subscription in
self?.otherSubscription = subscription
subscription.request(.unlimited)
},
receiveValue: { [weak self] value in
guard let self = self else { return .none }
self.latestValue = value

if !gotInitialValue {
// When getting initial value, start pulling values
// from upstream in the main sink
self.sink = Sink(upstream: self.upstream,
downstream: self.downstream,
transformOutput: { [weak self] value in
guard let self = self,
let other = self.latestValue else { return nil }

return self.resultSelector(value, other)
},
transformFailure: { $0 })

// Signal initial value to start fulfilling downstream demand
gotInitialValue = true
onInitialValue()
}

return .unlimited
},
receiveCompletion: nil)

self.second.subscribe(subscriber)
/// Upon an emission from self, emit the latest value from the
/// second publisher, if any exists.
///
/// - parameter other: A second publisher source.
///
/// - returns: A publisher containing the latest value from the second publisher, if any.
func withLatestFrom<Other: Publisher>(_ other: Other)
-> AnyPublisher<Other.Output, Failure>
where Other.Failure == Failure {
withLatestFrom(other) { $1 }
}

var description: String {
return "WithLatestFrom.Subscription<\(Output.self), \(Failure.self)>"
/// Upon an emission from self, emit the latest value from the
/// second and third publisher, if any exists.
///
/// - parameter other: A second publisher source.
/// - parameter other1: A third publisher source.
///
/// - returns: A publisher containing the latest value from the second and third publisher, if any.
func withLatestFrom<Other: Publisher, Other1: Publisher>(_ other: Other,
_ other1: Other1)
-> AnyPublisher<(Other.Output, Other1.Output), Failure>
where Other.Failure == Failure, Other1.Failure == Failure {
withLatestFrom(other, other1) { $1 }
}

func cancel() {
sink = nil
otherSubscription?.cancel()
/// Upon an emission from self, emit the latest value from the
/// second, third and forth publisher, if any exists.
///
/// - parameter other: A second publisher source.
/// - parameter other1: A third publisher source.
/// - parameter other2: A forth publisher source.
///
/// - returns: A publisher containing the latest value from the second, third and forth publisher, if any.
func withLatestFrom<Other: Publisher, Other1: Publisher, Other2: Publisher>(_ other: Other,
_ other1: Other1,
_ other2: Other2)
-> AnyPublisher<(Other.Output, Other1.Output, Other2.Output), Failure>
where Other.Failure == Failure, Other1.Failure == Failure, Other2.Failure == Failure {
withLatestFrom(other, other1, other2) { $1 }
}
}
}
#endif
51 changes: 51 additions & 0 deletions Tests/WithLatestFromTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -386,5 +386,56 @@ class WithLatestFromTests: XCTestCase {
subject1.send(completion: .finished)
XCTAssertTrue(completed)
}

func testWithLatestFromCompletion() {
let subject1 = PassthroughSubject<Int, Never>()
let subject2 = PassthroughSubject<String, Never>()
var results = [String]()
var completed = false

subscription = subject1
.withLatestFrom(subject2)
.sink(receiveCompletion: { _ in completed = true },
receiveValue: { results.append($0) })

subject1.send(completion: .finished)
XCTAssertTrue(completed)
XCTAssertTrue(results.isEmpty)
}

func testWithLatestFrom2Completion() {
let subject1 = PassthroughSubject<Int, Never>()
let subject2 = PassthroughSubject<String, Never>()
let subject3 = PassthroughSubject<String, Never>()
var results = [(String, String)]()
var completed = false

subscription = subject1
.withLatestFrom(subject2, subject3)
.sink(receiveCompletion: { _ in completed = true },
receiveValue: { results.append($0) })

subject1.send(completion: .finished)
XCTAssertTrue(completed)
XCTAssertTrue(results.isEmpty)
}

func testWithLatestFrom3Completion() {
let subject1 = PassthroughSubject<Int, Never>()
let subject2 = PassthroughSubject<String, Never>()
let subject3 = PassthroughSubject<String, Never>()
let subject4 = PassthroughSubject<String, Never>()
var results = [(String, String, String)]()
var completed = false

subscription = subject1
.withLatestFrom(subject2, subject3, subject4)
.sink(receiveCompletion: { _ in completed = true },
receiveValue: { results.append($0) })

subject1.send(completion: .finished)
XCTAssertTrue(completed)
XCTAssertTrue(results.isEmpty)
}
}
#endif

0 comments on commit fc3e405

Please sign in to comment.