Building Swift's Combine Framework From Scratch
Table of Contents
Introduction
After spending 6+ months at 2 different iOS internships, I realized I still don't have a concrete understanding of Combine (or RxSwift) and somehow scraped by without knowing much of either. My goal here is to walk through the process of building the Combine framework from the ground up to hopefully allow you all to get a better understanding of the inner workings. A side note: I utilize and reference Joseph Heck's notes on Combine, the official Combine documentation, and the OpenCombine library.
Review of Combine
You can skip this if you already have a good understanding of reactive programming, publishers/subscribers, etc. as most of this is simply a quick, concise summary of important concepts touched upon in Joseph Heck's notes.
Combine is a framework for reactive programming, which is a model that revolves around reacting to changes in data/events (usually as a stream). This differs from more traditional request/response models which revolve around requesting data to make changes based on some response. For example, if we have weather data that outputs different temperatures over time, in a traditional request/response approach, we would constantly have to make requests to check the temperature to have the most recent data. However, with Combine, we would "subscribe" to the weather data (the "publisher") so that we constantly get notified whenever the weather changes.
Generally, good uses for Combine are for situations where you want to set up something that reacts to a variety of inputs, such as user interfaces or a pipeline of asynchronous network requests.
There are 4 main concepts that we'll review quickly: Publishers, Subscribers, Operators, and Back Pressure. Throughout the explanations of each, I'll use an analogy to a radio station to help make the concepts more clear.
Publishers
Publishers provide data when requested and when it's available. With our radio station analogy:
| Radio Station Analogy | Publishers |
|---|---|
| Can broadcast various types of data like music or emergency signals | Can send various types of data |
| Doesn't care who's listening | Any type of subscriber can receive data from the publisher |
| Can have many listeners | Can have many subscribers |
Publishers have 2 associated types: <Output, Failure> where output and failure can be any type, as long as they match the types of the subscriber. More on that in the subscribers section below.
Subscribers
Subscribers request the data provided by publishers through a subscription. With our radio station analogy:
| Radio Station Analogy | Subscribers |
|---|---|
| Listeners can listen to the broadcast | Gets data from publishers |
| Listeners can do things with the broadcast, such as play music, change the broadcast data, etc. | Subscribers can modify the data from publishers |
Subscribers also have 2 associated types: <Input, Failure>, which also can be any type. However, in order to subscribe to a publisher, the Input type MUST match the Output type and the Failure type MUST match the Failure type of the Publisher.
When a subscriber "subscribes" (connects) to a publisher, it means that it can actively read the data the publisher outputs. You can think of subscriptions as the radio connection being tuned in to the radio station.
Operators
Operators are used to process, change, and transform the data provided by a publisher and requested by the subscriber. They should conform to both the publisher and subscriber protocols and thus support subscribing to publishers and sending the results to subscribers.
You can think of them as middle men that can be chained together to process the data in order to reach some end goal. In terms of our radio station analogy, they are like audio processors or audio mixing boards that can modify the broadcast from the radio station.
Back Pressure
The concept of back pressure is that subscribers control the flow of data and what kind of processing happens. Thus, subscribers will provide information about how much data it wants from a publisher it is subscribed to based on some Demand. Subscribers can also cancel the subscription at any time!
Full Lifecycle
Combine Lifecycle Diagram from Joseph Heck's Notes
- A subscriber will subscribe to a publisher
- The publisher acknowledges the subscription and sends that acknowledgement back to the subscriber
- The subscriber can then request data from the publisher whenever it wants to
- The publisher sends the requested values back
- The subscriber can cancel the subscription anytime after the connection is acknowledged (step 2)
- The publisher can send a completion, which will terminate the subscription
Protocols
Let's start by building out the necessary protocols for publishers, subscribers, and subscriptions. We will follow the lifecycle to build these out, so the code will jump around from one protocol to another.
Actually, before we even go through the steps, we can code out a few things. For publishers, we know that they have an Output value and Failure value that can be any type:
protocol Publisher {
associatedtype Output
associatedtype Failure: Error
}
And for subscribers, the same is true in that every subscriber has an Input value and a Failure value, both of which can be any type:
protocol Subscriber {
associatedtype Input
associatedtype Failure: Error
}
Now, let's go through the lifecycle step by step to finish up our protocols.
For the first step, a subscriber will need to call some function from the publisher in order to subscribe. That tells us that in our publisher protocol, we will need to handle this subscription, since the subscribers are the ones that are initiating the subscription.
Here is the function if you aren't interested in the steps to get to this. If you want the full explanation and walkthrough, you can open this collapsed section:
How the subscribe Function Works
We can start out with this function:
func subscribe()
Since a subscriber is connecting to the publisher, we know that our function will need to accept a subscriber as a parameter:
func subscribe(subscriber: Subscriber)
However, in swift, we cannot use a protocol directly as a type when it has associated types since the compiler needs to ensure type safety. That means we need to use generics to tell the compiler that our function will take in some type that conforms to the Subscriber protocol and will accept a subscriber of whatever that type is:
func subscribe<S: Subscriber>(_ subscriber: S)
Basically, we have S which can be anything like TestSubscriber, SinkSubscriber, UISubscriber, etc. as long as it conforms to the Subscriber protocol.
Now there's one more problem to address. Let's say a publisher outputs integer values but a subscriber with an input type of string tries to subscribe. With our current function, that would compile just fine. However, if you recall, we need to make sure the input/output and failure types match. Thus, we get our final function:
protocol Publisher {
associatedtype Output
associatedtype Failure: Error
func subscribe<S: Subscriber>(_ subscriber: S)
where S.Input == Output, S.Failure == Failure
}
In the second step, the publisher sends out an acknowledgement to the subscriber to let the subscriber know it got the request and that a subscription was formed. That tells us we need some function in the subscriber to process that subscription it has received. Again, if you want the full explanation and walkthrough to getting the end result, you can open this collapsed section:
How the receive subscription Function Works
To get our final function, we can first start with something simple:
func receive(subscription: ???)
We call it receive because from the subscriber's perspective, it's receiving the subscription acknowledgement.
But here is the tricky part. Let's think about what comes next in the lifecycle: the subscriber will request data at any time (and also cancel the subscription too!). That means we need to implement some sort of function in our publisher to handle requests right?
Actually, no. A single publisher can have multiple subscribers, so if there's only one request() function, the publisher has no idea which subscriber is making the request.
Also, if you recall, a key part of Combine is back pressure, in which subscribers control the flow of data. If we just acknowledge without giving the subscriber any control mechanism, there is no way for subscribers to control how much data to request and when to cancel.
Thus, we need the publisher to create a Subscription object to represent a specific connection and give it to the subscriber. That means we can include our Subscription object in the parameter (I will cover this next).
protocol Subscriber {
associatedtype Input
associatedtype Failure: Error
func receive(subscription: Subscription)
}
Next, let's build out the protocol for the Subscription object since in the third step, that is how the subscriber requests data since the above receive function accepts that subscription object as a parameter.
How the Subscription Protocol Works
Thinking about what subscribers need to do with the subscription, they need to do 2 main things: control the flow of data and cancel the subscription. Thus, the Subscription protocol needs 2 functions:
protocol Subscription {
func request()
func cancel()
}
For the request function, we want subscribers to be able to control how much data it can handle. Some subscribers might want one item at time, or 10 at once, or even all items immediately. That means we need a parameter that expresses how much data we want.
You may think that passing in an integer is enough. I mean I did too. However, you quickly may realize that there isn't a good way to express unlimited demands—like if a subscriber wants all the data.
To address that, let's create an enum called Demand that can represent both finite and unlimited demands:
enum Demand {
case unlimited
case max(Int) // Give up to this many
}
But wait. We also need to be able to combine demands.
Take this scenario for example: a subscriber initially demands 5 items from a publisher and while the publisher is fulfilling that request (let's say it's has sent 1 item), the subscriber requests for 2 more.
The total demand now is 5 - 1 + 2 = 6 items—showing how we need a + operator to add these demands together.
We cannot just use the basic + operator because we are adding Demand enums, which can either be unlimited or some integer. That means we need to build a custom one and it's a lot easier than it sounds. We know that anything unlimited added with an integer will just be unlimited, and unlimited + unlimited is just unlimited. Of course, an integer added with an integer is an integer. With that in mind, we can finish our function:
enum Demand {
case unlimited
case max(Int)
static func + (lhs: Demand, rhs: Demand) -> Demand {
switch (lhs, rhs) {
case (.unlimited, _), (_, .unlimited):
return .unlimited // if there is unlimited, it will always return unlimited
case (.max(let l), .max(let r)):
return .max(l + r)
}
}
Finally, we can pass a demand into our request function, leaving us with this:
func request(_ demand: Demand)
Finally, for the cancel() function, we can just leave it at that since cancellation doesn't need any parameters. You either cancel or don't!
That leaves us with our final subscription protocol:
protocol Subscription {
func request(_ demand: Demand)
func cancel()
}
enum Demand {
case unlimited
case max(Int)
static func + (lhs: Demand, rhs: Demand) -> Demand {
switch (lhs, rhs) {
case (.unlimited, _), (_, .unlimited):
return .unlimited
case (.max(let l), .max(let r)):
return .max(l + r)
}
}
}
A quick summary so far:
- Publisher's Output/Failure = Subscriber's Input/Failure
- Subscriber calls
subscribe()→ Publisher creates and sendsSubscriptionobject viareceive(subscription:)back to subscriber - Subscriber controls flow by requesting specific amounts of data through the subscription
- Subscriber demands can be
.unlimitedor.max(n); demands accumulate with+operator - Subscriber can cancel; publisher can send completion (
.finishedor.failure) - The subscription object represents one specific connection with independent flow control
Moving on to the next step of the lifecycle, the publisher needs a way to send data back to the subscriber after it gets a demand.
That tells us that we need to implement another type of receive function in our subscriber protocol so that publishers can call it to send data to that subscriber.
How the receive input Function Works
Since our function's purpose is for publishers to send data back to the subscriber, it only make sense that it should take in some value of type Input since the data type from the publisher must match the subscriber's!
That gives us this:
func receive(_input: Input)
However, that isn't enough. Looping back to back pressure again (I know it sucks haha), the subscriber needs a way to adjust the demand as it processes items since at this point, the publisher is in control.
To make this more clear, the subscriber may process the initial demand very quick and want more or it may be too slow and pause the stream of data from the publisher.
Therefore, the subscriber should return a Demand value to indicate how much more data it wants. This way, the subscriber can control the flow of data.
That leads us to our final function:
protocol Subscriber {
associatedtype Input
associatedtype Failure: Error
func receive(subscription: Subscription)
func receive(_ input: Input) -> Demand // <-- function added
}
Finally, let's handle the last step where publishers can send completions that terminate subscriptions. There are 3 ways data flow could possibly stop:
- Subscriber cancels (we covered this already)
- Publisher has no more data to send
- An error occurs
For the last 2 ways, the publisher needs a way to tell the subscriber the stream has ended. That means we need to implement a function in our subscriber protocol to handle this.
How the receive completion Function Works
Since we are receiving a notification that the stream has ended, we can start with this base function:
func receive(completion: ???)
But what information does the subscriber need to know?
Well, we know that there are 2 things that can happen: all data sent or an error. We could use a boolean or something but we need to pass error information if failure occurs and we might need to extend to more completion ways in the future. Thus, we will use an enum to represent our completion states:
enum Completion {
case finished
case failure(Error)
}
However, we can't simply pass in the Error since different publishers could have different error types.
Remember that our Subscriber protocol has a Failure associated type, so subscribers can have a variety of errors like network errors or database errors:
protocol Subscriber {
associatedtype Input
associatedtype Failure: Error // ← Different subscribers handle different errors!
}
That means the Completion enum needs to be generic over the error type:
enum Completion<Failure: Error> {
case finished
case failure(Failure)
}
Here is our final function and enum:
protocol Subscriber {
associatedtype Input
associatedtype Failure: Error
func receive(subscription: Subscription)
func receive(_ input: Input) -> Demand
func receive(completion: Completion<Failure>) // <- final function added
}
I am going to add one last thing: a protocol called Cancellable.
The Subscription protocol has both request() and cancel(), but users should only have access to cancel() since they shouldn't be able to call request() and interfere with the subscriber's internal flow control.
Later on when we actually implement a subscriber, we want to return a Cancellable instead of a Subscription since we want to give users just the "cancel button" they need without exposing the Subscription internals.
Here is the Cancellable protocol:
protocol Cancellable {
func cancel()
}
Summary
Here is a quick summary of the protocols and the full code:
Full Protocol Code
Publisher Protocol
protocol Publisher {
associatedtype Output
associatedtype Failure: Error
func subscribe<S: Subscriber>(_ subscriber: S) where S.Input == Output, S.Failure == Failure
}
Subscriber Protocol
protocol Subscriber {
associatedtype Input
associatedtype Failure: Error
func receive(subscription: Subscription)
func receive(_ input: Input) -> Demand
func receive(completion: Completion<Failure>)
}
enum Completion<Failure: Error> {
case finished
case failure(Failure)
}
Subscription Protocol
protocol Subscription {
func request(_ demand: Demand)
func cancel()
}
enum Demand {
case unlimited
case max(Int)
static func + (lhs: Demand, rhs: Demand) -> Demand {
switch (lhs, rhs) {
case (.unlimited, _), (_, .unlimited):
return .unlimited
case (.max(let l), .max(let r)):
return .max(l + r)
}
}
}
Cancellable Protocol
protocol Cancellable {
func cancel()
}
Core Protocols:
Publisher- HasOutputandFailureassociated types; providessubscribe<S: Subscriber>()where types must matchSubscriber- HasInputandFailureassociated types; provides threereceivefunctions for the complete lifecycleSubscription- Represents one specific connection; providesrequest(_ demand:)andcancel()
The Complete Lifecycle:
- Connection: Subscriber calls
publisher.subscribe(subscriber)→ Publisher verifies type compatibility - Acknowledgment: Publisher creates
Subscriptionobject → Callssubscriber.receive(subscription:) - Flow Control: Subscriber stores subscription → Calls
subscription.request(demand)to specify how much data it wants - Data Flow: Publisher sends data via
subscriber.receive(_ input:) -> Demand→ Subscriber returns additional demand - Termination: Either subscriber calls
subscription.cancel()OR publisher callssubscriber.receive(completion:)with.finishedor.failure(error)
Key Concepts:
- Type Safety: Publisher's
Output/Failuremust match Subscriber'sInput/Failure(enforced at compile-time) - Back-Pressure: Subscriber controls flow through
Demand- can be.unlimitedor.max(n) - Demand Accumulation: Demands add together:
currentDemand = oldDemand + newDemand(unlimited always wins) - Independent Connections: Each subscription is separate - one publisher can have multiple subscribers with independent flow control
- Completion vs Cancellation: Publisher-initiated completion (
.finished/.failure) vs subscriber-initiated cancellation (no completion sent)
Supporting Types:
Demandenum - Represents.unlimitedor.max(Int)with custom+operatorCompletion<Failure: Error>enum - Represents.finishedor.failure(Failure)for stream termination
Building Our First Publisher: Just
We'll finally make use of our protocols and have a concrete publisher that we can utilize!
Just is the simplest publisher in Combine. It takes a single value, immediately publishes it to any subscriber, and then completes.
Think of it like a one-time announcement over the radio where the station broadcasts a message once and then goes silent.
Here is what we need Just to do:
- Conform to the
Publisherprotocol - Create a subscription when someone subscribes
- Send the single value when requested
- Immediately complete after sending the value
To start, we know that Just needs to conform to Publisher, which means we need to define our Output and Failure types and initialize our value:
struct Just<Output>: Publisher { // Just is generic type that can work with any value
let value: Output
typealias Failure = Never // Just can never fail!
init(_ value: Output) {
self.value = value
}
}
Next, we need to implement the subscribe function from our Publisher protocol:
func subscribe<S: Subscriber>(_ subscriber: S)
where S.Input == Output, S.Failure == Never {
// But what goes here?
}
We know that within our subscribe function, the publisher needs to create a subscription object for this specific connection and give that subscription to the subscriber.
That means we need to create a JustSubscription class that conforms to the Subscription protocol.
This subscription will handle the actual work of sending the value. To start, we can define our class:
private class JustSubscription<S: Subscriber>: Subscription where S.Input == Output, S.Failure == Never {
// ...
}
We use a class because we need reference semantics (can be cancelled from anywhere) and we need to manage state (has the value been sent? is it cancelled?). It of course is a generic since the subscriber can be of any type.
Within our class, besides implementing the request() and cancel() functions from the Subscription protocol, we also need some stored properties to keep track of the subscriber it is sending values to and the value to send:
private class JustSubscription<S: Subscriber>: Subscription where S.Input == Output, S.Failure == Never {
private var subscriber: S? // Optional so we can set it to nil when cancelled
private let value: Output
init(value: Output, subscriber: S) {
self.value = value
self.subscriber = subscriber
}
}
Now, let's implement the request() function.
Remember that when the subscriber requests data, we need to:
- Send the value via
subscriber.receive(value) - Send completion via
subscriber.receive(completion: .finished) - Clean up by setting subscriber to nil
func request(_ demand: Demand) {
guard let subscriber = subscriber else {
return // Already sent or cancelled
}
_ = subscriber.receive(value)
subscriber.receive(completion: .finished)
self.subscriber = nil
}
Notice we ignore the Demand returned by receive(value).
Why? Because Just only has one value to send anyway.
There's nothing more to give, so any additional demand is irrelevant.
For the cancel() function, we just need to clean up:
func cancel() {
subscriber = nil
}
Finally, back in our subscribe function in the Just struct, we create the subscription and hand it to the subscriber:
func subscribe<S: Subscriber>(_ subscriber: S) where S.Input == Output, S.Failure == Never {
let subscription = JustSubscription(
value: value,
subscriber: subscriber
)
subscriber.receive(subscription: subscription)
}
So combining all the code, we have created our first publisher!!
struct Just<Output>: Publisher {
typealias Failure = Never
let value: Output
init(_ value: Output) {
self.value = value
}
func subscribe<S: Subscriber>(_ subscriber: S)
where S.Input == Output, S.Failure == Never {
let subscription = JustSubscription(
value: value,
subscriber: subscriber
)
subscriber.receive(subscription: subscription)
}
private class JustSubscription<S: Subscriber>: Subscription
where S.Input == Output, S.Failure == Never {
private var subscriber: S?
private let value: Output
init(value: Output, subscriber: S) {
self.value = value
self.subscriber = subscriber
}
func request(_ demand: Demand) {
guard let subscriber = subscriber else {
return
}
_ = subscriber.receive(value)
subscriber.receive(completion: .finished)
self.subscriber = nil
}
func cancel() {
subscriber = nil
}
}
}
Building Our First Subscriber: Sink
Now that we have Just publishing data, we need something to receive it. Sink is the most commonly used subscriber in Combine.
It's a simple subscriber that executes closures when it receives values and completions.
Here is what we need Sink to do:
- Conform to the
Subscriberprotocol to receive data - Store closures for handling values and completions
- Conform to
Cancellableso users can cancel the subscription - Request data when it receives the subscription
- Execute the appropriate closures when data or completion arrives
In order to conform to the Subscriber protocol, we need to implement all of the protocol's functions.
Also, like Just, we need to have some stored properties. We need to keep track of received values, completions, and a reference to the subscription.
However, the stored properties for received values and completions should be closures though! This is because it doesn't know what your want to do with the data. When the data arrives, should we print it? save it to a database? update some UI?
Sink has no idea! That's why you pass in closures. They are your custom instructions for what to do when data arrives.
Without closures, you'd need to subclass Sink every time you wanted different behavior. With closures, Sink stays generic and flexible so you can just "plug in" the behavior you want at the call site.
With that in mind, we can begin coding up our class:
class Sink<Input, Failure: Error>: Subscriber, Cancellable {
private let receiveValue: (Input) -> Void
private let receiveCompletion: (Completion<Failure>) -> Void
private var subscription: Subscription?
init(
receiveCompletion: @escaping (Completion<Failure>) -> Void, // Note: The `@escaping` keyword tells Swift that "This closure will be called after the function that receives it has finished executing."
receiveValue: @escaping (Input) -> Void
) {
self.receiveCompletion = receiveCompletion
self.receiveValue = receiveValue
}
}
Now, we can implement our three receive() functions.
First, receive(subscription:) is called when the publisher acknowledges our subscription:
func receive(subscription: Subscription) {
self.subscription = subscription // store reference to subscription
subscription.request(.unlimited)
}
We store the subscription and immediately request unlimited data.
Why unlimited?
Because Sink is designed for simple cases where you just want all the data without complex back-pressure control.
Next, receive(_ input:) is called when the publisher sends data:
func receive(_ input: Input) -> Demand {
receiveValue(input) // Call the user's closure
return .unlimited // Keep requesting more
}
We simply call the closure that was passed in during initialization and return .unlimited to continue receiving all data.
Then, receive(completion:) is called when the stream ends:
func receive(completion: Completion<Failure>) {
receiveCompletion(completion) // Call the user's closure
subscription = nil // Clean up
}
We call the completion handler and set the subscription to nil since the stream is done.
Finally, we implement Cancellable to allow manual cancellation:
func cancel() {
subscription?.cancel()
subscription = nil
}
That's the core Sink implementation! But there's one more thing—we want a convenient way to use it. Nobody wants to write:
let sink = Sink<Int, Never>(
receiveCompletion: { _ in },
receiveValue: { print($0) }
)
publisher.subscribe(sink)
Instead, we want to write:
publisher.sink { print($0) }
To do this, we will add convenience extensions on Publisher.
Convenience Functions
We can start by writing one for publishers that can fail (so not Just)
extension Publisher {
func sink(
receiveCompletion: @escaping (Completion<Failure>) -> Void,
receiveValue: @escaping (Output) -> Void
) -> Cancellable {
let sink = Sink<Output, Failure>(
receiveCompletion: receiveCompletion,
receiveValue: receiveValue
)
subscribe(sink)
return sink
}
}
What this does is that it will create a Sink object with the provided closures and return it as a Cancellable. It hides the Sink creation and subscription details.
Now, users can write:
Just(8).sink(
receiveCompletion: { completion in
print("Done!")
},
receiveValue: { value in
print("Got: \(value)")
}
)
However, this may even be more than we need for publishers like Just that cannot fail. Thus, we can code up something even simpler to use:
extension Publisher where Failure == Never {
func sink(receiveValue: @escaping (Output) -> Void) -> Cancellable {
return sink(
receiveCompletion: { _ in }, // Ignore completion
receiveValue: receiveValue
)
}
}
Now we can write the simpler syntax we want:
Just(42).sink { value in
print("Got: \(value)")
}
// Or even shorter:
Just(42).sink { print($0) }
Combining everything we have, we get the full code for our Sink subscriber:
class Sink<Input, Failure: Error>: Subscriber, Cancellable {
private let receiveValue: (Input) -> Void
private let receiveCompletion: (Completion<Failure>) -> Void
private var subscription: Subscription?
init(
receiveCompletion: @escaping (Completion<Failure>) -> Void,
receiveValue: @escaping (Input) -> Void
) {
self.receiveCompletion = receiveCompletion
self.receiveValue = receiveValue
}
func receive(subscription: Subscription) {
self.subscription = subscription
subscription.request(.unlimited)
}
func receive(_ input: Input) -> Demand {
receiveValue(input)
return .unlimited
}
func receive(completion: Completion<Failure>) {
receiveCompletion(completion)
subscription = nil
}
func cancel() {
subscription?.cancel()
subscription = nil
}
}
extension Publisher {
func sink(
receiveCompletion: @escaping (Completion<Failure>) -> Void,
receiveValue: @escaping (Output) -> Void
) -> Cancellable {
let sink = Sink<Output, Failure>(
receiveCompletion: receiveCompletion,
receiveValue: receiveValue
)
subscribe(sink)
return sink
}
// For publishers that cannot fail, like `Just`
func sink(receiveValue: @escaping (Output) -> Void) -> Cancellable
where Failure == Never {
return sink(
receiveCompletion: { _ in },
receiveValue: receiveValue
)
}
}
Building Our First Operator: Map
Now we have a publisher (Just) and a subscriber (Sink), but the data just flows straight through. What if we want to transform it? That's where operators come in!
Map is one of the simplest and most useful operators in Combine.
It transforms each value from the upstream publisher using a function you provide, and sends that transformed data to the downstream subscriber.
Think of it like the .map function on arrays, but for streams of data.
The data flow pipeline with the operator works like this:
- The subscriber will subscribe to the operator (operator acts like a publisher)
- The operator will subscribe to the publisher (operator acts like a subscriber)
- The publisher sends the data down to the operator, where it is transformed
- The operator then sends the data down to the subscriber
A key insight that we now have is that operators have to act as both a publisher AND subscriber!
However, we can't make one type conform to both Publisher and Subscriber with different Input/Output types.
So operators use a clever pattern: an outer type that's a Publisher and an inner type that's a Subscriber.
Let's start with the outer type. Map needs to be a Publisher:
struct Map<Upstream: Publisher, Output>: Publisher {
typealias Failure = Upstream.Failure // Upstream.Failure b/c operator just passes failure down
let upstream: Upstream // The publisher we are transforming
let transform: (Upstream.Output) -> Output
func subscribe(???)
}
Taking a deeper look at the code above. Our Map operator takes in some publisher and outputs a generic Output type. It refers to that publisher as upstream.
Transform is the function that changes Upstream.Output to Output (the output from the Publisher type to the input of the Subscriber type).
Now to finish adhereing to the Publisher protocol, we need to implement the subscribe() function.
Within this function, we need to handle connecting our pipeline so that the subscriber can communicate with the operator and publisher.
That means we will need to create our Inner type that is able to subscribe to the publisher and send the received subscription downstream to the subscriber so that all components of our pipeline are connected.
func subscribe<S: Subscriber>(_ subscriber: S) where S.Input == Output, S.Failure == Upstream.Failure {
let inner = Inner(
downstream: subscriber,
transform: transform
)
upstream.subscribe(inner)
}
Now for coding up the Inner subscriber. It's purpose will be to subscribe to the upstream publisher, transform the data, and send it downstream to the subscriber.
We can start by writing the function definition:
private class Inner<Downstream: Subscriber>: Subscriber where Downstream.Input == Output, Downstream.Failure == Upstream.Failure
Basically, Inner is a subscriber that takes on the type of the downstream subscriber and verifies that the Input type of the subscriber matches the transformed output type from Map.
We can now write the functions from the Subscriber protocol within Inner:
private class Inner<Downstream: Subscriber>: Subscriber where Downstream.Input == Output, Downstream.Failure == Upstream.Failure {
typealias Input: Upstream.Output // Input type must match publisher output type
typealias Failure = Upstream.Failure
private let downstream: Downstream
private let transform: (Upstream.Output) -> Output
init(downstream: Downstream, transform: @escaping (Upstream.Output) -> Output) {
self.downstream = downstream
self.transform = transform
}
// Sends subscription to downstream subscriber to complete pipeline connection
func receive(subscription: Subscription) {
downstream.receive(subscription: subscription)
}
// Transforms data from publisher and sends it downstream to subscriber
func receive(_ input: Upstream.Output) {
let transformed = transform(input)
return downstream.receive(transformed) // Returns demand from subscriber
}
// Forwards completion to downstream subscriber
func receive(completion: Completion<Upstream.Failure>) {
downstream.receive(completion: completion)
}
}
We should also put our map operator in a namespace to prevent name conflicts (many libraries have types called "Map") and to group all operator types together. While this isn't technically necessary, it will help keep our code organized.
enum Publishers { // <-- Name of namespace
struct Map<Upstream: Publisher, Output>: Publisher {
// ... our implementation
}
}
Finally, we also want a convenience function for clean syntax to use in our code:
extension Publisher {
func map<T>(_ transform: @escaping (Output) -> T) -> Publishers.Map<Self, T> {
return Publishers.Map(upstream: self, transform: transform)
}
}
We make Map an extension Publisher because publishers are what you operate on and chain together.
The methods either return new publishers (for chaining) or create subscribers and connect them (to end the chain).
Now with everything put together, we get this:
extension Publisher {
func map<T>(_ transform: @escaping (Output) -> T) -> Publishers.Map<Self, T> {
return Publishers.Map(upstream: self, transform: transform)
}
}
enum Publishers {
struct Map<Upstream: Publisher, Output>: Publisher {
typealias Failure = Upstream.Failure
let upstream: Upstream
let transform: (Upstream.Output) -> Output
func subscribe<S: Subscriber>(_ subscriber: S) where S.Input == Output, S.Failure == Upstream.Failure {
let inner = Inner(
downstream: subscriber,
transform: transform
)
upstream.subscribe(inner)
}
private class Inner<Downstream: Subscriber>: Subscriber where Downstream.Input == Output, Downstream.Failure == Upstream.Failure {
typealias Input = Upstream.Output
typealias Failure = Upstream.Failure
private let downstream: Downstream
private let transform: (Upstream.Output) -> Output
init(downstream: Downstream, transform: @escaping (Upstream.Output) -> Output) {
self.downstream = downstream
self.transform = transform
}
func receive(subscription: Subscription) {
downstream.receive(subscription: subscription)
}
func receive(_ input: Upstream.Output) -> Demand {
let transformed = transform(input)
return downstream.receive(transformed)
}
func receive(completion: Completion<Upstream.Failure>) {
downstream.receive(completion: completion)
}
}
}
}
Putting It All Together
Lets say we have Just -> Map -> Sink:
let _ = Just(10)
.map { $0 * 2 }
.sink { value in
print("Doubled: \(value)")
}
Let's go through the entire pipeline and how it works.
Sink will first call subscribe() in the outer type. That function will then make sure that the type sink takes in—a.k.a its Input type—matches Map's Output type (the transformed type we're sending), and also verify that the subscriber's Failure type matches the upstream publisher's Failure type (since Map doesn't transform errors, just passes them through).
The subscribe() function also creates the Inner subscriber which is used to subscribe to the publisher. In this case, Inner subscribes to Just.
Then, Just will create a subscription using JustSubscription and then sends that back to Inner using its receive(subscription) function.
Within that function, Inner will pass that subscription down to Sink. Thus, the full, connected pipeline is formed which means data can now be sent.
After Sink stores that subscription using its own receive(subscription) function, in which it will send a demand using that subscription created.
Sink sends that demand by calling the request() function in JustSubscription. That tells the Just publisher to send that data to the Map operator.
Map receives that data using its receive(input) function, where it actually transforms it and then sends it downstream to Sink.
That function in Sink then returns, which propagates back to Just where it then calls receive(completion) from Inner. Within Inner, it just sends that completion down to Sink.
Finally, Sink will clean the subscription up by setting it to nil.
Conclusion (Not Really)
Wow. You made it all the way here where we went over the basics of Combine, the protocols, and even implemented our own publisher, subscriber, and operator! This forms the basis of Combine and how reactive programming is handled in Swift. By this point, you probably have a better understanding of how this all works and you can stop reading here. However, if you want to continue to see other operators and such being implemented, feel free to keep reading!

