Python Concurrency with asyncio
Contents
A week ago, I was working on a project that involved calling a REST API end
point 32 million times to retrieve certain type of documents. The input to the
API was a presigned URL that had a validity of few days. Hence I did not have
the luxury of doing things in sequential manner. A rough calculation for the
time taken to perform the task using a simple for
loop made me realize that
the task is a nice little use case for parallelizing. That’s when I started
looking at asyncio
. In the first go at my task, I ventured along with a
standard approach of using multithreading
functions in python. However there
was always an itch to see if I could get better performance using ayncio
and
multithreading
. The book titled “Python Concurrency with asyncio” written by
“Matthew Fowler” helped me understand the basics of concurrent and parallel
computing with asyncio
. Subsequently I went back and performed the task of
pinging an API 32 million times to retrieve 32 million json documents using
asyncio
and multithreading
. In this post, I will summarize a few chapters
that I found it useful to get my work done.
Getting to know asyncio
asyncio
was introduced in Python 3.4 as an additional way to handle highly
concurrent workloads outside of multithreading and multiprocessing. The chapter
introduces the concept of concurrency and parallelism using an example of baking
a cake. Concurrency means that the subtasks involved in doing a certain task are
not done in a sequential manner but are being done by switching between tasks.
Parallelism means that subtasks are being done concurrently, but they are also
executed at the same time.
The difference between concurrency and parallelism is illustrated via baking a cake situation
There are two types of multitasking scenarios: Preemptive multitasking and Cooperative multitasking. In the preemptive one, we let the OS decide how to switch between which work is currently being executed via a process called time slicing. When the OS switches between work, we call it preempting. In Cooperative multitasking, we explicitly code points in our app where we can let other tasks run. The tasks in our application operate in a cooperative mode
asyncio
uses cooperative multitasking to achieve concurrency. When our
application reaches a point where it could wait a while for a result to come
back, we explicitly mark this in code. This allows other code to run while we
wait for the result to come back in the background.
The chapter gives a good primer on multiprocessing and multithreading by walking through a few code snippets.
A process is an application run that has a memory space that other applications cannot access
Threads can be thought of as lighter-weight processes. They do not have their own memory as does a process; instead, they share the memory of the process that created them. Threads are associated with the process that created then. A process will always have at least one thread associated with it, known as the main thread. A process can also create other threads called as worker threads. These threads can perform other work concurrently alongside the main thread.
A good visual to highlight the difference between multi-threaded program and multiprocessing program
GIL is the most infamous topic in the Python community. It prevents one python process from executing more that one Python bytecode instruction at any given time. This means that even if we have multiple threads on multiple cores, Python can have only one thread running Python code at once.
Why does GIL exist ?
The answer lies in how memory is managed in CPython. In CPython, the memory is managed primarily by a process known as reference counting. Reference counting works by keeping track of who currently needs access to a particular Python object. A reference count is an integer keeping track of how many places reference that particular object. When reference count reaches zero, no one is referencing the object, it can be deleted from the memory
The conflict with threads arises in the that the implementation in CPython is not thread safe. If two or more threads modify a shared variable, that variable may end in an unexpected state.
Is the GIL ever released?
GIL is released when I/O operation happens. This lets us employ threads to do concurrent work when it comes to I/O but not for CPU-bound Python code itself
Why is GIL released only for I/O operations but not CPU-bound operations ?
The answer lies in the system calls that are made in the background. In the case of I/O, the low-level system calls are outside of the Python runtime. This allows GIL to be released because it is not interacting with Python objects directly. GIL is only reacquired when the data received is translated back in to a Python object. Then, at the OS level, the I/O operations execute concurrently. This model gives us concurrency but not parallelism. In Python, because of GIL, the best we can do is concurrency of our I/O operations
asyncio
exploits the fact that I/O operations release the GIL to give us
concurrency even with only one thread. When we utilize asyncio we create objects
called coroutines. A coroutine can be thought of as executing a lightweight
thread. Much like we can have multiple threads running at the same time, each
with their own concurrent I/O operation, we can have many coroutines running
alongside one another. When we are waiting for our I/O bound coroutines to
finish, we can still execute other Python code, thus, giving us concurrency. It
is important to note that asyncio does not circumvent the GIL. If we have a CPU
bound task, we still need to use multiple processes to execute it concurrently
asyncio
uses the event notification mechanism of the underlying OS to achieve
concurrency. The underlying OS uses non-blocking I/O mode for all the I/O
operations. When we hit an I/O operation, we hand it over to our OS event
notification system to keep track of it for us. Once we have done this hand-off,
our Python thread is free to keep running other Python code or add more
non-blocking sockets for the OS to keep track of for us. When our I/O operation
finishes, we “wake up” the task that was waiting for the result and then proceed
to run any other Python code that came after that I/O operation
The way asyncio keeps track of the tasks waiting for I/O notifications is via event loop
An event loop is at the heart of every asyncio application. It keeps a queue of tasks that are nothing but wrappers around a coroutine. A coroutine can pause execution when it hits an I/O bound operation and will let the event loop run other tasks that are not waiting for I/O operation to complete.
A good visual to illustrate event loop is
All the basic concepts relating to general concepts of concurrency and multiprocessing are very well explained in the first chapter using a combination of visuals and code snippets.
asyncio basics
Coroutine construct is the key element of an asyncio framework. A coroutine is like a regular python function but with the superpower that is can pause its execution when it encounters an operation that could take a while to complete. when that long running operation is complete, we can “wake up” our paused coroutine and finish executing any other code in that coroutine. While a paused coroutine is waiting for the operation it paused for to finish, we can other code. This running of other code while waiting is what gives our application concurrency.
What can one do after defining a coroutine ? One can use asyncio.run
function
to create an event related to the coroutine. await
is used to pause a
coroutine so that the control passes back to the main loop only after the
coroutine is done executing. Using async
functions along with await
and
asyncio.run
, one cannot really run a block of code concurrently, unless one
creates a task
.
Once you create a task, one usually must use an await
keyword on our tasks at
some point in the application. If we do not use await
, our task would be
scheduled to run, but it would almost immediately be stopped and cleaned up when
asyncio.run
shuts down the event loop.
There are also other functions in the asyncio
module such as cancel
,
wait_for
and shield
that can be used by the programmer so that the
application never waits forever for a certain task to get done.
In order to understand the difference between task
and coroutine
, as both
seem to be responding to wait
prefix, the chapter introduces future
and
awaitable
A future
is a Python object that contains a single value that you expect to get
at some point in the future but may not yet have. A task
can be thought of as
a combination of both a coroutine and a future. When we create a task
, we are
creating an empty future
and running the coroutine. When the coroutine has
completed with either an exception or a result, we set the result of exception
of the future
. The common thread between coroutine, task and future is that
they are all implementations of the Awaitable
abstract class.
There are two common pitfalls of using coroutines and tasks. The first one
involves running a CPU bound function using the asyncio framework. Since the CPU
bound function by default does not release GIL, asyncio library will not give
any performance gains. The second pitfall is using blocking IO libraries such as
requests
. These libraries are by default blocking and hence one cannot take
advantage of asyncio
model. It is better to use the relevant libraries that
implements non blocking sockets.
A first asyncio application
This chapter introduces a basic example of socket communication between a client and a server. By default socket communication is blocking and hence a client has to wait until the server finishes processing other requests that have come in earlier. One can of course make the socket communication non-blocking so that parallel requests could be processed by the server. However there is a cost of doing this programatically as it is generally a CPU heavy operation. Instead it is better one uses the built in APIs at operating system level.
Operating systems have efficient APIs that let us watch sockets for incoming data and other events built in. While the actual API is dependent on the operating system, all of these I/O notification systems operate on a similar concepts. We give them a list of sockets we want to monitor for events, and instead of constantly checking each socket to see if it has data, the operating system tells us explicitly when sockets have data. Because this is implemented at the hardware level, very little CPU utilization is used during the monitoring, allowing for efficient resource usage. These notification systems are the core of how asyncio achieves concurrency.
Concurrent web requests
This chapter introduces aiohttp
library that uses non-blocking sockets to make
web requests and returns coroutines for those requests, which we can then
await
for a result.
One cannot take an existing Python library and use it with async
library and
hope to get performance gains. Most libraries do not perform well with asyncio
because it uses blocking sockets. For example, the requests
library will block
the thread that it runs in, and since asyncio is single threaded, out entire
event loop will halt until that request finishes.
The first advantage that aiohttp
offers is connection pooling, i.e. creating a
reusable pool of connections that can be used for performing requests. The
chapter talks about all the important functions in the aiohttp
library.
For a while I did not understand the reason for using gather
. But reading this
chapter made the purpose of the function very clear. The function takes in a
sequence of awaitables and lets us run them concurrently, all in one line of
code. gather
will automatically wrap coroutines in to a task and put them on
an event loop.
gather
function has a few drawbacks. The first, is that it isn’t easy to
cancel our tasks if one throws an exception. The second is that we must wait for
all for all our coroutines to finish before we can process our results.
The chapter walks the reader through wait
function that gives a more
fine-grained control over the asynchronous request.
How much you get out of the chapter is completely dependent on what you want to
accomplish ? It all depends on the idiosyncrasies of the server that you are
pinging . In my case, I had to download 32 million documents from 32 million
presigned URLs, each came with a certain time before which they expired. There
was no choice for me other than to use parallel processing as going through
the simple route of using a sequential download would have made the task
infeasible. However in my case, the server had a peculiar feature that limited
the API calls per second and throttled the requests. Hence I had to use other
hacks to make sure that aiohttp
tasks were not crossing a specific threshold
of firing the requests.
Handling CPU bound work
asyncio
has an API for interoperating with Python’s multiprocessing library.
This lets us use async await
syntax as well as as asyncio
APIs with multiple
processes. Thus we can get the benefit asyncio
library even when using
CPU-bound code.
Basic idea behind multiprocessing
library
Using this library helps one to spawn separate subprocesses to handle our work. Each subprocess will have its own Python interpreter and be subject to the GIL, but instead of one interpreter we will have several, each with its own GIL. Assume we run on a machine with multiple CPU cores, this means that we can parallelize any CPU-bound workload effectively. Even if we have more processes than cores, our OS will use preemptive multitasking to allow our multiple tasks to run concurrently. This setup is both concurrent and parallel
One can use Process
function to spawn several processes and get things done in
a concurrent fashion. However it is better to use process pools, i.e. a
collection of Python processes that we can use to run functions in parallel. The
Pool
function from multiprocessing
library gives the ability to run tasks in
parallel. Using process pools are good for simple use cases, but Python offers
an abstraction on top of the multiprocessing pools via concurrent.futures
module. This module contains executors for both processes and threads that can
be used on their own but also interoperate with asyncio
.
When we submit a task to a process pool, it may not run immediately because the processes in the pool may be busy with other tasks. Ho does the process pool handle this ? In the background, a process pool executors keep a queue of tasks to manage this. When we submit a task to the process pool, its arguments are picked and put on the task queue. Then, each worked process asks for a task from the queue when it is ready for work. When a worked process pulls a task off the queue, it unpickles the arguments and begins to execute the task.
Use eventloop
The use of executors
makes the whole process far more pleasing as it abstracts
across threads and processes. However there is one limitation, the order of
iteration is deterministic based on the target function arguments. I think the main
takeaway is that one can use the event loop that asyncio relies on, and make
sure that process pool executors get a chance to use event loops. The regular
code involving process pool executors is easy to tweak so as to make it work
with event loops.
The chapter gives a quick primer on map reduce and then shows an example of using asyncio on map reduce to give it a better performance.
Shared locks
The section on shared locks is very interesting. Learned about two kinds of shared data: values and array, supported by multiprocessing module. These shared data types can be used to make sure that the processes can access global variables and in order to prevent race conditions, one can use lock features available for these shared data objects. Sharing data between process pools is a bit more nuanced but the code given in the chapter is easy to understand
Handling blocking work with threads
The organization of the chapter is similar to the previous chapter; however the
focus is on threads. If one wants to use asyncio
with legacy code that has
blocking
code, then this chapter is worth going over.
I found the discussion around race condition interesting. In the case of
threads, since they share share the memory of the process that spawned them, we
do not have to go through the route of using shared data type objects. However
to implement the lock functionality, there is a Lock
function with in the
threading module that one can use. To illustrate this point, the author uses a
URL download reporter example. The following is a slight modification to the
code that uses the Lock
functionality
|
|
After understanding the use of locks, I realized that I could have used this sort of code in my “32 million API pings” task to keep track of the progress instead of keeping track of files downloaded.
Synchronization
This chapter proved to be priceless for my task as it gave me a good
understanding of Semaphore
that I could then use it to set a limit to the
number of requests fired by API.
asyncio
synchronization primitives can help us prevent bugs unique to a
single-threaded concurrency model.
Firstly, can there be single-threaded concurrency bugs ? The chapter gives a nice example to illustrate the point that concurrency issues can happen in a single-threaded process too.
A simple example proves the point that concurrency issues can happen with
asyncio
library too
|
|
The above code throws an exception; when message_all_users
pauses, another
coroutine runs and modifies a variable that is being used.
These are the types of bugs you tend to see in a single-threaded concurrency
model. You hit a suspension point with await
, and another coroutine runs and
modifies some share state, changing it for the first coroutine once it resumes
in a undesired way. The key difference between multithreaded concurrency bugs
and single threaded concurrency bugs is that in a multithreaded application,
race conditions are possible anywhere you modify a mutable state. In a
single-threaded concurrency model, you need to modify the mutable state during
an await
point.
How can one solve the above problem ? It is via asyncio locks, that are awaitable objects that suspend coroutine execution when they are blocked. This means that when a coroutine is blocked waiting to acquire a lock, other code can run. This is very useful in situations where you want a set of coroutines not to get locked in a race condition
Semaphores
I found this functionality extremely useful in my task involving download of 32 million documents. There was a rate limit applied by the API and hence my plain and simple asyncio program was getting HTTP 429 response codes. Semaphore came to my rescue.
Semaphores acts much like a lock in that we can acquire and we can release it, with the major difference being that we can acquire it multiple times up to a limit we can specify. Internally a semaphore keeps track of this limit; each time we acquire the semaphore we decrement the limit, and each time we release the semaphore we increment it. If the count reaches zero, any further attempts to acquire the semaphore will block until someone else calls release and increments the count. Semaphore is useful whenever you are hitting an API that limits concurrent access to an endpoint.
Unfortunately using Semaphore did not solve my problem completely as the API that I was hitting had also per second limit on the number of concurrency calls. Using ONLY Semaphore can make the code bursty, meaning that is has the potential to burst 10 requests at the same moment, creating a potential spike in traffic. This may not be desirable if we are concerned about spikes of load on the API we are calling. Hence there was a hack that I found online that helped me fire API calls at a fixed number per second that uses ‘leaky bucket’ algo. Here is the link to the article and here is the code that was helpful in my task.
BoundedSemaphore
There is a variant of Semaphore called BoundedSemaphore that can be used to
get a more fine grained control on the way the lock is acquired and released.
Using Semaphore, it is valid to call release
more times than we call
acquire
. If we are always using async with
block, this situation can never
happen. However if there are situations in the app where the locks are released
with out the async with
block for whatever reason it is, one can use
BoundedSemaphore functionality to restrict the number of releases one can call
on a lock.
Takeaway
Thanks to this book, I was able to answer the following questions relating to parallel processing:
- How is
asyncio
different from the regularmultiprocessing
andconcurrent.futures
module ? - Is there a speed up in I/O bound tasks when one uses
asyncio
as compared toconcurrent.futures
on a 32 core machine ? - How do I make sure that I do not exceed the API rate limit while doing millions of request ?
- How do I make sure that I do not run out of memory because asyncio can spawn literally thousands of requests at the same time ?
- How do I use
aiohttp
to improve I/O bound tasks that involves pinging an external REST endpoint ? - How do I use
aiofiles
so that I can take a blocking code that serializes JSON response, in to a non blocking code that serializes JSON response ? - Why is it better to use a
ClientSession
inaiohttp
? - What is the performance benefit of using
aiomultiprocess
overasyncio
? - What is the performance benefit of using
asyncio
overThreadPoolExecutor
? - What is a blocking code ?
- What is a non blocking code ?
- What’s the use of
asyncio.sleep
function ? - What is a
coroutine
? - What is a
task
? - How is
coroutine
different fromtask
? - What is
awaitable
? - What is the difference between process and thread ?
- If you are using a
ThreadPoolexecutor
on a multicore CPU, will the threads run on a single core or multiple cores ? - If you are using a
ThreadPoolexecutor
on a multicore CPU, will the threads be concurrently processed ? - What does concurrent and parallel operation mean ?
- What is the default
max_ workers
in aProcessPoolExecutor
? - What is the default
max_ workers
in aThreadsPoolExecutor
? - What is
Semaphore
and how can one use it in task that involvesasyncio
library? - What is
BoundedSemaphore
and how can one use it in task that involvesasyncio
library? - Why are thread pool executors and process pool executors better to work with, as compared to thread and processes directly?
- Can different processes access to shared memory without creating a race condition ?
- How can one use
asyncio
with blocking libraries such asrequests
? - What is the limitation with GIL ?
- GIL is not much of an issue in I/O bound task - Why ?
- How does one access the event loop in a program ?
- What is the use of
wait
keyword in theasyncio
framework ? Where should one use it ? - What is the connection between generators and coroutines ?
- What are similarities and dissimilarities between generators and coroutines
- Python is multithreaded but is not simultaneously multithreaded. What does this mean ?
- Multi-threaded does not strictly mean single-core. What does this statement mean ?
- How does one avoid locks in a situation that involves thread pool executor and async code?
I found this book very useful in order to successfully accomplish my task of
making 32 million REST API calls. Clearly without the knowledge from this book,
it would have taken forever to get the task done. The author has taken pains to
explain various aspects of the library in a pretty elaborate manner, making this
book a good beginner book for understanding asyncio
library. I think this is
one of the the best Python books that I have read this year.