Stackless Event Loops

This is a follow-on article to Stackless Python Meets Twisted Matrix. This time how to use function decorators to turn a ordinary looking function in a looping event dispatcher. Useful for the Observer design pattern.

Through the syntactical power of decorators, one can can convert any function into a continuously run event loop. This utilizes Stackless Python, which has been discussed in earlier articles on not only this web site, but many many others as well.

The premise behind this event loop is this: a tasklet runs and dispatches incoming “events” to a handler function. To the outside caller, it appears to be a regular function call, but the mechanisms provided by the decorator allow the execution of the “event handler” to be run in a seperate tasklet. If desired, this premise can be extended further to allow for the event loop to run in its own thread.

First, let’s look at the class that does all the heavy lifting.

class ChannelProcessor:
    def __init__( self, action ):
        self.channel = stackless.channel( )
        self.action = action
        self.running = True
        self.process( )

    def stop( self ):
        self.running = False
        self.channel.send( 1 )

    @blocking_tasklet
    def __call__( self, *args, **kwargs ):
        c = stackless.channel( )
        self.channel.send( (c,args,kwargs) )
        rv = c.receive( )
        if isinstance( rv, failure.Failure ):
            raise rv.value
        return rv

@deferred_tasklet
def process( self ):
    while self.running:
        vals = self.channel.receive( )
        if len( vals ) == 3:
            c,args,kwargs = vals
            d = defer.Deferred( )
            d.addBoth( c.send )
            _wrapper( d, self.action, *args, **kwargs )
        else:
            self.running = False

This code makes use of the decorators described in an earlier article, available here. As you will notice, the core of the event loop is contained in the process function, which runs in it’s own tasklet (due to being decorated by the deferred_tasklet decorator). It doesn’t matter if you aren’t using Twisted for this code to work, although you will need Twisted installed for it to run (unless you change the mechanices of deferred_tasklet).

process
simply loops until told otherwise (via the stop method), receiving data from it’s channel. If the data is a tuple of 3 items, it calls the original function (stored in the action member). Return value from the event handler is sent on a channel, which we received as the 1st element of the tuple.

An outside caller enters into this mechanism via the __call__ method. This method creates a new channel, and then passes that channel, and the parameters it was called with along the object’s channel. It then waits for data to be sent back to it. After a quick look at the data returned, it either returns the data through the traditional means or raises an exception (if it received an exception).

Now, for the decorator:

@decorator
def channel_processor( f, *args, **kwargs ):
    func_obj = None
    if type( f ) == types.MethodType:
        target = f.im_self
        target_name = f.func_name + '_cp'
        if not hasattr( target, target_name ):
            func_obj = ChannelProcessor( f )
            setattr( target, target_name, func_obj )
        else:
            func_obj = getattr( target, target_name )
    elif not hasattr( f, "_cp" ):
        setattr( f, '_cp', ChannelProcessor( f ) )
        func_obj = f._cp
    else:
        func_obj = f._cp
    return func_obj( *args, **kwargs )

Here, we create a ChannelProcessor object and stuff it into the calling function’s member list (as a member named _cp). If it already exists, great. In any case, we then call the object, which will lead us into the __call__ method shown above.

A special case is made if we are an object’s method, instead of a regular function. This does not happen in the regular use-case of a decorator (when using the @decorator_name syntax). It only happens when we do something like:

class A:
    def __init__( self ):
        self.func = channel_processor( func )

    def func( self, *args ):
        print "Here i am with arg list:", args

You use this method if you need to have each object having it’s own tasklet that handles events. Using the standard decorator syntax results in each function having it’s own event handling tasklet.

I tend to use the per-function methodology, but as usual, your mileage may vary.


Posted

in

by

Comments

Leave a Reply

Your email address will not be published. Required fields are marked *