Python and AMQP

Just stray links so far. A home-grown solution to message/queue persistence is probably quicker and easier than any on the following ...


Can use variety of transports, especially MongoDB

Kombu is a messaging library for Python.

The aim of Kombu is to make messaging in Python as easy as possible by providing an idiomatic high-level interface for the AMQ protocol, and also provide proven and tested solutions to common messaging problems.

Virtual transports makes it really easy to add support for non-AMQP transports. There is already built-in support for Redis, Beanstalk, Amazon SQS, CouchDB, MongoDB, ZeroMQ, ZooKeeper, SoftLayer MQ and Pyro.

Transport Comparison Client Type Direct Topic Fanout
amqp Native Yes Yes Yes
qpid Native Yes Yes Yes
redis Virtual Yes Yes Yes (PUB/SUB)
mongodb Virtual Yes Yes Yes
beanstalk Virtual Yes Yes [1] No
SQS Virtual Yes Yes [1] Yes [2]
couchdb Virtual Yes Yes [1] No
zookeeper Virtual Yes Yes [1] No
in-memory Virtual Yes Yes [1] No
django Virtual Yes Yes [1] No
sqlalchemy Virtual Yes Yes [1] No
SLMQ Virtual Yes Yes [1] No

[1]: Declarations only kept in memory, so exchanges/queues must be declared by all clients that needs them.

[2]: Fanout supported via storing routing tables in SimpleDB. Disabled by default, but can be enabled by using the supports_fanout transport option.

Note that there are a fair number of dependencies ( such as curses and curl ) that may not work or install easily on Windows. Look like addons and not core functionality.

Kombu Using MongoDB



Depends on Kombu.

Celery is an asynchronous task queue/job queue based on distributed message passing. It is focused on real-time operation, but supports scheduling as well.

The execution units, called tasks, are executed concurrently on a single or more worker servers using multiprocessing, Eventlet, or gevent. Tasks can execute asynchronously (in the background) or synchronously (wait until ready).

Celery is used in production systems to process millions of tasks a day ...

Multi broker support - The recommended message broker is RabbitMQ, but support for Redis, Beanstalk, MongoDB, CouchDB, and databases (using SQLAlchemy or the Django ORM) is also available.

Broker Instructions

  • RabbitMQ
  • Redis

Experimental Transports

  • SQLAlchemy
  • Django Database
  • MongoDB
  • Amazon SQS
  • CouchDB
  • Beanstalk
  • IronMQ

Intro : Celery and MongoDB -

    pip install celery
    pip install -U celery-with-mongodb


from celery.schedules import crontab

    "host": "",
    "port": 27017,
    "database": "jobs", 
    "taskmeta_collection": "stock_taskmeta_collection",

#used to schedule tasks periodically and passing optional arguments 
#Can be very useful. Celery does not seem to support scheduled task but only periodic
    'every-minute': {
        'task': 'tasks.add',
        'schedule': crontab(minute='*/1'),
        'args': (1,2),

.Create Application and Worker)

from celery import Celery
import time 

#Specify mongodb host and datababse to connect to
BROKER_URL = 'mongodb://localhost:27017/jobs'

celery = Celery('EOD_TASKS',broker=BROKER_URL)

#Loads settings for Backend to store results of jobs 

def add(x, y):
    return x + y

Now that we have our application and configs created lets start it up!

celery -A tasks worker --loglevel=info

Examples shows command line intereface ...

... I needed a scheduler which could store the scheduling in a real database so I decided to use mongoDB for its simplicity.

# launched the celery beat

celery beat -A your_lib.your_celery_app -S your_lib.your_scheduler_file.MongoScheduler -l DEBUG

from celery import current_app, schedules
from celery.beat import Scheduler, ScheduleEntry
from mongoengine import (
    Document, EmbeddedDocument, StringField, EmbeddedDocumentField,
    ListField, DictField, BooleanField, DateTimeField, IntField

class PeriodicCronTask(Document):
    """ Models of the schedule document.

    class Crontab(EmbeddedDocument):

        minute = StringField(required=True)
        hour = StringField(required=True)
        day_of_week = StringField(default='*')
        day_of_month = StringField(default='*')
        month_of_year = StringField(default='*')

        def schedule(self):
            return schedules.crontab(

        def __unicode__(self):
            """See `~celery.schedules.CRON_REPR`.
            return u"<crontab: {0.minute} {0.hour} {0.day_of_week} {0.day_of_month} \
                 {0.month_of_year} (m/h/d/dM/MY)>".format(self)

    crontab = EmbeddedDocumentField(Crontab, required=True)
    name = StringField(required=True, unique=True)
    task = StringField(required=True)

    args = ListField(StringField())
    kwargs = DictField()
    last_run_at = DateTimeField()

    # what you want
    # enabled = BooleanField(default=False)
    # immediat = BooleanField(default=False)
    # total_run_count = IntField(default=0)

    def schedule(self):
        return self.crontab.schedule

    def __unicode__(self):
        return u'{} :: {}'.format(, self.crontab)

    def clean(self):
        """ Validation by mongoengine to ensure the crontab schedule format is right.

class MongoScheduleEntry(ScheduleEntry):

    def __init__(self, mongo_entry):

        self._task = mongo_entry
        params = {
            "task": self._task.task,
            "args": self._task.args,
            "kwargs": self._task.kwargs,
            "app": current_app._get_current_object(),

        super(MongoScheduleEntry, self).__init__(**params)
        self.schedule = self._task.schedule
        # useful for the *is_due* method
        self.last_run_at = self._task.last_run_at

    def _next_instance(self):
        self._task.last_run_at =
        self._task.total_run_count += 1

        return self.__class__(self._task)

    __next__ = next = _next_instance

    def is_due(self):
        is_due, next = super(MongoScheduleEntry, self).is_due()
        if not self._task.enabled and is_due:
            is_due = False

        return is_due, next

    def save(self):

class MongoScheduler(Scheduler):

    Entry = MongoScheduleEntry

    def __init__(self, *args, **kwargs):
        self.conf = get_configuration()
        self._mongo = connect("schedules", host=self.conf.MONGO_CELERY_URI)

        # Check the maximum time to sleep between re-checking the schedule
        self.max_interval = self.conf.CELERY_SCHEDULER_UPDATE_INTERVAL
        self._schedules = {}

        super(MongoScheduler, self).__init__(*args, **kwargs)

    def get_from_database(self):
        """ Retrieves entries from mongoDB.
        return { MongoScheduleEntry(doc) for doc in PeriodicCronTask.objects()}

    def setup_schedule(self):
        """ Optional, you can pass this method

    def schedule(self):
        self._schedules = self.get_from_database()
        return self._schedules

    def sync(self):
        for entry in self._schedules.values():

Creating simple realtime app with Celery, CherryPy? and MongoDb? -


Carrot has pluggable messaging back-ends, so it is possible to support several messaging systems. Currently, there is support for AMQP (py-amqplib, pika), STOMP (python-stomp). There’s also an in-memory backend for testing purposes, using the Python queue module.

Several AMQP message broker implementations exists, including RabbitMQ, ZeroMQ and Apache ActiveMQ. You’ll need to have one of these installed, personally we’ve been using RabbitMQ.

Carrot is an AMQP messaging queue framework.

AMQP is the Advanced Message Queuing Protocol, an open standard protocol for message orientation, queuing, routing, reliability and security.


Pika is a pure-Python implementation of the AMQP 0-9-1 protocol that tries to stay fairly independent of the underlying network support library.

Pika was developed primarily for use with RabbitMQ, but should also work with other AMQP 0-9-1 brokers.

Pika is a pure-Python implementation of the AMQP 0-9-1 protocol that tries to stay fairly independent of the underlying network support library.

Currently pika only supports Python 2.6 and 2.7. Work to support 3.3+ is underway.

If you have not developed with Pika or RabbitMQ before ...


Python client for the Advanced Message Queuing Procotol (AMQP)


Python library for communicating with AMQP peers and brokers using Twisted

an AMQP 0.9.1 client library for Python >= 3.2.0

Any AMQP 0.9.1-compliant server is supported, but RabbitMQ is our primary target. Apache Qpid is confirmed to work, but only with “anonymous” authentication. A CRAM-MD5 auth mechanism is currently being developed and will be released shortly.

Python AMQP Messenger Examples

See JavaServers#Qpid


A gevent-based microframework for building RESTful web services that provide AMQP interfaces ...

Nucleon largely consists of glue between existing Python components, primarily gevent, paste, psycopg2, and an AMQP library called Puka.

nucleon.amqp is an AMQP 0.9.1 library built from the ground up for integration with gevent.

Good documentation - Messaging#RabbitMQ inspired.

Introduction to AMQP

Routing a message

Topic Exchange Patterns

Routing Topologies

Remote Procedure Call (RPC)


Task Distribution


Synchronous/Asynchronous Model

Synchronous Methods

Asynchronous Methods

Error Handling

Making AMQP Connections

Performing actions upon connect or reconnect


Queues and Exchanges

Exchange Management

Queue Management

Binding and Unbinding Queues

Exchange-to-Exchange Binding (RabbitMQ Extension)

Publishing Messages

Basic Message Properties

Returned Messages

Publish Confirmation (RabbitMQ Extension)

Consuming messages

Message Object

Acknowledging or Rejecting Messages

Poll a queue

RabbitMQ Extensions



The recommended library for Python to access RabbitMQ servers is Pika.

See Messaging#CloudAMQP

Also See


Search wiki for 'amqp'

Last modified 2 years ago Last modified on 04/17/2015 02:50:41 PM

Attachments (1)

Download all attachments as: .zip