Python Thread Safety: Using a Lock and Other Techniques

Python Thread Safety: Using a Lock and Other Techniques

by Adarsh Divakaran Oct 23, 2024 intermediate python

Python threading allows you to run parts of your code concurrently, making the code more efficient. However, when you introduce threading to your code without knowing about thread safety, you may run into issues such as race conditions. You solve these with tools like locks, semaphores, events, conditions, and barriers.

By the end of this tutorial, you’ll be able to identify safety issues and prevent them by using the synchronization primitives in Python’s threading module to make your code thread-safe.

In this tutorial, you’ll learn:

  • What thread safety is
  • What race conditions are and how to avoid them
  • How to identify thread safety issues in your code
  • What different synchronization primitives exist in the threading module
  • How to use synchronization primitives to make your code thread-safe

To get the most out of this tutorial, you’ll need to have basic experience working with multithreaded code using Python’s threading module and ThreadPoolExecutor.

Take the Quiz: Test your knowledge with our interactive “Python Thread Safety: Using a Lock and Other Techniques” quiz. You’ll receive a score upon completion to help you track your learning progress:


Interactive Quiz

Python Thread Safety: Using a Lock and Other Techniques

In this quiz, you'll test your understanding of Python thread safety. You'll revisit the concepts of race conditions, locks, and other synchronization primitives in the threading module. By working through this quiz, you'll reinforce your knowledge about how to make your Python code thread-safe.

Threading in Python

In this section, you’ll get a general overview of how Python handles threading. Before discussing threading in Python, it’s important to revisit two related terms that you may have heard about in this context:

  • Concurrency: The ability of a system to handle multiple tasks by allowing their execution to overlap in time but not necessarily happen simultaneously.
  • Parallelism: The simultaneous execution of multiple tasks that run at the same time to leverage multiple processing units, typically multiple CPU cores.

Python’s threading is a concurrency framework that allows you to spin up multiple threads that run concurrently, each executing pieces of code. This improves the efficiency and responsiveness of your application. When running multiple threads, the Python interpreter switches between them, handing the control of execution over to each thread.

By running the script below, you can observe the creation of four threads:

Python threading_example.py
import threading
import time
from concurrent.futures import ThreadPoolExecutor

def threaded_function():
    for number in range(3):
        print(f"Printing from {threading.current_thread().name}. {number=}")
        time.sleep(0.1)

with ThreadPoolExecutor(max_workers=4, thread_name_prefix="Worker") as executor:
    for _ in range(4):
        executor.submit(threaded_function)

In this example, threaded_function prints the values zero to two that your for loop assigns to the loop variable number. Using a ThreadPoolExecutor, four threads are created to execute the threaded function. ThreadPoolExecutor is configured to run a maximum of four threads concurrently with max_workers=4, and each worker thread is named with a “Worker” prefix, as in thread_name_prefix="Worker".

In print(), the .name attribute on threading.current_thread() is used to get the name of the current thread. This will help you identify which thread is executed each time. A call to sleep() is added inside the threaded function to increase the likelihood of a context switch.

You’ll learn what a context switch is in just a moment. First, run the script and take a look at the output:

Shell
$ python threading_example.py
Printing from Worker_0. number=0
Printing from Worker_1. number=0
Printing from Worker_2. number=0
Printing from Worker_3. number=0
Printing from Worker_0. number=1
Printing from Worker_2. number=1
Printing from Worker_1. number=1
Printing from Worker_3. number=1
Printing from Worker_0. number=2
Printing from Worker_2. number=2
Printing from Worker_1. number=2
Printing from Worker_3. number=2

Each line in the output represents a print() call from a worker thread, identified by Worker_0, Worker_1, Worker_2, and Worker_3. The number that follows the worker thread name shows the current iteration of the loop each thread is executing. Each thread takes turns executing the threaded_function, and the execution happens in a concurrent rather than sequential manner.

For example, after Worker_0 prints number=0, it’s not immediately followed by Worker_0 printing number=1. Instead, you see outputs from Worker_1, Worker_2, and Worker_3 printing number=0 before Worker_0 proceeds to number=1. You’ll notice from these interleaved outputs that multiple threads are running at the same time, taking turns to execute their part of the code.

This happens because the Python interpreter performs a context switch. This means that Python pauses the execution state of the current thread and passes control to another thread. When the context switches, Python saves the current execution state so that it can resume later. By switching the control of execution at specific intervals, multiple threads can execute code concurrently.

You can check the context switch interval of your Python interpreter by typing the following in the REPL:

Python
>>> import sys
>>> sys.getswitchinterval()
0.005

The output of calling the getswitchinterval() is a number in seconds that represents the context switch interval of your Python interpreter. In this case, it’s 0.005 seconds or five milliseconds. You can think of the switch interval as how often the Python interpreter checks if it should switch to another thread.

An interval of five milliseconds doesn’t mean that threads switch exactly every five milliseconds, but rather that the interpreter considers switching to another thread at these intervals.

The switch interval is defined in the Python docs as follows:

This floating-point value determines the ideal duration of the “timeslices” allocated to concurrently running Python threads. (Source)

In the previous thread pool example, you’ll find a call to sleep() inside the threaded_function, which delays the program execution by 0.1 seconds. This increases the chance of a context switch happening in between because the execution will take much longer than the context switch interval.

Due to context switching, programs can behave unexpectedly when run in a multithreaded environment. To manage this complexity, you need to understand thread safety, which ensures your programs run predictably and reliably. You’ll explore what thread safety means and why it’s essential for maintaining consistency in your multithreaded applications.

Understanding Thread Safety

Thread safety refers to the property of an algorithm or program being able to function correctly during simultaneous execution by multiple threads. Code is considered thread-safe if it behaves deterministically and produces the desired output when run in a multithreaded environment.

Thread safety issues occur because of two factors:

  1. Shared mutable data: Threads share the memory of their parent process, so all variables and data structures are shared across threads. This can lead to errors when working with shared, changeable data.
  2. Non-atomic operations: These occur in a multithreaded environment when operations involving multiple steps are interrupted by context switches. This can result in unexpected outcomes if threads are switched during the operation.

To effectively manage these issues, it’s important to understand how Python handles threading under the hood. This has everything to do with the Global Interpreter Lock (GIL), and the GIL’s implications on threading is what you’ll learn about next.

The GIL and Its Implications on Threading

Python’s Global Interpreter Lock (GIL) is a mutex that protects access to Python objects, preventing multiple threads from executing Python bytecodes simultaneously. The GIL allows only one thread to execute at a single point in time. This can lead to performance penalties if you try to use multithreading in CPU-bound programs.

When an operation is completed in a single bytecode instruction, it’s atomic. Because the GIL only allows one thread to run at a time, these atomic operations are safe from interference by other threads. This ensures that atomic operations are generally thread-safe, which means that you don’t have to worry about conflicts between threads with atomic operations.

Also, keep in mind that the GIL only deals with single bytecode instructions, which aren’t the same as single lines of Python code. If you want to learn more about why some lines of Python code that may appear atomic are actually not, then expand the collapsible section below:

Not all operations that appear atomic in Python code translate to single bytecode instructions. For example, updating shared mutable data involves multiple bytecode instructions and is therefore non-atomic:

Python
>>> import dis

>>> def single_line_increment(n):
...     n += 1
...     return n
...

>>> dis.dis(single_line_increment)
  2           0 LOAD_FAST                0 (n)
              2 LOAD_CONST               1 (1)
              4 INPLACE_ADD
              6 STORE_FAST               0 (n)

  3           8 LOAD_FAST                0 (n)
             10 RETURN_VALUE

When you inspect the bytecode of n += 1 using the dis module, you’ll notice that it’s non-atomic. The operation consists of four main instructions, each of which can potentially be interrupted by the Python interpreter:

  1. LOAD_FAST loads the current value of n.
  2. LOAD_CONST loads the constant 1.
  3. INPLACE_ADD adds n and 1.
  4. STORE_FAST stores the result back into n.

Between any of these four bytecode instructions, Python may release the GIL and allow another thread to execute. If the other thread also modifies n, then this may lead to a race condition and result in unpredictable behavior.

To ensure thread safety when accessing shared mutable data, synchronization mechanisms like locks are still necessary, despite the presence of the GIL.

Up to Python 3.12, threading allows for concurrency rather than parallelism because of the GIL. The GIL limits parallelism but enables concurrency by allowing multiple threads to run concurrently. A GIL-free Python interpreter is available starting with Python 3.13 and can be enabled with a build-time flag. Note that the default Python interpreter will still have the GIL in 3.13.

As a result of the ongoing effort to remove the GIL, a GIL-free Python interpreter may be the default in a future Python release. However, the GIL removal is being implemented in a way that aims to maintain compatibility with existing multithreaded Python code, ensuring that the behavior of properly written multithreaded code remains consistent with the GIL-enabled versions.

Race Conditions

Now that you’ve explored the basics of thread safety, it’s time to look at race conditions and how the lack of proper synchronization between threads can lead to unpredictable and erroneous program behavior.

A race condition occurs when the outcome of a program depends on the sequence or timing of uncontrollable events like thread execution order. Race conditions can lead to logical errors and non-deterministic results when code is run.

If two threads simultaneously read and write to a shared variable without adequate synchronization, then they can interfere with each other, leading to incorrect results and behaviors. You can run the script below to simulate such a scenario where two threads try to modify an attribute simultaneously:

Python bank_multithreaded_withdrawal.py
import time
from concurrent.futures import ThreadPoolExecutor

class BankAccount:
    def __init__(self, balance=0):
        self.balance = balance

    def withdraw(self, amount):
        if self.balance >= amount:
            new_balance = self.balance - amount
            time.sleep(0.1)  # Simulate a delay
            self.balance = new_balance
        else:
            raise ValueError("Insufficient balance")

account = BankAccount(1000)

with ThreadPoolExecutor(max_workers=2) as executor:
    executor.submit(account.withdraw, 500)
    executor.submit(account.withdraw, 700)

print(f"Final account balance: {account.balance}")

Here, you can see a BankAccount object that gets initialized with a balance of 1000. The .withdraw() method is used to withdraw an amount from the account, provided there’s a sufficient balance. A call to sleep() is added inside .withdraw() to simulate a delay and make context switches more likely to occur. Even without this explicit delay, context switches can happen between statements in real-world scenarios.

When you run the code above, you may get this result:

Shell
$ python bank_multithreaded_withdrawal.py
Final account balance: 300

The result isn’t guaranteed to be deterministic. When you run this script multiple times, the results can vary:

Shell
$ python bank_multithreaded_withdrawal.py
Final account balance: 500

As you can see, this time the result is different. If these withdrawal operations are executed sequentially, then the code is expected to raise an exception since the account balance is only 1000 and is less than the total amount to be withdrawn, which is 1200—the sum of 500 and 700.

In the multithreaded scenario above, the flow of execution may happen as follows:

  • The calls to executor.submit() results in the creation of two threads.
  • The first thread checks the balance (1000) and finds it sufficient for a withdrawal of 500, so it proceeds with the withdrawal.
  • Before the first thread saves the update balance to the .balance attribute, a context switch happens, and the second thread starts and attemps to withdraw an amount of 700. It checks the balance and finds it sufficient for the withdrawal of 700.
  • Both threads independently calculate new balances based on the original balance of 1000. The first thread attempts to update the .balance attribute to 500, while the second thread, unaware of the first thread’s action, calculates and tries to set the balance to 300.
  • The thread that’s last to update “wins”, and sets the balance to either 300 or 500.

You’ll notice here that the error is due to simultaneous operations on shared mutable data, which in this case is the .balance attribute that’s shared between threads. Data is being read and modified by one thread while it’s being manipulated by another. Thankfully, Python’s threading module provides a solution, which you’ll look at next.

Synchronization Primitives

You’ve seen how race conditions can be caused by either shared mutable data or non-atomic operations. Python’s threading module provides various synchronization primitives to prevent race conditions and allow for coordination across threads.

You can use synchronization primitives to do the following:

  • Control the simultaneous execution of a block of code by threads
  • Make multiple code statements atomic with respect to a thread
  • Limit concurrent access by threads
  • Coordinate between threads and perform actions based on the state of other threads

In the upcoming sections, you’ll explore specific synchronization primitives such as locks and semaphores, which play a crucial role in enforcing thread safety and coordination between threads.

Using Python Threading Locks for Mutual Exclusion

A lock is a synchronization primitive that can be used for exclusive access to a resource. Once a thread acquires a lock, no other threads can acquire it and proceed until the lock is released. You can use a lock to wrap a statement or group of statements that should be executed atomically. Next, you’ll take a look at the lock objects provided by Python’s threading module.

threading.Lock for Primitive Locking

You can create a Lock object by calling the Lock() constructor from Python’s threading module. A Lock object has two states—locked and unlocked. When it’s unlocked, it can be acquired by a thread by calling the .acquire() method on Lock. The lock is then held by the thread and other threads can’t access it. The Lock object is released by calling the .release() method so other threads can acquire it.

Here’s a quick breakdown of these two methods:

  • .acquire(): When the Lock object state is unlocked, .acquire() changes the Lock object to a locked state and returns immediately. If the Lock object is in a locked state, .acquire() blocks the program execution of other threads and waits for the Lock object to be released by the thread holding the lock.

  • .release(): When the Lock object state is locked, the .acquire() method calls from other threads will block their execution until the thread holding the lock calls .release() on Lock. It should only be called in the locked state because it changes the state to unlocked and returns immediately. If an attempt is made to release an unlocked lock, a RuntimeError is raised.

The Lock object can be used as a context manager when used with the with statement. This automates the acquiring and releasing of locks. When the program enters the with block, the .acquire() method on the Lock is automatically called. When the program exits the with block, the .release() method is called.

You can modify the banking example you used previously by adding a lock to make it thread-safe:

Python bank_thread_safe.py
import threading
import time
from concurrent.futures import ThreadPoolExecutor

class BankAccount:
    def __init__(self, balance=0):
        self.balance = balance
        self.account_lock = threading.Lock()

    def withdraw(self, amount):
        with self.account_lock:
            if self.balance >= amount:
                new_balance = self.balance - amount
                print(f"Withdrawing {amount}...")
                time.sleep(0.1)  # Simulate a delay
                self.balance = new_balance
            else:
                raise ValueError("Insufficient balance")

    def deposit(self, amount):
        with self.account_lock:
            new_balance = self.balance + amount
            print(f"Depositing {amount}...")
            time.sleep(0.1)  # Simulate a delay
            self.balance = new_balance

account = BankAccount(1000)

with ThreadPoolExecutor(max_workers=3) as executor:
    executor.submit(account.withdraw, 700)
    executor.submit(account.deposit, 1000)
    executor.submit(account.withdraw, 300)

print(f"Final account balance: {account.balance}")

Here you have the BankAccount class, which initializes its objects with an initial balance passed as the argument. The account object is initialized with a balance of 1000. A threading.Lock() object is stored in the attribute .account_lock, which is used to synchronize access to the account balance.

Both the .withdraw() and .deposit() methods use a with self.account_lock: block. This ensures that only one thread at a time can execute the code inside these blocks.

In this example, you’ll notice that a BankAccount instance and three threads are created:

  1. The first thread withdraws 700.
  2. The second thread deposits 1000.
  3. The third thread withdraws 300.

For additional insight, you add a descriptive call to print() to both methods and also print the final account balance after all the threads are complete.

Now when you run the script, you get this output:

Shell
$ python bank_thread_safe.py
Withdrawing 700...
Depositing 1000...
Withdrawing 300...
Final account balance: 1000

Unlike the previous example without locks, this implementation is thread-safe. Lock ensures that only one thread at a time can modify the balance across the deposit() and withdraw() functions, both of which share the same lock.

The with self.account_lock: statement acquires the lock before entering the block and releases it after exiting. This solves the race condition problem you saw in the earlier example. Now, operations on the balance are atomic across threads, and they can’t be interrupted by other threads once they’ve started. This means that a thread will wait for a pending withdrawal or deposit operation to complete before it executes its operation.

threading.RLock for Reentrant Locking

If a lock isn’t released properly due to an error or oversight in the code, it can lead to a deadlock, where threads wait indefinitely for the lock to be released. The reasons for a deadlock include:

  • Nested Lock Acquisition: A deadlock can occur if a thread attempts to acquire a lock it already holds. In conventional locks, trying to acquire the same lock multiple times within the same thread leads to the thread blocking itself, a situation that doesn’t resolve without external intervention.

  • Multiple Locks Acquisition: A deadlock is likely when multiple locks are used, and threads acquire them in inconsistent order. If two threads each hold one lock and are waiting for the other, neither thread can proceed, resulting in a deadlock.

In the following example, you’ll explore a nested lock acquisition scenario. Notice how BankAccount now includes two methods that execute when you perform a cash deposit:

Python bank_deadlock.py
import threading
import time
from concurrent.futures import ThreadPoolExecutor

class BankAccount:
    def __init__(self):
        self.balance = 0
        self.lock = threading.Lock()

    def deposit(self, amount):
        print(
            f"Thread {threading.current_thread().name} "
            "waiting to acquire lock for .deposit()"
        )
        with self.lock:
            print(
                f"Thread {threading.current_thread().name} "
                "acquired lock for .deposit()"
            )
            time.sleep(0.1)
            self._update_balance(amount)

    def _update_balance(self, amount):
        print(
            f"Thread {threading.current_thread().name} "
            "waiting to acquire lock for ._update_balance()"
        )
        with self.lock:  # This will cause a deadlock
            print(
                f"Thread {threading.current_thread().name} "
                "acquired lock for ._update_balance()"
            )
            self.balance += amount

account = BankAccount()

with ThreadPoolExecutor(max_workers=3, thread_name_prefix="Worker") as executor:
    for _ in range(3):
        executor.submit(account.deposit, 100)

print(f"Final balance: {account.balance}")

Here you see BankAccount with a .deposit() method and an internal ._update_balance() method. Both of these methods attempt to acquire the same lock stored in the .lock attribute.

Running this script leads to a deadlock scenario:

Shell
$ python bank_deadlock.py
Thread Worker_0 waiting to acquire lock for .deposit()
Thread Worker_0 acquired lock for .deposit()
Thread Worker_1 waiting to acquire lock for .deposit()
Thread Worker_2 waiting to acquire lock for .deposit()
Thread Worker_0 waiting to acquire lock for ._update_balance()

You’ll notice that the execution is stopped due to a deadlock and the interpreter locks infinitely. The quick breakdown below explains what’s happening:

  1. Thread Worker_0 acquires the lock in the .deposit() method.
  2. The same thread then tries to acquire the lock again in ._update_balance().
  3. Since the lock is non-reentrant, meaning it can’t be acquired again by the same thread, the program deadlocks.
  4. Threads Worker_1 and Worker_2 are waiting to acquire the lock, but they’ll never get it because the first thread, Worker_0, is deadlocked.

The lock objects created from threading.Lock are non-reentrant. Once a thread has acquired it, that same thread can’t acquire it again without first releasing it. The thread hangs indefinitely because ._update_balance() tries to acquire the lock that’s already held by .deposit().

This deadlock issue can be fixed by using RLock, which is a reentrant lock. It doesn’t block when a holding thread requests the lock again. In other words, an RLock allows a thread to acquire the lock multiple times before it releases the lock. This is useful in recursive functions or in situations where a thread needs to re-enter a locked resource that it has already locked.

Similar to Lock, an RLock object can be created by instantiating RLock from the threading module in the Python standard library. You can use RLock to prevent a deadlock scenario:

Python bank_rlock.py
import threading
import time
from concurrent.futures import ThreadPoolExecutor

class BankAccount:
    def __init__(self):
        self.balance = 0
        self.lock = threading.RLock()

    def deposit(self, amount):
        print(
            f"Thread {threading.current_thread().name} "
            "waiting to acquire lock for .deposit()"
        )
        with self.lock:
            print(
                f"Thread {threading.current_thread().name} "
                "acquired lock for .deposit()"
            )
            time.sleep(0.1)
            self._update_balance(amount)

    def _update_balance(self, amount):
        print(
            f"Thread {threading.current_thread().name} "
            "waiting to acquire lock for ._update_balance()"
        )
        with self.lock:
            print(
                f"Thread {threading.current_thread().name} "
                "acquired lock for ._update_balance()"
            )
            self.balance += amount

account = BankAccount()

with ThreadPoolExecutor(max_workers=3, thread_name_prefix="Worker") as executor:
    for _ in range(3):
        executor.submit(account.deposit, 100)

Here, the .lock attribute is now an instance of RLock instead of Lock. The script will now run without a deadlock:

Shell
$ python bank_rlock.py
Thread Worker_0 waiting to acquire lock for .deposit()
Thread Worker_0 acquired lock for .deposit()
Thread Worker_1 waiting to acquire lock for .deposit()
Thread Worker_2 waiting to acquire lock for .deposit()
Thread Worker_0 waiting to acquire lock for ._update_balance()
Thread Worker_0 acquired lock for ._update_balance()
Thread Worker_1 acquired lock for .deposit()
Thread Worker_1 waiting to acquire lock for ._update_balance()
Thread Worker_1 acquired lock for ._update_balance()
Thread Worker_2 acquired lock for .deposit()
Thread Worker_2 waiting to acquire lock for ._update_balance()
Thread Worker_2 acquired lock for ._update_balance()
Final balance: 300

You’ll notice that the execution is completed because the RLock object allows a thread to acquire the same lock multiple times. This is why the ._update_balance() method can acquire the lock even though it’s already held by .deposit(). You can see that all three threads (Worker_0, Worker_1, and Worker_2) successfully acquire the lock for both .deposit() and ._update_balance().

This code completes without deadlock, and the final balance of 300 is correct, resulting from three threads depositing 100. RLock is helpful in scenarios like this, where you have nested lock acquisitions within the same thread. RLock also keeps a count of how many times it’s been acquired, and it must be released the same number of times to be fully unlocked.

While RLock offers the advantage of allowing the same thread to acquire the lock multiple times without causing a deadlock, it comes with a slight performance overhead compared to Lock. This overhead arises because RLock needs to track the number of times the same thread has acquired it. Therefore, if your use case doesn’t require a thread to re-acquire a lock it already holds, it’s more efficient to use Lock.

Now that you understand the basics of using Lock and RLock for managing thread access, you’re ready to explore semaphores, which offer more advanced control over how threads interact with limited resources.

Limiting Access With Semaphores

A semaphore is useful when the number of resources is limited and a number of threads try to access these limited resources. It uses a counter to limit access by multiple threads to a critical section. The Semaphore() constructor accepts a value argument, which denotes the maximum number of concurrent threads acquiring it.

Similar to Lock objects, Semaphore objects have .acquire() and .release() methods and can be used as a context manager. Each .acquire() call reduces a semaphores’s counter by one, and further .acquire() calls are blocked when the counter reaches zero.

When Semaphore is used as a context manager, the context manager block is entered after a successful .acquire() call. The .release() method is automatically called when the control exits the with block.

You can use this approach in scenarios where resources are limited, and a number of threads are trying to concurrently access the same resources. In a banking context, you may think of an example where multiple customers are waiting in the bank to be served by a limited number of tellers.

In the example below, teller_semaphore indicates the number of available tellers:

Python bank_semaphore.py
import random
import threading
import time
from concurrent.futures import ThreadPoolExecutor

# Semaphore with a maximum of 2 resources (tellers)
teller_semaphore = threading.Semaphore(2)

def now():
    return time.strftime("%H:%M:%S")

def serve_customer(name):
    print(f"{now()}: {name} is waiting for a teller.")
    with teller_semaphore:
        print(f"{now()}: {name} is being served by a teller.")
        # Simulate the time taken for the teller to serve the customer
        time.sleep(random.randint(1, 3))
        print(f"{now()}: {name} is done being served.")

customers = [
    "Customer 1",
    "Customer 2",
    "Customer 3",
    "Customer 4",
    "Customer 5",
]

with ThreadPoolExecutor(max_workers=5) as executor:
    for customer_name in customers:
        thread = executor.submit(serve_customer, customer_name)

print(f"{now()}: All customers have been served.")

In this code snippet, you can see that the Semaphore object, teller_semaphore, has a value of 2, representing two available tellers.

The serve_customer() function simulates a customer’s interaction with a teller and does the following:

  • Prints when the customer starts waiting
  • Uses a with statement to acquire and release the semaphore
  • Simulates service time with a random sleep() call between one and three seconds
  • Prints when the customer is done

A timestamp of the operation is added at the beginning of each print() call inside serve_customer() using now() as a helper. You can see that a ThreadPoolExecutor is used to create a pool of five threads, and five calls that represent five customers are made to serve_customer() using these worker threads.

This is what the program output looks like now when you run the script:

Shell
$ python bank_semaphore.py
10:12:28: Customer 1 is waiting for a teller.
10:12:28: Customer 1 is being served by a teller.
10:12:28: Customer 2 is waiting for a teller.
10:12:28: Customer 2 is being served by a teller.
10:12:28: Customer 3 is waiting for a teller.
10:12:28: Customer 4 is waiting for a teller.
10:12:28: Customer 5 is waiting for a teller.
10:12:29: Customer 1 is done being served.
10:12:29: Customer 3 is being served by a teller.
10:12:30: Customer 3 is done being served.
10:12:30: Customer 4 is being served by a teller.
10:12:31: Customer 2 is done being served.
10:12:31: Customer 5 is being served by a teller.
10:12:32: Customer 4 is done being served.
10:12:33: Customer 5 is done being served.
10:12:33: All customers have been served.

You’ll notice that initially, five customers are waiting at the bank to be served by tellers. All customers start waiting at the same timestamp because the threads are created at roughly the same time.

Because teller_semaphore is acquired inside serve_customer(), the following sequence of events unfolds:

  • At any point in time, there are two active customers being served concurrently by the tellers. You can see this from the events happening at timestamp 10:12:28. Customers 1 and 2 are chosen for service by the tellers, and the other customers have to wait.
  • Remaining customers keep waiting and are admitted one at a time whenever an existing customer is done being served. You can see that at timestamp 10:12:29, Customer 1 is done being served. This frees up a teller, and Customer 3, who was waiting, gets the chance to be served.

Using Semaphore here ensures that no more than two customers are being served at any given time, which effectively manages the limited teller resources.

Using Synchronization Primitives for Communication and Coordination

In the previous sections, you learned about the synchronization primitives you can use to limit concurrent access to resources. In this section, you’ll explore synchronization primitives such as event, condition, and barrier objects, which facilitate communication and coordination among multiple threads.

Events for Signaling

You can use Event objects for signaling, allowing a thread to notify one or more threads about an action. An Event object can be created by instantiating Event from the threading module. Event objects maintain an internal flag that starts as False. You can set this flag to True with .set() and reset it to False with .clear(). Threads can wait for the flag to become True using .wait(), which blocks the thread until the flag is set.

You can see an example of an Event object in action in the banking scenario below:

Python bank_event.py
import threading
import time
from concurrent.futures import ThreadPoolExecutor

bank_open = threading.Event()
transactions_open = threading.Event()

def serve_customer(customer_data):
    print(f"{customer_data['name']} is waiting for the bank to open.")

    bank_open.wait()
    print(f"{customer_data['name']} entered the bank")
    if customer_data["type"] == "WITHDRAW_MONEY":
        print(f"{customer_data['name']} is waiting for transactions to open.")
        transactions_open.wait()
        print(f"{customer_data['name']} is starting their transaction.")

        # Simulate the time taken for performing the transaction
        time.sleep(2)

        print(
            f"{customer_data['name']} completed transaction and exited bank"
        )
    else:
        # Simulate the time taken for banking
        time.sleep(2)
        print(f"{customer_data['name']} has exited bank")

customers = [
    {"name": "Customer 1", "type": "WITHDRAW_MONEY"},
    {"name": "Customer 2", "type": "CHECK_BALANCE"},
    {"name": "Customer 3", "type": "WITHDRAW_MONEY"},
    {"name": "Customer 4", "type": "WITHDRAW_MONEY"},
]

with ThreadPoolExecutor(max_workers=4) as executor:
    for customer_data in customers:
        executor.submit(serve_customer, customer_data)

    print("Bank manager is preparing to open the bank.")
    time.sleep(2)
    print("Bank is now open!")
    bank_open.set()  # Signal that the bank is open

    time.sleep(3)
    print("Transactions are now open!")
    transactions_open.set()

print("All customers have completed their transactions.")

In this example, the bank_open and transaction_open Event objects are used to signal the opening of the bank and the commencement of transactions. You’ll notice that two Event objects are created:

  1. bank_open: Signals when the bank is open.
  2. transactions_open: Signals when transactions are allowed.

The serve_customer() function simulates customers’ actions in these ways:

  • It waits for the bank to open using bank_open.wait().
  • For withdrawal transactions, it waits for transactions to open using transactions_open.wait().
  • It simulates the transaction or other banking activity with a sleep() call.

The program then starts by simulating the bank manager’s actions:

  • Opens the bank by calling bank_open.set() after a delay of two seconds.
  • Opens transactions by calling transactions_open.set() after a delay of three seconds.

You’ll also notice four threads representing the actions of the four customers in the customers list. A ThreadPoolExecutor is used here to create four threads representing these customers.

Here’s what the output looks like now:

Shell
$ python bank_event.py
Customer 1 is waiting for the bank to open.
Customer 2 is waiting for the bank to open.
Customer 3 is waiting for the bank to open.
Customer 4 is waiting for the bank to open.
Bank manager is preparing to open the bank.
Bank is now open!
Customer 1 entered the bank
Customer 4 entered the bank
Customer 3 entered the bank
Customer 3 is waiting for transactions to open.
Customer 1 is waiting for transactions to open.
Customer 2 entered the bank
Customer 4 is waiting for transactions to open.
Customer 2 has exited bank
Transactions are now open!
Customer 4 is starting their transaction.
Customer 3 is starting their transaction.
Customer 1 is starting their transaction.
Customer 3 completed transaction and exited bank
Customer 1 completed transaction and exited bank
Customer 4 completed transaction and exited bank
All customers have completed their transactions.

By examining this output, you can observe the following:

  • Customers wait for the bank to open before entering. Initially, four customers are waiting and only enter the bank after the manager opens it, which is done by bank_open.set().
  • Customers may enter the bank in any order once the bank is open.
  • Multiple customers can enter the bank simultaneously once it’s open.
  • Customers withdrawing money wait for an additional event, which in this case is for transactions to open. Customers 1, 3, and 4 waited for the transaction_open event to be set.
  • Withdrawal transactions by Customers 1, 3, and 4 start concurrently once they’re allowed.

This example illustrates how you can use Event objects to coordinate actions across multiple threads. As you’ve seen, they’re particularly useful in scenarios where you need to signal state changes to multiple waiting threads simultaneously. This provides an efficient way to manage synchronization in your programs.

Next, you’ll dive into Condition objects, which provide a powerful mechanism for controlling the flow of your multithreaded programs.

Conditions for Conditional Waiting

A Condition object is built on top of a Lock or RLock object. It supports additional methods that allow threads to wait for certain conditions to be met, and to signal other threads that those conditions have changed.

Condition objects are always associated with a lock. The lock argument used in the Condition() constructor accepts either a Lock or RLock object. If this argument is omitted, a new RLock object is created and used as the underlying lock.

The various methods associated with Condition objects are shown in the table below:

Method Description
.acquire() Used to acquire the underlying lock associated with the Condition. It must be called before a thread can wait on or signal a condition.
.release() Releases the underlying lock.
.wait(timeout=None) Blocks the thread until it’s notified or a specified timeout occurs. This method releases the lock before blocking and reacquires it upon notification or when the timeout expires. It’s used when a thread needs to wait for a specific condition to be true before proceeding.
.notify(n=1) Wakes up one of the threads waiting for the condition if any are waiting. If multiple threads are waiting, the method selects one to notify at random.
.notify_all() Wakes up all threads waiting for the condition. It’s the broadest way to handle notifications, ensuring that all waiting threads are notified. It’s useful when a change affects all waiting threads or when all threads need to recheck the condition they’re waiting on.

These Condition methods can be used to coordinate across threads, allowing you to effectively manage the flow of execution in a multithreaded environment. By using these methods, you can ensure that threads wait for specific conditions to be met before proceeding, notify one or multiple threads when a condition has changed, and maintain control over the sequence and timing of thread operations.

Similar to locks, Condition objects support the context manager protocol. In the example below, you’ll see a Condition object, customer_available_condition, that notifies the bank teller of the presence of a new customer:

Python bank_condition.py
import random
import threading
import time
from concurrent.futures import ThreadPoolExecutor

customer_available_condition = threading.Condition()

# Customers waiting to be served by the Teller
customer_queue = []

def now():
    return time.strftime("%H:%M:%S")

def serve_customers():
    while True:
        with customer_available_condition:
            # Wait for a customer to arrive
            while not customer_queue:
                print(f"{now()}: Teller is waiting for a customer.")
                customer_available_condition.wait()

            # Serve the customer
            customer = customer_queue.pop(0)
            print(f"{now()}: Teller is serving {customer}.")

        # Simulate the time taken to serve the customer
        time.sleep(random.randint(1, 5))
        print(f"{now()}: Teller has finished serving {customer}.")

def add_customer_to_queue(name):
    with customer_available_condition:
        print(f"{now()}: {name} has arrived at the bank.")
        customer_queue.append(name)

        customer_available_condition.notify()

customer_names = [
    "Customer 1",
    "Customer 2",
    "Customer 3",
    "Customer 4",
    "Customer 5",
]

with ThreadPoolExecutor(max_workers=6) as executor:
    teller_thread = executor.submit(serve_customers)
    for name in customer_names:
        # Simulate customers arriving at random intervals
        time.sleep(random.randint(1, 3))
        executor.submit(add_customer_to_queue, name)

In this example, you can see that the Condition object customer_available_condition acts as both a lock and a way to communicate between threads. It’s used to coordinate between the teller and customers.

Here, the customer_queue is the shared resource protected by the condition. The Condition object is used with the with statement to ensure that the Condition object’s lock is properly acquired and released.

Notice how serve_customers() runs in an infinite loop and performs the following functions:

  • Waits for customers using customer_available_condition.wait().
  • Uses the .wait() method and releases the Condition object’s lock. Because of this, add_customer_to_queue() can acquire the lock when a new customer arrives.
  • Simulates the action of the teller by serving customers when they arrive with a random delay representing the service time.

The add_customer_to_queue() function adds the customer to the queue and notifies the teller of a new customer using .notify().

You can see that the script creates a ThreadPoolExecutor with a maximum of six workers and calls serve_customers() from one thread. Then, it calls add_customer_to_queue() from five threads, representing five different customers in the customer_names list.

This script generates the following output:

Shell
$ python bank_condition.py
10:15:08: Teller is waiting for a customer.
10:15:09: Customer 1 has arrived at the bank.
10:15:09: Teller is serving Customer 1.
10:15:11: Customer 2 has arrived at the bank.
10:15:12: Teller has finished serving Customer 1.
10:15:12: Teller is serving Customer 2.
10:15:13: Teller has finished serving Customer 2.
10:15:13: Teller is waiting for a customer.
10:15:14: Customer 3 has arrived at the bank.
10:15:14: Teller is serving Customer 3.
10:15:15: Customer 4 has arrived at the bank.
10:15:17: Customer 5 has arrived at the bank.
10:15:18: Teller has finished serving Customer 3.
10:15:18: Teller is serving Customer 4.
10:15:22: Teller has finished serving Customer 4.
10:15:22: Teller is serving Customer 5.
10:15:25: Teller has finished serving Customer 5.
10:15:25: Teller is waiting for a customer.

From this output, you can observe the following:

  • The teller alternates between serving customers and waiting for new ones.
  • When a customer arrives while the teller is waiting, the teller immediately starts serving them. You can see that at timestamp 10:15:08, the teller is waiting for a customer. As soon as Customer 1 arrives at 10:15:09, the teller starts serving the customer.
  • When a customer arrives while the teller is busy, as with Customer 2, 4 and 5, they wait in the queue until the teller is free.

To summarize, using Condition here has allowed for:

  1. The teller to efficiently wait for customers without busy-waiting
  2. Customers to notify the teller of their arrival
  3. Synchronization of access to the shared customer queue

Condition is commonly used in producer-consumer scenarios. In this case, the customers are producers adding to the queue, and the teller is a consumer taking from the queue. Now that you’ve seen how to use conditions to facilitate communication between producers and consumers, you’re ready to explore how to synchronize a fixed number of threads.

Barriers for Coordination

A barrier is a synchronization primitive that allows a group of threads to wait for each other before continuing execution. You can use Barrier objects to block program execution until a specified number of threads have reached the barrier point.

Barrier in Python’s threading module has the following signature:

Python Syntax
Barrier(parties, action=None, timeout=None)

It has one required and two optional arguments:

  1. parties specifies the number of threads of the Barrier object. The .wait() method waits for this number of threads to reach the barrier point before proceeding.
  2. action is an optional callable that will be called by one of the threads when it’s released.
  3. timeout optionally specifies the timeout value for the .wait() method.

A Barrier object can be used in a banking scenario when you want to accept all customers into a bank only after all the bank tellers are ready. In this example, you’ll see that the variable teller_barrier holds a Barrier object initialized with three parties:

Python bank_barrier.py
import random
import threading
import time
from concurrent.futures import ThreadPoolExecutor

teller_barrier = threading.Barrier(3)

def now():
    return time.strftime("%H:%M:%S")

def prepare_for_work(name):
    print(f"{now()}: {name} is preparing their counter.")

    # Simulate the delay to prepare the counter
    time.sleep(random.randint(1, 3))
    print(f"{now()}: {name} has finished preparing.")

    # Wait for all tellers to finish preparing
    teller_barrier.wait()
    print(f"{now()}: {name} is now ready to serve customers.")

tellers = ["Teller 1", "Teller 2", "Teller 3"]

with ThreadPoolExecutor(max_workers=3) as executor:
    for teller_name in tellers:
        executor.submit(prepare_for_work, teller_name)

print(f"{now()}: All tellers are ready to serve customers.")

In this code snippet, the teller_barrier ensures that all three tellers are ready before any of them start serving customers. teller_barrier.wait() is the synchronization point where each thread waits for other threads.

You can also see that three tellers prepare their counters concurrently, taking different amounts of time. Despite the tellers finishing preparation at different times, they all start to serve customers simultaneously. ThreadPoolExecutor is used to model the operations of the three tellers present in tellers.

When you run the script, it gives an output similar to the one shown below:

Shell
$ python bank_barrier.py
10:19:42: Teller 1 is preparing their counter.
10:19:42: Teller 2 is preparing their counter.
10:19:42: Teller 3 is preparing their counter.
10:19:43: Teller 2 has finished preparing.
10:19:43: Teller 3 has finished preparing.
10:19:44: Teller 1 has finished preparing.
10:19:44: Teller 1 is now ready to serve customers.
10:19:44: Teller 3 is now ready to serve customers.
10:19:44: Teller 2 is now ready to serve customers.
10:19:44: All tellers are ready to serve customers.

This output gives you the following information about teller activity:

  • Tellers start preparing at roughly the same time, as shown by the timestamp 10:19:42.
  • They finish preparing at different times due to random delays. You’ll notice that Teller 1 takes longer to finish than the others.
  • Once the last teller finishes preparing at timestamp 10:19:44, all tellers announce they’re ready to serve customers at the same timestamp.

Barrier objects are useful in scenarios where multiple threads need to wait for each other before proceeding, or when you need to synchronize the start of a particular phase across multiple threads.

Now that you’ve seen examples of various synchronization primitives in action, it’s time to explore when you should employ these tools to ensure thread safety and data integrity in your multithreaded applications.

Deciding When to Use Synchronization Primitives

When you run code in a multithreaded environment, race conditions are common and result in unexpected and non-deterministic output. To make a program thread-safe, you need to know when to use synchronization primitives to control and coordinate threads.

Here are some guidelines to help you decide when to use synchronization primitives when introducing multithreading in your applications:

  • Check for atomicity requirements: You should keep in mind that operations from different threads can interleave in unpredictable ways due to context switches. If a block of statements needs to be executed as an atomic unit, then you need to implement proper mutual exclusion synchronization primitives.

  • Check for shared mutable data: When multiple threads operate on shared data, one thread may read the data that’s currently being modified by another thread. You should either avoid sharing data by using thread-local storage, or if sharing is really necessary, introduce locks or other synchronization mechanisms to prevent race conditions.

  • External library code might not be designed for thread safety: When writing code, you’ll often use third-party packages and methods from the standard-library modules. Keep in mind that library code might not be designed with thread safety in mind. The bottom line is that you’re responsible for ensuring thread safety by using proper synchronization primitives.

When you’re unsure about the thread safety of a piece of code or operation, use appropriate synchronization primitives to safeguard against potential conflicts. As the CPython documentation wisely advises:

When in doubt, use a mutex. (Source)

Conclusion

You now know how to avoid race conditions in a multithreaded environment, and how to use the synchronization primitives provided by Python’s threading module. These tools are essential for ensuring that your code behaves correctly and predictably when multiple threads are involved, preventing issues that can arise from concurrent execution.

In this tutorial, you’ve learned how to:

  • Identify race conditions in code
  • Use Lock and RLock objects for mutual exclusion
  • Use Semaphore objects to limit concurrent access to resources
  • Leverage Event objects for simple signaling between threads
  • Use Condition objects to make threads conditionally wait
  • Use Barrier objects to coordinate thread execution

With this knowledge, you’re well-equipped to write thread-safe code that functions as expected in a multithreaded environment. By applying these synchronization techniques, you can enhance the reliability and efficiency of your applications, allowing them to fully leverage the benefits of concurrent execution while minimizing the risks of race conditions.

Take the Quiz: Test your knowledge with our interactive “Python Thread Safety: Using a Lock and Other Techniques” quiz. You’ll receive a score upon completion to help you track your learning progress:


Interactive Quiz

Python Thread Safety: Using a Lock and Other Techniques

In this quiz, you'll test your understanding of Python thread safety. You'll revisit the concepts of race conditions, locks, and other synchronization primitives in the threading module. By working through this quiz, you'll reinforce your knowledge about how to make your Python code thread-safe.

🐍 Python Tricks 💌

Get a short & sweet Python Trick delivered to your inbox every couple of days. No spam ever. Unsubscribe any time. Curated by the Real Python team.

Python Tricks Dictionary Merge

About Adarsh Divakaran

Adarsh is a Pythonista & Backend Developer with expertise in building backend systems & APIs. He has presented at various Python conferences, including Pycascades, Europython, and Flaskcon.

» More about Adarsh

Each tutorial at Real Python is created by a team of developers so that it meets our high quality standards. The team members who worked on this tutorial are:

Master Real-World Python Skills With Unlimited Access to Real Python

Locked learning resources

Join us and get access to thousands of tutorials, hands-on video courses, and a community of expert Pythonistas:

Level Up Your Python Skills »

Master Real-World Python Skills
With Unlimited Access to Real Python

Locked learning resources

Join us and get access to thousands of tutorials, hands-on video courses, and a community of expert Pythonistas:

Level Up Your Python Skills »

What Do You Think?

Rate this article:

What’s your #1 takeaway or favorite thing you learned? How are you going to put your newfound skills to use? Leave a comment below and let us know.

Commenting Tips: The most useful comments are those written with the goal of learning from or helping out other students. Get tips for asking good questions and get answers to common questions in our support portal.


Looking for a real-time conversation? Visit the Real Python Community Chat or join the next “Office Hours” Live Q&A Session. Happy Pythoning!