Asynchronous Generators in Python
In this lesson you’ll learn how to create an asynchronous generator:
async def square_odds(start, stop):
for odd in odds(start, stop):
await asyncio.sleep(2)
yield odd ** 2
You’ll also see how to loop over values asynchronously using an async for
loop.
00:00
One of the last theoretical things I want to talk about is asynchronous generators. Asynchronous generators are basically an amalgam of this odds()
function and randn()
, meaning it’s a generator in that it produces values, but it’s asynchronous in that when it produces the values, the values get produced asynchronously, which means the first value may be 1 second, the second value may come out 10 seconds later, the third value may come out 7.3 seconds later.
00:27
So the values come out at different times, okay? The best way to show this is just an example, so I’m going to say async
. It’s a function, it’s an asynchronous generator, so I’m going to use async
. def
means I’m going to create a function, and I’m going to call this thing square_odds()
,
00:47
because I’m going to take some odd value and then I’m going to square it. I’m going to pass in some start
value, some stop
value, and I’m going to leverage this existing generator up here.
01:00
I’m going to say for odd in odds()
and I’m going to pass in my start
, my stop
. Now, I’m going to wait some amount of time asynchronously, so I’m going to say await
. This is what makes this function asynchronous, is that I’m using await
.
01:27
And I’m going to sleep()
, and let’s say I’m going to sleep()
2
seconds. But what I’m doing right here is I’m pretending that I’m talking to a database or a file system or a web server—this is what I’m actually simulating here in this step right there.
01:44
So, imagine me calling a website or getting back some value from a database—that’s what that could be. And then I’m just going to return the value, okay? So I’m going to say yield
—I’m going to yield out, because I’m using a generator, so I’m using yield
.
02:00
So I’m using yield
like I did in the generator, but I’m using await
like I did in this coroutine, this asynchronous function. So that’s why this is like a mixture of the other two.
02:11
And I’m just going to yield that odd ** 2
(squared), because it’s square_odds()
.
02:19
Okay, now that I have this square_odds()
asynchronous generator, let’s go ahead and execute it, or cause it to run. So normally, when you have a generator you would just use what’s called a for
in
loop. Because this is an asynchronous generator, you’re going to use an async
for
in
loop.
02:36
So you’re going to say async for
, let’s say so
(square odd) in square_odds()
. I’m going to pass in some start
value, let’s say 11
, and some stop
value, say 17
,
02:51
colon (:
), and then let’s say print('so:', so)
.
02:59 So this allows me to loop over something that’s asynchronous, which means I could be calling an API, talking to a database, writing to a file system—you don’t really know how long it’s going to take.
03:11
And so when you’re looping over something that’s asynchronous, creating this async for
is basically the perfect solution for that. So let’s see if this actually works.
03:22 This was my output from the initial generator.
03:28
That was from the coroutines. And then this is my square_odds()
. So 11 squared, 12 squared—or sorry, 11
, 13
, 15
, and then 17
.
03:40 So that’s the square of all those, and then it finished. So yes, you can see I was able to loop asynchronously over those values as they came in one after the other.
03:53 So, this kind of wraps up the theoretical part of the lecture. What’s going to happen next is I’m going to go and we’re going to build a very tiny application that kind of puts all this stuff together.
tevakrief on Sept. 12, 2019
Hello ! Same question as karncx, why does it take 2s each time ?
Thanks !
Hilman on Feb. 8, 2020
My understanding is that asyncio.gather is the one that responsible for the coroutine which basically means, do the tasks together. As for the asynchronous generator, it just show an example of one async task (a generator in this case).
You can try to run multiple asynchronous coroutine-ly. Maybe change the code to something like this:
async def square_odds(start, stop):
for odd in odds(start, stop):
await asyncio.sleep(2) # talking to database simulation
yield odd ** 2
async def for_async(func_id):
async for so in square_odds(11, 17):
print(f"({func_id}) so: {so}")
await asyncio.gather(*(for_async(f"for_loop {i}") for i in range(3)))
rneilsen on Sept. 7, 2021
I didn’t really understand the point of this example at all. I can’t see what was happening asyncronously; it just seems to wait 2 seconds between delivering each result. It feels like the exact same thing would happen if we didn’t have any async stuff at all, just a function that spits out answers with a 2 second wait.
Bartosz Zaczyński RP Team on Sept. 7, 2021
@rneilsen Asynchronous processing is most suitable for handling so-called I/O-bound tasks, which spend most of their time waiting for data to arrive. Suppose you were a web browser that needed to download several resources, including images, style sheets, and script files, before rendering the page. To minimize the total wait time, you could start fetching all files at once without waiting until the previous one finishes completely.
Here’s an example that demonstrates this:
$ python example.py
Downloaded index.html
Downloaded style.css
Downloaded app.js
Downloaded logo.png
Synchronous time: 2.80 seconds
Downloaded app.js
Downloaded logo.png
Downloaded index.html
Downloaded style.css
Asynchronous time: 0.88 seconds
And here’s the corresponding code:
# example.py
import asyncio
import functools
import random
import time
async def main():
files = ["index.html", "style.css", "app.js", "logo.png"]
await synchronous(files)
await asynchronous(files)
def timed(function):
@functools.wraps(function)
async def wrapper(*args, **kwargs):
t1 = time.perf_counter()
result = await function(*args, **kwargs)
t2 = time.perf_counter()
name = function.__name__.title()
print(f"{name} time: {(t2 - t1):.2f} seconds\n")
return result
return wrapper
@timed
async def synchronous(files):
for file in files:
await download(file)
@timed
async def asynchronous(files):
tasks = [download(f) for f in files]
await asyncio.gather(*tasks)
async def download(file):
await asyncio.sleep(random.random())
print(f"Downloaded {file}")
if __name__ == '__main__':
asyncio.run(main())
The synchronous code downloads the files one by one, whereas the asynchronous code kicks off the download of all the files simultaneously.
rneilsen on Sept. 7, 2021
I understand all that stuff. I don’t understand the specific square_odds example in this video. He made an async square_odds generator with a 2 second sleep timer, and then called it in an async for
loop, and it just delivered its results one at a time, 2 seconds apart. The async for
part didn’t seem to accomplish anything at all.
Bartosz Zaczyński RP Team on Sept. 8, 2021
@rneilsen Ah, I get it. Sorry for the misunderstanding.
You’re right. There’s no difference between having an async for
and a regular for
loop in this example. Both would work the same way by iterating a sequence without an apparent benefit because the async for
runs sequentially instead of concurrently.
However, you’ll need the async for
to iterate over an asynchronous generator, which might be sucking up data from a database or the network asynchronously. The plain old for
loop just wouldn’t interoperate with such a generator at all.
On the other hand, if you’d like to run multiple async generators in parallel, then it gets more tricky. One example you’ll find on StackOverflow turns your asynchronous generators into parallel tasks:
import asyncio, random, time
def synchronous_generator(start, stop):
for number in range(start, stop + 1):
time.sleep(random.random())
yield number
async def asynchronous_generator(start, stop):
for number in range(start, stop + 1):
await asyncio.sleep(random.random())
yield number
def merge_async_iters(*aiters):
queue = asyncio.Queue(1)
async def drain(aiter):
async for item in aiter:
await queue.put(item)
async def merged():
while not all(task.done() for task in tasks):
yield await queue.get()
tasks = [asyncio.create_task(drain(aiter)) for aiter in aiters]
return merged()
async def main():
# No apparent difference between sync and async:
for number in synchronous_generator(1, 5):
print(number)
async for number in asynchronous_generator(1, 5):
print(number)
# Running the iterations in parallel:
gen1 = asynchronous_generator(1, 5)
gen2 = asynchronous_generator(10, 15)
async for number in merge_async_iters(gen1, gen2):
print(number)
if __name__ == '__main__':
asyncio.run(main())
In another answer, someone shows how you can also use the aiostream
library if you don’t mind having an external dependency in your project.
Become a Member to join the conversation.
karncx on Aug. 30, 2019
Why does the async for take number of steps * 2 seconds? Why does it not save time like asyncio.gather?