Python threading allows you to run parts of your code concurrently, making the code more efficient. However, when you introduce threading to your code without knowing about thread safety, you may run into issues such as race conditions. You solve these with tools like locks, semaphores, events, conditions, and barriers.
By the end of this tutorial, you’ll be able to identify safety issues and prevent them by using the synchronization primitives in Python’s threading
module to make your code thread-safe.
In this tutorial, you’ll learn:
- What thread safety is
- What race conditions are and how to avoid them
- How to identify thread safety issues in your code
- What different synchronization primitives exist in the
threading
module - How to use synchronization primitives to make your code thread-safe
To get the most out of this tutorial, you’ll need to have basic experience working with multithreaded code using Python’s threading
module and ThreadPoolExecutor
.
Get Your Code: Click here to download the free sample code that you’ll use to learn about thread safety techniques in Python.
Take the Quiz: Test your knowledge with our interactive “Python Thread Safety: Using a Lock and Other Techniques” quiz. You’ll receive a score upon completion to help you track your learning progress:
Interactive Quiz
Python Thread Safety: Using a Lock and Other TechniquesIn this quiz, you'll test your understanding of Python thread safety. You'll revisit the concepts of race conditions, locks, and other synchronization primitives in the threading module. By working through this quiz, you'll reinforce your knowledge about how to make your Python code thread-safe.
Threading in Python
In this section, you’ll get a general overview of how Python handles threading. Before discussing threading in Python, it’s important to revisit two related terms that you may have heard about in this context:
- Concurrency: The ability of a system to handle multiple tasks by allowing their execution to overlap in time but not necessarily happen simultaneously.
- Parallelism: The simultaneous execution of multiple tasks that run at the same time to leverage multiple processing units, typically multiple CPU cores.
Python’s threading is a concurrency framework that allows you to spin up multiple threads that run concurrently, each executing pieces of code. This improves the efficiency and responsiveness of your application. When running multiple threads, the Python interpreter switches between them, handing the control of execution over to each thread.
By running the script below, you can observe the creation of four threads:
threading_example.py
import threading
import time
from concurrent.futures import ThreadPoolExecutor
def threaded_function():
for number in range(3):
print(f"Printing from {threading.current_thread().name}. {number=}")
time.sleep(0.1)
with ThreadPoolExecutor(max_workers=4, thread_name_prefix="Worker") as executor:
for _ in range(4):
executor.submit(threaded_function)
In this example, threaded_function
prints the values zero to two that your for
loop assigns to the loop variable number
. Using a ThreadPoolExecutor
, four threads are created to execute the threaded function. ThreadPoolExecutor
is configured to run a maximum of four threads concurrently with max_workers=4
, and each worker thread is named with a “Worker” prefix, as in thread_name_prefix="Worker"
.
In print()
, the .name
attribute on threading.current_thread()
is used to get the name of the current thread. This will help you identify which thread is executed each time. A call to sleep()
is added inside the threaded function to increase the likelihood of a context switch.
You’ll learn what a context switch is in just a moment. First, run the script and take a look at the output:
$ python threading_example.py
Printing from Worker_0. number=0
Printing from Worker_1. number=0
Printing from Worker_2. number=0
Printing from Worker_3. number=0
Printing from Worker_0. number=1
Printing from Worker_2. number=1
Printing from Worker_1. number=1
Printing from Worker_3. number=1
Printing from Worker_0. number=2
Printing from Worker_2. number=2
Printing from Worker_1. number=2
Printing from Worker_3. number=2
Each line in the output represents a print()
call from a worker thread, identified by Worker_0
, Worker_1
, Worker_2
, and Worker_3
. The number that follows the worker thread name shows the current iteration of the loop each thread is executing. Each thread takes turns executing the threaded_function
, and the execution happens in a concurrent rather than sequential manner.
For example, after Worker_0
prints number=0
, it’s not immediately followed by Worker_0
printing number=1
. Instead, you see outputs from Worker_1
, Worker_2
, and Worker_3
printing number=0
before Worker_0
proceeds to number=1
. You’ll notice from these interleaved outputs that multiple threads are running at the same time, taking turns to execute their part of the code.
This happens because the Python interpreter performs a context switch. This means that Python pauses the execution state of the current thread and passes control to another thread. When the context switches, Python saves the current execution state so that it can resume later. By switching the control of execution at specific intervals, multiple threads can execute code concurrently.
You can check the context switch interval of your Python interpreter by typing the following in the REPL:
>>> import sys
>>> sys.getswitchinterval()
0.005
The output of calling the getswitchinterval()
is a number in seconds that represents the context switch interval of your Python interpreter. In this case, it’s 0.005 seconds or five milliseconds. You can think of the switch interval as how often the Python interpreter checks if it should switch to another thread.
An interval of five milliseconds doesn’t mean that threads switch exactly every five milliseconds, but rather that the interpreter considers switching to another thread at these intervals.
The switch interval is defined in the Python docs as follows:
This floating-point value determines the ideal duration of the “timeslices” allocated to concurrently running Python threads. (Source)
In the previous thread pool example, you’ll find a call to sleep()
inside the threaded_function
, which delays the program execution by 0.1 seconds. This increases the chance of a context switch happening in between because the execution will take much longer than the context switch interval.
Due to context switching, programs can behave unexpectedly when run in a multithreaded environment. To manage this complexity, you need to understand thread safety, which ensures your programs run predictably and reliably. You’ll explore what thread safety means and why it’s essential for maintaining consistency in your multithreaded applications.
Understanding Thread Safety
Thread safety refers to the property of an algorithm or program being able to function correctly during simultaneous execution by multiple threads. Code is considered thread-safe if it behaves deterministically and produces the desired output when run in a multithreaded environment.
Thread safety issues occur because of two factors:
- Shared mutable data: Threads share the memory of their parent process, so all variables and data structures are shared across threads. This can lead to errors when working with shared, changeable data.
- Non-atomic operations: These occur in a multithreaded environment when operations involving multiple steps are interrupted by context switches. This can result in unexpected outcomes if threads are switched during the operation.
To effectively manage these issues, it’s important to understand how Python handles threading under the hood. This has everything to do with the Global Interpreter Lock (GIL), and the GIL’s implications on threading is what you’ll learn about next.
The GIL and Its Implications on Threading
Python’s Global Interpreter Lock (GIL) is a mutex that protects access to Python objects, preventing multiple threads from executing Python bytecodes simultaneously. The GIL allows only one thread to execute at a single point in time. This can lead to performance penalties if you try to use multithreading in CPU-bound programs.
Note: While the GIL limits parallelism for CPU-bound tasks, threading can still be effective for IO-bound tasks.
When an operation is completed in a single bytecode instruction, it’s atomic. Because the GIL only allows one thread to run at a time, these atomic operations are safe from interference by other threads. This ensures that atomic operations are generally thread-safe, which means that you don’t have to worry about conflicts between threads with atomic operations.
Note: The GIL can be released during I/O operations or when calling C extensions, which can result in race conditions even for operations that appear atomic in Python code. Therefore, synchronization mechanisms like locks are still necessary to ensure thread safety when accessing shared mutable data.
Also, keep in mind that the GIL only deals with single bytecode instructions, which aren’t the same as single lines of Python code. If you want to learn more about why some lines of Python code that may appear atomic are actually not, then expand the collapsible section below:
Not all operations that appear atomic in Python code translate to single bytecode instructions. For example, updating shared mutable data involves multiple bytecode instructions and is therefore non-atomic:
>>> import dis
>>> def single_line_increment(n):
... n += 1
... return n
...
>>> dis.dis(single_line_increment)
2 0 LOAD_FAST 0 (n)
2 LOAD_CONST 1 (1)
4 INPLACE_ADD
6 STORE_FAST 0 (n)
3 8 LOAD_FAST 0 (n)
10 RETURN_VALUE
When you inspect the bytecode of n += 1
using the dis
module, you’ll notice that it’s non-atomic. The operation consists of four main instructions, each of which can potentially be interrupted by the Python interpreter:
LOAD_FAST
loads the current value ofn
.LOAD_CONST
loads the constant1
.INPLACE_ADD
addsn
and1
.STORE_FAST
stores the result back inton
.
Between any of these four bytecode instructions, Python may release the GIL and allow another thread to execute. If the other thread also modifies n
, then this may lead to a race condition and result in unpredictable behavior.
To ensure thread safety when accessing shared mutable data, synchronization mechanisms like locks are still necessary, despite the presence of the GIL.
Up to Python 3.12, threading allows for concurrency rather than parallelism because of the GIL. The GIL limits parallelism but enables concurrency by allowing multiple threads to run concurrently. A GIL-free Python interpreter is available starting with Python 3.13 and can be enabled with a build-time flag. Note that the default Python interpreter will still have the GIL in 3.13.
As a result of the ongoing effort to remove the GIL, a GIL-free Python interpreter may be the default in a future Python release. However, the GIL removal is being implemented in a way that aims to maintain compatibility with existing multithreaded Python code, ensuring that the behavior of properly written multithreaded code remains consistent with the GIL-enabled versions.
Race Conditions
Now that you’ve explored the basics of thread safety, it’s time to look at race conditions and how the lack of proper synchronization between threads can lead to unpredictable and erroneous program behavior.
A race condition occurs when the outcome of a program depends on the sequence or timing of uncontrollable events like thread execution order. Race conditions can lead to logical errors and non-deterministic results when code is run.
If two threads simultaneously read and write to a shared variable without adequate synchronization, then they can interfere with each other, leading to incorrect results and behaviors. You can run the script below to simulate such a scenario where two threads try to modify an attribute simultaneously:
bank_multithreaded_withdrawal.py
import time
from concurrent.futures import ThreadPoolExecutor
class BankAccount:
def __init__(self, balance=0):
self.balance = balance
def withdraw(self, amount):
if self.balance >= amount:
new_balance = self.balance - amount
time.sleep(0.1) # Simulate a delay
self.balance = new_balance
else:
raise ValueError("Insufficient balance")
account = BankAccount(1000)
with ThreadPoolExecutor(max_workers=2) as executor:
executor.submit(account.withdraw, 500)
executor.submit(account.withdraw, 700)
print(f"Final account balance: {account.balance}")
Here, you can see a BankAccount
object that gets initialized with a balance of 1000. The .withdraw()
method is used to withdraw an amount from the account, provided there’s a sufficient balance. A call to sleep()
is added inside .withdraw()
to simulate a delay and make context switches more likely to occur. Even without this explicit delay, context switches can happen between statements in real-world scenarios.
When you run the code above, you may get this result:
$ python bank_multithreaded_withdrawal.py
Final account balance: 300
The result isn’t guaranteed to be deterministic. When you run this script multiple times, the results can vary:
$ python bank_multithreaded_withdrawal.py
Final account balance: 500
As you can see, this time the result is different. If these withdrawal operations are executed sequentially, then the code is expected to raise an exception since the account balance is only 1000 and is less than the total amount to be withdrawn, which is 1200—the sum of 500 and 700.
In the multithreaded scenario above, the flow of execution may happen as follows:
- The calls to
executor.submit()
results in the creation of two threads. - The first thread checks the balance (1000) and finds it sufficient for a withdrawal of 500, so it proceeds with the withdrawal.
- Before the first thread saves the update balance to the
.balance
attribute, a context switch happens, and the second thread starts and attemps to withdraw an amount of 700. It checks the balance and finds it sufficient for the withdrawal of 700. - Both threads independently calculate new balances based on the original balance of 1000. The first thread attempts to update the
.balance
attribute to 500, while the second thread, unaware of the first thread’s action, calculates and tries to set the balance to 300. - The thread that’s last to update “wins”, and sets the balance to either 300 or 500.
You’ll notice here that the error is due to simultaneous operations on shared mutable data, which in this case is the .balance
attribute that’s shared between threads. Data is being read and modified by one thread while it’s being manipulated by another. Thankfully, Python’s threading
module provides a solution, which you’ll look at next.
Synchronization Primitives
You’ve seen how race conditions can be caused by either shared mutable data or non-atomic operations. Python’s threading
module provides various synchronization primitives to prevent race conditions and allow for coordination across threads.
You can use synchronization primitives to do the following:
- Control the simultaneous execution of a block of code by threads
- Make multiple code statements atomic with respect to a thread
- Limit concurrent access by threads
- Coordinate between threads and perform actions based on the state of other threads
In the upcoming sections, you’ll explore specific synchronization primitives such as locks and semaphores, which play a crucial role in enforcing thread safety and coordination between threads.
Using Python Threading Locks for Mutual Exclusion
A lock is a synchronization primitive that can be used for exclusive access to a resource. Once a thread acquires a lock, no other threads can acquire it and proceed until the lock is released. You can use a lock to wrap a statement or group of statements that should be executed atomically. Next, you’ll take a look at the lock objects provided by Python’s threading
module.
threading.Lock for Primitive Locking
You can create a Lock
object by calling the Lock()
constructor from Python’s threading
module. A Lock
object has two states—locked and unlocked. When it’s unlocked, it can be acquired by a thread by calling the .acquire()
method on Lock
. The lock is then held by the thread and other threads can’t access it. The Lock
object is released by calling the .release()
method so other threads can acquire it.
Here’s a quick breakdown of these two methods:
-
.acquire()
: When theLock
object state is unlocked,.acquire()
changes theLock
object to a locked state and returns immediately. If theLock
object is in a locked state,.acquire()
blocks the program execution of other threads and waits for theLock
object to be released by the thread holding the lock. -
.release()
: When theLock
object state is locked, the.acquire()
method calls from other threads will block their execution until the thread holding the lock calls.release()
onLock
. It should only be called in the locked state because it changes the state to unlocked and returns immediately. If an attempt is made to release an unlocked lock, aRuntimeError
is raised.
The Lock
object can be used as a context manager when used with the with
statement. This automates the acquiring and releasing of locks. When the program enters the with
block, the .acquire()
method on the Lock
is automatically called. When the program exits the with
block, the .release()
method is called.
You can modify the banking example you used previously by adding a lock to make it thread-safe:
bank_thread_safe.py
import threading
import time
from concurrent.futures import ThreadPoolExecutor
class BankAccount:
def __init__(self, balance=0):
self.balance = balance
self.account_lock = threading.Lock()
def withdraw(self, amount):
with self.account_lock:
if self.balance >= amount:
new_balance = self.balance - amount
print(f"Withdrawing {amount}...")
time.sleep(0.1) # Simulate a delay
self.balance = new_balance
else:
raise ValueError("Insufficient balance")
def deposit(self, amount):
with self.account_lock:
new_balance = self.balance + amount
print(f"Depositing {amount}...")
time.sleep(0.1) # Simulate a delay
self.balance = new_balance
account = BankAccount(1000)
with ThreadPoolExecutor(max_workers=3) as executor:
executor.submit(account.withdraw, 700)
executor.submit(account.deposit, 1000)
executor.submit(account.withdraw, 300)
print(f"Final account balance: {account.balance}")
Here you have the BankAccount
class, which initializes its objects with an initial balance passed as the argument. The account
object is initialized with a balance of 1000. A threading.Lock()
object is stored in the attribute .account_lock
, which is used to synchronize access to the account balance.
Both the .withdraw()
and .deposit()
methods use a with self.account_lock:
block. This ensures that only one thread at a time can execute the code inside these blocks.
In this example, you’ll notice that a BankAccount
instance and three threads are created:
- The first thread withdraws 700.
- The second thread deposits 1000.
- The third thread withdraws 300.
For additional insight, you add a descriptive call to print()
to both methods and also print the final account balance after all the threads are complete.
Now when you run the script, you get this output:
$ python bank_thread_safe.py
Withdrawing 700...
Depositing 1000...
Withdrawing 300...
Final account balance: 1000
Unlike the previous example without locks, this implementation is thread-safe. Lock
ensures that only one thread at a time can modify the balance across the deposit()
and withdraw()
functions, both of which share the same lock.
The with self.account_lock:
statement acquires the lock before entering the block and releases it after exiting. This solves the race condition problem you saw in the earlier example. Now, operations on the balance are atomic across threads, and they can’t be interrupted by other threads once they’ve started. This means that a thread will wait for a pending withdrawal or deposit operation to complete before it executes its operation.
threading.RLock for Reentrant Locking
If a lock isn’t released properly due to an error or oversight in the code, it can lead to a deadlock, where threads wait indefinitely for the lock to be released. The reasons for a deadlock include:
-
Nested Lock Acquisition: A deadlock can occur if a thread attempts to acquire a lock it already holds. In conventional locks, trying to acquire the same lock multiple times within the same thread leads to the thread blocking itself, a situation that doesn’t resolve without external intervention.
-
Multiple Locks Acquisition: A deadlock is likely when multiple locks are used, and threads acquire them in inconsistent order. If two threads each hold one lock and are waiting for the other, neither thread can proceed, resulting in a deadlock.
In the following example, you’ll explore a nested lock acquisition scenario. Notice how BankAccount
now includes two methods that execute when you perform a cash deposit:
bank_deadlock.py
import threading
import time
from concurrent.futures import ThreadPoolExecutor
class BankAccount:
def __init__(self):
self.balance = 0
self.lock = threading.Lock()
def deposit(self, amount):
print(
f"Thread {threading.current_thread().name} "
"waiting to acquire lock for .deposit()"
)
with self.lock:
print(
f"Thread {threading.current_thread().name} "
"acquired lock for .deposit()"
)
time.sleep(0.1)
self._update_balance(amount)
def _update_balance(self, amount):
print(
f"Thread {threading.current_thread().name} "
"waiting to acquire lock for ._update_balance()"
)
with self.lock: # This will cause a deadlock
print(
f"Thread {threading.current_thread().name} "
"acquired lock for ._update_balance()"
)
self.balance += amount
account = BankAccount()
with ThreadPoolExecutor(max_workers=3, thread_name_prefix="Worker") as executor:
for _ in range(3):
executor.submit(account.deposit, 100)
print(f"Final balance: {account.balance}")
Here you see BankAccount
with a .deposit()
method and an internal ._update_balance()
method. Both of these methods attempt to acquire the same lock stored in the .lock
attribute.
Running this script leads to a deadlock scenario:
$ python bank_deadlock.py
Thread Worker_0 waiting to acquire lock for .deposit()
Thread Worker_0 acquired lock for .deposit()
Thread Worker_1 waiting to acquire lock for .deposit()
Thread Worker_2 waiting to acquire lock for .deposit()
Thread Worker_0 waiting to acquire lock for ._update_balance()
You’ll notice that the execution is stopped due to a deadlock and the interpreter locks infinitely. The quick breakdown below explains what’s happening:
- Thread
Worker_0
acquires the lock in the.deposit()
method. - The same thread then tries to acquire the lock again in
._update_balance()
. - Since the lock is non-reentrant, meaning it can’t be acquired again by the same thread, the program deadlocks.
- Threads
Worker_1
andWorker_2
are waiting to acquire the lock, but they’ll never get it because the first thread,Worker_0
, is deadlocked.
The lock objects created from threading.Lock
are non-reentrant. Once a thread has acquired it, that same thread can’t acquire it again without first releasing it. The thread hangs indefinitely because ._update_balance()
tries to acquire the lock that’s already held by .deposit()
.
This deadlock issue can be fixed by using RLock
, which is a reentrant lock. It doesn’t block when a holding thread requests the lock again. In other words, an RLock
allows a thread to acquire the lock multiple times before it releases the lock. This is useful in recursive functions or in situations where a thread needs to re-enter a locked resource that it has already locked.
Similar to Lock
, an RLock
object can be created by instantiating RLock
from the threading
module in the Python standard library. You can use RLock
to prevent a deadlock scenario:
bank_rlock.py
import threading
import time
from concurrent.futures import ThreadPoolExecutor
class BankAccount:
def __init__(self):
self.balance = 0
self.lock = threading.RLock()
def deposit(self, amount):
print(
f"Thread {threading.current_thread().name} "
"waiting to acquire lock for .deposit()"
)
with self.lock:
print(
f"Thread {threading.current_thread().name} "
"acquired lock for .deposit()"
)
time.sleep(0.1)
self._update_balance(amount)
def _update_balance(self, amount):
print(
f"Thread {threading.current_thread().name} "
"waiting to acquire lock for ._update_balance()"
)
with self.lock:
print(
f"Thread {threading.current_thread().name} "
"acquired lock for ._update_balance()"
)
self.balance += amount
account = BankAccount()
with ThreadPoolExecutor(max_workers=3, thread_name_prefix="Worker") as executor:
for _ in range(3):
executor.submit(account.deposit, 100)
Here, the .lock
attribute is now an instance of RLock
instead of Lock
. The script will now run without a deadlock:
$ python bank_rlock.py
Thread Worker_0 waiting to acquire lock for .deposit()
Thread Worker_0 acquired lock for .deposit()
Thread Worker_1 waiting to acquire lock for .deposit()
Thread Worker_2 waiting to acquire lock for .deposit()
Thread Worker_0 waiting to acquire lock for ._update_balance()
Thread Worker_0 acquired lock for ._update_balance()
Thread Worker_1 acquired lock for .deposit()
Thread Worker_1 waiting to acquire lock for ._update_balance()
Thread Worker_1 acquired lock for ._update_balance()
Thread Worker_2 acquired lock for .deposit()
Thread Worker_2 waiting to acquire lock for ._update_balance()
Thread Worker_2 acquired lock for ._update_balance()
Final balance: 300
You’ll notice that the execution is completed because the RLock
object allows a thread to acquire the same lock multiple times. This is why the ._update_balance()
method can acquire the lock even though it’s already held by .deposit()
. You can see that all three threads (Worker_0
, Worker_1
, and Worker_2
) successfully acquire the lock for both .deposit()
and ._update_balance()
.
This code completes without deadlock, and the final balance of 300 is correct, resulting from three threads depositing 100. RLock
is helpful in scenarios like this, where you have nested lock acquisitions within the same thread. RLock
also keeps a count of how many times it’s been acquired, and it must be released the same number of times to be fully unlocked.
While RLock
offers the advantage of allowing the same thread to acquire the lock multiple times without causing a deadlock, it comes with a slight performance overhead compared to Lock
. This overhead arises because RLock
needs to track the number of times the same thread has acquired it. Therefore, if your use case doesn’t require a thread to re-acquire a lock it already holds, it’s more efficient to use Lock
.
Now that you understand the basics of using Lock
and RLock
for managing thread access, you’re ready to explore semaphores, which offer more advanced control over how threads interact with limited resources.
Limiting Access With Semaphores
A semaphore is useful when the number of resources is limited and a number of threads try to access these limited resources. It uses a counter to limit access by multiple threads to a critical section. The Semaphore()
constructor accepts a value
argument, which denotes the maximum number of concurrent threads acquiring it.
Similar to Lock
objects, Semaphore
objects have .acquire()
and .release()
methods and can be used as a context manager. Each .acquire()
call reduces a semaphores’s counter by one, and further .acquire()
calls are blocked when the counter reaches zero.
When Semaphore
is used as a context manager, the context manager block is entered after a successful .acquire()
call. The .release()
method is automatically called when the control exits the with
block.
You can use this approach in scenarios where resources are limited, and a number of threads are trying to concurrently access the same resources. In a banking context, you may think of an example where multiple customers are waiting in the bank to be served by a limited number of tellers.
In the example below, teller_semaphore
indicates the number of available tellers:
bank_semaphore.py
import random
import threading
import time
from concurrent.futures import ThreadPoolExecutor
# Semaphore with a maximum of 2 resources (tellers)
teller_semaphore = threading.Semaphore(2)
def now():
return time.strftime("%H:%M:%S")
def serve_customer(name):
print(f"{now()}: {name} is waiting for a teller.")
with teller_semaphore:
print(f"{now()}: {name} is being served by a teller.")
# Simulate the time taken for the teller to serve the customer
time.sleep(random.randint(1, 3))
print(f"{now()}: {name} is done being served.")
customers = [
"Customer 1",
"Customer 2",
"Customer 3",
"Customer 4",
"Customer 5",
]
with ThreadPoolExecutor(max_workers=5) as executor:
for customer_name in customers:
thread = executor.submit(serve_customer, customer_name)
print(f"{now()}: All customers have been served.")
In this code snippet, you can see that the Semaphore
object, teller_semaphore
, has a value of 2
, representing two available tellers.
The serve_customer()
function simulates a customer’s interaction with a teller and does the following:
- Prints when the customer starts waiting
- Uses a
with
statement to acquire and release the semaphore - Simulates service time with a random
sleep()
call between one and three seconds - Prints when the customer is done
A timestamp of the operation is added at the beginning of each print()
call inside serve_customer()
using now()
as a helper. You can see that a ThreadPoolExecutor
is used to create a pool of five threads, and five calls that represent five customers are made to serve_customer()
using these worker threads.
This is what the program output looks like now when you run the script:
$ python bank_semaphore.py
10:12:28: Customer 1 is waiting for a teller.
10:12:28: Customer 1 is being served by a teller.
10:12:28: Customer 2 is waiting for a teller.
10:12:28: Customer 2 is being served by a teller.
10:12:28: Customer 3 is waiting for a teller.
10:12:28: Customer 4 is waiting for a teller.
10:12:28: Customer 5 is waiting for a teller.
10:12:29: Customer 1 is done being served.
10:12:29: Customer 3 is being served by a teller.
10:12:30: Customer 3 is done being served.
10:12:30: Customer 4 is being served by a teller.
10:12:31: Customer 2 is done being served.
10:12:31: Customer 5 is being served by a teller.
10:12:32: Customer 4 is done being served.
10:12:33: Customer 5 is done being served.
10:12:33: All customers have been served.
You’ll notice that initially, five customers are waiting at the bank to be served by tellers. All customers start waiting at the same timestamp because the threads are created at roughly the same time.
Because teller_semaphore
is acquired inside serve_customer()
, the following sequence of events unfolds:
- At any point in time, there are two active customers being served concurrently by the tellers. You can see this from the events happening at timestamp
10:12:28
. Customers 1 and 2 are chosen for service by the tellers, and the other customers have to wait. - Remaining customers keep waiting and are admitted one at a time whenever an existing customer is done being served. You can see that at timestamp
10:12:29
, Customer 1 is done being served. This frees up a teller, and Customer 3, who was waiting, gets the chance to be served.
Using Semaphore
here ensures that no more than two customers are being served at any given time, which effectively manages the limited teller resources.
Using Synchronization Primitives for Communication and Coordination
In the previous sections, you learned about the synchronization primitives you can use to limit concurrent access to resources. In this section, you’ll explore synchronization primitives such as event, condition, and barrier objects, which facilitate communication and coordination among multiple threads.
Events for Signaling
You can use Event
objects for signaling, allowing a thread to notify one or more threads about an action. An Event
object can be created by instantiating Event
from the threading
module. Event objects maintain an internal flag that starts as False
. You can set this flag to True
with .set()
and reset it to False
with .clear()
. Threads can wait for the flag to become True
using .wait()
, which blocks the thread until the flag is set.
You can see an example of an Event
object in action in the banking scenario below:
bank_event.py
import threading
import time
from concurrent.futures import ThreadPoolExecutor
bank_open = threading.Event()
transactions_open = threading.Event()
def serve_customer(customer_data):
print(f"{customer_data['name']} is waiting for the bank to open.")
bank_open.wait()
print(f"{customer_data['name']} entered the bank")
if customer_data["type"] == "WITHDRAW_MONEY":
print(f"{customer_data['name']} is waiting for transactions to open.")
transactions_open.wait()
print(f"{customer_data['name']} is starting their transaction.")
# Simulate the time taken for performing the transaction
time.sleep(2)
print(
f"{customer_data['name']} completed transaction and exited bank"
)
else:
# Simulate the time taken for banking
time.sleep(2)
print(f"{customer_data['name']} has exited bank")
customers = [
{"name": "Customer 1", "type": "WITHDRAW_MONEY"},
{"name": "Customer 2", "type": "CHECK_BALANCE"},
{"name": "Customer 3", "type": "WITHDRAW_MONEY"},
{"name": "Customer 4", "type": "WITHDRAW_MONEY"},
]
with ThreadPoolExecutor(max_workers=4) as executor:
for customer_data in customers:
executor.submit(serve_customer, customer_data)
print("Bank manager is preparing to open the bank.")
time.sleep(2)
print("Bank is now open!")
bank_open.set() # Signal that the bank is open
time.sleep(3)
print("Transactions are now open!")
transactions_open.set()
print("All customers have completed their transactions.")
In this example, the bank_open
and transaction_open
Event
objects are used to signal the opening of the bank and the commencement of transactions. You’ll notice that two Event
objects are created:
bank_open
: Signals when the bank is open.transactions_open
: Signals when transactions are allowed.
The serve_customer()
function simulates customers’ actions in these ways:
- It waits for the bank to open using
bank_open.wait()
. - For withdrawal transactions, it waits for transactions to open using
transactions_open.wait()
. - It simulates the transaction or other banking activity with a
sleep()
call.
The program then starts by simulating the bank manager’s actions:
- Opens the bank by calling
bank_open.set()
after a delay of two seconds. - Opens transactions by calling
transactions_open.set()
after a delay of three seconds.
You’ll also notice four threads representing the actions of the four customers in the customers
list. A ThreadPoolExecutor
is used here to create four threads representing these customers.
Here’s what the output looks like now:
$ python bank_event.py
Customer 1 is waiting for the bank to open.
Customer 2 is waiting for the bank to open.
Customer 3 is waiting for the bank to open.
Customer 4 is waiting for the bank to open.
Bank manager is preparing to open the bank.
Bank is now open!
Customer 1 entered the bank
Customer 4 entered the bank
Customer 3 entered the bank
Customer 3 is waiting for transactions to open.
Customer 1 is waiting for transactions to open.
Customer 2 entered the bank
Customer 4 is waiting for transactions to open.
Customer 2 has exited bank
Transactions are now open!
Customer 4 is starting their transaction.
Customer 3 is starting their transaction.
Customer 1 is starting their transaction.
Customer 3 completed transaction and exited bank
Customer 1 completed transaction and exited bank
Customer 4 completed transaction and exited bank
All customers have completed their transactions.
By examining this output, you can observe the following:
- Customers wait for the bank to open before entering. Initially, four customers are waiting and only enter the bank after the manager opens it, which is done by
bank_open.set()
. - Customers may enter the bank in any order once the bank is open.
- Multiple customers can enter the bank simultaneously once it’s open.
- Customers withdrawing money wait for an additional event, which in this case is for transactions to open. Customers 1, 3, and 4 waited for the
transaction_open
event to be set. - Withdrawal transactions by Customers 1, 3, and 4 start concurrently once they’re allowed.
This example illustrates how you can use Event
objects to coordinate actions across multiple threads. As you’ve seen, they’re particularly useful in scenarios where you need to signal state changes to multiple waiting threads simultaneously. This provides an efficient way to manage synchronization in your programs.
Next, you’ll dive into Condition
objects, which provide a powerful mechanism for controlling the flow of your multithreaded programs.
Conditions for Conditional Waiting
A Condition
object is built on top of a Lock
or RLock
object. It supports additional methods that allow threads to wait for certain conditions to be met, and to signal other threads that those conditions have changed.
Condition
objects are always associated with a lock. The lock
argument used in the Condition()
constructor accepts either a Lock
or RLock
object. If this argument is omitted, a new RLock
object is created and used as the underlying lock.
The various methods associated with Condition
objects are shown in the table below:
Method | Description |
---|---|
.acquire() |
Used to acquire the underlying lock associated with the Condition . It must be called before a thread can wait on or signal a condition. |
.release() |
Releases the underlying lock. |
.wait(timeout=None) |
Blocks the thread until it’s notified or a specified timeout occurs. This method releases the lock before blocking and reacquires it upon notification or when the timeout expires. It’s used when a thread needs to wait for a specific condition to be true before proceeding. |
.notify(n=1) |
Wakes up one of the threads waiting for the condition if any are waiting. If multiple threads are waiting, the method selects one to notify at random. |
.notify_all() |
Wakes up all threads waiting for the condition. It’s the broadest way to handle notifications, ensuring that all waiting threads are notified. It’s useful when a change affects all waiting threads or when all threads need to recheck the condition they’re waiting on. |
These Condition
methods can be used to coordinate across threads, allowing you to effectively manage the flow of execution in a multithreaded environment. By using these methods, you can ensure that threads wait for specific conditions to be met before proceeding, notify one or multiple threads when a condition has changed, and maintain control over the sequence and timing of thread operations.
Similar to locks, Condition
objects support the context manager protocol. In the example below, you’ll see a Condition
object, customer_available_condition
, that notifies the bank teller of the presence of a new customer:
bank_condition.py
import random
import threading
import time
from concurrent.futures import ThreadPoolExecutor
customer_available_condition = threading.Condition()
# Customers waiting to be served by the Teller
customer_queue = []
def now():
return time.strftime("%H:%M:%S")
def serve_customers():
while True:
with customer_available_condition:
# Wait for a customer to arrive
while not customer_queue:
print(f"{now()}: Teller is waiting for a customer.")
customer_available_condition.wait()
# Serve the customer
customer = customer_queue.pop(0)
print(f"{now()}: Teller is serving {customer}.")
# Simulate the time taken to serve the customer
time.sleep(random.randint(1, 5))
print(f"{now()}: Teller has finished serving {customer}.")
def add_customer_to_queue(name):
with customer_available_condition:
print(f"{now()}: {name} has arrived at the bank.")
customer_queue.append(name)
customer_available_condition.notify()
customer_names = [
"Customer 1",
"Customer 2",
"Customer 3",
"Customer 4",
"Customer 5",
]
with ThreadPoolExecutor(max_workers=6) as executor:
teller_thread = executor.submit(serve_customers)
for name in customer_names:
# Simulate customers arriving at random intervals
time.sleep(random.randint(1, 3))
executor.submit(add_customer_to_queue, name)
In this example, you can see that the Condition
object customer_available_condition
acts as both a lock and a way to communicate between threads. It’s used to coordinate between the teller and customers.
Here, the customer_queue
is the shared resource protected by the condition. The Condition
object is used with the with
statement to ensure that the Condition
object’s lock is properly acquired and released.
Notice how serve_customers()
runs in an infinite loop and performs the following functions:
- Waits for customers using
customer_available_condition.wait()
. - Uses the
.wait()
method and releases theCondition
object’s lock. Because of this,add_customer_to_queue()
can acquire the lock when a new customer arrives. - Simulates the action of the teller by serving customers when they arrive with a random delay representing the service time.
The add_customer_to_queue()
function adds the customer to the queue and notifies the teller of a new customer using .notify()
.
You can see that the script creates a ThreadPoolExecutor
with a maximum of six workers and calls serve_customers()
from one thread. Then, it calls add_customer_to_queue()
from five threads, representing five different customers in the customer_names
list.
This script generates the following output:
$ python bank_condition.py
10:15:08: Teller is waiting for a customer.
10:15:09: Customer 1 has arrived at the bank.
10:15:09: Teller is serving Customer 1.
10:15:11: Customer 2 has arrived at the bank.
10:15:12: Teller has finished serving Customer 1.
10:15:12: Teller is serving Customer 2.
10:15:13: Teller has finished serving Customer 2.
10:15:13: Teller is waiting for a customer.
10:15:14: Customer 3 has arrived at the bank.
10:15:14: Teller is serving Customer 3.
10:15:15: Customer 4 has arrived at the bank.
10:15:17: Customer 5 has arrived at the bank.
10:15:18: Teller has finished serving Customer 3.
10:15:18: Teller is serving Customer 4.
10:15:22: Teller has finished serving Customer 4.
10:15:22: Teller is serving Customer 5.
10:15:25: Teller has finished serving Customer 5.
10:15:25: Teller is waiting for a customer.
From this output, you can observe the following:
- The teller alternates between serving customers and waiting for new ones.
- When a customer arrives while the teller is waiting, the teller immediately starts serving them. You can see that at timestamp
10:15:08
, the teller is waiting for a customer. As soon as Customer 1 arrives at10:15:09
, the teller starts serving the customer. - When a customer arrives while the teller is busy, as with Customer 2, 4 and 5, they wait in the queue until the teller is free.
To summarize, using Condition
here has allowed for:
- The teller to efficiently wait for customers without busy-waiting
- Customers to notify the teller of their arrival
- Synchronization of access to the shared customer queue
Condition
is commonly used in producer-consumer scenarios. In this case, the customers are producers adding to the queue, and the teller is a consumer taking from the queue. Now that you’ve seen how to use conditions to facilitate communication between producers and consumers, you’re ready to explore how to synchronize a fixed number of threads.
Barriers for Coordination
A barrier is a synchronization primitive that allows a group of threads to wait for each other before continuing execution. You can use Barrier
objects to block program execution until a specified number of threads have reached the barrier point.
Barrier
in Python’s threading
module has the following signature:
Barrier(parties, action=None, timeout=None)
It has one required and two optional arguments:
parties
specifies the number of threads of theBarrier
object. The.wait()
method waits for this number of threads to reach the barrier point before proceeding.action
is an optional callable that will be called by one of the threads when it’s released.timeout
optionally specifies the timeout value for the.wait()
method.
A Barrier
object can be used in a banking scenario when you want to accept all customers into a bank only after all the bank tellers are ready. In this example, you’ll see that the variable teller_barrier
holds a Barrier
object initialized with three parties
:
bank_barrier.py
import random
import threading
import time
from concurrent.futures import ThreadPoolExecutor
teller_barrier = threading.Barrier(3)
def now():
return time.strftime("%H:%M:%S")
def prepare_for_work(name):
print(f"{now()}: {name} is preparing their counter.")
# Simulate the delay to prepare the counter
time.sleep(random.randint(1, 3))
print(f"{now()}: {name} has finished preparing.")
# Wait for all tellers to finish preparing
teller_barrier.wait()
print(f"{now()}: {name} is now ready to serve customers.")
tellers = ["Teller 1", "Teller 2", "Teller 3"]
with ThreadPoolExecutor(max_workers=3) as executor:
for teller_name in tellers:
executor.submit(prepare_for_work, teller_name)
print(f"{now()}: All tellers are ready to serve customers.")
In this code snippet, the teller_barrier
ensures that all three tellers are ready before any of them start serving customers. teller_barrier.wait()
is the synchronization point where each thread waits for other threads.
You can also see that three tellers prepare their counters concurrently, taking different amounts of time. Despite the tellers finishing preparation at different times, they all start to serve customers simultaneously. ThreadPoolExecutor
is used to model the operations of the three tellers present in tellers
.
When you run the script, it gives an output similar to the one shown below:
$ python bank_barrier.py
10:19:42: Teller 1 is preparing their counter.
10:19:42: Teller 2 is preparing their counter.
10:19:42: Teller 3 is preparing their counter.
10:19:43: Teller 2 has finished preparing.
10:19:43: Teller 3 has finished preparing.
10:19:44: Teller 1 has finished preparing.
10:19:44: Teller 1 is now ready to serve customers.
10:19:44: Teller 3 is now ready to serve customers.
10:19:44: Teller 2 is now ready to serve customers.
10:19:44: All tellers are ready to serve customers.
This output gives you the following information about teller activity:
- Tellers start preparing at roughly the same time, as shown by the timestamp
10:19:42
. - They finish preparing at different times due to random delays. You’ll notice that Teller 1 takes longer to finish than the others.
- Once the last teller finishes preparing at timestamp
10:19:44
, all tellers announce they’re ready to serve customers at the same timestamp.
Barrier objects are useful in scenarios where multiple threads need to wait for each other before proceeding, or when you need to synchronize the start of a particular phase across multiple threads.
Now that you’ve seen examples of various synchronization primitives in action, it’s time to explore when you should employ these tools to ensure thread safety and data integrity in your multithreaded applications.
Deciding When to Use Synchronization Primitives
When you run code in a multithreaded environment, race conditions are common and result in unexpected and non-deterministic output. To make a program thread-safe, you need to know when to use synchronization primitives to control and coordinate threads.
Here are some guidelines to help you decide when to use synchronization primitives when introducing multithreading in your applications:
-
Check for atomicity requirements: You should keep in mind that operations from different threads can interleave in unpredictable ways due to context switches. If a block of statements needs to be executed as an atomic unit, then you need to implement proper mutual exclusion synchronization primitives.
-
Check for shared mutable data: When multiple threads operate on shared data, one thread may read the data that’s currently being modified by another thread. You should either avoid sharing data by using thread-local storage, or if sharing is really necessary, introduce locks or other synchronization mechanisms to prevent race conditions.
-
External library code might not be designed for thread safety: When writing code, you’ll often use third-party packages and methods from the standard-library modules. Keep in mind that library code might not be designed with thread safety in mind. The bottom line is that you’re responsible for ensuring thread safety by using proper synchronization primitives.
When you’re unsure about the thread safety of a piece of code or operation, use appropriate synchronization primitives to safeguard against potential conflicts. As the CPython documentation wisely advises:
When in doubt, use a mutex. (Source)
Conclusion
You now know how to avoid race conditions in a multithreaded environment, and how to use the synchronization primitives provided by Python’s threading
module. These tools are essential for ensuring that your code behaves correctly and predictably when multiple threads are involved, preventing issues that can arise from concurrent execution.
In this tutorial, you’ve learned how to:
- Identify race conditions in code
- Use
Lock
andRLock
objects for mutual exclusion - Use
Semaphore
objects to limit concurrent access to resources - Leverage
Event
objects for simple signaling between threads - Use
Condition
objects to make threads conditionally wait - Use
Barrier
objects to coordinate thread execution
With this knowledge, you’re well-equipped to write thread-safe code that functions as expected in a multithreaded environment. By applying these synchronization techniques, you can enhance the reliability and efficiency of your applications, allowing them to fully leverage the benefits of concurrent execution while minimizing the risks of race conditions.
Get Your Code: Click here to download the free sample code that you’ll use to learn about thread safety techniques in Python.
Take the Quiz: Test your knowledge with our interactive “Python Thread Safety: Using a Lock and Other Techniques” quiz. You’ll receive a score upon completion to help you track your learning progress:
Interactive Quiz
Python Thread Safety: Using a Lock and Other TechniquesIn this quiz, you'll test your understanding of Python thread safety. You'll revisit the concepts of race conditions, locks, and other synchronization primitives in the threading module. By working through this quiz, you'll reinforce your knowledge about how to make your Python code thread-safe.