You’d like to define tasks with behavior similar to “actors” in the so-called “actor model.”
The “actor model” is one of the oldest and most simple approaches to concurrency anddistributed computing. In fact, its underlying simplicity is part of its appeal. In a nutshell,an actor is a concurrently executing task that simply acts upon messages sent to it. Inresponse to these messages, it may decide to send further messages to other actors.Communication with actors is one way and asynchronous. Thus, the sender of a messagedoes not know when a message actually gets delivered, nor does it receive a responseor acknowledgment that the message has been processed.Actors are straightforward to define using a combination of a thread and a queue. Forexample:
from queue import Queuefrom threading import Thread, Event
pass
class Actor:def init(self):self._mailbox = Queue()def send(self, msg):‘''Send a message to the actor‘''self._mailbox.put(msg)def recv(self):
‘''Receive an incoming message‘''msg = self._mailbox.get()if msg is ActorExit:
raise ActorExit()
return msg
def close(self):‘''Close the actor, thus shutting it down‘''self.send(ActorExit)def start(self):
‘''Start concurrent execution‘''self._terminated = Event()t = Thread(target=self._bootstrap)
t.daemon = Truet.start()
def _bootstrap(self):try:self.run()except ActorExit:passfinally:self._terminated.set()def join(self):self._terminated.wait()def run(self):
‘''Run method to be implemented by the user‘''while True:
msg = self.recv()
def run(self):while True:msg = self.recv()print(‘Got:', msg)
In this example, Actor instances are things that you simply send a message to usingtheir send() method. Under the covers, this places the message on a queue and handsit off to an internal thread that runs to process the received messages. The close()method is programmed to shut down the actor by placing a special sentinel value(ActorExit) on the queue. Users define new actors by inheriting from Actor and re‐defining the run() method to implement their custom processing. The usage of theActorExit exception is such that user-defined code can be programmed to catch thetermination request and handle it if appropriate (the exception is raised by the get()method and propagated).If you relax the requirement of concurrent and asynchronous message delivery, actor-like objects can also be minimally defined by generators. For example:
def print_actor():
while True:
try:msg = yield # Get a messageprint(‘Got:', msg)except GeneratorExit:print(‘Actor terminating')
Part of the appeal of actors is their underlying simplicity. In practice, there is just onecore operation, send(). Plus, the general concept of a “message” in actor-based systemsis something that can be expanded in many different directions. For example, you couldpass tagged messages in the form of tuples and have actors take different courses ofaction like this:
class TaggedActor(Actor):def run(self):while True:tag, *payload = self.recv()getattr(self,'do_‘+tag)(*payload)
print(‘Running A', x)
def do_B(self, x, y):print(‘Running B', x, y)
As another example, here is a variation of an actor that allows arbitrary functions to beexecuted in a worker and results to be communicated back using a special Result object:
from threading import Eventclass Result:
def init(self):self._evt = Event()self._result = Nonedef set_result(self, value):> self._result = value
self._evt.set()
def result(self):self._evt.wait()return self._result
class Worker(Actor):def submit(self, func, *args, **kwargs):r = Result()self.send((func, args, kwargs, r))return rdef run(self):while True:func, args, kwargs, r = self.recv()r.set_result(func(*args, **kwargs))
Last, but not least, the concept of “sending” a task a message is something that can bescaled up into systems involving multiple processes or even large distributed systems.For example, the send() method of an actor-like object could be programmed to trans‐mit data on a socket connection or deliver it via some kind of messaging infrastructure(e.g., AMQP, ZMQ, etc.).