Parallelism, Concurrency, and AsyncIO in Python - by example

Last updated October 12th, 2020

This post looks at how to speed up CPU-bound and IO-bound operations with multiprocessing, threading, and AsyncIO.

Contents

Concurrency vs Parallelism

Concurrency and parallelism are similar terms, but they are not the same thing.

Concurrency is the ability to run multiple tasks on the CPU at the same time. Tasks can start, run, and complete in overlapping time periods. In the case of a single CPU, multiple tasks are run with the help of context switching, where the state of a process is stored so that it can be called and executed later.

Parallelism, meanwhile, is the ability to run multiple tasks at the same time across multiple CPU cores.

Though they can increase the speed of your application, concurrency and parallelism should not be used everywhere. The use case depends on whether the task is CPU-bound or IO-bound.

Tasks that are limited by the CPU are CPU-bound. For example, mathematical computations are CPU-bound since computational power increases as the number of computer processors increases. Parallelism is for CPU-bound tasks. In theory, If a task is divided into n-subtasks, each of these n-tasks can run in parallel to effectively reduce the time to 1/n of the original non-parallel task. Concurrency is preferred for IO-bound tasks, as you can do something else while the IO resources are being fetched.

The best example of CPU-bound tasks is in data science. Data Scientists deal with huge chunks of data. For data preprocessing, they can split the data into multiple batches and run them in parallel, effectively decreasing the total time to process. Increasing the number of cores results in faster processing.

Web scraping is IO-bound. Because the task has little effect on the CPU since most of the time is spent on reading from and writing to the network. Other common IO-bound tasks include database calls and reading and writing files to disk. Web applications, like Django and Flask, are IO-bound applications.

If you're interested in learning more about the differences between threads, multiprocessing, and async in Python, check out the Speeding Up Python with Concurrency, Parallelism, and asyncio post

Scenario

With that, let's take a look at how to speed up the following tasks:

# tasks.py

import os
from multiprocessing import current_process
from threading import current_thread

import requests


def make_request(num):
    # io-bound

    pid = os.getpid()
    thread_name = current_thread().name
    process_name = current_process().name
    print(f"{pid} - {process_name} - {thread_name}")

    requests.get("https://httpbin.org/ip")


async def make_request_async(num, client):
    # io-bound

    pid = os.getpid()
    thread_name = current_thread().name
    process_name = current_process().name
    print(f"{pid} - {process_name} - {thread_name}")

    await client.get("https://httpbin.org/ip")


def get_prime_numbers(num):
    # cpu-bound

    pid = os.getpid()
    thread_name = current_thread().name
    process_name = current_process().name
    print(f"{pid} - {process_name} - {thread_name}")

    numbers = []

    prime = [True for i in range(num + 1)]
    p = 2

    while p * p <= num:
        if prime[p]:
            for i in range(p * 2, num + 1, p):
                prime[i] = False
        p += 1

    prime[0] = False
    prime[1] = False

    for p in range(num + 1):
        if prime[p]:
            numbers.append(p)

    return numbers

All of the code examples in this post can be found in the parallel-concurrent-examples-python repo.

Notes:

  • make_request makes an HTTP request to https://httpbin.org/ip X number of times.
  • make_request_async makes the same HTTP request asynchronously with HTTPX.
  • get_prime_numbers calculates the prime numbers, via the Sieve of Eratosthenes method, from two to the provided limit.

We'll be using the following libraries from the standard library to speed up the above tasks:

Library Class/Method Processing Type
threading Thread concurrent
concurrent.futures ThreadPoolExecutor concurrent
asyncio gather concurrent (via coroutines)
multiprocessing Pool parallel
concurrent.futures ProcessPoolExecutor parallel

IO-bound Operation

Again, IO-bound tasks spend more time on IO than on the CPU.

Since web scraping is IO bound, we should use threading to speed up the processing as the retrieving of the HTML (IO) is slower than parsing it (CPU).

Scenario: How to speed up a Python-based web scraping and crawling script?

Sync Example

Let's start with a benchmark.

# io-bound_sync.py

import time

from tasks import make_request


def main():
    for num in range(1, 101):
        make_request(num)


if __name__ == "__main__":
    start_time = time.perf_counter()

    main()

    end_time = time.perf_counter()
    print(f"Elapsed run time: {end_time - start_time} seconds.")

Here, we made 100 HTTP requests using the make_request function. Since requests happen synchronously, each task is executed sequentially.

Elapsed run time: 15.710984757 seconds.

So, that's roughly 0.16 seconds per request.

Threading Example

# io-bound_concurrent_1.py

import threading
import time

from tasks import make_request


def main():
    tasks = []

    for num in range(1, 101):
        tasks.append(threading.Thread(target=make_request, args=(num,)))
        tasks[-1].start()

    for task in tasks:
        task.join()


if __name__ == "__main__":
    start_time = time.perf_counter()

    main()

    end_time = time.perf_counter()
    print(f"Elapsed run time: {end_time - start_time} seconds.")

Here, the same make_request function is called 100 times. This time the threading library is used to create a thread for each request.

Elapsed run time: 1.020112515 seconds.

The total time decreases from ~16s to ~1s.

Since we're using separate threads for each request, you might be wondering why the whole thing didn't take ~0.16s to finish. This extra time is the overhead for managing threads. The Global Interpreter Lock (GIL) in Python makes sure that only one thread uses the Python bytecode at a time.

concurrent.futures Example

# io-bound_concurrent_2.py

import time
from concurrent.futures import ThreadPoolExecutor, wait

from tasks import make_request


def main():
    futures = []

    with ThreadPoolExecutor() as executor:
        for num in range(1, 101):
            futures.append(executor.submit(make_request, num))

    wait(futures)


if __name__ == "__main__":
    start_time = time.perf_counter()

    main()

    end_time = time.perf_counter()
    print(f"Elapsed run time: {end_time - start_time} seconds.")

Here we used concurrent.futures.ThreadPoolExecutor to achieve multithreading. After all the futures/promises are created, we used wait to wait for all of them to complete.

Elapsed run time: 1.340592231 seconds

concurrent.futures.ThreadPoolExecutor is actually an abstraction around the multithreading library, which makes it easier to use. In the previous example, we assigned each request to a thread and in total 100 threads were used. But ThreadPoolExecutor defaults the number of worker threads to min(32, os.cpu_count() + 4). ThreadPoolExecutor exists to ease the process of achieving multithreading. If you want more control over multithreading, use the multithreading library instead.

AsyncIO Example

# io-bound_concurrent_3.py

import asyncio
import time

import httpx

from tasks import make_request_async


async def main():
    async with httpx.AsyncClient() as client:
        return await asyncio.gather(
            *[make_request_async(num, client) for num in range(1, 101)]
        )


if __name__ == "__main__":
    start_time = time.perf_counter()

    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

    end_time = time.perf_counter()
    elapsed_time = end_time - start_time
    print(f"Elapsed run time: {elapsed_time} seconds")

httpx is used here since requests does not support async operations.

Here, we used asyncio to achieve concurrency.

Elapsed run time: 0.553961068 seconds

asyncio is faster than the other methods, because threading makes use of OS (Operating System) threads. So the threads are managed by the OS, where thread switching is preempted by the OS. asyncio uses coroutines, which are defined by the Python interpreter. With coroutines, the program decides when to switch tasks in an optimal way. This is handled by the even_loop in asyncio.

CPU-bound Operation

Scenario: How to speed up a simple data processing script?

Sync Example

Again, let's start with a benchmark.

# cpu-bound_sync.py

import time

from tasks import get_prime_numbers


def main():
    for num in range(1000, 16000):
        get_prime_numbers(num)


if __name__ == "__main__":
    start_time = time.perf_counter()

    main()

    end_time = time.perf_counter()
    print(f"Elapsed run time: {end_time - start_time} seconds.")

Here, we executed the get_prime_numbers function for numbers from 1000 to 16000.

Elapsed run time: 17.863046316 seconds.

Multiprocessing Example

# cpu-bound_parallel_1.py

import time
from multiprocessing import Pool, cpu_count

from tasks import get_prime_numbers


def main():
    with Pool(cpu_count() - 1) as p:
        p.starmap(get_prime_numbers, zip(range(1000, 16000)))
        p.close()
        p.join()


if __name__ == "__main__":
    start_time = time.perf_counter()

    main()

    end_time = time.perf_counter()
    print(f"Elapsed run time: {end_time - start_time} seconds.")

Here, we used multiprocessing to calculate the prime numbers.

Elapsed run time: 2.9848740599999997 seconds.

concurrent.futures Example

# cpu-bound_parallel_2.py

import time
from concurrent.futures import ProcessPoolExecutor, wait
from multiprocessing import cpu_count

from tasks import get_prime_numbers


def main():
    futures = []

    with ProcessPoolExecutor(cpu_count() - 1) as executor:
        for num in range(1000, 16000):
            futures.append(executor.submit(get_prime_numbers, num))

    wait(futures)


if __name__ == "__main__":
    start_time = time.perf_counter()

    main()

    end_time = time.perf_counter()
    print(f"Elapsed run time: {end_time - start_time} seconds.")

Here, we achieved multiprocessing using concurrent.futures.ProcessPoolExecutor. Once the jobs are added to futures, wait(futures) waits for them to finish.

Elapsed run time: 4.452427557 seconds.

concurrent.futures.ProcessPoolExecutor is a wrapper around multiprocessing.Pool. It has the same limitations as the ThreadPoolExecutor. If you want more control over multiprocessing, use multiprocessing.Pool. concurrent.futures provides an abstraction over both multiprocessing and threading, making it easy to switch between the two.

Conclusion

It's worth noting that using multiprocessing to execute the make_request function will be much slower than the threading flavor since the processes will be need to wait for the IO. The multiprocessing approach will be faster then the sync approach, though.

Similarly, using concurrency for CPU-bound tasks is not worth the effort when compared to parallelism.

That being said, using concurrency or parallelism to execute your scripts adds complexity. Your code will generally be harder to read, test, and debug, so only use them when absolutely necessary for long-running scripts.

concurrent.futures is where I generally start since-

  1. It's easy to switch back and forth between concurrency and parallelism
  2. The dependent libraries don't need to support asyncio (requests vs httpx)
  3. It's cleaner and easier to read over the other approaches

Grab the code from the parallel-concurrent-examples-python repo on GitHub.

Amal Shaji

Amal Shaji

Amal is a full-stack developer interested in deep learning for computer vision and autonomous vehicles. He enjoys working with Python, PyTorch, Go, FastAPI, and Docker. He writes to learn and is a professional introvert.

Share this tutorial

Featured Course

Test-Driven Development with Django, Django REST Framework, and Docker

In this course, you'll learn how to set up a development environment with Docker in order to build and deploy a RESTful API powered by Python, Django, and Django REST Framework.

Featured Course

Test-Driven Development with Django, Django REST Framework, and Docker

In this course, you'll learn how to set up a development environment with Docker in order to build and deploy a RESTful API powered by Python, Django, and Django REST Framework.