HomeHome
blogprojectswallcontact
© 2026 Kenny Wan
December 7, 2025

Building Swift's Combine Framework From Scratch

Table of Contents

Introduction
Review of Combine
Publishers
Subscribers
Operators
Back Pressure
Full Lifecycle
Protocols
Summary
Building Our First Publisher: Just
Building Our First Subscriber: Sink
Convenience Functions
Building Our First Operator: Map
Putting It All Together
Conclusion (Not Really)

Introduction

After spending 6+ months at 2 different iOS internships, I realized I still don't have a concrete understanding of Combine (or RxSwift) and somehow scraped by without knowing much of either. My goal here is to walk through the process of building the Combine framework from the ground up to hopefully allow you all to get a better understanding of the inner workings. A side note: I utilize and reference Joseph Heck's notes on Combine, the official Combine documentation, and the OpenCombine library.

Review of Combine

You can skip this if you already have a good understanding of reactive programming, publishers/subscribers, etc. as most of this is simply a quick, concise summary of important concepts touched upon in Joseph Heck's notes.

Combine is a framework for reactive programming, which is a model that revolves around reacting to changes in data/events (usually as a stream). This differs from more traditional request/response models which revolve around requesting data to make changes based on some response. For example, if we have weather data that outputs different temperatures over time, in a traditional request/response approach, we would constantly have to make requests to check the temperature to have the most recent data. However, with Combine, we would "subscribe" to the weather data (the "publisher") so that we constantly get notified whenever the weather changes.

Generally, good uses for Combine are for situations where you want to set up something that reacts to a variety of inputs, such as user interfaces or a pipeline of asynchronous network requests.

There are 4 main concepts that we'll review quickly: Publishers, Subscribers, Operators, and Back Pressure. Throughout the explanations of each, I'll use an analogy to a radio station to help make the concepts more clear.

Publishers

Publishers provide data when requested and when it's available. With our radio station analogy:

Radio Station AnalogyPublishers
Can broadcast various types of data like music or emergency signalsCan send various types of data
Doesn't care who's listeningAny type of subscriber can receive data from the publisher
Can have many listenersCan have many subscribers

Publishers have 2 associated types: <Output, Failure> where output and failure can be any type, as long as they match the types of the subscriber. More on that in the subscribers section below.

Subscribers

Subscribers request the data provided by publishers through a subscription. With our radio station analogy:

Radio Station AnalogySubscribers
Listeners can listen to the broadcastGets data from publishers
Listeners can do things with the broadcast, such as play music, change the broadcast data, etc.Subscribers can modify the data from publishers

Subscribers also have 2 associated types: <Input, Failure>, which also can be any type. However, in order to subscribe to a publisher, the Input type MUST match the Output type and the Failure type MUST match the Failure type of the Publisher.

When a subscriber "subscribes" (connects) to a publisher, it means that it can actively read the data the publisher outputs. You can think of subscriptions as the radio connection being tuned in to the radio station.

Operators

Operators are used to process, change, and transform the data provided by a publisher and requested by the subscriber. They should conform to both the publisher and subscriber protocols and thus support subscribing to publishers and sending the results to subscribers.

You can think of them as middle men that can be chained together to process the data in order to reach some end goal. In terms of our radio station analogy, they are like audio processors or audio mixing boards that can modify the broadcast from the radio station.

Back Pressure

The concept of back pressure is that subscribers control the flow of data and what kind of processing happens. Thus, subscribers will provide information about how much data it wants from a publisher it is subscribed to based on some Demand. Subscribers can also cancel the subscription at any time!

Full Lifecycle

Combine Lifecycle Diagram from Joseph Heck's NotesCombine Lifecycle Diagram from Joseph Heck's Notes
  1. A subscriber will subscribe to a publisher
  2. The publisher acknowledges the subscription and sends that acknowledgement back to the subscriber
  3. The subscriber can then request data from the publisher whenever it wants to
  4. The publisher sends the requested values back
  5. The subscriber can cancel the subscription anytime after the connection is acknowledged (step 2)
  6. The publisher can send a completion, which will terminate the subscription

Protocols

Let's start by building out the necessary protocols for publishers, subscribers, and subscriptions. We will follow the lifecycle to build these out, so the code will jump around from one protocol to another.

Actually, before we even go through the steps, we can code out a few things. For publishers, we know that they have an Output value and Failure value that can be any type:

protocol Publisher {
    associatedtype Output
    associatedtype Failure: Error
}

And for subscribers, the same is true in that every subscriber has an Input value and a Failure value, both of which can be any type:

protocol Subscriber {
    associatedtype Input
    associatedtype Failure: Error
}

Now, let's go through the lifecycle step by step to finish up our protocols.

For the first step, a subscriber will need to call some function from the publisher in order to subscribe. That tells us that in our publisher protocol, we will need to handle this subscription, since the subscribers are the ones that are initiating the subscription.

Here is the function if you aren't interested in the steps to get to this. If you want the full explanation and walkthrough, you can open this collapsed section:

How the subscribe Function Works

We can start out with this function:

func subscribe()

Since a subscriber is connecting to the publisher, we know that our function will need to accept a subscriber as a parameter:

func subscribe(subscriber: Subscriber)

However, in swift, we cannot use a protocol directly as a type when it has associated types since the compiler needs to ensure type safety. That means we need to use generics to tell the compiler that our function will take in some type that conforms to the Subscriber protocol and will accept a subscriber of whatever that type is:

func subscribe<S: Subscriber>(_ subscriber: S)

Basically, we have S which can be anything like TestSubscriber, SinkSubscriber, UISubscriber, etc. as long as it conforms to the Subscriber protocol.

Now there's one more problem to address. Let's say a publisher outputs integer values but a subscriber with an input type of string tries to subscribe. With our current function, that would compile just fine. However, if you recall, we need to make sure the input/output and failure types match. Thus, we get our final function:

protocol Publisher {
    associatedtype Output
    associatedtype Failure: Error

    func subscribe<S: Subscriber>(_ subscriber: S)
        where S.Input == Output, S.Failure == Failure
}

In the second step, the publisher sends out an acknowledgement to the subscriber to let the subscriber know it got the request and that a subscription was formed. That tells us we need some function in the subscriber to process that subscription it has received. Again, if you want the full explanation and walkthrough to getting the end result, you can open this collapsed section:

How the receive subscription Function Works

To get our final function, we can first start with something simple:

func receive(subscription: ???)

We call it receive because from the subscriber's perspective, it's receiving the subscription acknowledgement.

But here is the tricky part. Let's think about what comes next in the lifecycle: the subscriber will request data at any time (and also cancel the subscription too!). That means we need to implement some sort of function in our publisher to handle requests right?

Actually, no. A single publisher can have multiple subscribers, so if there's only one request() function, the publisher has no idea which subscriber is making the request.

Also, if you recall, a key part of Combine is back pressure, in which subscribers control the flow of data. If we just acknowledge without giving the subscriber any control mechanism, there is no way for subscribers to control how much data to request and when to cancel.

Thus, we need the publisher to create a Subscription object to represent a specific connection and give it to the subscriber. That means we can include our Subscription object in the parameter (I will cover this next).

protocol Subscriber {
    associatedtype Input
    associatedtype Failure: Error

    func receive(subscription: Subscription)

}

Next, let's build out the protocol for the Subscription object since in the third step, that is how the subscriber requests data since the above receive function accepts that subscription object as a parameter.

How the Subscription Protocol Works

Thinking about what subscribers need to do with the subscription, they need to do 2 main things: control the flow of data and cancel the subscription. Thus, the Subscription protocol needs 2 functions:

protocol Subscription {
    func request()
    func cancel()
}

For the request function, we want subscribers to be able to control how much data it can handle. Some subscribers might want one item at time, or 10 at once, or even all items immediately. That means we need a parameter that expresses how much data we want.

You may think that passing in an integer is enough. I mean I did too. However, you quickly may realize that there isn't a good way to express unlimited demands—like if a subscriber wants all the data. To address that, let's create an enum called Demand that can represent both finite and unlimited demands:

enum Demand {
    case unlimited
    case max(Int)  // Give up to this many
}

But wait. We also need to be able to combine demands. Take this scenario for example: a subscriber initially demands 5 items from a publisher and while the publisher is fulfilling that request (let's say it's has sent 1 item), the subscriber requests for 2 more. The total demand now is 5 - 1 + 2 = 6 items—showing how we need a + operator to add these demands together.

We cannot just use the basic + operator because we are adding Demand enums, which can either be unlimited or some integer. That means we need to build a custom one and it's a lot easier than it sounds. We know that anything unlimited added with an integer will just be unlimited, and unlimited + unlimited is just unlimited. Of course, an integer added with an integer is an integer. With that in mind, we can finish our function:

enum Demand {
    case unlimited
    case max(Int)

    static func + (lhs: Demand, rhs: Demand) -> Demand {
    switch (lhs, rhs) {
        case (.unlimited, _), (_, .unlimited):
            return .unlimited  // if there is unlimited, it will always return unlimited
        case (.max(let l), .max(let r)):
            return .max(l + r)
    }
}

Finally, we can pass a demand into our request function, leaving us with this:

func request(_ demand: Demand)

Finally, for the cancel() function, we can just leave it at that since cancellation doesn't need any parameters. You either cancel or don't!

That leaves us with our final subscription protocol:

protocol Subscription {
    func request(_ demand: Demand)
    func cancel()
}

enum Demand {
    case unlimited
    case max(Int)
    
    static func + (lhs: Demand, rhs: Demand) -> Demand {
        switch (lhs, rhs) {
            case (.unlimited, _), (_, .unlimited):
                return .unlimited
            case (.max(let l), .max(let r)):
                return .max(l + r)
        }
    }
}

A quick summary so far:

  • Publisher's Output/Failure = Subscriber's Input/Failure
  • Subscriber calls subscribe() → Publisher creates and sends Subscription object via receive(subscription:) back to subscriber
  • Subscriber controls flow by requesting specific amounts of data through the subscription
  • Subscriber demands can be .unlimited or .max(n); demands accumulate with + operator
  • Subscriber can cancel; publisher can send completion (.finished or .failure)
  • The subscription object represents one specific connection with independent flow control

Moving on to the next step of the lifecycle, the publisher needs a way to send data back to the subscriber after it gets a demand. That tells us that we need to implement another type of receive function in our subscriber protocol so that publishers can call it to send data to that subscriber.

How the receive input Function Works

Since our function's purpose is for publishers to send data back to the subscriber, it only make sense that it should take in some value of type Input since the data type from the publisher must match the subscriber's! That gives us this:

func receive(_input: Input)

However, that isn't enough. Looping back to back pressure again (I know it sucks haha), the subscriber needs a way to adjust the demand as it processes items since at this point, the publisher is in control.

To make this more clear, the subscriber may process the initial demand very quick and want more or it may be too slow and pause the stream of data from the publisher. Therefore, the subscriber should return a Demand value to indicate how much more data it wants. This way, the subscriber can control the flow of data. That leads us to our final function:

protocol Subscriber {
    associatedtype Input
    associatedtype Failure: Error

    func receive(subscription: Subscription)

    func receive(_ input: Input) -> Demand  // <-- function added
}

Finally, let's handle the last step where publishers can send completions that terminate subscriptions. There are 3 ways data flow could possibly stop:

  1. Subscriber cancels (we covered this already)
  2. Publisher has no more data to send
  3. An error occurs

For the last 2 ways, the publisher needs a way to tell the subscriber the stream has ended. That means we need to implement a function in our subscriber protocol to handle this.

How the receive completion Function Works

Since we are receiving a notification that the stream has ended, we can start with this base function:

func receive(completion: ???)

But what information does the subscriber need to know?

Well, we know that there are 2 things that can happen: all data sent or an error. We could use a boolean or something but we need to pass error information if failure occurs and we might need to extend to more completion ways in the future. Thus, we will use an enum to represent our completion states:

enum Completion {
    case finished
    case failure(Error)
}

However, we can't simply pass in the Error since different publishers could have different error types. Remember that our Subscriber protocol has a Failure associated type, so subscribers can have a variety of errors like network errors or database errors:

protocol Subscriber {
    associatedtype Input
    associatedtype Failure: Error  // ← Different subscribers handle different errors!
}

That means the Completion enum needs to be generic over the error type:

enum Completion<Failure: Error> {
    case finished
    case failure(Failure)
}

Here is our final function and enum:

protocol Subscriber {
    associatedtype Input
    associatedtype Failure: Error

    func receive(subscription: Subscription)

    func receive(_ input: Input) -> Demand

    func receive(completion: Completion<Failure>)  // <- final function added
}

I am going to add one last thing: a protocol called Cancellable.

The Subscription protocol has both request() and cancel(), but users should only have access to cancel() since they shouldn't be able to call request() and interfere with the subscriber's internal flow control.

Later on when we actually implement a subscriber, we want to return a Cancellable instead of a Subscription since we want to give users just the "cancel button" they need without exposing the Subscription internals.

Here is the Cancellable protocol:

protocol Cancellable {
    func cancel()
}

Summary

Here is a quick summary of the protocols and the full code:

Full Protocol Code

Publisher Protocol

protocol Publisher {

    associatedtype Output
    
    associatedtype Failure: Error
    
    func subscribe<S: Subscriber>(_ subscriber: S) where S.Input == Output, S.Failure == Failure
}

Subscriber Protocol

protocol Subscriber {

    associatedtype Input
    
    associatedtype Failure: Error
    
    func receive(subscription: Subscription)
    
    func receive(_ input: Input) -> Demand
    
    func receive(completion: Completion<Failure>)
}


enum Completion<Failure: Error> {
    case finished
    case failure(Failure)
}

Subscription Protocol

protocol Subscription {

    func request(_ demand: Demand)
    
    func cancel()
}

enum Demand {
    
    case unlimited
    
    case max(Int)
    
    static func + (lhs: Demand, rhs: Demand) -> Demand {
        switch (lhs, rhs) {
            case (.unlimited, _), (_, .unlimited):
                    return .unlimited
        
            case (.max(let l), .max(let r)):
                    return .max(l + r)
        }
    }
}

Cancellable Protocol

protocol Cancellable {
    func cancel()
}

Core Protocols:

  • Publisher - Has Output and Failure associated types; provides subscribe<S: Subscriber>() where types must match
  • Subscriber - Has Input and Failure associated types; provides three receive functions for the complete lifecycle
  • Subscription - Represents one specific connection; provides request(_ demand:) and cancel()

The Complete Lifecycle:

  1. Connection: Subscriber calls publisher.subscribe(subscriber) → Publisher verifies type compatibility
  2. Acknowledgment: Publisher creates Subscription object → Calls subscriber.receive(subscription:)
  3. Flow Control: Subscriber stores subscription → Calls subscription.request(demand) to specify how much data it wants
  4. Data Flow: Publisher sends data via subscriber.receive(_ input:) -> Demand → Subscriber returns additional demand
  5. Termination: Either subscriber calls subscription.cancel() OR publisher calls subscriber.receive(completion:) with .finished or .failure(error)

Key Concepts:

  • Type Safety: Publisher's Output/Failure must match Subscriber's Input/Failure (enforced at compile-time)
  • Back-Pressure: Subscriber controls flow through Demand - can be .unlimited or .max(n)
  • Demand Accumulation: Demands add together: currentDemand = oldDemand + newDemand (unlimited always wins)
  • Independent Connections: Each subscription is separate - one publisher can have multiple subscribers with independent flow control
  • Completion vs Cancellation: Publisher-initiated completion (.finished/.failure) vs subscriber-initiated cancellation (no completion sent)

Supporting Types:

  • Demand enum - Represents .unlimited or .max(Int) with custom + operator
  • Completion<Failure: Error> enum - Represents .finished or .failure(Failure) for stream termination

Building Our First Publisher: Just

We'll finally make use of our protocols and have a concrete publisher that we can utilize!

Just is the simplest publisher in Combine. It takes a single value, immediately publishes it to any subscriber, and then completes. Think of it like a one-time announcement over the radio where the station broadcasts a message once and then goes silent.

Here is what we need Just to do:

  1. Conform to the Publisher protocol
  2. Create a subscription when someone subscribes
  3. Send the single value when requested
  4. Immediately complete after sending the value

To start, we know that Just needs to conform to Publisher, which means we need to define our Output and Failure types and initialize our value:

struct Just<Output>: Publisher {  // Just is generic type that can work with any value
    let value: Output
    typealias Failure = Never  // Just can never fail!

    init(_ value: Output) {
        self.value = value
    }
}

Next, we need to implement the subscribe function from our Publisher protocol:

func subscribe<S: Subscriber>(_ subscriber: S) 
    where S.Input == Output, S.Failure == Never {
    // But what goes here?
}

We know that within our subscribe function, the publisher needs to create a subscription object for this specific connection and give that subscription to the subscriber.

That means we need to create a JustSubscription class that conforms to the Subscription protocol. This subscription will handle the actual work of sending the value. To start, we can define our class:

private class JustSubscription<S: Subscriber>: Subscription where S.Input == Output, S.Failure == Never {
    // ...
}

We use a class because we need reference semantics (can be cancelled from anywhere) and we need to manage state (has the value been sent? is it cancelled?). It of course is a generic since the subscriber can be of any type.

Within our class, besides implementing the request() and cancel() functions from the Subscription protocol, we also need some stored properties to keep track of the subscriber it is sending values to and the value to send:

private class JustSubscription<S: Subscriber>: Subscription where S.Input == Output, S.Failure == Never {
     private var subscriber: S?  // Optional so we can set it to nil when cancelled
     private let value: Output

     init(value: Output, subscriber: S) {
        self.value = value
        self.subscriber = subscriber
    }
}

Now, let's implement the request() function. Remember that when the subscriber requests data, we need to:

  1. Send the value via subscriber.receive(value)
  2. Send completion via subscriber.receive(completion: .finished)
  3. Clean up by setting subscriber to nil
func request(_ demand: Demand) {
    guard let subscriber = subscriber else {
        return  // Already sent or cancelled
    }

    _ = subscriber.receive(value)
    subscriber.receive(completion: .finished)

    self.subscriber = nil
}

Notice we ignore the Demand returned by receive(value). Why? Because Just only has one value to send anyway. There's nothing more to give, so any additional demand is irrelevant.

For the cancel() function, we just need to clean up:

func cancel() {
    subscriber = nil
}

Finally, back in our subscribe function in the Just struct, we create the subscription and hand it to the subscriber:

func subscribe<S: Subscriber>(_ subscriber: S) where S.Input == Output, S.Failure == Never {     
    let subscription = JustSubscription(
        value: value, 
        subscriber: subscriber
    )
    
    subscriber.receive(subscription: subscription)
}

So combining all the code, we have created our first publisher!!

struct Just<Output>: Publisher {
    typealias Failure = Never
    
    let value: Output
    
    init(_ value: Output) {
        self.value = value
    }
    
    func subscribe<S: Subscriber>(_ subscriber: S) 
        where S.Input == Output, S.Failure == Never {
        
        let subscription = JustSubscription(
            value: value,
            subscriber: subscriber
        )
        
        subscriber.receive(subscription: subscription)
    }
    
    private class JustSubscription<S: Subscriber>: Subscription 
        where S.Input == Output, S.Failure == Never {
        
        private var subscriber: S?
        private let value: Output
        
        init(value: Output, subscriber: S) {
            self.value = value
            self.subscriber = subscriber
        }
        
        func request(_ demand: Demand) {
            guard let subscriber = subscriber else {
                return
            }
            
            _ = subscriber.receive(value)
            subscriber.receive(completion: .finished)
            
            self.subscriber = nil
        }
        
        func cancel() {
            subscriber = nil
        }
    }
}

Building Our First Subscriber: Sink

Now that we have Just publishing data, we need something to receive it. Sink is the most commonly used subscriber in Combine. It's a simple subscriber that executes closures when it receives values and completions.

Here is what we need Sink to do:

  1. Conform to the Subscriber protocol to receive data
  2. Store closures for handling values and completions
  3. Conform to Cancellable so users can cancel the subscription
  4. Request data when it receives the subscription
  5. Execute the appropriate closures when data or completion arrives

In order to conform to the Subscriber protocol, we need to implement all of the protocol's functions. Also, like Just, we need to have some stored properties. We need to keep track of received values, completions, and a reference to the subscription.

However, the stored properties for received values and completions should be closures though! This is because it doesn't know what your want to do with the data. When the data arrives, should we print it? save it to a database? update some UI? Sink has no idea! That's why you pass in closures. They are your custom instructions for what to do when data arrives.

Without closures, you'd need to subclass Sink every time you wanted different behavior. With closures, Sink stays generic and flexible so you can just "plug in" the behavior you want at the call site.

With that in mind, we can begin coding up our class:

class Sink<Input, Failure: Error>: Subscriber, Cancellable {
    
    private let receiveValue: (Input) -> Void
        
    private let receiveCompletion: (Completion<Failure>) -> Void
    
    private var subscription: Subscription?

    init(
        receiveCompletion: @escaping (Completion<Failure>) -> Void,  // Note: The `@escaping` keyword tells Swift that "This closure will be called after the function that receives it has finished executing."
        receiveValue: @escaping (Input) -> Void
    ) {
        self.receiveCompletion = receiveCompletion
        self.receiveValue = receiveValue
    }
}

Now, we can implement our three receive() functions.

First, receive(subscription:) is called when the publisher acknowledges our subscription:

func receive(subscription: Subscription) {
    self.subscription = subscription  // store reference to subscription
    subscription.request(.unlimited)
}

We store the subscription and immediately request unlimited data.

Why unlimited? Because Sink is designed for simple cases where you just want all the data without complex back-pressure control.

Next, receive(_ input:) is called when the publisher sends data:

func receive(_ input: Input) -> Demand {
    receiveValue(input)  // Call the user's closure
    return .unlimited    // Keep requesting more
}

We simply call the closure that was passed in during initialization and return .unlimited to continue receiving all data.

Then, receive(completion:) is called when the stream ends:

func receive(completion: Completion<Failure>) {
    receiveCompletion(completion)  // Call the user's closure
    subscription = nil              // Clean up
}

We call the completion handler and set the subscription to nil since the stream is done.

Finally, we implement Cancellable to allow manual cancellation:

func cancel() {
    subscription?.cancel()
    subscription = nil
}

That's the core Sink implementation! But there's one more thing—we want a convenient way to use it. Nobody wants to write:

let sink = Sink<Int, Never>(
    receiveCompletion: { _ in },
    receiveValue: { print($0) }
)
publisher.subscribe(sink)

Instead, we want to write:

publisher.sink { print($0) }

To do this, we will add convenience extensions on Publisher.

Convenience Functions

We can start by writing one for publishers that can fail (so not Just)

extension Publisher {
    func sink(
        receiveCompletion: @escaping (Completion<Failure>) -> Void,
        receiveValue: @escaping (Output) -> Void
    ) -> Cancellable {
        let sink = Sink<Output, Failure>(
            receiveCompletion: receiveCompletion,
            receiveValue: receiveValue
        )
        
        subscribe(sink)
        
        return sink
    }
}

What this does is that it will create a Sink object with the provided closures and return it as a Cancellable. It hides the Sink creation and subscription details. Now, users can write:

Just(8).sink(
    receiveCompletion: { completion in
        print("Done!")
    },
    receiveValue: { value in
        print("Got: \(value)")
    }
)

However, this may even be more than we need for publishers like Just that cannot fail. Thus, we can code up something even simpler to use:

extension Publisher where Failure == Never {
    func sink(receiveValue: @escaping (Output) -> Void) -> Cancellable {
        return sink(
            receiveCompletion: { _ in },  // Ignore completion
            receiveValue: receiveValue
        )
    }
}

Now we can write the simpler syntax we want:

Just(42).sink { value in
    print("Got: \(value)")
}

// Or even shorter:
Just(42).sink { print($0) }

Combining everything we have, we get the full code for our Sink subscriber:

class Sink<Input, Failure: Error>: Subscriber, Cancellable {
    
    private let receiveValue: (Input) -> Void
        
    private let receiveCompletion: (Completion<Failure>) -> Void
    
    private var subscription: Subscription?
    
    init(
        receiveCompletion: @escaping (Completion<Failure>) -> Void,
        receiveValue: @escaping (Input) -> Void
    ) {
        self.receiveCompletion = receiveCompletion
        self.receiveValue = receiveValue
    }
    
    func receive(subscription: Subscription) {
        self.subscription = subscription
        subscription.request(.unlimited)
    }
    
    func receive(_ input: Input) -> Demand {
        receiveValue(input)
        return .unlimited
    }
    
    func receive(completion: Completion<Failure>) {
        receiveCompletion(completion)
        subscription = nil
    }
    
    func cancel() {
        subscription?.cancel()
        subscription = nil
    }
}

extension Publisher {
    func sink(
        receiveCompletion: @escaping (Completion<Failure>) -> Void,
        receiveValue: @escaping (Output) -> Void
    ) -> Cancellable {
        let sink = Sink<Output, Failure>(
            receiveCompletion: receiveCompletion,
            receiveValue: receiveValue
        )
        
        subscribe(sink)
        
        return sink
    }
    
    // For publishers that cannot fail, like `Just`
    func sink(receiveValue: @escaping (Output) -> Void) -> Cancellable
        where Failure == Never {
        return sink(
            receiveCompletion: { _ in },
            receiveValue: receiveValue
        )
    }
}

Building Our First Operator: Map

Now we have a publisher (Just) and a subscriber (Sink), but the data just flows straight through. What if we want to transform it? That's where operators come in!

Map is one of the simplest and most useful operators in Combine. It transforms each value from the upstream publisher using a function you provide, and sends that transformed data to the downstream subscriber. Think of it like the .map function on arrays, but for streams of data.

The data flow pipeline with the operator works like this:

  1. The subscriber will subscribe to the operator (operator acts like a publisher)
  2. The operator will subscribe to the publisher (operator acts like a subscriber)
  3. The publisher sends the data down to the operator, where it is transformed
  4. The operator then sends the data down to the subscriber

A key insight that we now have is that operators have to act as both a publisher AND subscriber!

However, we can't make one type conform to both Publisher and Subscriber with different Input/Output types. So operators use a clever pattern: an outer type that's a Publisher and an inner type that's a Subscriber.

Let's start with the outer type. Map needs to be a Publisher:

struct Map<Upstream: Publisher, Output>: Publisher {
    typealias Failure = Upstream.Failure  // Upstream.Failure b/c operator just passes failure down
    
    let upstream: Upstream  // The publisher we are transforming
    let transform: (Upstream.Output) -> Output

    func subscribe(???)
}

Taking a deeper look at the code above. Our Map operator takes in some publisher and outputs a generic Output type. It refers to that publisher as upstream. Transform is the function that changes Upstream.Output to Output (the output from the Publisher type to the input of the Subscriber type).

Now to finish adhereing to the Publisher protocol, we need to implement the subscribe() function. Within this function, we need to handle connecting our pipeline so that the subscriber can communicate with the operator and publisher. That means we will need to create our Inner type that is able to subscribe to the publisher and send the received subscription downstream to the subscriber so that all components of our pipeline are connected.

func subscribe<S: Subscriber>(_ subscriber: S) where S.Input == Output, S.Failure == Upstream.Failure {
    let inner = Inner(
        downstream: subscriber,
        transform: transform
    )
    
    upstream.subscribe(inner)
}

Now for coding up the Inner subscriber. It's purpose will be to subscribe to the upstream publisher, transform the data, and send it downstream to the subscriber.

We can start by writing the function definition:

private class Inner<Downstream: Subscriber>: Subscriber where Downstream.Input == Output, Downstream.Failure == Upstream.Failure

Basically, Inner is a subscriber that takes on the type of the downstream subscriber and verifies that the Input type of the subscriber matches the transformed output type from Map.

We can now write the functions from the Subscriber protocol within Inner:

private class Inner<Downstream: Subscriber>: Subscriber where Downstream.Input == Output, Downstream.Failure == Upstream.Failure {
    typealias Input: Upstream.Output  // Input type must match publisher output type
    typealias Failure = Upstream.Failure

    private let downstream: Downstream
    private let transform: (Upstream.Output) -> Output

    init(downstream: Downstream, transform: @escaping (Upstream.Output) -> Output) {
        self.downstream = downstream
        self.transform = transform
    }

    // Sends subscription to downstream subscriber to complete pipeline connection
    func receive(subscription: Subscription) {
        downstream.receive(subscription: subscription)
    }

    // Transforms data from publisher and sends it downstream to subscriber
    func receive(_ input: Upstream.Output) {
        let transformed = transform(input)
        return downstream.receive(transformed)  // Returns demand from subscriber
    }

    // Forwards completion to downstream subscriber
    func receive(completion: Completion<Upstream.Failure>) {
        downstream.receive(completion: completion)
    }
}

We should also put our map operator in a namespace to prevent name conflicts (many libraries have types called "Map") and to group all operator types together. While this isn't technically necessary, it will help keep our code organized.

enum Publishers {  // <-- Name of namespace
    struct Map<Upstream: Publisher, Output>: Publisher {
        // ... our implementation
    }
}

Finally, we also want a convenience function for clean syntax to use in our code:

extension Publisher {
    func map<T>(_ transform: @escaping (Output) -> T) -> Publishers.Map<Self, T> {
        return Publishers.Map(upstream: self, transform: transform)
    }
}

We make Map an extension Publisher because publishers are what you operate on and chain together. The methods either return new publishers (for chaining) or create subscribers and connect them (to end the chain).

Now with everything put together, we get this:

extension Publisher {
    
    func map<T>(_ transform: @escaping (Output) -> T) -> Publishers.Map<Self, T> {
        return Publishers.Map(upstream: self, transform: transform)
    }
}

enum Publishers {
    
    struct Map<Upstream: Publisher, Output>: Publisher {
        
        typealias Failure = Upstream.Failure
        
        let upstream: Upstream
        let transform: (Upstream.Output) -> Output
        
        func subscribe<S: Subscriber>(_ subscriber: S) where S.Input == Output, S.Failure == Upstream.Failure {
            let inner = Inner(
                downstream: subscriber,
                transform: transform
            )
            
            upstream.subscribe(inner)
        }
        
        private class Inner<Downstream: Subscriber>: Subscriber where Downstream.Input == Output, Downstream.Failure == Upstream.Failure {
            typealias Input = Upstream.Output
            typealias Failure = Upstream.Failure
            
            private let downstream: Downstream
            private let transform: (Upstream.Output) -> Output
            
            init(downstream: Downstream, transform: @escaping (Upstream.Output) -> Output) {
                self.downstream = downstream
                self.transform = transform
            }
            
            func receive(subscription: Subscription) {
                downstream.receive(subscription: subscription)
            }
            
            func receive(_ input: Upstream.Output) -> Demand {
                let transformed = transform(input)
                
                return downstream.receive(transformed)
            }
            
            func receive(completion: Completion<Upstream.Failure>) {
                downstream.receive(completion: completion)
            }
        }
    }
}

Putting It All Together

Lets say we have Just -> Map -> Sink:

let _ = Just(10)
    .map { $0 * 2 }
    .sink { value in
        print("Doubled: \(value)")
    }

Let's go through the entire pipeline and how it works.

Sink will first call subscribe() in the outer type. That function will then make sure that the type sink takes in—a.k.a its Input type—matches Map's Output type (the transformed type we're sending), and also verify that the subscriber's Failure type matches the upstream publisher's Failure type (since Map doesn't transform errors, just passes them through). The subscribe() function also creates the Inner subscriber which is used to subscribe to the publisher. In this case, Inner subscribes to Just. Then, Just will create a subscription using JustSubscription and then sends that back to Inner using its receive(subscription) function. Within that function, Inner will pass that subscription down to Sink. Thus, the full, connected pipeline is formed which means data can now be sent. After Sink stores that subscription using its own receive(subscription) function, in which it will send a demand using that subscription created. Sink sends that demand by calling the request() function in JustSubscription. That tells the Just publisher to send that data to the Map operator. Map receives that data using its receive(input) function, where it actually transforms it and then sends it downstream to Sink. That function in Sink then returns, which propagates back to Just where it then calls receive(completion) from Inner. Within Inner, it just sends that completion down to Sink. Finally, Sink will clean the subscription up by setting it to nil.

Conclusion (Not Really)

Wow. You made it all the way here where we went over the basics of Combine, the protocols, and even implemented our own publisher, subscriber, and operator! This forms the basis of Combine and how reactive programming is handled in Swift. By this point, you probably have a better understanding of how this all works and you can stop reading here. However, if you want to continue to see other operators and such being implemented, feel free to keep reading!

Read More

Ultimate Binary Search Guide With 10+ Examples
#technical
Companies Are Hiding Secret Messages in Their Robots.txt Files
#xd#technical
Data Structures Tier List
#xd#technical
Emotional Intelligence > Work Intelligence
#self-improvement#life