This is Understanding Combine, written by Matt Neuburg. It is a work in progress. Corrections and suggestions are greatly appreciated (you can comment here). So are donations; please consider keeping me going by funding this work at http://www.paypal.me/mattneub. Or buy my books: the current editions are iOS 13 Programming Fundamentals with Swift and Programming iOS 13. Thank you!


Custom Publishers

What would it take to write your own publisher? The Publisher protocol itself is extremely simple; it requires just one method:

func receive<S>(subscriber: S) 
    where S : Subscriber, Self.Failure == S.Failure, Self.Output == S.Input

The idea here is we that are being asked to let ourselves be subscribed to by a Subscriber (whose types must match our own, as I discussed earlier). And it is easy to write a skeleton publisher that conforms to the protocol requirements:

struct MyCoolPublisher : Publisher {
    typealias Output = String
    typealias Failure = Never
    func receive<S>(subscriber: S) 
        where S:Subscriber, S.Input == Output, S.Failure == Failure {
            // now what?
    }
}

The problem is the implementation of receive(subscriber:). As I say in the comment: Now what? Although the protocol itself has no way of enforcing this, our job here is to make a Subscription object and call the subscriber’s receive(subscription:):

func receive<S>(subscriber: S) 
    where S:Subscriber, S.Input == Output, S.Failure == Failure {
        let subscription = // make a Subscription
        subscriber.receive(subscription:subscription)
}

So we’re going to have to make a Subscription. And that’s not all. The Publisher and the Subscription, together, must go on fulfilling the obligations of a Publisher and a Subscription for as long as they both continue to operate. In sum, it isn’t enough to conform to the Publisher protocol; we must also behave like a publisher! If you look at what a subscriber expects from its publisher, you can get a sense for what our obligations are:

  • After the Subscription is sent the request message asking for values to be delivered, then when a value is produced, the subscriber needs to be sent the receive(_:) method with that value.

  • If there can be a failure, then if there is a failure, the subscriber needs to be sent receive(completion:) with the .failure case.

  • If we limit the number of values we will produce, then if we have produced our last value, the subscriber needs to be sent receive(completion:) with the .finished case.

  • The subscription must conform to Cancellable. If it is sent the cancel method, we need to stop producing values and tear ourselves down in good order.

Clearly these obligations can be quite involved. And equally clearly, the bulk of the work seems to involve, not the Publisher, but the Subscription. So we are compelled, whether we like it or not, to think about how to write a Subscription.

By convention, the Subscription class is usually a nested class called Inner. And we can see immediately at least one thing our Inner class will certainly need to do: it must keep a reference to the Subscriber so that it can send it values! So we can fill out our skeleton a little by giving Inner a Subscriber instance variable, which I like to call downstream, and by conforming to the Subscription protocol requirements:

struct MyCoolPublisher : Publisher {
    typealias Output = String
    typealias Failure = Never
    func receive<S>(subscriber: S) 
        where S:Subscriber, S.Input == Output, S.Failure == Failure {
            subscriber.receive(subscription:Inner(downstream:subscriber))
    }
    class Inner<S> : Subscription 
        where S:Subscriber, S.Input == Output, S.Failure == Failure {
            var downstream : S?
            init(downstream:S) {
                self.downstream = downstream
            }
            func request(_ demand: Subscribers.Demand) {
                // ???
            }
            func cancel() {
                // ???
            }
    }
}

We have now pushed all the real work into Inner, and can proceed to implement the rest of this publisher there. How we do that will depend on what this publisher publishes.

For example, let’s say our publisher’s job is to emit the names of the three Pep Boys, "Manny", "Moe", and "Jack", in order, and then stop. So we’ll keep a list of Pep Boys, and we’ll respond to a demand by sending those values to the subscriber; and when we come to the end of the list, we’ll signal completion:

var boys = Array(["Manny", "Moe", "Jack"].reversed()) // so we can use popLast
func request(_ demand: Subscribers.Demand) {
    guard let downstream = self.downstream else { return }
    while let boy = boys.popLast() {
        _ = downstream.receive(boy)
        if boys.isEmpty {
            downstream.receive(completion: .finished)
            self.downstream = nil
            return
        }
    }
}

As you can see, once we’ve signalled completion, we also release our downstream subscriber. We should do the same thing in our implementation of cancel; that’s really all that cancellation consists of:

func cancel() {
    downstream = nil
}

If we now attach a Sink to our publisher, we’ll see that it does in fact receive "Manny", "Moe", and "Jack", followed by a .finished completion. Our publisher is working! It’s only a “toy” publisher, but it demonstrates clearly the fundamentals of writing a publisher.

Responding to Backpressure

Perhaps you’ve noticed, though, that our publisher is failing to respond to backpressure. In our implementation of request(_:), we are not examining the incoming demand parameter to see how many values we’re being asked for; and when we send a value to the downstream subscriber, we are completely ignoring the demand that is returned. Nevertheless, if we’re writing a publisher for our own use, that might be perfectly acceptable. Some publishers publish just one value; most subscribers don’t exert any backpressure; and in any case not every publisher needs to respond to backpressure.

If we do want to implement a response to backpressure, that turns out to be quite a tricky problem, because a limited demand can arrive while we’re in the middle of fulfilling a limited demand. We need to keep a running total of how much total demand has piled up at every given moment. Fortunately, the Subscribers.Demand struct defines arithmetic operators, comparison operators, and range operators, to work with other Subscriber.Demand instances, as well as with Int. Thus it is easy to manipulate a demand as if it were an Int.

So what exactly do we need to do? We’ll keep a demand instance property called limit that starts out life as .none. When a demand comes in, we’ll add that to our limit. When we vend a value to the subscriber, we’ll subtract 1 from our limit. Any time the limit is .none, we won’t vend any values:

var limit = Subscribers.Demand.none
func request(_ demand: Subscribers.Demand) {
    guard let downstream = self.downstream else { return }
    self.limit += demand
    while let boy = boys.popLast(), limit > .none {
        let newdemand = downstream.receive(boy)
        self.limit -= 1 // because we just vended one
        self.limit += newdemand // because more demand just arrived
        if boys.isEmpty {
            downstream.receive(completion: .finished)
            self.downstream = nil
            return
        }
    }
}

It works! That was an easy implementation, of course, because we are not emitting values asynchronously. Obviously, responding to backpressure asynchronously would be more involved; I’m not going to go into the details here.

A UIControl Publisher

Let’s graduate now from our “toy” example publisher to a publisher that could prove downright useful. There’s a gaping hole in Apple’s provision of built-in publishers: the Combine framework doesn’t define a publisher for UIControl events. But clearly such a publisher is needed; what could be a more common source of asynchronous events than a UIControl? Let’s write that publisher.

I’ll posit a Publisher struct called ControlPublisher. Its job will be to emit a value when the control that vends it fires its .primaryActionTriggered control event (to keep things simple, and because that’s the control event we’re most likely to want to hear about). What sort of value should this publisher emit? I’m going to suggest that it should emit a reference to the control itself; that, after all, is what a control passes to a control event handler function when an event occurs.

In order to make itself the target of control events from the control, the Subscription is going to need to maintain a reference to the control; and so the Publisher itself will need to be initialized with the control, so it can pass it to the Subscription. Here’s an initial sketch, based on the boilerplate we’ve already developed:

struct ControlPublisher : Publisher {
    typealias Output = UIControl
    typealias Failure = Never
    unowned let control : UIControl
    init(control:UIControl) { self.control = control }
    func receive<S>(subscriber: S) 
        where S : Subscriber, S.Input == Output, S.Failure == Failure {
            subscriber.receive(subscription: 
                Inner(downstream: subscriber, sender: self.control))
    }
    class Inner <S:Subscriber>: NSObject, Subscription 
        where S.Input == Output, S.Failure == Failure {
            weak var sender : UIControl?
            var downstream : S?
            init(downstream: S, sender: UIControl) {
                self.downstream = downstream
                self.sender = sender
                super.init()
            }
            func request(_ demand: Subscribers.Demand) {
                // ?
            }
            func cancel() {
                // ?
            }
        }
}

Now let’s start filling in the blanks. First, what should happen when our request(_:) method is called with a demand? I’m not going to bother examining the demand; this isn’t going to be a publisher that responds to backpressure. Instead, I’m going to assume that the arrival of a demand means that the subscriber is ready to hear about the control’s .primaryActionTriggered event firing. That means that we need to hear about the control’s .primaryActionTriggered event firing! So I’ll add self (the Subscription object) as the control’s target:

func request(_ demand: Subscribers.Demand) {
    self.sender?.addTarget(self, 
        action: #selector(doAction), 
        for: .primaryActionTriggered)
}
@objc func doAction(_ sender:UIControl) {
    // ?
}

So what should we do when the control event fires and our doAction is called? We should pass a value down to the subscriber:

@objc func doAction(_ sender:UIControl) {
    guard let sender = self.sender else {return}
    _ = self.downstream?.receive(sender)
}

Our only remaining considerations have to do with management of the stored UIControl reference. Remember, we are registered as the control’s target. If we are cancelled, or if we are about to go out of existence, let’s play it safe by unregistering ourselves as target. We will also release the downstream as well as our reference to the control. A utility method will come in handy here:

private func finish() {
    self.sender?.removeTarget( self, 
        action: #selector(doAction), for: .primaryActionTriggered)
    self.sender = nil
    self.downstream = nil
}

Our cancel method calls that method, and our deinit calls it as well, and we’re done:

func cancel() {
    self.finish()
}
deinit {
    self.finish()
}

Our publisher is finished! I’ll make it available to every UIControl by injecting a publisher method by way of an extension:

extension UIControl {
    func publisher() -> ControlPublisher {
        ControlPublisher(control:self)
    }
}

Let’s try it! My view controller’s view contains a button, and the view controller has an outlet to that button. In the view controller’s viewDidLoad, I’ll create a pipeline from the button’s publisher:

self.myButton.publisher()
    .sink { _ in print("button!") }
        .store(in:&self.storage)

We run the app and tap the button. The console displays "button!" — our publisher is working.


Table of Contents