Event-based Programming for Python

Oftentimes, you need to have objects that communicate with each other via events. This is a very useful setup, for example, in a GUI — where these events represent things like mouse clicks, key strokes, or button presses. That’s not what I developed these classes for, since I was more interested in simulating things and the event system seemed like the most natural fit, but the recipe is still relevant to other event handling needs.

We’re going to build upon earlier discussions, most notably about Stackless event loops by adding in some concrete examples of using that recipe.

The Observer pattern strikes again, as I’m defining the relationship between the event generator and the event listener as one of Observable and Observer. We’ll make use of the channel_processor function decorator described in Stackless event loops.

class EventObserver( object ):
    def __init__( self, *args, **kwargs ):
        super( EventObserver, self ).__init__( *args, **kwargs )

    @channel_processor
    def event_transieve( self, *args, **kwargs ):
        evt = kwargs.get( 'data', None )
        self.process_event( evt )

    def process_event( self, event ):
        pass

    def event_notify( self, event ):
        self.event_transieve( event )

This is straight-forward enough. The only trickery (if you could call it that) is in the event_transieve method. And all that does is take whatever is passed as the keyword argument data and call the method process_event. In this base class implementation, that function does nothing.

One bit of niftiness does occur, however, when the event_transieve method is invoked. Through the use of function decorators (and therefore transparent to the calling client) this method actually spans across tasklets, granting some semblance of concurrency.

class EventObservable( object ):
    def __init__( self, *args, **kwargs ):
        super( EventObservable, self ).__init__( *args, **kwargs )
        self.__event_observers = weakref.WeakKeyDictionary( )
        self.__events = [ ]

    def attach_event_observer( self, obs, level=1 ):
        if obs not in self.__event_observers:
            self.__event_observers[obs] = level
        return self

    def detach_event_observer( self, obs ):
        if obs in self.__event_observers:
            del self.__event_observers[obs]
        return self

    @channel_processor
    def dispatcher( self, *args, **kwargs ):
        data = kwargs.get( 'data', None )
        wlist = []
        for key in self.__event_observers.keys( ):
            val = self.__event_observers[key]
            seok = SortableEventObserverKey( key, val )
            heapq.heappush( wlist, seok )
         while len( wlist ):
             obs = heapq.heappop( wlist )
             obs( evt )

    def dispatch_event( self, event ):
        self.dispatcher( event )
        return event

Now, we can safely ignore the attach_event_observer and methods — they only exist to implement the Observer pattern. The only method we really care about at the moment is dispatcher.

In this method we simply loop over all the currently registered observers, invoking (ultimately) their event_notify method. If you don’t see how that happens, just be patient and wait until we look at the SortableEventObserverKey helper class and it’s definition of the __call__ method.

class SortableEventObserverKey( object ):
    def __init__( self, kval, weight, *args, **kwargs ):
        super( SortableEventObserverKey, self ).__init__( *args, **kwargs )
        self.__value = kval
        self.__weight = weight

    def __cmp__( self, other ):
        return cmp( self.__weight, other.__weight )

    def __call__( self, event ):
        return self.__value.event_notify( event )

    def __repr__( self ):
        return "%s, %s" % ( self.__value, self.__weight )

Now, I hate that I had to throw something like that into the discussion. The helper class only exists to make the comparison functions easier when using the heap queue. For anyone unfamiliar with heaps, a heap ensures that the highest weighted object is at the front of the queue and will be the first one taken out of the structure.

class EventParticipant( EventObservable, EventObserver ):
    def __init__( self, *args, **kwargs ):
        super( EventParticipant, self ).__init__( *args, **kwargs )
        event_manager = kwargs.get( "event_manager", "events" )
        self.event_manager = pytypes.get_event_manager( event_manager )

    def generate_event( self, event_type, *data, **kwargs ):
        evt = self.event_manager.create_event( self, event_type, *data, **kwargs )
        return self.dispatch_event( evt )

Here’s the easy class to implement. It defines the EventParticipant, which is both the Observable and the Observer. This is, utlimately, the class that I extend for my simulations since my program domain requires for the objects to both generate events and be interested in other object’s events. Simply extending from this class gives you that ability in a nice, clean, and concurrent fashion (or, at least as concurrent as Stackless gets you).

Leave a Reply

Your email address will not be published. Required fields are marked *