Writing a Commissaire Service¶
High Level¶
- Subclass
commissaire_service.service.CommissaireService
- Define all
on_{{ method }}
methods to exposed them on the message bus - Define how to run the service (Directly or via a ServiceManager)
Specifics¶
Create the Subclass¶
All Commissaire Services must subclass (or reimplement the functionality of...)
commissaire_service.service.CommissaireService
.
from commissaire_service.service import CommissaireService
class MyService(CommissaireService):
"""
This is MyService.
"""
pass
Define Exposed Methods¶
CommissaireService
uses the on_{{ method }}
convention for exposing methods
to remote callers. If you wanted to expose a method as ping
you would
define a method on your service called on_ping
. on_{{ method }}
‘s
expect to take 1 or more arguments where the first required argument is message
which is the message itself in the case the method needs extra information.
To return results back to the caller via the message bus simply use the return
statement as if it was a normal method. If there is an error, raise
the
proper exception. These will be transformed into proper messages and returned
to the message bus and passed to the caller.
Note
message must always be the first argument!
def on_ping(self, message):
"""
Exposed as ping. Takes no bus arguments.
"""
return 'pong'
def on_echo(self, message, words):
"""
Exposed as echo. Takes one bus argument of words.
"""
return words
def on_fail(self, message):
"""
Exposed as fail. Takes no bus arguments.
"""
raise NotImplementedError('I was never created')
Storage Integration¶
Most services will want to interact with Commissaire’s Storage
service
in some way; perhaps to read or update records in permanent storage or just
be notified of changes. This is such a common case that Commissaire provides
a convenience class named StorageClient
to easily interact with the
Storage
service.
The StorageClient
API uses Model
instances as inputs and outputs,
and handles all the JSON encoding and decoding for you.
from commissaire import models
from commissaire.storage import client
class MyService(CommissaireService):
"""
This demonstrates how to use StorageClient.
"""
def __init__(self, exchange_name, connection_url, config_file=None):
...
# Chain up to CommissaireService.__init__()
...
# StorageClient requires a BusMixin interface, which
# our parent class -- CommissaireService -- provides.
self.storage = client.StorageClient(self)
# Invoke a method when a new Host record is created.
#
# Can also listen for: client.NOTIFY_EVENT_DELETED
# client.NOTIFY_EVENT_UPDATED
# client.NOTIFY_EVENT_ANY
self.storage.register_callback(
self.host_created_cb, models.Host,
client.NOTIFY_EVENT_CREATED)
@client.NotifyCallback
def host_created_cb(self, event, model, message):
# "event" will always be "created" since we specified
# client.NOTIFY_EVENT_CREATED when registering this
# callback (see above).
#
# "model" will always be a models.Host instance since
# we specified it when registering this callback (see
# above). We could have also passed None to catch the
# creation of any type of record in permanent storage.
pass
def on_do_something_cool(self, message, host):
# Fetch a cluster for some reason. storage.get_cluster()
# returns a models.Cluster instance instead of a bunch of
# JSON to decode.
cluster = self.storage.get_cluster('my_cluster')
...
# Do something cool with the requested host.
...
# Say we updated the state of the models.Host instance.
# This writes the updated state back to permanent storage.
self.storage.save(host)
Running the Service¶
The simplest way to run a CommissaireService
is to create an instance
and use it’s run
method.
#: The arguments used to create new kombu.Queue instances
queue_kwargs = [
{'name': 'my_queue', 'routing_key': 'queues.my_queue.*'},
]
try:
MyService(
exchange_name='my_exchange',
connection_url='redis://127.0.0.1:6379/',
qkwargs=queue_kwargs
).run()
except Exception as error:
# Handle it ;-)
pass
A more likely pattern is to run multiple instances of a service on the same
queue to be able to handle more requests. This can be done by wrapping the
service in a ServiceManager
. As you can see it follows a similar pattern
as the CommissaireService
prepending a few inputs required for
running multiple processes.
Note
Debugging with multiple processes can be much harder. If you need to debug
a service it is recommend to use the CommissaireService
directly to
ensure no Exception
information gets eaten up between the process pool
and service.
#: The arguments used to create new kombu.Queue instances
queue_kwargs = [
{'name': 'my_queue', 'routing_key': 'queues.my_queue.*'},
]
try:
ServiceManager(
service_class=MyService,
process_count=3,
exchange_name='my_exchange',
connection_url='redis://127.0.0.1:6379/',
qkwargs=queue_kwargs
).run()
except Exception as error:
pass
Code Example¶
See simpleservice.