In an earlier publish, I wrote about other ways that you would be able to bridge your current asynchronous code over to Swift’s new Concurrency system that leverages async / await. The mechanisms proven there work nice for code the place your code produces a single end result that may be modeled as a single worth.
Nevertheless in some circumstances this isn’t attainable as a result of your current code will present a number of values over time. That is the case for issues like obtain progress, the consumer’s present location, and different comparable conditions.
Usually talking, these sorts of patterns can be modeled as AsyncSequence
objects that you would be able to iterate over utilizing an asynchronous for loop. A primary instance of this could be the strains
property on URL
:
let url = URL(string: "https://donnywals.com")!
for strive await line in url.strains {
// use line
}
However what’s the easiest way to construct your individual async sequences? Implementing the AsyncSequence
protocol and constructing your on AsyncIterator
sounds tedious and error-prone. Fortunately, there’s no purpose so that you can be doing any of that.
On this publish, I’ll present you how one can leverage Swift’s AsyncStream
to construct customized async sequences that produce values everytime you want them to.
Producing a easy async stream
An async stream might be produced in numerous methods. The best strategy to create an async stream is to make use of the AsyncStream(unfolding:)
initializer. Its utilization seems to be a bit as follows:
let stream = AsyncStream(unfolding: {
return Int.random(in: 0..<Int.max)
})
In fact, this instance isn’t significantly helpful by itself nevertheless it does present how easy the idea of AsyncStream(unfolding:)
is. We use this model of AsyncStream
each time we are able to produce and return return values for our async stream. The closure that’s handed to unfolding
is async
so which means we are able to await
asynchronous operations from inside our unfolding closure. Your unfolding closure might be known as each time you’re anticipated to start producing a price in your stream. In apply which means your closure might be known as, you carry out some work, you come a price after which your closure known as. This repeats till the for loop is cancelled, the duty that accommodates your async for loop is cancelled, or till you come nil
out of your unfolding closure.
The AsyncStream(unfolding:)
strategy to produce a stream of values is sort of handy nevertheless it’s significantly helpful in conditions the place:
- You need to carry out async work that must be awaited to provide components
- You will have a have to deal with again strain when bridging an API you personal
While you’re bridging an current API that’s based mostly on delegates or for APIs that leverage callbacks to speak outcomes, you most likely received’t be capable to use AsyncStream(unfolding:)
. Whereas it’s the only and least error-prone strategy to construct an async stream, it’s additionally the best way that I’ve discovered to be most limiting and it doesn’t typically match properly with bridging current code over to Swift Concurrency.
Extra flexibility might be discovered within the continuation based mostly API for AsyncStream
.
Producing an async stream with a continuation
When an asynchronous closure doesn’t fairly suit your use case for creating your individual async stream, a continuation based mostly method is perhaps a significantly better answer for you. With a continuation you might have the power to assemble an async stream object and ship values over the async stream each time values grow to be accessible.
We are able to do that by creating an AsyncStream
utilizing the AsyncStream(construct:)
initializer:
let stream2 = AsyncStream { cont in
cont.yield(Int.random(in: 0..<Int.max))
}
The instance above creates an AsyncStream
that produces a single integer worth. This worth is produced by calling yield
on the continuation. Each time we’ve a price to ship, we must always name yield
on the continuation with the worth that we need to ship.
If we’re constructing an AsyncStream
that wraps a delegate based mostly API, we are able to maintain on to our continuation within the delegate object and name yield
each time a related delegate methodology known as.
For instance, we may name continuation.yield
from inside a CLLocationManagerDelegate
each time a brand new consumer location is made accessible to us:
class AsyncLocationStream: NSObject, CLLocationManagerDelegate {
lazy var stream: AsyncStream<CLLocation> = {
AsyncStream { (continuation: AsyncStream<CLLocation>.Continuation) -> Void in
self.continuation = continuation
}
}()
var continuation: AsyncStream<CLLocation>.Continuation?
func locationManager(_ supervisor: CLLocationManager, didUpdateLocations places: [CLLocation]) {
for location in places {
continuation?.yield(location)
}
}
}
The instance above is a really naive place to begin for creating an async stream of consumer places. There are a few issues we don’t absolutely have in mind similar to cancelling and beginning location statement or asking for location permissions.
At its core although, this instance is a good place to begin for experimenting with async streams.
Observe that this method won’t watch for customers of your async stream to devour a price absolutely earlier than you possibly can ship your subsequent worth down the stream. As an alternative, all values that you simply ship might be buffered in your async stream by default which can or will not be what you need.
In sensible phrases which means whenever you ship values down your stream quicker than the consuming for loop can course of these values, you’ll find yourself with a buffer crammed with values that might be delivered to the consuming for loop with a delay. This is perhaps precisely what you want, but when the values you ship are considerably time delicate and ephemeral it might doubtlessly make sense to drop values if the consuming for loop isn’t able to obtain values.
We may resolve that we by no means need to maintain on to greater than 1 location and that we solely need to buffer the final recognized location to keep away from processing stale information. We are able to do that by setting a buffering coverage on our async stream:
lazy var stream: AsyncStream<CLLocation> = {
AsyncStream(bufferingPolicy: .bufferingNewest(1)) { (continuation: AsyncStream<CLLocation>.Continuation) -> Void in
self.continuation = continuation
}
}()
This code passes a bufferingPolicy
of .bufferingNewest(1)
to our AsyncStream
. Which means we are going to solely buffer a single worth if the consuming for loop isn’t processing gadgets quick sufficient, and we are going to discard older values in favor of preserving solely the most recent location.
If our stream involves a pure shut, you possibly can name end()
in your continuation to finish the stream of values.
In case your stream would possibly fail with an error, you may also select to create an AsyncThrowingStream
as an alternative of an AsyncStream
. The important thing distinction is that buyers of a throwing stream should await new values utilizing strive await
as an alternative simply await
. To make your stream throw an error you possibly can both name end(throwing:)
in your continuation or you possibly can name yield(with:)
utilizing a Outcome
object that represents a failure.
Whereas the fundamentals of constructing an AsyncStream
aren’t significantly advanced, we do want to consider how we handle the lifecycles of the issues we create rigorously. Particularly as a result of we’re not imagined to make our continuations outlive our streams which is a very simple mistake to make whenever you’re bridging current delegate based mostly code.
Managing your stream’s lifecycle
There are basically two methods for an async stream to finish. First, the stream would possibly naturally finish producing values as a result of no additional values might be produced. You’ll name end
in your continuation and you may present any cleanup that you might want to do on the identical time. For instance, you could possibly set the continuation that you simply’re holding on to to nil
to be sure to can’t by accident use it anymore.
Alternatively, your stream can finish as a result of the duty that’s used to run your async stream is cancelled. Contemplate the next:
let places = AsyncLocationStream()
let process = Job {
for await location in places.stream {
print(location)
}
}
process.cancel()
When one thing just like the above occurs, we are going to need to ensure that we don’t name yield
on our continuation anymore except we begin a brand new stream with a brand new, energetic, continuation.
We are able to detect and reply to the tip of our stream by setting an onTermination
handler on our continuation:
self.continuation?.onTermination = { end in
print(end result)
self.continuation = nil
}
Ideally we set this handler instantly after we first create our async stream.
Along with the stream being cancelled or in any other case going out of scope, we may break
out of our loop which can finally trigger our process to complete. That is usually talking not one thing it will finish your async stream so if you need breaking out of your loop to finish your stream, you have to to take this under consideration your self.
Personally, I’ve discovered that the simplest strategy to be sure to do some cleanup is to have some methodology in your stream producing object to cancel the stream as an alternative of simply breaking out of an async for loop. That method, you possibly can carry out cleanup and never have a stream that’s sending values although no one is listening.
It’s additionally vital to keep in mind that the sample I confirmed earlier will solely work if one shopper makes use of your location stream object. You can not have a number of for loops iterating over a single stream in Swift Concurrency as a result of by default, async sequences lack the power to share their iterations with a number of loops.
In Abstract
On this publish, you realized loads about async streams and how one can produce your individual async sequences. First, you noticed the unfolding
method of constructing an async stream and also you realized that this method is comparatively easy however won’t be very helpful for those who have to bridge current delegate or callback based mostly APIs.
After exploring unfolding
for a bit, we took a take a look at the construct
closure for async streams. You realized that this method leverages a continuation object that may be known as to provide values if and when wanted.
You noticed a really rudimentary instance of an object that may bridge a CLLocationManager
into async await, and also you realized a however about appropriately managing your continuations to forestall sending values into an already accomplished stream.
If in case you have any questions or feedback for me about this publish, please be happy to succeed in out on Twitter or on Mastodon.