Python and AMQP
Just stray links so far. A home-grown solution to message/queue persistence is probably quicker and easier than any on the following ...
Kombu
Can use variety of transports, especially MongoDB
https://pypi.python.org/pypi/kombu
Kombu is a messaging library for Python.
The aim of Kombu is to make messaging in Python as easy as possible by providing an idiomatic high-level interface for the AMQ protocol, and also provide proven and tested solutions to common messaging problems.
Virtual transports makes it really easy to add support for non-AMQP transports. There is already built-in support for Redis, Beanstalk, Amazon SQS, CouchDB, MongoDB, ZeroMQ, ZooKeeper, SoftLayer MQ and Pyro.
Transport Comparison | Client Type | Direct | Topic | Fanout |
amqp | Native | Yes | Yes | Yes |
qpid | Native | Yes | Yes | Yes |
redis | Virtual | Yes | Yes | Yes (PUB/SUB) |
mongodb | Virtual | Yes | Yes | Yes |
beanstalk | Virtual | Yes | Yes [1] | No |
SQS | Virtual | Yes | Yes [1] | Yes [2] |
couchdb | Virtual | Yes | Yes [1] | No |
zookeeper | Virtual | Yes | Yes [1] | No |
in-memory | Virtual | Yes | Yes [1] | No |
django | Virtual | Yes | Yes [1] | No |
sqlalchemy | Virtual | Yes | Yes [1] | No |
SLMQ | Virtual | Yes | Yes [1] | No |
[1]: Declarations only kept in memory, so exchanges/queues must be declared by all clients that needs them.
[2]: Fanout supported via storing routing tables in SimpleDB. Disabled by default, but can be enabled by using the supports_fanout transport option.
http://kombu.readthedocs.org/en/latest/
http://docs.kombu.me/en/latest/
http://oriolrius.cat/blog/2013/09/30/hello-world-using-kombu-library-and-python/
http://oriolrius.cat/blog/2012/03/09/amqp-and-rabbitmq-toc/
Note that there are a fair number of dependencies ( such as curses and curl ) that may not work or install easily on Windows. Look like addons and not core functionality.
Kombu Using MongoDB
http://www.slideshare.net/mongodb/mongodb-as-message-queue
http://pydoc.net/Python/kombu/2.5.4/kombu.transport.mongodb/
Celery
Depends on Kombu.
http://www.celeryproject.org/
Celery is an asynchronous task queue/job queue based on distributed message passing. It is focused on real-time operation, but supports scheduling as well.
The execution units, called tasks, are executed concurrently on a single or more worker servers using multiprocessing, Eventlet, or gevent. Tasks can execute asynchronously (in the background) or synchronously (wait until ready).
Celery is used in production systems to process millions of tasks a day ...
Multi broker support - The recommended message broker is RabbitMQ, but support for Redis, Beanstalk, MongoDB, CouchDB, and databases (using SQLAlchemy or the Django ORM) is also available.
http://docs.celeryproject.org/en/latest/getting-started/brokers/
Broker Instructions
- RabbitMQ
- Redis
Experimental Transports
- SQLAlchemy
- Django Database
- MongoDB
- Amazon SQS
- CouchDB
- Beanstalk
- IronMQ
Intro : Celery and MongoDB - http://skillachie.com/2013/06/15/intro-celery-and-mongodb/
pip install celery pip install -U celery-with-mongodb
Create celeryconfig.py
from celery.schedules import crontab CELERY_RESULT_BACKEND = "mongodb" CELERY_MONGODB_BACKEND_SETTINGS = { "host": "127.0.0.1", "port": 27017, "database": "jobs", "taskmeta_collection": "stock_taskmeta_collection", } #used to schedule tasks periodically and passing optional arguments #Can be very useful. Celery does not seem to support scheduled task but only periodic CELERYBEAT_SCHEDULE = { 'every-minute': { 'task': 'tasks.add', 'schedule': crontab(minute='*/1'), 'args': (1,2), }, }
.Create tasks.py(Celery Application and Worker)
from celery import Celery import time #Specify mongodb host and datababse to connect to BROKER_URL = 'mongodb://localhost:27017/jobs' celery = Celery('EOD_TASKS',broker=BROKER_URL) #Loads settings for Backend to store results of jobs celery.config_from_object('celeryconfig') @celery.task def add(x, y): time.sleep(30) return x + y
Now that we have our application and configs created lets start it up!
celery -A tasks worker --loglevel=info
Examples shows command line intereface ...
http://thebault.co/celery-beat-scheduler-mongodb.html
... I needed a scheduler which could store the scheduling in a real database so I decided to use mongoDB for its simplicity.
# launched the celery beat
celery beat -A your_lib.your_celery_app -S your_lib.your_scheduler_file.MongoScheduler -l DEBUG
from celery import current_app, schedules from celery.beat import Scheduler, ScheduleEntry from mongoengine import ( Document, EmbeddedDocument, StringField, EmbeddedDocumentField, ListField, DictField, BooleanField, DateTimeField, IntField ) class PeriodicCronTask(Document): """ Models of the schedule document. """ class Crontab(EmbeddedDocument): minute = StringField(required=True) hour = StringField(required=True) day_of_week = StringField(default='*') day_of_month = StringField(default='*') month_of_year = StringField(default='*') @property def schedule(self): return schedules.crontab( minute=self.minute, hour=self.hour, day_of_week=self.day_of_week, day_of_month=self.day_of_month, month_of_year=self.month_of_year) def __unicode__(self): """See `~celery.schedules.CRON_REPR`. """ return u"<crontab: {0.minute} {0.hour} {0.day_of_week} {0.day_of_month} \ {0.month_of_year} (m/h/d/dM/MY)>".format(self) crontab = EmbeddedDocumentField(Crontab, required=True) name = StringField(required=True, unique=True) task = StringField(required=True) args = ListField(StringField()) kwargs = DictField() last_run_at = DateTimeField() # what you want # enabled = BooleanField(default=False) # immediat = BooleanField(default=False) # total_run_count = IntField(default=0) @property def schedule(self): return self.crontab.schedule def __unicode__(self): return u'{} :: {}'.format(self.name, self.crontab) def clean(self): """ Validation by mongoengine to ensure the crontab schedule format is right. """ self.crontab.schedule class MongoScheduleEntry(ScheduleEntry): def __init__(self, mongo_entry): self._task = mongo_entry params = { "name": self._task.name, "task": self._task.task, "args": self._task.args, "kwargs": self._task.kwargs, "app": current_app._get_current_object(), } super(MongoScheduleEntry, self).__init__(**params) self.schedule = self._task.schedule # useful for the *is_due* method self.last_run_at = self._task.last_run_at def _next_instance(self): self._task.last_run_at = self.app.now() self._task.total_run_count += 1 return self.__class__(self._task) __next__ = next = _next_instance def is_due(self): is_due, next = super(MongoScheduleEntry, self).is_due() if not self._task.enabled and is_due: is_due = False return is_due, next def save(self): self._task.save() class MongoScheduler(Scheduler): Entry = MongoScheduleEntry def __init__(self, *args, **kwargs): self.conf = get_configuration() self._mongo = connect("schedules", host=self.conf.MONGO_CELERY_URI) # Check the maximum time to sleep between re-checking the schedule self.max_interval = self.conf.CELERY_SCHEDULER_UPDATE_INTERVAL self._schedules = {} super(MongoScheduler, self).__init__(*args, **kwargs) def get_from_database(self): """ Retrieves entries from mongoDB. """ self.sync() return {doc.name: MongoScheduleEntry(doc) for doc in PeriodicCronTask.objects()} def setup_schedule(self): """ Optional, you can pass this method """ self.schedule @property def schedule(self): self._schedules = self.get_from_database() return self._schedules def sync(self): for entry in self._schedules.values(): entry.save()
Creating simple realtime app with Celery, CherryPy? and MongoDb? - http://pawelmhm.github.io/python/2015/02/15/creating-realtime-scraper.html
Carrot
https://pypi.python.org/pypi/carrot/
Carrot has pluggable messaging back-ends, so it is possible to support several messaging systems. Currently, there is support for AMQP (py-amqplib, pika), STOMP (python-stomp). There’s also an in-memory backend for testing purposes, using the Python queue module.
Several AMQP message broker implementations exists, including RabbitMQ, ZeroMQ and Apache ActiveMQ. You’ll need to have one of these installed, personally we’ve been using RabbitMQ.
https://github.com/ask/carrot/
Carrot is an AMQP messaging queue framework.
AMQP is the Advanced Message Queuing Protocol, an open standard protocol for message orientation, queuing, routing, reliability and security.
http://ask.github.io/carrot/introduction.html
http://pythonhosted.org//carrot/
Pika
https://pypi.python.org/pypi/pika
Pika is a pure-Python implementation of the AMQP 0-9-1 protocol that tries to stay fairly independent of the underlying network support library.
Pika was developed primarily for use with RabbitMQ, but should also work with other AMQP 0-9-1 brokers.
https://github.com/tonyg/pika
http://pika.readthedocs.org/en/latest/
Pika is a pure-Python implementation of the AMQP 0-9-1 protocol that tries to stay fairly independent of the underlying network support library.
Currently pika only supports Python 2.6 and 2.7. Work to support 3.3+ is underway.
If you have not developed with Pika or RabbitMQ before ...
py-amqplib
http://barryp.org/software/py-amqplib/
Python client for the Advanced Message Queuing Procotol (AMQP)
https://code.google.com/p/py-amqplib/
Other
Python library for communicating with AMQP peers and brokers using Twisted
https://pypi.python.org/pypi/amqp
https://pypi.python.org/pypi/amqpy/
an AMQP 0.9.1 client library for Python >= 3.2.0
http://amqpy.readthedocs.org/en/latest/
Any AMQP 0.9.1-compliant server is supported, but RabbitMQ is our primary target. Apache Qpid is confirmed to work, but only with “anonymous” authentication. A CRAM-MD5 auth mechanism is currently being developed and will be released shortly.
https://pypi.python.org/pypi/librabbitmq
https://qpid.apache.org/releases/qpid-proton-0.8/messenger/python/examples/index.html
Python AMQP Messenger Examples
https://qpid.apache.org/releases/qpid-proton-0.8/protocol-engine/python/api/index.html
See JavaServers#Qpid
Nucleon
https://pypi.python.org/pypi/nucleon
A gevent-based microframework for building RESTful web services that provide AMQP interfaces ...
Nucleon largely consists of glue between existing Python components, primarily gevent, paste, psycopg2, and an AMQP library called Puka.
http://nucleon.readthedocs.org/en/latest/
http://pythonhosted.org/nucleon.amqp/
nucleon.amqp is an AMQP 0.9.1 library built from the ground up for integration with gevent.
Good documentation - Messaging#RabbitMQ inspired.
http://pythonhosted.org/nucleon.amqp/#api-documentation
Introduction to AMQP
Routing a message
Topic Exchange Patterns
Routing Topologies
Remote Procedure Call (RPC)
Map-Reduce
Task Distribution
Publish-Subscribe
Synchronous/Asynchronous Model
Synchronous Methods
Asynchronous Methods
Error Handling
Making AMQP Connections
Performing actions upon connect or reconnect
Channels
Queues and Exchanges
Exchange Management
Queue Management
Binding and Unbinding Queues
Exchange-to-Exchange Binding (RabbitMQ Extension)
Publishing Messages
Basic Message Properties
Returned Messages
Publish Confirmation (RabbitMQ Extension)
Consuming messages
Message Object
Acknowledging or Rejecting Messages
Poll a queue
RabbitMQ Extensions
Transactions
Glossary
https://devcenter.heroku.com/articles/cloudamqp#use-with-python
The recommended library for Python to access RabbitMQ servers is Pika.
Also See
https://pypi.python.org/pypi?%3Aaction=search&term=amqp&submit=search
Attachments (1)
-
kombu-mongodb-example.py
(7.4 KB) -
added by billb 3 years ago.
Example of Kombu using MongoDB as Transport
Download all attachments as: .zip