Concurrency in Python

Sanket Saxena
8 min readJun 15, 2024

--

This article delves into advanced techniques and best practices for leveraging Python’s concurrency models, including multithreading, multiprocessing, and asyncio. We’ll explore in-depth examples and practical scenarios to help you optimize performance and make informed decisions on the appropriate concurrency model for your tasks.

1. Understanding Concurrency Models in Python

Synchronous Execution

In Python, synchronous execution is straightforward and sequential. Each operation must complete before the next one starts. This model is simple but can be inefficient for I/O-bound tasks where the CPU waits idly for I/O operations to complete.

Asynchronous Execution

Asynchronous execution allows the program to handle I/O-bound tasks more efficiently by not waiting for the I/O operations to complete. This is achieved using the asyncio library, which provides an event loop to manage and schedule tasks.

Example:

async def fetch_data():
await asyncio.sleep(2) # Simulating a non-blocking I/O operation
return "Data fetched"
async def main():
data = await fetch_data()
print(data)
asyncio.run(main())

2. Multithreading and Multiprocessing

Core Concepts

Multithreading involves running multiple threads within a single process, sharing the same memory space. Multiprocessing involves multiple processes, each with its own memory space, which allows true parallelism on multi-core systems.

Comparison:

  • Multithreading: Suitable for I/O-bound tasks due to the Global Interpreter Lock (GIL) in CPython, which prevents multiple native threads from executing Python bytecodes simultaneously.
  • Multiprocessing: Suitable for CPU-bound tasks as each process runs independently on different CPU cores.

Example: Multithreading

def worker():
print("Thread is working")
threads = []

for i in range(5):
thread = threading.Thread(target=worker)
threads.append(thread)
thread.start()

for thread in threads:
thread.join()

Example: Multiprocessing

def worker():
print("Process is working")

processes = []
for i in range(5):
process = Process(target=worker)
processes.append(process)
process.start()

for process in processes:
process.join()

3. Creating and Managing Threads

Using the threading Module

Python’s threading module simplifies the creation and management of threads.

Example: Creating Threads

def print_numbers():
for i in range(10):
print(i)

thread = threading.Thread(target=print_numbers)
thread.start()
thread.join()

Thread Lifecycle and States

Threads can be in various states such as New, Runnable, Blocked, Waiting, Timed Waiting, and Terminated. Understanding these states helps in managing thread behavior effectively.

Example: Demonstrating Thread States

def worker():
time.sleep(2)
print("Worker thread")

thread = threading.Thread(target=worker)
print(f"Thread State before start: {thread.is_alive()}")
thread.start()
print(f"Thread State after start: {thread.is_alive()}")
thread.join()
print(f"Thread State after join: {thread.is_alive()}")

Daemon vs. Non-Daemon Threads

Daemon threads run in the background and do not block the program from exiting. Non-daemon threads must complete before the program can terminate.

Example: Daemon and Non-Daemon Threads

def daemon_worker():
time.sleep(5)
print("Daemon thread")

def non_daemon_worker():
print("Non-Daemon thread")

daemon_thread = threading.Thread(target=daemon_worker, daemon=True)
non_daemon_thread = threading.Thread(target=non_daemon_worker)

daemon_thread.start()
non_daemon_thread.start()

Importance of if __name__ == "__main__"

This construct ensures that code is only executed when the script is run directly, not when it is imported as a module. This is crucial for preventing unintended code execution in multi-threaded and multi-process programs.

Example:

if __name__ == "__main__":
print("Script executed directly")

4. Synchronization Mechanisms

Need for Thread Safety

In concurrent asynchronous code, especially in ASGI (Asynchronous Server Gateway Interface) applications, ensuring thread safety is crucial to prevent race conditions and data corruption. ASGI applications, designed for handling asynchronous web servers and applications, frequently encounter scenarios where multiple tasks may access shared resources concurrently. Without proper synchronization mechanisms, these concurrent accesses can lead to inconsistent states, data corruption, and unpredictable behaviors.

Common Issues in Concurrent Code

  1. Race Conditions: These occur when multiple tasks or threads attempt to modify shared data simultaneously. The outcome depends on the specific sequence of events, leading to non-deterministic and often incorrect behavior.
  2. Data Corruption: Without proper synchronization, shared data can be left in an inconsistent state, causing application errors and data integrity issues.
  3. Deadlocks: Improperly managed locks can lead to deadlocks, where two or more tasks wait indefinitely for each other to release resources.

Ensuring Thread Safety

To manage concurrent access to shared resources, Python’s asyncio module provides several synchronization primitives, such as locks, semaphores, and condition variables. These primitives ensure that only one task can access a shared resource at a time, maintaining data integrity and consistency.

Locks and RLocks

Locks are used to prevent race conditions by ensuring that only one thread can access a resource at a time. RLocks (reentrant locks) allow the same thread to acquire the lock multiple times.

Example: Using Lock and RLock

lock = threading.Lock()

def critical_section():
with lock:
print("Critical section")

thread1 = threading.Thread(target=critical_section)
thread2 = threading.Thread(target=critical_section)

thread1.start()
thread2.start()
thread1.join()
thread2.join()

Semaphores

Semaphores control access to a resource by maintaining a counter. They are useful for managing a fixed number of resources.

Example: Using Semaphore

semaphore = threading.Semaphore(3)

def access_resource():
with semaphore:
print("Accessing resource")

threads = []
for _ in range(5):
thread = threading.Thread(target=access_resource)
threads.append(thread)
thread.start()

for thread in threads:
thread.join()

Conditions and Events

Conditions and Events are synchronization primitives used for signaling between threads.

Example: Using Condition

condition = threading.Condition()

def worker():
with condition:
condition.wait()
print("Worker thread activated")

thread = threading.Thread(target=worker)
thread.start()

with condition:
condition.notify()

thread.join()

Monitors

Monitors combine locks and condition variables to manage resource access and coordination between threads.

Example: Implementing Monitor Pattern

class Monitor:
def __init__(self):
self.lock = threading.Lock()
self.condition = threading.Condition(self.lock)
def critical_section(self):
with self.condition:
# Perform critical operations
self.condition.notify()

monitor = Monitor()
thread = threading.Thread(target=monitor.critical_section)
thread.start()
thread.join()

5. Advanced Threading Concepts

Thread Pools and Executors

Using thread pools allows efficient management of a large number of threads.

Example: Using ThreadPoolExecutor

from concurrent.futures import ThreadPoolExecutor

def task(n):
print(f"Processing {n}")
with ThreadPoolExecutor(max_workers=5) as executor:
executor.map(task, range(10))

Thread Communication

Using queue.Queue enables thread-safe communication between threads.

Example: Thread Communication with Queue

q = queue.Queue()

def producer():
for i in range(5):
q.put(i)
print(f"Produced {i}")

def consumer():
while True:
item = q.get()
if item is None:
break
print(f"Consumed {item}")

producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)

producer_thread.start()
consumer_thread.start()

producer_thread.join()
q.put(None)
consumer_thread.join()

Deadlocks and How to Avoid Them

Deadlocks occur when two or more threads are waiting indefinitely for resources held by each other. Avoiding deadlocks requires careful design and implementation.

Example: Avoiding Deadlocks

lock1 = threading.Lock()
lock2 = threading.Lock()

def task1():
with lock1:
print("Task 1 acquired lock1")
with lock2:
print("Task 1 acquired lock2")

def task2():
with lock2:
print("Task 2 acquired lock2")
with lock1:
print("Task 2 acquired lock1")


thread1 = threading.Thread(target=task1)
thread2 = threading.Thread(target=task2)

thread1.start()
thread2.start()
thread1.join()
thread2.join()

Race Conditions

Race conditions occur when multiple threads access shared resources concurrently, leading to unpredictable outcomes. Using synchronization mechanisms can prevent race conditions.

Example: Handling Race Conditions

counter = 0
lock = threading.Lock()

def increment():
global counter
with lock:
counter += 1

threads = []
for _ in range(100):
thread = threading.Thread(target=increment)
threads.append(thread)
thread.start()

for thread in threads:
thread.join()

print(f"Final counter value: {counter}")

Livelocks and Starvation

Livelocks occur when threads are active but unable to progress. Starvation happens when threads are perpetually denied resources. Proper resource management and scheduling can mitigate these issues.

Example:

import threading
import time

# Livelock example with two threads trying to acquire two locks

lock1 = threading.Lock()
lock2 = threading.Lock()

def thread1():
while True:
with lock1:
time.sleep(0.1)
if lock2.acquire(timeout=0.1):
print("Thread 1: Acquired lock2")
lock2.release()
break
print("Thread 1: Released lock1")
time.sleep(0.1)

def thread2():
while True:
with lock2:
time.sleep(0.1)
if lock1.acquire(timeout=0.1):
print("Thread 2: Acquired lock1")
lock1.release()
break
print("Thread 2: Released lock2")
time.sleep(0.1)

t1 = threading.Thread(target=thread1)
t2 = threading.Thread(target=thread2)

t1.start()
t2.start()

t1.join()
t2.join()

# Starvation example where a lower priority thread is starved of CPU time
class Worker(threading.Thread):
def __init__(self, name, priority):
threading.Thread.__init__(self)
self.name = name
self.priority = priority

def run(self):
for _ in range(self.priority):
print(f"Worker {self.name}: working with priority {self.priority}")
time.sleep(0.1)

# High priority threads
high_priority_worker1 = Worker("High1", 10)
high_priority_worker2 = Worker("High2", 10)

# Low priority thread
low_priority_worker = Worker("Low", 1)

high_priority_worker1.start()
high_priority_worker2.start()
low_priority_worker.start()

high_priority_worker1.join()
high_priority_worker2.join()
low_priority_worker.join()

In this example:

  1. Livelock: Two threads repeatedly try to acquire two locks, releasing one lock if the other is unavailable, leading to a livelock situation where both threads are active but unable to make progress.
  2. Starvation: A low-priority worker thread is starved of CPU time by higher-priority worker threads.

Proper synchronization and resource management techniques can prevent these issues.

6. Asyncio Event Loop and Control Flow

The asyncio event loop is the core component that drives asynchronous programming in Python. It is responsible for scheduling and executing tasks, handling I/O operations, and managing multiple asynchronous functions concurrently. The event loop runs in a single thread and uses non-blocking I/O operations to efficiently manage multiple tasks.

How the Asyncio Event Loop Works

The event loop continuously checks for and processes events, such as I/O completion or task readiness. Here’s a detailed breakdown of its workings:

  1. Task Scheduling: Tasks (coroutines) are scheduled to run in the event loop. This is done using methods like asyncio.create_task() or higher-level functions like asyncio.gather(), which allow multiple coroutines to run concurrently.
  2. Running the Event Loop: The event loop is started using asyncio.run(), which initializes and runs the loop until all scheduled tasks are complete. During this time, the loop handles task execution, context switching, and I/O events.
  3. Task Execution: Tasks are executed asynchronously. When a task performs an I/O operation (e.g., await asyncio.sleep()), the event loop pauses its execution and switches to another task that is ready to run, ensuring efficient use of resources.
  4. Handling Callbacks and Futures: The event loop manages callbacks and futures, ensuring that they are executed when their conditions are met. Futures represent values that will be available at some point, and callbacks are functions that are called when a future is resolved.
  5. Non-Blocking I/O: The event loop performs I/O operations in a non-blocking manner. This means that while waiting for I/O operations to complete, other tasks can continue to run, thus maximizing the utilization of the system’s resources.

Example: Basic Event Loop

async def main():
print('Hello')
await asyncio.sleep(1)
print('World')

asyncio.run(main())

Task Scheduling and Management

Using asyncio.create_task(), you can create and schedule tasks that the event loop will run concurrently. This method allows for fine-grained control over task execution.

Example: Creating and Running Tasks

async def task1():
print("Task 1 start")
await asyncio.sleep(2)
print("Task 1 end")

async def task2():
print("Task 2 start")
await asyncio.sleep(1)
print("Task 2 end")

async def main():
task_1 = asyncio.create_task(task1())
task_2 = asyncio.create_task(task2())
await asyncio.gather(task_1, task_2)

asyncio.run(main())

Using asyncio.gather

asyncio.gather allows you to wait for multiple coroutines to complete. This function is essential for running multiple tasks concurrently and collecting their results.

Example: Using asyncio.gather

async def fetch_data():
await asyncio.sleep(1)
return "Data"

async def fetch_more_data():
await asyncio.sleep(2)
return "More Data"

async def main():
results = await asyncio.gather(fetch_data(), fetch_more_data())
print(results)

asyncio.run(main())

Advanced Control Flow with asyncio.wait

asyncio.wait provides more control over the execution of multiple tasks, allowing you to handle completed and pending tasks separately.

Example: Using asyncio.wait

async def task():
await asyncio.sleep(1)
return "Task completed"

async def main():
tasks = [asyncio.create_task(task()) for _ in range(3)]
done, pending = await asyncio.wait(tasks, timeout=2)
for d in done:
print(d.result())

asyncio.run(main())

7. Futures in Asynchronous Programming

Futures represent values that may not yet be available. They are used to manage the results of asynchronous operations.

Creating Futures

Futures can be created and manipulated using the asyncio library.

Example: Basic Future Usage

async def set_future_value(future):
await asyncio.sleep(1)
future.set_result("Future is done")

async def main():
loop = asyncio.get_event_loop()
future = loop.create_future()
await asyncio.create_task(set_future_value(future))
result = await future
print(result)

asyncio.run(main())

Integrating with Executors

Futures can be integrated with thread and process pools using ThreadPoolExecutor or ProcessPoolExecutor.

Example: Using ThreadPoolExecutor

def blocking_io():
# Simulating a blocking I/O operation
return "I/O Result"

async def main():
loop = asyncio.get_event_loop()
with ThreadPoolExecutor() as pool:
result = await loop.run_in_executor(pool, blocking_io)
print(result)

asyncio.run(main())

8. Performance and Optimization

Single Process Execution

While simple, single-process execution may not be efficient for large or concurrent tasks.

Example: Single Process Task

def process_task():
time.sleep(2)
print("Task complete")

for _ in range(5):
process_task()

Multi-processing with ProcessPoolExecutor

Using ProcessPoolExecutor allows parallel execution of tasks, taking advantage of multiple CPU cores.

Example: Using ProcessPoolExecutor

def cpu_bound_task():
print(f"Task running on process {os.getpid()}")
return os.getpid()

if __name__ == "__main__":
with ProcessPoolExecutor() as executor:
results = list(executor.map(cpu_bound_task, range(10)))
print(results)

--

--

Sanket Saxena

I love writing about software engineering and all the cool things I learn along the way.