This is Understanding Combine, written by Matt Neuburg. It is a work in progress. Corrections and suggestions are greatly appreciated (you can comment here). So are donations; please consider keeping me going by funding this work at http://www.paypal.me/mattneub. Or buy my books: the current editions are iOS 13 Programming Fundamentals with Swift and Programming iOS 13. Thank you!


Buffer

.buffer (Publishers.Buffer) is apparently intended to reconcile the timing between the upstream’s production of values and the downstream’s demand for those values. It accumulates values from upstream into its internal buffer, which is of predefined size, until the buffer fills up due to backpressure from downstream. At that point, it throws away values from upstream or throws an error, depending on how you’ve configured the operator. The buffer itself effectively behaves as a FIFO stack (also known as a queue).

Some examples will clarify, but first let’s talk about the parameters:

size:
The size of the buffer.
prefetch:
The strategy (Publishers.PrefetchStrategy) for requesting values from upstream. There are two possibilities (the names do not describe the behavior very well):
  • .byRequest, despite the name, means that the operator makes an .unlimited demand right at the start.

  • .keepFull means that the operator pauses as soon as a value arrives that can’t go into the buffer.

whenFull:
The policy to be followed (Publishers.BufferingStrategy) when a value arrives from upstream but the buffer is full. There are three possibilities:
  • .dropNewest means throw away the new value that doesn’t fit into the buffer.

  • .dropOldest means throw away the first value from the buffer and append the new value at the end.

  • .customError means fail; it takes an associated value that’s a function producing an Error, which is sent downstream as a failure.

To make sense of .buffer, we need a pipeline that exerts backpressure from below. Here’s one:

let pub = Timer.publish(every: 0.2, on: .main, in: .common).autoconnect()
    .scan(0) {i,_ in i+1}
    .buffer(size: 4, prefetch: .byRequest, whenFull: .dropNewest)
    .flatMap(maxPublishers:.max(1)) {
        Just($0).delay(for: 2, scheduler: DispatchQueue.main)
    }

At the top of the pipeline we have a publisher that produces values every one-fifth of a second. At the bottom we have a .flatMap with a .max(1) demand setting, so that it asks for just one value at a time, while transforming its received value into a publisher with a two-second .delay. This means that when a value arrives into the .flatMap, it sits there for two seconds before being published, at which point the .flatMap requests a new value.

If there were no .buffer operator in the story, this pipeline would simply produce one successive Int every two seconds. The backpressure from .flatMap applies directly to the publisher at the top: every time the .flatMap asks for a new value, the upstream produces the next value in the series. But now we have introduced a buffer between them — and the buffer can hold only four values at a time. What will happen? Here’s what we get, one value every two seconds, at the end of the pipeline:

1
2
3
4
5
13
24
35
...

Here’s what happened:

  1. The buffer starts by asking for unlimited values, and the .flatMap starts by asking for one value. After one-fifth of a second, the first value arrives and the buffer passes it on to the .flatMap. The 1 is now sitting in the .flatMap, waiting for two seconds to be published, exerting backpressure on the buffer.

  2. Meanwhile, values keep arriving into the buffer: the 2, the 3, the 4, the 5… But now the buffer is full (because it only holds four values), and the 6 arrives. That’s an overflow. We have told the buffer to .dropNewest, so the 6 is thrown away. And so on for the 7, the 8, and all the numbers up to 12.

  3. At that point, .flatMap publishes the 1 and asks for a new value. The buffer responds by feeding it the 2, freeing up a space in the buffer. So when the 13 arrives, there’s room for it, and in it goes into the buffer.

  4. Now there’s no more room in the buffer again, while we wait two seconds for the .flatMap to clear, and the 14 arrives and is thrown away, then the 15, and so on.

  5. Eventually we reach a steady state where only every 11th value from the top of pipeline arrives at the bottom; all the others are being thrown away because they encounter a full buffer.

If we change .dropNewest to .dropOldest, the result is the same except that we achieve the steady state much sooner:

1
9
20
31
42
...

Here’s what happened:

  1. As before, the 1 passes into the .flatMap and the 2, 3, 4, and 5 fill up the buffer.

  2. Now our policy is .dropOldest, so when the 6 arrives it goes into the buffer and the 2 is thrown away; the 7 arrives and goes into the buffer and the 3 is thrown away; and so on.

  3. Eventually the buffer consists of 9, 10, 11, 12 — and at that moment the 1 is finally published from the .flatMap, which requests a new value, and is handed the 9 from the start of the buffer. And so on.

So much for the .byRequest strategy. Now let’s switch to the .keepFull strategy:

.buffer(size: 4, prefetch: .keepFull, whenFull: .dropNewest)

Here’s what we get:

1
2
3
4
5
7
9
...

The buffer now follows a policy where it keeps asking for values until there’s an overflow. When that happens, the buffer stops asking for values until the buffer has an open slot. So:

  1. The 1 passes through to the .flatMap and sits there, exerting backpressure.

  2. Meanwhile the 2, 3, 4 and 5 arrive and fill the buffer.

  3. The 6 arrives and causes an overflow; the 6 is thrown away and the buffer stops asking for values.

  4. Eventually the .flatMap publishes the 1 and receives the 2 from the buffer, opening a slot in the buffer. At that point the buffer receives the 7 and puts it in the buffer, and also receives the 8. That’s an overflow, so the 8 is thrown away and the buffer stops asking for values. And so on.

Finally, if we change .dropNewest to .dropOldest, the pipeline produces 1, 3, 5, 7, and so on, for reasons that should now be obvious.

It’s difficult to determine whether these behaviors are bugs or reflect the intended .buffer functionality. It’s a curious beast and, given the oddities of its behavior, it seems unlikely that you’ll use it. But there it is, for better or for worse.


Table of Contents