You have multiple threads in your program and you want to safely communicate orexchange data between them.
Perhaps the safest way to send data from one thread to another is to use a Queue fromthe queue library. To do this, you create a Queue instance that is shared by the threads.Threads then use put() or get() operations to add or remove items from the queue.For example:
from queue import Queuefrom threading import Thread
while True:# Produce some data...out_q.put(data)
while True:# Get some datadata = in_q.get()# Process the data...
Queue instances already have all of the required locking, so they can be safely shared byas many threads as you wish.When using queues, it can be somewhat tricky to coordinate the shutdown of the pro‐ducer and consumer. A common solution to this problem is to rely on a special sentinelvalue, which when placed in the queue, causes consumers to terminate. For example:
from queue import Queuefrom threading import Thread
while running:# Produce some data...out_q.put(data)> # Put the sentinel on the queue to indicate completionout_q.put(_sentinel)
while True:> # Get some datadata = in_q.get()
Check for terminationif data is _sentinel:
in_q.put(_sentinel)break
Process the data...
A subtle feature of this example is that the consumer, upon receiving the special sentinelvalue, immediately places it back onto the queue. This propagates the sentinel to otherconsumers threads that might be listening on the same queue—thus shutting them alldown one after the other.Although queues are the most common thread communication mechanism, you canbuild your own data structures as long as you add the required locking and synchroni‐zation. The most common way to do this is to wrap your data structures with a conditionvariable. For example, here is how you might build a thread-safe priority queue, asdiscussed in Recipe 1.5.
import heapqimport threading
class PriorityQueue:def init(self):self._queue = []self._count = 0self._cv = threading.Condition()def put(self, item, priority):with self._cv:heapq.heappush(self._queue, (-priority, self._count, item))self._count += 1self._cv.notify()def get(self):with self._cv:while len(self._queue) == 0:self._cv.wait()
return heapq.heappop(self._queue)[-1]
Thread communication with a queue is a one-way and nondeterministic process. Ingeneral, there is no way to know when the receiving thread has actually received amessage and worked on it. However, Queue objects do provide some basic completionfeatures, as illustrated by the task_done() and join() methods in this example:
from queue import Queuefrom threading import Thread
while running:# Produce some data...out_q.put(data)
while True:> # Get some datadata = in_q.get()
Process the data...# Indicate completionin_q.task_done()
If a thread needs to know immediately when a consumer thread has processed a par‐ticular item of data, you should pair the sent data with an Event object that allows theproducer to monitor its progress. For example:
from queue import Queuefrom threading import Thread, Event
while running:# Produce some data...# Make an (data, event) pair and hand it to the consumerevt = Event()out_q.put((data, evt))...# Wait for the consumer to process the itemevt.wait()
while True:# Get some datadata, evt = in_q.get()# Process the data...# Indicate completionevt.set()
Writing threaded programs based on simple queuing is often a good way to maintainsanity. If you can break everything down to simple thread-safe queuing, you’ll find thatyou don’t need to litter your program with locks and other low-level synchronization.Also, communicating with queues often leads to designs that can be scaled up to otherkinds of message-based communication patterns later on. For instance, you might be
able to split your program into multiple processes, or even a distributed system, withoutchanging much of its underlying queuing architecture.One caution with thread queues is that putting an item in a queue doesn’t make a copyof the item. Thus, communication actually involves passing an object reference betweenthreads. If you are concerned about shared state, it may make sense to only pass im‐mutable data structures (e.g., integers, strings, or tuples) or to make deep copies of thequeued items. For example:from queue import Queuefrom threading import Threadimport copy
while True:# Produce some data...out_q.put(copy.deepcopy(data))
while True:# Get some datadata = in_q.get()# Process the data...
Queue objects provide a few additional features that may prove to be useful in certaincontexts. If you create a Queue with an optional size, such as Queue(N), it places a limiton the number of items that can be enqueued before the put() blocks the producer.Adding an upper bound to a queue might make sense if there is mismatch in speedbetween a producer and consumer. For instance, if a producer is generating items at amuch faster rate than they can be consumed. On the other hand, making a queue blockwhen it’s full can also have an unintended cascading effect throughout your program,possibly causing it to deadlock or run poorly. In general, the problem of “flow control”between communicating threads is a much harder problem than it seems. If you everfind yourself trying to fix a problem by fiddling with queue sizes, it could be an indicatorof a fragile design or some other inherent scaling problem.Both the get() and put() methods support nonblocking and timeouts. For example:
import queueq = queue.Queue()
try:data = q.get(block=False)except queue.Empty:...try:q.put(item, block=False)except queue.Full:...try:data = q.get(timeout=5.0)except queue.Empty:...
Both of these options can be used to avoid the problem of just blocking indefinitely ona particular queuing operation. For example, a nonblocking put() could be used witha fixed-sized queue to implement different kinds of handling code for when a queue isfull. For example, issuing a log message and discarding:
def producer(q):
...try:
q.put(item, block=False)
except queue.Full:log.warning(‘queued item %r discarded!', item)
A timeout is useful if you’re trying to make consumer threads periodically give up onoperations such as q.get() so that they can check things such as a termination flag, asdescribed in Recipe 12.1.
_running = True
def consumer(q):while _running:try:item = q.get(timeout=5.0)# Process item...except queue.Empty:pass
Lastly, there are utility methods q.qsize(), q.full(), q.empty() that can tell you thecurrent size and status of the queue. However, be aware that all of these are unreliablein a multithreaded environment. For example, a call to q.empty() might tell you thatthe queue is empty, but in the time that has elapsed since making the call, another threadcould have added an item to the queue. Frankly, it’s best to write your code not to relyon such functions.