wiki:Notes/PythonAMQP

Python and AMQP

Just stray links so far. A home-grown solution to message/queue persistence is probably quicker and easier than any on the following ...

Kombu

Can use variety of transports, especially MongoDB

https://pypi.python.org/pypi/kombu

Kombu is a messaging library for Python.

The aim of Kombu is to make messaging in Python as easy as possible by providing an idiomatic high-level interface for the AMQ protocol, and also provide proven and tested solutions to common messaging problems.

Virtual transports makes it really easy to add support for non-AMQP transports. There is already built-in support for Redis, Beanstalk, Amazon SQS, CouchDB, MongoDB, ZeroMQ, ZooKeeper, SoftLayer MQ and Pyro.

Transport Comparison Client Type Direct Topic Fanout
amqp Native Yes Yes Yes
qpid Native Yes Yes Yes
redis Virtual Yes Yes Yes (PUB/SUB)
mongodb Virtual Yes Yes Yes
beanstalk Virtual Yes Yes [1] No
SQS Virtual Yes Yes [1] Yes [2]
couchdb Virtual Yes Yes [1] No
zookeeper Virtual Yes Yes [1] No
in-memory Virtual Yes Yes [1] No
django Virtual Yes Yes [1] No
sqlalchemy Virtual Yes Yes [1] No
SLMQ Virtual Yes Yes [1] No

[1]: Declarations only kept in memory, so exchanges/queues must be declared by all clients that needs them.

[2]: Fanout supported via storing routing tables in SimpleDB. Disabled by default, but can be enabled by using the supports_fanout transport option.

http://kombu.readthedocs.org/en/latest/

http://docs.kombu.me/en/latest/

http://oriolrius.cat/blog/2013/09/30/hello-world-using-kombu-library-and-python/

http://oriolrius.cat/blog/2012/03/09/amqp-and-rabbitmq-toc/

Note that there are a fair number of dependencies ( such as curses and curl ) that may not work or install easily on Windows. Look like addons and not core functionality.

Kombu Using MongoDB

http://www.slideshare.net/mongodb/mongodb-as-message-queue

http://pydoc.net/Python/kombu/2.5.4/kombu.transport.mongodb/

See kombu-mongodb-example.py

Celery

Depends on Kombu.

http://www.celeryproject.org/

Celery is an asynchronous task queue/job queue based on distributed message passing. It is focused on real-time operation, but supports scheduling as well.

The execution units, called tasks, are executed concurrently on a single or more worker servers using multiprocessing, Eventlet, or gevent. Tasks can execute asynchronously (in the background) or synchronously (wait until ready).

Celery is used in production systems to process millions of tasks a day ...

Multi broker support - The recommended message broker is RabbitMQ, but support for Redis, Beanstalk, MongoDB, CouchDB, and databases (using SQLAlchemy or the Django ORM) is also available.

https://github.com/celery

http://docs.celeryproject.org/en/latest/getting-started/brokers/

Broker Instructions

  • RabbitMQ
  • Redis

Experimental Transports

  • SQLAlchemy
  • Django Database
  • MongoDB
  • Amazon SQS
  • CouchDB
  • Beanstalk
  • IronMQ

Intro : Celery and MongoDB - http://skillachie.com/2013/06/15/intro-celery-and-mongodb/

    pip install celery
    pip install -U celery-with-mongodb

Create celeryconfig.py

from celery.schedules import crontab

CELERY_RESULT_BACKEND = "mongodb"
CELERY_MONGODB_BACKEND_SETTINGS = {
    "host": "127.0.0.1",
    "port": 27017,
    "database": "jobs", 
    "taskmeta_collection": "stock_taskmeta_collection",
}

#used to schedule tasks periodically and passing optional arguments 
#Can be very useful. Celery does not seem to support scheduled task but only periodic
CELERYBEAT_SCHEDULE = {
    'every-minute': {
        'task': 'tasks.add',
        'schedule': crontab(minute='*/1'),
        'args': (1,2),
    },
}

.Create tasks.py(Celery Application and Worker)

from celery import Celery
import time 

#Specify mongodb host and datababse to connect to
BROKER_URL = 'mongodb://localhost:27017/jobs'

celery = Celery('EOD_TASKS',broker=BROKER_URL)

#Loads settings for Backend to store results of jobs 
celery.config_from_object('celeryconfig')

@celery.task
def add(x, y):
    time.sleep(30)
    return x + y

Now that we have our application and configs created lets start it up!

celery -A tasks worker --loglevel=info

Examples shows command line intereface ...

http://thebault.co/celery-beat-scheduler-mongodb.html

... I needed a scheduler which could store the scheduling in a real database so I decided to use mongoDB for its simplicity.

# launched the celery beat

celery beat -A your_lib.your_celery_app -S your_lib.your_scheduler_file.MongoScheduler -l DEBUG

from celery import current_app, schedules
from celery.beat import Scheduler, ScheduleEntry
from mongoengine import (
    Document, EmbeddedDocument, StringField, EmbeddedDocumentField,
    ListField, DictField, BooleanField, DateTimeField, IntField
)

class PeriodicCronTask(Document):
    """ Models of the schedule document.
    """

    class Crontab(EmbeddedDocument):

        minute = StringField(required=True)
        hour = StringField(required=True)
        day_of_week = StringField(default='*')
        day_of_month = StringField(default='*')
        month_of_year = StringField(default='*')

        @property
        def schedule(self):
            return schedules.crontab(
                minute=self.minute,
                hour=self.hour,
                day_of_week=self.day_of_week,
                day_of_month=self.day_of_month,
                month_of_year=self.month_of_year)

        def __unicode__(self):
            """See `~celery.schedules.CRON_REPR`.
            """
            return u"<crontab: {0.minute} {0.hour} {0.day_of_week} {0.day_of_month} \
                 {0.month_of_year} (m/h/d/dM/MY)>".format(self)

    crontab = EmbeddedDocumentField(Crontab, required=True)
    name = StringField(required=True, unique=True)
    task = StringField(required=True)

    args = ListField(StringField())
    kwargs = DictField()
    last_run_at = DateTimeField()

    # what you want
    # enabled = BooleanField(default=False)
    # immediat = BooleanField(default=False)
    # total_run_count = IntField(default=0)

    @property
    def schedule(self):
        return self.crontab.schedule

    def __unicode__(self):
        return u'{} :: {}'.format(self.name, self.crontab)

    def clean(self):
        """ Validation by mongoengine to ensure the crontab schedule format is right.
        """
        self.crontab.schedule


class MongoScheduleEntry(ScheduleEntry):

    def __init__(self, mongo_entry):

        self._task = mongo_entry
        params = {
            "name": self._task.name,
            "task": self._task.task,
            "args": self._task.args,
            "kwargs": self._task.kwargs,
            "app": current_app._get_current_object(),
        }

        super(MongoScheduleEntry, self).__init__(**params)
        self.schedule = self._task.schedule
        # useful for the *is_due* method
        self.last_run_at = self._task.last_run_at

    def _next_instance(self):
        self._task.last_run_at = self.app.now()
        self._task.total_run_count += 1

        return self.__class__(self._task)

    __next__ = next = _next_instance

    def is_due(self):
        is_due, next = super(MongoScheduleEntry, self).is_due()
        if not self._task.enabled and is_due:
            is_due = False

        return is_due, next

    def save(self):
        self._task.save()



class MongoScheduler(Scheduler):

    Entry = MongoScheduleEntry

    def __init__(self, *args, **kwargs):
        self.conf = get_configuration()
        self._mongo = connect("schedules", host=self.conf.MONGO_CELERY_URI)

        # Check the maximum time to sleep between re-checking the schedule
        self.max_interval = self.conf.CELERY_SCHEDULER_UPDATE_INTERVAL
        self._schedules = {}

        super(MongoScheduler, self).__init__(*args, **kwargs)

    def get_from_database(self):
        """ Retrieves entries from mongoDB.
        """
        self.sync()
        return {doc.name: MongoScheduleEntry(doc) for doc in PeriodicCronTask.objects()}

    def setup_schedule(self):
        """ Optional, you can pass this method
        """
        self.schedule

    @property
    def schedule(self):
        self._schedules = self.get_from_database()
        return self._schedules

    def sync(self):
        for entry in self._schedules.values():
            entry.save()

Creating simple realtime app with Celery, CherryPy? and MongoDb? - http://pawelmhm.github.io/python/2015/02/15/creating-realtime-scraper.html

Carrot

https://pypi.python.org/pypi/carrot/

Carrot has pluggable messaging back-ends, so it is possible to support several messaging systems. Currently, there is support for AMQP (py-amqplib, pika), STOMP (python-stomp). There’s also an in-memory backend for testing purposes, using the Python queue module.

Several AMQP message broker implementations exists, including RabbitMQ, ZeroMQ and Apache ActiveMQ. You’ll need to have one of these installed, personally we’ve been using RabbitMQ.

https://github.com/ask/carrot/

Carrot is an AMQP messaging queue framework.

AMQP is the Advanced Message Queuing Protocol, an open standard protocol for message orientation, queuing, routing, reliability and security.

http://ask.github.io/carrot/introduction.html

http://pythonhosted.org//carrot/

Pika

https://pypi.python.org/pypi/pika

Pika is a pure-Python implementation of the AMQP 0-9-1 protocol that tries to stay fairly independent of the underlying network support library.

Pika was developed primarily for use with RabbitMQ, but should also work with other AMQP 0-9-1 brokers.

https://github.com/tonyg/pika

http://pika.readthedocs.org/en/latest/

Pika is a pure-Python implementation of the AMQP 0-9-1 protocol that tries to stay fairly independent of the underlying network support library.

Currently pika only supports Python 2.6 and 2.7. Work to support 3.3+ is underway.

If you have not developed with Pika or RabbitMQ before ...

https://github.com/pika

py-amqplib

http://barryp.org/software/py-amqplib/

Python client for the Advanced Message Queuing Procotol (AMQP)

https://code.google.com/p/py-amqplib/

Other

https://launchpad.net/txamqp

Python library for communicating with AMQP peers and brokers using Twisted

https://pypi.python.org/pypi/amqp

https://pypi.python.org/pypi/amqpy/

an AMQP 0.9.1 client library for Python >= 3.2.0

http://amqpy.readthedocs.org/en/latest/

Any AMQP 0.9.1-compliant server is supported, but RabbitMQ is our primary target. Apache Qpid is confirmed to work, but only with “anonymous” authentication. A CRAM-MD5 auth mechanism is currently being developed and will be released shortly.

https://pypi.python.org/pypi/librabbitmq

https://qpid.apache.org/releases/qpid-proton-0.8/messenger/python/examples/index.html

Python AMQP Messenger Examples

https://qpid.apache.org/releases/qpid-proton-0.8/protocol-engine/python/api/index.html

See JavaServers#Qpid

Nucleon

https://pypi.python.org/pypi/nucleon

A gevent-based microframework for building RESTful web services that provide AMQP interfaces ...

Nucleon largely consists of glue between existing Python components, primarily gevent, paste, psycopg2, and an AMQP library called Puka.

http://nucleon.readthedocs.org/en/latest/

http://pythonhosted.org/nucleon.amqp/

nucleon.amqp is an AMQP 0.9.1 library built from the ground up for integration with gevent.

Good documentation - Messaging#RabbitMQ inspired.

http://pythonhosted.org/nucleon.amqp/#api-documentation

Introduction to AMQP

Routing a message

Topic Exchange Patterns

Routing Topologies

Remote Procedure Call (RPC)

Map-Reduce

Task Distribution

Publish-Subscribe

Synchronous/Asynchronous Model

Synchronous Methods

Asynchronous Methods

Error Handling

Making AMQP Connections

Performing actions upon connect or reconnect

Channels

Queues and Exchanges

Exchange Management

Queue Management

Binding and Unbinding Queues

Exchange-to-Exchange Binding (RabbitMQ Extension)

Publishing Messages

Basic Message Properties

Returned Messages

Publish Confirmation (RabbitMQ Extension)

Consuming messages

Message Object

Acknowledging or Rejecting Messages

Poll a queue

RabbitMQ Extensions

Transactions

Glossary


https://devcenter.heroku.com/articles/cloudamqp#use-with-python

The recommended library for Python to access RabbitMQ servers is Pika.

See Messaging#CloudAMQP

Also See

https://pypi.python.org/pypi?%3Aaction=search&term=amqp&submit=search

https://code.activestate.com/pypm/search:amqp/

Notes/Messaging#AMQPMessaging

Search wiki for 'amqp'

Last modified 3 years ago Last modified on 04/17/2015 02:50:41 PM

Attachments (1)

Download all attachments as: .zip