The first time I learned about AsyncStream I directly thought about Combine and a stream of values. In a way, you could compare both, but they’re definitely not identical. Yet, I truly believe you’ll find new ways to solve similar problems as you would with Combine, so this is going to be an interesting lesson.

Just like with task groups, there’s a throwing and a non-throwing variant of an AsyncStream. The majority of the lesson will focus on the throwing variant, but the same learnings apply to the non-throwing one.

What is an AsyncStream or AsyncThrowingStream?

You can see an AsyncThrowingStream as a stream of elements that could potentially result in a thrown error. Values deliver over time, and the stream can be closed by a finish event. A finish event could either be a success or a failure if an error occurs.

An AsyncStream is similar to the throwing variant but will never result in a throwing error. A non-throwing async stream finishes based on an explicit finished call or when the stream cancels.

How to use AsyncThrowingStream

An AsyncThrowingStream can be an excellent replacement for existing code based upon closures like progress and completion handlers. To better understand what I mean, I’ll introduce you to a scenario I’ve encountered back in the days while working on the WeTransfer app. 

In the app, we had an existing class based on closures called the FileDownloader:

struct FileDownloader {
    enum Status {
        case downloading(Float)
        case finished(Data)
    }

    func download(_ url: URL, progressHandler: @escaping (Float) -> Void, completion: @escaping (Result<Data, Error>) -> Void) throws {
        // .. Download implementation
    }
}

The file downloader takes a URL, reports progress, and completes with a result containing the downloaded data or an error on failure.

The file downloader reports a stream of values during the file download. In this case, it’s reporting a stream of status values to report the current status of the running download. The FileDownloader is a perfect example of a piece of code that you can rewrite to use AsyncThrowingStream.

Though, rewriting requires you to rewrite your code at the implementation level as well, so let’s define an overload method instead. This is a great tip in general when migrating code—use overloads. We’ll dive much deeper into this later in this course.

extension FileDownloader {
    /// Define a download overload which provides an `AsyncThrowingStream`.
    func download(_ url: URL) -> AsyncThrowingStream<Status, Error> {
        return AsyncThrowingStream { continuation in
            do {
                /// Call into the original closure-based method.
                try self.download(url, progressHandler: { progress in
                    /// Send progress updates through the stream.
                    continuation.yield(.downloading(progress))
                }, completion: { result in
                    switch result {
                    case .success(let data):
                        /// Send a finished message to the stream.
                        continuation.yield(.finished(data))

                        /// Terminate the continuation.
                        continuation.finish()
                    case .failure(let error):

                        /// Finished and terminate the continuation with the error:
                        continuation.finish(throwing: error)
                    }
                })
            } catch {
                /// Finished and terminate the continuation with the error:
                continuation.finish(throwing: error)
            }
        }
    }
}

Let’s discuss this code in detail:

To zoom into this, I’d like to take this piece of code:

switch result {
case .success(let data):
    continuation.yield(.finished(data))
    continuation.finish()
case .failure(let error):
    continuation.finish(throwing: error)
}

It’s essential to not forget about the finish() callback after you’ve received the final status update. Otherwise, we will keep the stream alive, and code at the implementation level awaiting results will never continue. Unfortunately, as of today, the compiler will not yet help us indicate this programming mistake.

We could replace the switch statement in the above code by making use of another yield method, accepting a Result enum as an argument:

///  In the .success(_:) case, this returns the associated value from the iterator’s next() method.
///  If the result is the failure(_:) case, this call terminates the stream with the result’s error, by calling finish(throwing:).
continuation.yield(with: result.map { .finished($0) })
continuation.finish()

The rewrite simplifies our code and takes away the switch case. We do have to map our Result enum to match the expected Status value. Our stream will finish after throwing the contained error if we yield a failing result.

Iterating over an AsyncThrowingStream

You can start iterating over the stream of values once you’ve configured your async throwing stream. In the case of our FileDownloader example, it will look as follows:

do {
    let fileDownloader = FileDownloader()

    for try await status in fileDownloader.download(url) {
        switch status {
        case .downloading(let progress):
            print("Downloading progress: \(progress)")
        case .finished(let data):
            print("Downloading completed with data: \(data)")
        }
    }
    print("Download finished and stream closed")
} catch {
    print("Download failed with \(error)")
}

We handle any status update, and we can use the catch closure to handle any occurred errors. You can iterate using a for ... in loop based on the AsyncSequence interface, which works the same for an AsyncStream.

The print statements in the above code example help you understand the lifecycle of an AsyncThrowingStream. You can replace the print statements to handle the progress updates and process the data to visualize it for your users. In the sample code for this module, you’ll find an actual working example of the FileDownloader which supports downloading images.

Debugging an AsyncStream

If a stream fails to report values, we could debug the stream’s yield callbacks by placing breakpoints. Though it could also be that the above “Download finished and stream closed” print statement won’t call, which means your code at the implementation level never continues. The latter could be a result of an unfinished stream.

To validate, we could make use of the onTermination callback:

func download(_ url: URL) -> AsyncThrowingStream<Status, Error> {
    return AsyncThrowingStream { continuation in

        /// Configure a termination callback to understand the lifetime of your stream.
        continuation.onTermination = { @Sendable terminationReason in
            print("Stream termination reason: \(terminationReason)")
        }

        // ..
    }
}

The callback is called on termination of the stream and will help you indicate when your stream terminates. When you cancel a task that’s iterating over an AsyncThrowingStream, and you’ve set an onTermination callback, that callback gets called right away. This gives you a chance to do any cleanup you need before the stream ends. After that, the stream either yields nil or throws an error to mark the end of the iteration. Once it’s finished, the stream clears out the termination callback.

In case of a thrown error, the output could look as follows:

Stream termination reason: finished(Optional(FileDownloader.FileDownloadingError.example))

The above output will only be possible when using an AsyncThrowingStream. In the case of a regular AsyncStream, the finished output looks as follows:

Stream termination reason: finished

While the result of cancellation looks like this for both types of streams:

Stream termination reason: cancelled

As said, you can also use this termination callback for any cleanup after the stream finishes. Examples could be removing any observers or cleaning disk space after the file download.

Canceling an AsyncStream

An AsyncStream or AsyncThrowingStream can cancel due to an enclosing task getting canceled. This is the result of context inheritance as we’ve learned in previous lessons. An example could look as follows:

let task = Task.detached {
    do {
        for try await status in download(url) {
            switch status {
            case .downloading(let progress):
                print("Downloading progress: \(progress)")
            case .finished(let data):
                print("Downloading completed with data: \(data)")
            }
        }
    } catch {
        print("Download failed with \(error)")
    }
}
task.cancel()

A stream cancels when going out of scope or when the enclosing task cancels. As mentioned before, the cancellation will trigger the onTermination callback accordingly. Therefore, the above could print something like:

Starting file download...
Downloading progress: 0.1
Stream termination reason: cancelled

There is no way to explicitly cancel a stream.

Configuring a buffer policy

By default, a stream will emit all values to a consumer by the time the values are read. This means that if you start awaiting results for a stream after all values are already emitted, you’ll still receive all values.

We can illustrate this using the following code example:

let indexStream = AsyncStream<Int> { continuation in
    (0...5).forEach { index in
        print("Yielding index \(index)...")
        continuation.yield(index)
    }

    continuation.finish()
}

/// Wait to ensure all index values are emitted.
try? await Task.sleep(for: .seconds(1))

print("Start awaiting indexes...")

/// Iterate over the stream of values.
for await index in indexStream {
    print("Received index: \(index)")
}

/* Prints out:
 Yielding index 0...
 Yielding index 1...
 Yielding index 2...
 Yielding index 3...
 Yielding index 4...
 Yielding index 5...
 Start awaiting indexes...
 Received index: 0
 Received index: 1
 Received index: 2
 Received index: 3
 Received index: 4
 Received index: 5
 */

This might not always be what you want. For example, if you start monitoring file system changes or location updates, you’re often only interested in the latest value.

For these cases, we can configure a so-called buffer policy. This policy describes what a stream should do with a value when there’s nobody awaiting the results yet.

The default buffer policy is set to .unbounded and was demonstrated in the previous code example. We could change this buffer policy to only buffer the latest values (“we only have buffer to store 1 value”):

let bufferingNewestStream = AsyncStream(bufferingPolicy: .bufferingNewest(1)) { continuation in
    (0...5).forEach { index in
        print("Yielding index \(index)...")
        continuation.yield(index)
    }

    continuation.finish()
}

/// Wait to ensure all index values are emitted.
try? await Task.sleep(for: .seconds(1))

print("Start awaiting indexes...")

/// Iterate over the stream of values.
for await index in bufferingNewestStream {
    print("Received index: \(index)")
}

/* Prints out:
 Yielding index 0...
 Yielding index 1...
 Yielding index 2...
 Yielding index 3...
 Yielding index 4...
 Yielding index 5...
 Start awaiting indexes...
 Received index: 5
 */

As you can see, by the time we start awaiting indexes, we only receive the latest of all emitted indexes.

Similarly, you can decide to only store the oldest value:

let bufferingOldestStream = AsyncStream(bufferingPolicy: .bufferingOldest(1)) { continuation in
    (0...5).forEach { index in
        print("Yielding index \(index)...")
        continuation.yield(index)
    }

    continuation.finish()
}

/// Wait to ensure all index values are emitted.
try? await Task.sleep(for: .seconds(1))

print("Start awaiting indexes...")

/// Iterate over the stream of values.
for await index in bufferingOldestStream {
    print("Received index: \(index)")
}

/* Prints out:
 Yielding index 0...
 Yielding index 1...
 Yielding index 2...
 Yielding index 3...
 Yielding index 4...
 Yielding index 5...
 Start awaiting indexes...
 Received index: 0
 */

Finally, you can decide to set the buffer policy to .bufferingNewest(0) to only receive new values emitted after you start awaiting results. This is demonstrated in the following code examples where we use a sleep to ensure the latest value emits after awaiting:

let bufferingNewerOnlyStream = AsyncStream(bufferingPolicy: .bufferingNewest(0)) { continuation in
    print("Yielding index 1...")
    continuation.yield(1)

    Task {
        try? await Task.sleep(for: .seconds(2))
        print("Yielding index 2...")
        continuation.yield(2)
        continuation.finish()
    }
}

/// Wait to ensure all index values are emitted.
try? await Task.sleep(for: .seconds(1))

print("Start awaiting indexes...")

/// Iterate over the stream of values.
for await index in bufferingNewerOnlyStream {
    print("Received index: \(index)")
}

/* Prints out:
 Yielding index 1...
 Start awaiting indexes...
 Yielding index 2...
 Received index: 2
 */

Values will be discarded right away when a buffer is set to zero. If there isn’t anyone awaiting the results, it means those values will never be read. This also applies to values that fall out of the buffer policy while nobody is reading values.

Limitations of an AsyncStream

The comparison with combine has often been made, but there’s one major difference that can completely influence whether you find it a good alternative for combine publishers.

An asynchronous stream is designed for single consumers

A stream is designed for single subscribers, while Combine publishers can have many.

Depending on your usecase, this might be fine. However, if you want to have multiple consumers for the same stream of data, you might want to look into the AsyncExtensions GitHub repository. It’s not officially supported by Apple, so I’m not going to dive into details on how the library works. However, its AsyncCurrentValueSubject allows you to write similar code to Combine while accepting multiple consumers.

To better illustrate this limitation, I’d like to look at the following code example:

let indexStream = AsyncStream { continuation in
    (0...5).forEach { index in
        continuation.yield(index)
    }

    continuation.finish()
}

Task {
    print("Index monitor 1 started...")

    for await index in indexStream {
        print("Monitor 1: \(index)")
    }
}

Task {
    print("Index monitor 2 started...")

    for await index in indexStream {
        print("Monitor 2: \(index)")
    }
}

We have a stream that returns six indexes over time. We subscribe to the stream from two different tasks and you’ll notice a different outcome each time you run this code:

Index monitor 1 started...
Index monitor 2 started...
Monitor 2: 1
Monitor 2: 2
Monitor 1: 0
Monitor 1: 4
Monitor 1: 5
Monitor 2: 3

As you can see, it’s unpredictable which consumer will receive which value. It’s important to be aware of this, since nothing stops you from resubscribing to an existing stream.

Once terminated, no more values will flow

This might make sense, but as soon as a stream gets terminated, it will no longer be able to emit any values. You can still subscribe to the stream and start waiting for values, but your new task will finish immediately since the termination event has already occurred.

To illustrate this, we can create a stream that directly terminates:

let indexStream = AsyncStream<Int> { continuation in
    continuation.onTermination = { _ in
        print("Index stream terminated.")
    }

    /// Terminate right away to demonstrate no more values will be emitted.
    continuation.finish()

    (0...5).forEach { index in
        continuation.yield(index)
    }
}

/// Wait to ensure the index stream gets terminated before monitoring.
try? await Task.sleep(for: .seconds(1))

print("Index monitor 1 started...")

for await index in indexStream {
    print("Monitor 1: \(index)")
}

print("Index monitor finished")

This will print out the following:

Index stream terminated.
Index monitor 1 started...
Index monitor finished

Summary

Both AsyncStream and AsyncThrowingStream provide a convenient way to send values over time. You can use it to replace common scenarios that you would normally solve using a Combine publisher pipeline or a combination of closure callbacks. It’s recommended to keep an eye on termination and you should be aware that streams remain alive until cancelled, failed, or finished.

In the next lesson, I’ll be giving guidance into when you should go for an AsyncStream, AsyncSequence, or regular asynchronous method.