加载中...

12.13 多个线程队列轮询


问题

You have a collection of thread queues, and you would like to be able to poll them forincoming items, much in the same way as you might poll a collection of network con‐nections for incoming data.

解决方案

A common solution to polling problems involves a little-known trick involving a hiddenloopback network connection. Essentially, the idea is as follows: for each queue (or anyobject) that you want to poll, you create a pair of connected sockets. You then write onone of the sockets to signal the presence of data. The other sockect is then passed toselect() or a similar function to poll for the arrival of data. Here is some sample codethat illustrates this idea:

import queueimport socketimport os

class PollableQueue(queue.Queue):def init(self):
super().init()# Create a pair of connected socketsif os.name == ‘posix':

self._putsocket, self._getsocket = socket.socketpair()

else:# Compatibility on non-POSIX systemsserver = socket.socket(socket.AF_INET, socket.SOCK_STREAM)server.bind((‘127.0.0.1', 0))server.listen(1)self._putsocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)self._putsocket.connect(server.getsockname())self.getsocket, = server.accept()server.close()def fileno(self):return self._getsocket.fileno()def put(self, item):super().put(item)self._putsocket.send(b'x')def get(self):self._getsocket.recv(1)return super().get()
In this code, a new kind of Queue instance is defined where there is an underlying pairof connected sockets. The socketpair() function on Unix machines can establish suchsockets easily. On Windows, you have to fake it using code similar to that shown (itlooks a bit weird, but a server socket is created and a client immediately connects to itafterward). The normal get() and put() methods are then redefined slightly to performa small bit of I/O on these sockets. The put() method writes a single byte of data to oneof the sockets after putting data on the queue. The get() method reads a single byte ofdata from the other socket when removing an item from the queue.

The fileno() method is what makes the queue pollable using a function such as select(). Essentially, it just exposes the underlying file descriptor of the socket used bythe get() function.Here is an example of some code that defines a consumer which monitors multiplequeues for incoming items:

import selectimport threading

def consumer(queues):
‘''Consumer that reads data on multiple queues simultaneously‘''while True:

canread, , _ = select.select(queues,[],[])for r in can_read:

item = r.get()print(‘Got:', item)

q1 = PollableQueue()q2 = PollableQueue()q3 = PollableQueue()t = threading.Thread(target=consumer, args=([q1,q2,q3],))t.daemon = Truet.start()

Feed data to the queuesq1.put(1)q2.put(10)q3.put(‘hello')q2.put(15)...

If you try it, you’ll find that the consumer indeed receives all of the put items, regardlessof which queues they are placed in.

讨论

The problem of polling non-file-like objects, such as queues, is often a lot trickier thanit looks. For instance, if you don’t use the socket technique shown, your only option isto write code that cycles through the queues and uses a timer, like this:

import timedef consumer(queues):

while True:for q in queues:if not q.empty():item = q.get()print(‘Got:', item)> # Sleep briefly to avoid 100% CPUtime.sleep(0.01)

This might work for certain kinds of problems, but it’s clumsy and introduces otherweird performance problems. For example, if new data is added to a queue, it won’t bedetected for as long as 10 milliseconds (an eternity on a modern processor).You run into even further problems if the preceding polling is mixed with the pollingof other objects, such as network sockets. For example, if you want to poll both socketsand queues at the same time, you might have to use code like this:

import select

def event_loop(sockets, queues):while True:

polling with a timeoutcanread, , _ = select.select(sockets, [], [], 0.01)for r in can_read:

handle_read(r)

for q in queues:if not q.empty():item = q.get()print(‘Got:', item)
The solution shown solves a lot of these problems by simply putting queues on equalstatus with sockets. A single select() call can be used to poll for activity on both. It isnot necessary to use timeouts or other time-based hacks to periodically check. More‐over, if data gets added to a queue, the consumer will be notified almost instantaneously.Although there is a tiny amount of overhead associated with the underlying I/O, it oftenis worth it to have better response time and simplified coding.


还没有评论.