加载中...

11.12 理解事件驱动的IO


问题

You have heard about packages based on “event-driven” or “asynchronous” I/O, butyou’re not entirely sure what it means, how it actually works under the covers, or howit might impact your program if you use it.

解决方案

At a fundamental level, event-driven I/O is a technique that takes basic I/O operations(e.g., reads and writes) and converts them into events that must be handled by yourprogram. For example, whenever data was received on a socket, it turns into a “receive”event that is handled by some sort of callback method or function that you supply torespond to it. As a possible starting point, an event-driven framework might start witha base class that implements a series of basic event handler methods like this:

class EventHandler:def fileno(self):‘Return the associated file descriptor'raise NotImplemented(‘must implement')def wants_to_receive(self):‘Return True if receiving is allowed'return Falsedef handle_receive(self):‘Perform the receive operation'passdef wants_to_send(self):‘Return True if sending is requested'return Falsedef handle_send(self):‘Send outgoing data'pass
Instances of this class then get plugged into an event loop that looks like this:

import select

def event_loop(handlers):while True:
wants_recv = [h for h in handlers if h.wants_to_receive()]wants_send = [h for h in handlers if h.wants_to_send()]can_recv, cansend, = select.select(wants_recv, wants_send, [])for h in can_recv:

h.handle_receive()

for h in can_send:h.handle_send()
That’s it! The key to the event loop is the select() call, which polls file descriptors foractivity. Prior to calling select(), the event loop simply queries all of the handlers tosee which ones want to receive or send. It then supplies the resulting lists to select().As a result, select() returns the list of objects that are ready to receive or send. Thecorresponding handle_receive() or handle_send() methods are triggered.To write applications, specific instances of EventHandler classes are created. For ex‐ample, here are two simple handlers that illustrate two UDP-based network services:

import socketimport time

class UDPServer(EventHandler):def init(self, address):self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)self.sock.bind(address)def fileno(self):return self.sock.fileno()def wants_to_receive(self):return Trueclass UDPTimeServer(UDPServer):def handle_receive(self):msg, addr = self.sock.recvfrom(1)self.sock.sendto(time.ctime().encode(‘ascii'), addr)class UDPEchoServer(UDPServer):def handle_receive(self):msg, addr = self.sock.recvfrom(8192)self.sock.sendto(msg, addr)if name == ‘main':handlers = [ UDPTimeServer((‘',14000)), UDPEchoServer((‘',15000)) ]event_loop(handlers)
To test this code, you can try connecting to it from another Python interpreter:

>>> from socket import *
>>> s = socket(AF_INET, SOCK_DGRAM)
>>> s.sendto(b'',('localhost',14000))
0
>>> s.recvfrom(128)
(b'Tue Sep 18 14:29:23 2012', ('127.0.0.1', 14000))
>>> s.sendto(b'Hello',('localhost',15000))
5
>>> s.recvfrom(128)
(b'Hello', ('127.0.0.1', 15000))
>>>

Implementing a TCP server is somewhat more complex, since each client involves theinstantiation of a new handler object. Here is an example of a TCP echo client.

class TCPServer(EventHandler):def init(self, address, client_handler, handler_list):self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)self.sock.bind(address)self.sock.listen(1)self.client_handler = client_handlerself.handler_list = handler_listdef fileno(self):return self.sock.fileno()def wants_to_receive(self):return Truedef handle_receive(self):client, addr = self.sock.accept()# Add the client to the event loop's handler listself.handler_list.append(self.client_handler(client, self.handler_list))class TCPClient(EventHandler):def init(self, sock, handler_list):self.sock = sockself.handler_list = handler_listself.outgoing = bytearray()def fileno(self):return self.sock.fileno()def close(self):self.sock.close()# Remove myself from the event loop's handler listself.handler_list.remove(self)def wants_to_send(self):return True if self.outgoing else Falsedef handle_send(self):nsent = self.sock.send(self.outgoing)self.outgoing = self.outgoing[nsent:]class TCPEchoClient(TCPClient):def wants_to_receive(self):return Truedef handle_receive(self):
data = self.sock.recv(8192)if not data:

self.close()

else:self.outgoing.extend(data)if name == ‘main':handlers = []handlers.append(TCPServer((‘',16000), TCPEchoClient, handlers))event_loop(handlers)
The key to the TCP example is the addition and removal of clients from the handler list.On each connection, a new handler is created for the client and added to the list. Whenthe connection is closed, each client must take care to remove themselves from the list.If you run this program and try connecting with Telnet or some similar tool, you’ll seeit echoing received data back to you. It should easily handle multiple clients.

讨论

Virtually all event-driven frameworks operate in a manner that is similar to that shownin the solution. The actual implementation details and overall software architecturemight vary greatly, but at the core, there is a polling loop that checks sockets for activityand which performs operations in response.One potential benefit of event-driven I/O is that it can handle a very large number ofsimultaneous connections without ever using threads or processes. That is, the select() call (or equivalent) can be used to monitor hundreds or thousands of socketsand respond to events occuring on any of them. Events are handled one at a time by theevent loop, without the need for any other concurrency primitives.The downside to event-driven I/O is that there is no true concurrency involved. If anyof the event handler methods blocks or performs a long-running calculation, it blocksthe progress of everything. There is also the problem of calling out to library functionsthat aren’t written in an event-driven style. There is always the risk that some librarycall will block, causing the event loop to stall.Problems with blocking or long-running calculations can be solved by sending the workout to a separate thread or process. However, coordinating threads and processes withan event loop is tricky. Here is an example of code that will do it using the concurrent.futures module:

from concurrent.futures import ThreadPoolExecutorimport os

class ThreadPoolHandler(EventHandler):def init(self, nworkers):if os.name == ‘posix':self.signal_done_sock, self.done_sock = socket.socketpair()else:
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)server.bind((‘127.0.0.1', 0))server.listen(1)self.signal_done_sock = socket.socket(socket.AF_INET,

socket.SOCK_STREAM)

self.signal_done_sock.connect(server.getsockname())self.donesock, = server.accept()server.close()

self.pending = []self.pool = ThreadPoolExecutor(nworkers)

def fileno(self):return self.done_sock.fileno()

Callback that executes when the thread is donedef _complete(self, callback, r):

self.pending.append((callback, r.result()))self.signal_done_sock.send(b'x')

Run a function in a thread pooldef run(self, func, args=(), kwargs={},*,callback):

r = self.pool.submit(func, *args, **kwargs)r.add_done_callback(lambda r: self._complete(callback, r))

def wants_to_receive(self):return True

Run callback functions of completed workdef handle_receive(self):

Invoke all pending callback functionsfor callback, result in self.pending:

callback(result)self.done_sock.recv(1)

self.pending = []

In this code, the run() method is used to submit work to the pool along with a callbackfunction that should be triggered upon completion. The actual work is then submittedto a ThreadPoolExecutor instance. However, a really tricky problem concerns the co‐ordination of the computed result and the event loop. To do this, a pair of sockets arecreated under the covers and used as a kind of signaling mechanism. When work iscompleted by the thread pool, it executes the _complete() method in the class. Thismethod queues up the pending callback and result before writing a byte of data on oneof these sockets. The fileno() method is programmed to return the other socket. Thus,when this byte is written, it will signal to the event loop that something has happened.The handle_receive() method, when triggered, will then execute all of the callbackfunctions for previously submitted work. Frankly, it’s enough to make one’s head spin.Here is a simple server that shows how to use the thread pool to carry out a long-runningcalculation:

A really bad Fibonacci implementationdef fib(n):

if n < 2:return 1else:return fib(n - 1) + fib(n - 2)

class UDPFibServer(UDPServer):def handle_receive(self):msg, addr = self.sock.recvfrom(128)n = int(msg)pool.run(fib, (n,), callback=lambda r: self.respond(r, addr))def respond(self, result, addr):self.sock.sendto(str(result).encode(‘ascii'), addr)if name == ‘main':pool = ThreadPoolHandler(16)handlers = [ pool, UDPFibServer((‘',16000))]event_loop(handlers)
To try this server, simply run it and try some experiments with another Python program:

from socket import *sock = socket(AF_INET, SOCK_DGRAM)for x in range(40):

sock.sendto(str(x).encode(‘ascii'), (‘localhost', 16000))resp = sock.recvfrom(8192)print(resp[0])

You should be able to run this program repeatedly from many different windows andhave it operate without stalling other programs, even though it gets slower and sloweras the numbers get larger.Having gone through this recipe, should you use its code? Probably not. Instead, youshould look for a more fully developed framework that accomplishes the same task.However, if you understand the basic concepts presented here, you’ll understand thecore techniques used to make such frameworks operate. As an alternative to callback-based programming, event-driven code will sometimes use coroutines. See Recipe 12.12for an example.


还没有评论.