From c618f4d8eda81c877b9ae57f7c49bf95440e06fc Mon Sep 17 00:00:00 2001 From: Daniel Peter Date: Sun, 22 Nov 2020 12:08:22 +0100 Subject: [PATCH 1/7] Adds ConcatMap operator --- Sources/Operators/ConcatMap.swift | 27 ++++++++ Tests/ConcatMapTests.swift | 104 ++++++++++++++++++++++++++++++ 2 files changed, 131 insertions(+) create mode 100644 Sources/Operators/ConcatMap.swift create mode 100644 Tests/ConcatMapTests.swift diff --git a/Sources/Operators/ConcatMap.swift b/Sources/Operators/ConcatMap.swift new file mode 100644 index 0000000..9711fe5 --- /dev/null +++ b/Sources/Operators/ConcatMap.swift @@ -0,0 +1,27 @@ +// +// ConcatMap.swift +// CombineExt +// +// Created by Daniel Peter on 22/11/2020. +// Copyright © 2020 Combine Community. All rights reserved. +// + +#if canImport(Combine) +import Combine + +@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) +public extension Publisher { + /// Transforms an output value into a new publisher, and flattens the stream of events from these multiple upstream publishers to appear as if they were coming from a single stream of events. + /// + /// Mapping to a new publisher will keep the subscription to the previous one alive until it completes and only then subscribe to the new one. This also means that all values sent by the new publisher are not forwarded as long as the previous one hasn't completed. + /// + /// - parameter transform: A transform to apply to each emitted value, from which you can return a new Publisher + /// + /// - returns: A publisher emitting the values of all emitted publishers in order. + func concatMap( + _ transform: @escaping (Self.Output) -> P + ) -> Publishers.FlatMap where T == P.Output, P: Publisher, Self.Failure == P.Failure { + flatMap(maxPublishers: .max(1), transform) + } +} +#endif diff --git a/Tests/ConcatMapTests.swift b/Tests/ConcatMapTests.swift new file mode 100644 index 0000000..7696fb2 --- /dev/null +++ b/Tests/ConcatMapTests.swift @@ -0,0 +1,104 @@ +// +// ConcatMapTests.swift +// CombineExtTests +// +// Created by Daniel Peter on 22/11/2020. +// Copyright © 2020 Combine Community. All rights reserved. +// + +#if !os(watchOS) +import XCTest +import Combine +import CombineExt + +@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) +final class ConcatMapTests: XCTestCase { + private enum TestError: Swift.Error { + case failure + } + private typealias P = PassthroughSubject + private var cancellables: Set! + + override func setUp() { + super.setUp() + cancellables = [] + } + + func test_publishes_values_inOrder() { + var receivedValues = [Int]() + let expectedValues = [1, 2, 3, 4] + + let firstPublisher = P() + let secondPublisher = P() + + let sut = PassthroughSubject() + + sut.concatMap { $0 } + .sink( + receiveCompletion: { _ in }, + receiveValue: { value in receivedValues.append(value) } + ) + .store(in: &cancellables) + + sut.send(firstPublisher) + sut.send(secondPublisher) + + firstPublisher.send(1) + firstPublisher.send(2) + firstPublisher.send(completion: .finished) + + secondPublisher.send(3) + secondPublisher.send(4) + secondPublisher.send(completion: .finished) + + XCTAssertEqual(expectedValues, receivedValues) + } + + func test_completes_when_last_publisher_completes() { + var receivedCompletion: Subscribers.Completion? + + let firstPublisher = P() + let secondPublisher = P() + + let sut = PassthroughSubject() + + sut.concatMap { $0 } + .sink( + receiveCompletion: { receivedCompletion = $0 }, + receiveValue: { _ in } + ) + .store(in: &cancellables) + + sut.send(firstPublisher) + sut.send(secondPublisher) + firstPublisher.send(completion: .finished) + XCTAssertNil(receivedCompletion) + secondPublisher.send(completion: .finished) + XCTAssertNotNil(receivedCompletion) + } + + func test_completes_with_failure_if_publisher_fails() { + let expectedCompletion = Subscribers.Completion.failure(.failure) + var receivedCompletion: Subscribers.Completion? + + let firstPublisher = P() + let secondPublisher = P() + + let sut = PassthroughSubject() + + sut.concatMap { $0 } + .sink( + receiveCompletion: { receivedCompletion = $0 }, + receiveValue: { _ in } + ) + .store(in: &cancellables) + + sut.send(firstPublisher) + sut.send(secondPublisher) + firstPublisher.send(completion: .failure(.failure)) + XCTAssertEqual(receivedCompletion, expectedCompletion) + secondPublisher.send(completion: .finished) + XCTAssertEqual(receivedCompletion, expectedCompletion) + } +} +#endif From 20423b2df2299804b59c803f521c3f67e2bdad38 Mon Sep 17 00:00:00 2001 From: Daniel Peter Date: Sun, 22 Nov 2020 13:15:22 +0100 Subject: [PATCH 2/7] Changes completion test to check for upstream completion --- Tests/ConcatMapTests.swift | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/Tests/ConcatMapTests.swift b/Tests/ConcatMapTests.swift index 7696fb2..9f8adce 100644 --- a/Tests/ConcatMapTests.swift +++ b/Tests/ConcatMapTests.swift @@ -54,7 +54,7 @@ final class ConcatMapTests: XCTestCase { XCTAssertEqual(expectedValues, receivedValues) } - func test_completes_when_last_publisher_completes() { + func test_completes_when_upstream_completes() { var receivedCompletion: Subscribers.Completion? let firstPublisher = P() @@ -74,6 +74,8 @@ final class ConcatMapTests: XCTestCase { firstPublisher.send(completion: .finished) XCTAssertNil(receivedCompletion) secondPublisher.send(completion: .finished) + XCTAssertNil(receivedCompletion) + sut.send(completion: .finished) XCTAssertNotNil(receivedCompletion) } From d46de4317ee352e30f0e68bad091ee27d5aa629b Mon Sep 17 00:00:00 2001 From: Daniel Peter Date: Sun, 22 Nov 2020 14:01:56 +0100 Subject: [PATCH 3/7] Adds Publishers.ConcatMap --- Sources/Operators/ConcatMap.swift | 142 +++++++++++++++++++++++++++++- Tests/ConcatMapTests.swift | 10 ++- 2 files changed, 148 insertions(+), 4 deletions(-) diff --git a/Sources/Operators/ConcatMap.swift b/Sources/Operators/ConcatMap.swift index 9711fe5..dcf30e2 100644 --- a/Sources/Operators/ConcatMap.swift +++ b/Sources/Operators/ConcatMap.swift @@ -8,6 +8,7 @@ #if canImport(Combine) import Combine +import class Foundation.NSRecursiveLock @available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) public extension Publisher { @@ -20,8 +21,145 @@ public extension Publisher { /// - returns: A publisher emitting the values of all emitted publishers in order. func concatMap( _ transform: @escaping (Self.Output) -> P - ) -> Publishers.FlatMap where T == P.Output, P: Publisher, Self.Failure == P.Failure { - flatMap(maxPublishers: .max(1), transform) + ) -> Publishers.ConcatMap where T == P.Output, P: Publisher, Self.Failure == P.Failure { + return Publishers.ConcatMap(upstream: self, transform: transform) + } +} + +@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) +public extension Publishers { + final class ConcatMap: Publisher where NewPublisher: Publisher, Upstream: Publisher, NewPublisher.Failure == Upstream.Failure { + public typealias Transform = (Upstream.Output) -> NewPublisher + public typealias Output = NewPublisher.Output + public typealias Failure = Upstream.Failure + + public let upstream: Upstream + public let transform: Transform + + public init( + upstream: Upstream, + transform: @escaping Transform + ) { + self.upstream = upstream + self.transform = transform + } + + public func receive(subscriber: S) + where Output == S.Input, Failure == S.Failure { + subscriber.receive( + subscription: Subscription( + upstream: upstream, + transform: transform, + downstream: subscriber + ) + ) + } + } +} + +// MARK: - Subscription +@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) +private extension Publishers.ConcatMap { + final class Subscription: Combine.Subscription where Downstream.Input == Output, Downstream.Failure == Failure { + private var sink: Sink? + + init( + upstream: Upstream, + transform: @escaping Transform, + downstream: Downstream + ) { + self.sink = Sink( + upstream: upstream, + downstream: downstream, + transform: { transform($0) } + ) + } + + func request(_ demand: Subscribers.Demand) { + sink?.demand(demand) + } + + func cancel() { + sink = nil + } + } +} + +// MARK: - Sink +@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) +private extension Publishers.ConcatMap { + final class Sink: CombineExt.Sink + where Downstream.Input == Output, Downstream.Failure == Failure { + private let lock = NSRecursiveLock() + private let transform: Transform + private var activePublisher: NewPublisher? + private var bufferedPublishers: [NewPublisher] + private var cancellables: Set + + init( + upstream: Upstream, + downstream: Downstream, + transform: @escaping Transform + ) { + self.transform = transform + self.bufferedPublishers = [] + self.cancellables = [] + super.init( + upstream: upstream, + downstream: downstream, + transformFailure: { $0 } + ) + } + + override func receive(_ input: Upstream.Output) -> Subscribers.Demand { + let mapped = transform(input) + + lock.lock() + if activePublisher == nil { + lock.unlock() + setActivePublisher(mapped) + } else { + lock.unlock() + bufferedPublishers.append(mapped) + } + + return .unlimited + } + + private func setActivePublisher(_ publisher: NewPublisher) { + lock.lock() + defer { lock.unlock() } + activePublisher = publisher + + publisher.sink( + receiveCompletion: { completion in + switch completion { + case .finished: + self.lock.lock() + guard let next = self.bufferedPublishers.first else { + self.lock.unlock() + return + } + self.bufferedPublishers.removeFirst() + self.lock.unlock() + self.setActivePublisher(next) + case .failure(let error): + self.receive(completion: .failure(error)) + } + }, + receiveValue: { value in + _ = self.buffer.buffer(value: value) + } + ) + .store(in: &cancellables) + } + } +} + +@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) +extension Publishers.ConcatMap.Subscription: CustomStringConvertible { + var description: String { + return "ConcatMap.Subscription<\(Downstream.Input.self), \(Downstream.Failure.self)>" } } #endif diff --git a/Tests/ConcatMapTests.swift b/Tests/ConcatMapTests.swift index 9f8adce..d7c3259 100644 --- a/Tests/ConcatMapTests.swift +++ b/Tests/ConcatMapTests.swift @@ -26,10 +26,11 @@ final class ConcatMapTests: XCTestCase { func test_publishes_values_inOrder() { var receivedValues = [Int]() - let expectedValues = [1, 2, 3, 4] + let expectedValues = [1, 2, 4, 5, 6] let firstPublisher = P() let secondPublisher = P() + let thirdPublisher = P() let sut = PassthroughSubject() @@ -42,15 +43,20 @@ final class ConcatMapTests: XCTestCase { sut.send(firstPublisher) sut.send(secondPublisher) + sut.send(thirdPublisher) firstPublisher.send(1) firstPublisher.send(2) + // values sent onto the second publisher will be ignored as long as the first publisher hasn't completed + secondPublisher.send(3) firstPublisher.send(completion: .finished) - secondPublisher.send(3) secondPublisher.send(4) + secondPublisher.send(5) secondPublisher.send(completion: .finished) + thirdPublisher.send(6) + XCTAssertEqual(expectedValues, receivedValues) } From 15176bca3be1f9f6ac813d9b12a16bf3032ea4ef Mon Sep 17 00:00:00 2001 From: Daniel Peter Date: Sun, 22 Nov 2020 14:13:41 +0100 Subject: [PATCH 4/7] fixup! Adds Publishers.ConcatMap --- Sources/Operators/ConcatMap.swift | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/Sources/Operators/ConcatMap.swift b/Sources/Operators/ConcatMap.swift index dcf30e2..c2f31ca 100644 --- a/Sources/Operators/ConcatMap.swift +++ b/Sources/Operators/ConcatMap.swift @@ -28,7 +28,7 @@ public extension Publisher { @available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) public extension Publishers { - final class ConcatMap: Publisher where NewPublisher: Publisher, Upstream: Publisher, NewPublisher.Failure == Upstream.Failure { + struct ConcatMap: Publisher where NewPublisher: Publisher, Upstream: Publisher, NewPublisher.Failure == Upstream.Failure { public typealias Transform = (Upstream.Output) -> NewPublisher public typealias Output = NewPublisher.Output public typealias Failure = Upstream.Failure @@ -88,8 +88,7 @@ private extension Publishers.ConcatMap { // MARK: - Sink @available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) private extension Publishers.ConcatMap { - final class Sink: CombineExt.Sink - where Downstream.Input == Output, Downstream.Failure == Failure { + final class Sink: CombineExt.Sink where Downstream.Input == Output, Downstream.Failure == Failure { private let lock = NSRecursiveLock() private let transform: Transform private var activePublisher: NewPublisher? From eb4cd7fd31aa5949bdc4954240e77036edffdd72 Mon Sep 17 00:00:00 2001 From: Daniel Peter Date: Sun, 22 Nov 2020 15:56:41 +0100 Subject: [PATCH 5/7] Publishes values of subsequent publisher after emptying publisher queue --- Sources/Operators/ConcatMap.swift | 11 ++-- Tests/ConcatMapTests.swift | 84 ++++++++++++++++++++++++++++--- 2 files changed, 82 insertions(+), 13 deletions(-) diff --git a/Sources/Operators/ConcatMap.swift b/Sources/Operators/ConcatMap.swift index c2f31ca..196036e 100644 --- a/Sources/Operators/ConcatMap.swift +++ b/Sources/Operators/ConcatMap.swift @@ -112,13 +112,12 @@ private extension Publishers.ConcatMap { override func receive(_ input: Upstream.Output) -> Subscribers.Demand { let mapped = transform(input) - lock.lock() + defer { lock.unlock() } + if activePublisher == nil { - lock.unlock() setActivePublisher(mapped) } else { - lock.unlock() bufferedPublishers.append(mapped) } @@ -132,15 +131,15 @@ private extension Publishers.ConcatMap { publisher.sink( receiveCompletion: { completion in + self.lock.lock() + defer { self.lock.unlock() } switch completion { case .finished: - self.lock.lock() guard let next = self.bufferedPublishers.first else { - self.lock.unlock() + self.activePublisher = nil return } self.bufferedPublishers.removeFirst() - self.lock.unlock() self.setActivePublisher(next) case .failure(let error): self.receive(completion: .failure(error)) diff --git a/Tests/ConcatMapTests.swift b/Tests/ConcatMapTests.swift index d7c3259..6c7bef3 100644 --- a/Tests/ConcatMapTests.swift +++ b/Tests/ConcatMapTests.swift @@ -24,9 +24,9 @@ final class ConcatMapTests: XCTestCase { cancellables = [] } - func test_publishes_values_inOrder() { + func test_publishes_values_in_order() { var receivedValues = [Int]() - let expectedValues = [1, 2, 4, 5, 6] + let expectedValues = [1, 2, 3] let firstPublisher = P() let secondPublisher = P() @@ -43,19 +43,89 @@ final class ConcatMapTests: XCTestCase { sut.send(firstPublisher) sut.send(secondPublisher) + + firstPublisher.send(1) + firstPublisher.send(completion: .finished) + + secondPublisher.send(2) sut.send(thirdPublisher) + secondPublisher.send(completion: .finished) + + thirdPublisher.send(3) + + XCTAssertEqual(expectedValues, receivedValues) + } + + func test_ignores_values_of_subsequent_while_previous_hasNot_completed() { + var receivedValues = [Int]() + let expectedValues = [1, 3] + + let firstPublisher = P() + let secondPublisher = P() + + let sut = PassthroughSubject() + + sut.concatMap { $0 } + .sink( + receiveCompletion: { _ in }, + receiveValue: { value in receivedValues.append(value) } + ) + .store(in: &cancellables) + + sut.send(firstPublisher) + sut.send(secondPublisher) firstPublisher.send(1) - firstPublisher.send(2) - // values sent onto the second publisher will be ignored as long as the first publisher hasn't completed + secondPublisher.send(2) + firstPublisher.send(completion: .finished) + secondPublisher.send(3) + secondPublisher.send(completion: .finished) + + XCTAssertEqual(expectedValues, receivedValues) + } + + func test_publishes_values_of_subsequent_publisher_after_emptying_publisher_queue() { + var receivedValues = [Int]() + let expectedValues = [1, 2] + + let firstPublisher = P() + let secondPublisher = P() + + let sut = PassthroughSubject() + + sut.concatMap { $0 } + .sink( + receiveCompletion: { _ in }, + receiveValue: { value in receivedValues.append(value) } + ) + .store(in: &cancellables) + + sut.send(firstPublisher) + firstPublisher.send(1) firstPublisher.send(completion: .finished) - secondPublisher.send(4) - secondPublisher.send(5) + sut.send(secondPublisher) + secondPublisher.send(2) secondPublisher.send(completion: .finished) - thirdPublisher.send(6) + XCTAssertEqual(expectedValues, receivedValues) + } + + func test_synchronous_completion() { + var receivedValues = [Int]() + let expectedValues = [1, 2] + let firstPublisher = Just(1) + let secondPublisher = Just(2) + + let sut = PassthroughSubject, Never>() + + sut.concatMap { $0 } + .sink { value in receivedValues.append(value) } + .store(in: &cancellables) + + sut.send(firstPublisher) + sut.send(secondPublisher) XCTAssertEqual(expectedValues, receivedValues) } From 844178ef68bcdc2605b9c0a1bde61064de4707c6 Mon Sep 17 00:00:00 2001 From: Daniel Peter Date: Mon, 12 Apr 2021 10:44:03 +0200 Subject: [PATCH 6/7] Adds Inner and OuterSink for ConcatMap --- Sources/Common/DemandBuffer.swift | 9 +- Sources/Operators/ConcatMap.swift | 174 +++++++++++++++++++++--------- 2 files changed, 130 insertions(+), 53 deletions(-) diff --git a/Sources/Common/DemandBuffer.swift b/Sources/Common/DemandBuffer.swift index 5df9743..5204262 100644 --- a/Sources/Common/DemandBuffer.swift +++ b/Sources/Common/DemandBuffer.swift @@ -27,6 +27,11 @@ class DemandBuffer { private var completion: Subscribers.Completion? private var demandState = Demand() + /// The remaining demand, i.e. the number of values the subscriber still expects + var remainingDemand: Subscribers.Demand { + demandState.requested - demandState.sent + } + /// Initialize a new demand buffer for a provided downstream subscriber /// /// - parameter subscriber: The downstream subscriber demanding events @@ -70,7 +75,7 @@ class DemandBuffer { /// Signal to the buffer that the downstream requested new demand /// - /// - note: The buffer will attempt to flush as many events rqeuested + /// - note: The buffer will attempt to flush as many events requested /// by the downstream at this point func demand(_ demand: Subscribers.Demand) -> Subscribers.Demand { flush(adding: demand) @@ -110,7 +115,7 @@ class DemandBuffer { return .none } - let sentDemand = demandState.requested - demandState.sent + let sentDemand = remainingDemand demandState.sent += sentDemand return sentDemand } diff --git a/Sources/Operators/ConcatMap.swift b/Sources/Operators/ConcatMap.swift index 196036e..e329962 100644 --- a/Sources/Operators/ConcatMap.swift +++ b/Sources/Operators/ConcatMap.swift @@ -8,7 +8,6 @@ #if canImport(Combine) import Combine -import class Foundation.NSRecursiveLock @available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) public extension Publisher { @@ -49,8 +48,8 @@ public extension Publishers { subscriber.receive( subscription: Subscription( upstream: upstream, - transform: transform, - downstream: subscriber + downstream: subscriber, + transform: transform ) ) } @@ -60,18 +59,21 @@ public extension Publishers { // MARK: - Subscription @available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) private extension Publishers.ConcatMap { - final class Subscription: Combine.Subscription where Downstream.Input == Output, Downstream.Failure == Failure { - private var sink: Sink? + final class Subscription: Combine.Subscription where + Downstream.Input == NewPublisher.Output, + Downstream.Failure == Failure + { + private var sink: OuterSink? init( upstream: Upstream, - transform: @escaping Transform, - downstream: Downstream + downstream: Downstream, + transform: @escaping Transform ) { - self.sink = Sink( + self.sink = OuterSink( upstream: upstream, downstream: downstream, - transform: { transform($0) } + transform: transform ) } @@ -88,68 +90,138 @@ private extension Publishers.ConcatMap { // MARK: - Sink @available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) private extension Publishers.ConcatMap { - final class Sink: CombineExt.Sink where Downstream.Input == Output, Downstream.Failure == Failure { + final class OuterSink: Subscriber where + Downstream.Input == NewPublisher.Output, + Downstream.Failure == Upstream.Failure + { private let lock = NSRecursiveLock() + + private let downstream: Downstream private let transform: Transform - private var activePublisher: NewPublisher? - private var bufferedPublishers: [NewPublisher] - private var cancellables: Set + + private var upstreamSubscription: Combine.Subscription? + private var innerSink: InnerSink? + + private var bufferedDemand: Subscribers.Demand = .none init( upstream: Upstream, downstream: Downstream, transform: @escaping Transform ) { + self.downstream = downstream self.transform = transform - self.bufferedPublishers = [] - self.cancellables = [] + upstream.subscribe(self) + } + + func demand(_ demand: Subscribers.Demand) { + lock.lock(); defer { lock.unlock() } + if let innerSink = innerSink { + innerSink.demand(demand) + } else { + bufferedDemand = demand + } + + upstreamSubscription?.requestIfNeeded(.unlimited) + } + + func receive(_ input: Upstream.Output) -> Subscribers.Demand { + lock.lock(); defer { lock.unlock() } + let transformedPublisher = transform(input) + + if let innerSink = innerSink { + innerSink.enqueue(publisher: transformedPublisher) + } else { + innerSink = InnerSink( + outerSink: self, + upstream: transformedPublisher, + downstream: downstream + ) + + innerSink?.demand(bufferedDemand) + } + + return .unlimited + } + + func receive(subscription: Combine.Subscription) { + lock.lock(); defer { lock.unlock() } + upstreamSubscription = subscription + } + + func receive(completion: Subscribers.Completion) { + lock.lock(); defer { lock.unlock() } + innerSink = nil + downstream.receive(completion: completion) + cancelUpstream() + } + + func cancelUpstream() { + lock.lock(); defer { lock.unlock() } + upstreamSubscription.kill() + } + + deinit { cancelUpstream() } + } + + final class InnerSink: CombineExt.Sink where + Downstream.Input == NewPublisher.Output, + Downstream.Failure == Upstream.Failure + { + private let outerSink: OuterSink + private let lock: NSRecursiveLock = NSRecursiveLock() + + private var hasActiveSubscription: Bool + private var publisherQueue: [NewPublisher] + + init( + outerSink: OuterSink, + upstream: NewPublisher, + downstream: Downstream + ) { + self.outerSink = outerSink + self.hasActiveSubscription = false + self.publisherQueue = [] + super.init( upstream: upstream, - downstream: downstream, - transformFailure: { $0 } + downstream: downstream ) } - override func receive(_ input: Upstream.Output) -> Subscribers.Demand { - let mapped = transform(input) - lock.lock() - defer { lock.unlock() } - - if activePublisher == nil { - setActivePublisher(mapped) + func enqueue(publisher: NewPublisher) { + lock.lock(); defer { lock.unlock() } + if hasActiveSubscription { + publisherQueue.append(publisher) } else { - bufferedPublishers.append(mapped) + publisher.subscribe(self) } + } - return .unlimited + override func receive(_ input: NewPublisher.Output) -> Subscribers.Demand { + return buffer.buffer(value: input) } - private func setActivePublisher(_ publisher: NewPublisher) { - lock.lock() - defer { lock.unlock() } - activePublisher = publisher - - publisher.sink( - receiveCompletion: { completion in - self.lock.lock() - defer { self.lock.unlock() } - switch completion { - case .finished: - guard let next = self.bufferedPublishers.first else { - self.activePublisher = nil - return - } - self.bufferedPublishers.removeFirst() - self.setActivePublisher(next) - case .failure(let error): - self.receive(completion: .failure(error)) - } - }, - receiveValue: { value in - _ = self.buffer.buffer(value: value) + override func receive(subscription: Combine.Subscription) { + lock.lock(); defer { lock.unlock() } + hasActiveSubscription = true + + super.receive(subscription: subscription) + subscription.requestIfNeeded(buffer.remainingDemand) + } + + override func receive(completion: Subscribers.Completion) { + lock.lock(); defer { lock.unlock() } + hasActiveSubscription = false + + switch completion { + case .finished: + if !publisherQueue.isEmpty { + publisherQueue.removeFirst().subscribe(self) } - ) - .store(in: &cancellables) + case .failure: + outerSink.receive(completion: completion) + } } } } From 80ba6dd362e01bc59100627ba026572e1846d0fc Mon Sep 17 00:00:00 2001 From: Daniel Peter Date: Mon, 12 Apr 2021 11:01:11 +0200 Subject: [PATCH 7/7] Weakifies outerSink in innerSink --- Sources/Operators/ConcatMap.swift | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/Sources/Operators/ConcatMap.swift b/Sources/Operators/ConcatMap.swift index e329962..43bec21 100644 --- a/Sources/Operators/ConcatMap.swift +++ b/Sources/Operators/ConcatMap.swift @@ -111,6 +111,7 @@ private extension Publishers.ConcatMap { ) { self.downstream = downstream self.transform = transform + upstream.subscribe(self) } @@ -168,7 +169,7 @@ private extension Publishers.ConcatMap { Downstream.Input == NewPublisher.Output, Downstream.Failure == Upstream.Failure { - private let outerSink: OuterSink + private weak var outerSink: OuterSink? private let lock: NSRecursiveLock = NSRecursiveLock() private var hasActiveSubscription: Bool @@ -199,7 +200,7 @@ private extension Publishers.ConcatMap { } override func receive(_ input: NewPublisher.Output) -> Subscribers.Demand { - return buffer.buffer(value: input) + buffer.buffer(value: input) } override func receive(subscription: Combine.Subscription) { @@ -219,8 +220,9 @@ private extension Publishers.ConcatMap { if !publisherQueue.isEmpty { publisherQueue.removeFirst().subscribe(self) } - case .failure: - outerSink.receive(completion: completion) + case let .failure(error): + buffer.complete(completion: .failure(error)) + outerSink?.receive(completion: completion) } } } @@ -229,7 +231,7 @@ private extension Publishers.ConcatMap { @available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) extension Publishers.ConcatMap.Subscription: CustomStringConvertible { var description: String { - return "ConcatMap.Subscription<\(Downstream.Input.self), \(Downstream.Failure.self)>" + "ConcatMap.Subscription<\(Downstream.Input.self), \(Downstream.Failure.self)>" } } #endif