Writing a Commissaire Service

High Level

  • Subclass commissaire_service.service.CommissaireService
  • Define all on_{{ method }} methods to exposed them on the message bus
  • Define how to run the service (Directly or via a ServiceManager)

Specifics

Create the Subclass

All Commissaire Services must subclass (or reimplement the functionality of...) commissaire_service.service.CommissaireService.

from commissaire_service.service import CommissaireService


class MyService(CommissaireService):
    """
    This is MyService.
    """
    pass

Define Exposed Methods

CommissaireService uses the on_{{ method }} convention for exposing methods to remote callers. If you wanted to expose a method as ping you would define a method on your service called on_ping. on_{{ method }}‘s expect to take 1 or more arguments where the first required argument is message which is the message itself in the case the method needs extra information.

To return results back to the caller via the message bus simply use the return statement as if it was a normal method. If there is an error, raise the proper exception. These will be transformed into proper messages and returned to the message bus and passed to the caller.

Note

message must always be the first argument!

def on_ping(self, message):
    """
    Exposed as ping. Takes no bus arguments.
    """
    return 'pong'

def on_echo(self, message, words):
    """
    Exposed as echo. Takes one bus argument of words.
    """
    return words

def on_fail(self, message):
    """
    Exposed as fail. Takes no bus arguments.
    """
    raise NotImplementedError('I was never created')

Storage Integration

Most services will want to interact with Commissaire’s Storage service in some way; perhaps to read or update records in permanent storage or just be notified of changes. This is such a common case that Commissaire provides a convenience class named StorageClient to easily interact with the Storage service.

The StorageClient API uses Model instances as inputs and outputs, and handles all the JSON encoding and decoding for you.

from commissaire import models
from commissaire.storage import client


class MyService(CommissaireService):
    """
    This demonstrates how to use StorageClient.
    """

    def __init__(self, exchange_name, connection_url, config_file=None):
        ...
        # Chain up to CommissaireService.__init__()
        ...

        # StorageClient requires a BusMixin interface, which
        # our parent class -- CommissaireService -- provides.
        self.storage = client.StorageClient(self)

        # Invoke a method when a new Host record is created.
        #
        # Can also listen for: client.NOTIFY_EVENT_DELETED
        #                      client.NOTIFY_EVENT_UPDATED
        #                      client.NOTIFY_EVENT_ANY
        self.storage.register_callback(
            self.host_created_cb, models.Host,
            client.NOTIFY_EVENT_CREATED)

    @client.NotifyCallback
    def host_created_cb(self, event, model, message):
        # "event" will always be "created" since we specified
        # client.NOTIFY_EVENT_CREATED when registering this
        # callback (see above).
        #
        # "model" will always be a models.Host instance since
        # we specified it when registering this callback (see
        # above). We could have also passed None to catch the
        # creation of any type of record in permanent storage.
        pass

    def on_do_something_cool(self, message, host):
        # Fetch a cluster for some reason.  storage.get_cluster()
        # returns a models.Cluster instance instead of a bunch of
        # JSON to decode.
        cluster = self.storage.get_cluster('my_cluster')
        ...
        # Do something cool with the requested host.
        ...

        # Say we updated the state of the models.Host instance.
        # This writes the updated state back to permanent storage.
        self.storage.save(host)

Running the Service

The simplest way to run a CommissaireService is to create an instance and use it’s run method.

#: The arguments used to create new kombu.Queue instances
queue_kwargs = [
    {'name': 'my_queue', 'routing_key': 'queues.my_queue.*'},
]

try:
    MyService(
        exchange_name='my_exchange',
        connection_url='redis://127.0.0.1:6379/',
        qkwargs=queue_kwargs
    ).run()
except Exception as error:
    # Handle it ;-)
    pass

A more likely pattern is to run multiple instances of a service on the same queue to be able to handle more requests. This can be done by wrapping the service in a ServiceManager. As you can see it follows a similar pattern as the CommissaireService prepending a few inputs required for running multiple processes.

Note

Debugging with multiple processes can be much harder. If you need to debug a service it is recommend to use the CommissaireService directly to ensure no Exception information gets eaten up between the process pool and service.

#: The arguments used to create new kombu.Queue instances
queue_kwargs = [
    {'name': 'my_queue', 'routing_key': 'queues.my_queue.*'},
]

try:
    ServiceManager(
        service_class=MyService,
        process_count=3,
        exchange_name='my_exchange',
        connection_url='redis://127.0.0.1:6379/',
        qkwargs=queue_kwargs
    ).run()
except Exception as error:
    pass

Code Example

See simpleservice.