Cyme is the Celery instance manager, a distributed application that manages clusters of Celery worker instances. Every machine (physical or virtual) runs the cyme-branch service.
Note
An updated list of requirements can always be found in the requirements/ directory of the Cyme distribution. This directory contains pip requirements files for different scenarios.
You can run as many branches as needed: one or multiple. It is also possible to run multiple branches on the same machine by specifying a custom HTTP port, and root directory for each branch.
Start one or more branches:
$ cyme-branch :8001 -i branch1 -D cyme/branch1/
$ cyme-branch :8002 -i branch2 -D cyme/branch2/
The default HTTP port is 8000, and the default root directory is instances/. The root directory must be writable by the user the cyme-branch application is running as. The logs and pid files of every worker instance will be stored in this directory.
Create a new application named foo:
$ curl -X POST -i http://localhost:8001/foo/
HTTP/1.1 201 CREATED
Content-Type: application/json
Date: Mon, 15 Aug 2011 22:06:43 GMT
Transfer-Encoding: chunked
{"name": "foo", "broker": {"password": "guest",
"hostname": "127.0.0.1",
"userid": "guest",
"port": 5672,
"virtual_host": "/"}}
Note that we can edit the broker connection details here by using a POST request:
$ curl -X POST -i http://localhost/bar/ -d \
'hostname=w1&userid=me&password=me&vhost=/'
Note
For convenience and full client support PUT can be replaced with a POST instead, and it will result in the same action being performed.
Also, for POST, PUT and DELETE the query part of the URL can be used instead of actual post data.
Create a new Celery worker instance:
$ curl -X PUT -i http://localhost:8001/foo/instances/
HTTP/1.1 201 CREATED
Content-Type: application/json
Date: Mon, 15 Aug 2011 15:25:11 GMT
Transfer-Encoding: chunked
{"is_enabled": true,
"name": "a35f2518-13bb-4403-bbdf-dd8751077712",
"queues": [],
"broker": {"password": "guest",
"userid": "guest",
"hostname": "127.0.0.1",
"virtual_host": "/",
"port": 5672},
"max_concurrency": 1,
"min_concurrency": 1}
Note that this instance is created on a random branch, not necessarily the branch that you are currently speaking to over HTTP. If you want to edit the data on a specific branch, please do so by using the admin interface of the branch, at http://localhost:8001/admin/.
In the logs of the affected branch you should now see something like this:
{582161d7-1187-4242-9874-32cd7186ba91} --> Instance.add(name=None)
{Supervisor} wake-up
{Supervisor} a35f2518-13bb-4403-bbdf-dd8751077712 instance.restart
celeryd-multi restart --suffix="" --no-color a35f2518-13bb-4403-bbdf-dd8751077712
-Q 'dq.a35f2518-13bb-4403-bbdf-dd8751077712'
--workdir=cyme/branch1/
--pidfile=cyme/branch1/a35f2518-13bb-4403-bbdf-dd8751077712/worker.pid
--logfile=cyme/branch1/a35f2518-13bb-4403-bbdf-dd8751077712/worker.log
--loglevel=DEBUG --autoscale=1,1
--broker=amqp://guest:guest@localhost:5672//
celeryd-multi v2.3.1
> a35f2518-13bb-4403-bbdf-dd8751077712: DOWN
> Restarting instance a35f2518-13bb-4403-bbdf-dd8751077712: OK
{Supervisor} a35f2518-13bb-4403-bbdf-dd8751077712 pingWithTimeout: 0.1
{Supervisor} a35f2518-13bb-4403-bbdf-dd8751077712 pingWithTimeout: 0.5
{Supervisor} a35f2518-13bb-4403-bbdf-dd8751077712 pingWithTimeout: 0.9
{Supervisor} a35f2518-13bb-4403-bbdf-dd8751077712 successfully restarted
{Supervisor} wake-up
{582161d7-1187-4242-9874-32cd7186ba91} <-- ok={
'is_enabled': True,
'name': 'a35f2518-13bb-4403-bbdf-dd8751077712',
'queues': [],
'broker': {'password': u'guest',
'hostname': u'127.0.0.1',
'userid': u'guest',
'port': 5672,
'virtual_host': u'/'},
'max_concurrency': 1,
'min_concurrency': 1}
Now that we have created an instance we can list the available instances:
$ curl -X GET -i http://localhost:8001/foo/instances/
HTTP/1.1 200 OK
Content-Type: application/json
Date: Mon, 15 Aug 2011 15:28:33 GMT
Transfer-Encoding: chunked
["a35f2518-13bb-4403-bbdf-dd8751077712"]
Note that this will list instances for every branch, not just the branch you are currently speaking to over HTTP.
Let’s create a queue declaration for a queue named tasks. This queue binds the exchange tasks with routing key tasks. (note that the queue name will be used as both exchange name and routing key if these are not provided).
Create the queue by performing the following request:
$ curl -X POST -d 'exchange=tasks&routing_key=tasks' \
-i http://localhost:8001/foo/queues/tasks/
HTTP/1.1 201 CREATED
Content-Type: application/json
Date: Mon, 15 Aug 2011 16:03:07 GMT
Transfer-Encoding: chunked
{"exchange": "t2",
"routing_key": "t2",
"options": null,
"name": "t2",
"exchange_type": null}
The queue declaration should now have been stored inside one of the branches, and we can verify that by retrieving a list of all queues defined on all branches:
$ curl -X GET -i http://localhost:8001/foo/queues/
HTTP/1.1200 OK
Content-Type: application/json
Date: Mon, 15 Aug 2011 16:08:37 GMT
Transfer-Encoding: chunked
["tasks"]
Now we can make our worker instance consume from the tasks queue to process tasks sent to it:
$ curl -X PUT -i \
http://localhost:8001/foo/instances/a35f2518-13bb-4403-bbdf-dd8751077712/queues/t2
HTTP/1.1 201 CREATED
Content-Type: application/json
Date: Mon, 15 Aug 2011 16:06:32 GMT
Transfer-Encoding: chunked
{"ok": "ok"}
In the logs for the branch that this is instance is a member of you should now see:
[2011-08-15 16:06:32,226: WARNING/MainProcess]
{Supervisor} a35f2518-13bb-4403-bbdf-dd8751077712: instance.consume_from: tasks
If the test was successful you can clean up after yourself by,
Cancelling consuming from the tasks queue:
$ curl -X DELETE -i \
http://localhost:8001/foo/instances/a35f2518-13bb-4403-bbdf-dd875107772/queues/tasks
Deleting the tasks queue:
$ curl -X DELETE -i http://localhost:8001/foo/queues/
and finally, deleting the worker instance:
$ curl -X DELETE -i http://localhost:8001/instances/a35f2518-13bb-4403-bbdf-dd8751077712/
The worker instance should now be shutdown by the branch supervisor.
[PUT|POST] http://branch:port/<name>/?hostname=str
?port=int
?userid=str
?password=str
?virtual_host=str
If hostname is not provided, then any other broker parameters will be ignored and the default broker will be used.
GET http://branch:port/
GET http://branch:port/name/
[PUT|POST] http://branch:port/<app>/instances/
This will return the details of the new id, including the instance name (which for anonymous instances is an UUID).
[PUT|POST] http://branch:port/<app>/instances/<name>/
GET http://branch:port/<app>/
GET http://branch:port/<app>/instances/<name>/
DELETE http://branch:port/<app>/instances/<name>/
[PUT|POST] http://branch:port/<app>/queues/<name>/?exchange=str
?exchange_type=str
?routing_key=str
?options=json dict
exchange and routing_key will default to the queue name if not provided, and exchange_type will default to direct. options is a json encoded mapping of additional queue, exchange and binding options, for a full list of supported options see kombu.compat.entry_to_queue().
GET http://branch:port/<app>/queues/<name>/
GET http://branch:port/<app>/queues/
Every instance can consume from one or more queues. Queues are referred to by name, and there must exist a full declaration for that name.
[PUT|POST] http://branch:port/<app>/instances/<instance>/queues/<queue>/
DELETE http://branch:port/<app>/instances/<instance>/queues/<queue>/
Queueing an URL will result in one of the worker instances to execute that request as soon as possible.
[verb] http://branch:port/<app>/queue/<queue>/<url>?get_data
post_data
The verb can be any supported HTTP verb, such as HEAD, GET, POST, PUT, DELETE, TRACE, OPTIONS, CONNECT, and PATCH. The worker will then use the same verb when performing the request. Any get and post data provided will also be forwarded.
When you queue an URL a unique identifier is returned, you can use this identifier (called an UUID) to query the status of the task or collect the return value. The return value of the task is the HTTP response of the actual request performed by the worker.
Examples:
GET http://branch:port/<app>/queue/tasks/http://m/import_contacts?user=133
POST http://branch:port/<app>/queue/tasks/http://m/import_user
username=George Costanza
company=Vandelay Industries
GET http://branch:port/<app>/query/<uuid>/state/
GET http://branch:port/<app>/query/<uuid>/result/
GET http://branch:port/<app>/query/<uuid>/wait/
To get configuration details and statistics for a particular instance:
GET http://branch:port/<app>/instance/<name>/stats/
cyme <cyme.management.commands.cyme.
This is the management application, speaking HTTP with the clients. See cyme --help for full description and command line arguments.
Creates a new branch and starts the service to manage it. See cyme-branch --help for full description and command line arguments.
The branch manager uses an SQLite database to store state, but this can also be another database system (MySQL, PostgreSQL, Oracle, DB2).
see: | cyme.models.App. |
---|
Every instance belongs to an application, and the application contains the default broker configuration.
see: | cyme.models.Broker. |
---|
The connection parameters for a specific broker (hostname, port, userid, password, virtual_host)
see: | cyme.models.Instance. |
---|
This describes a Celery worker instance that is a member of this branch. And also the queues it should consume from and its max/min concurrency settings. It also describes what broker the instance should be connecting to (which if not specified will default to the broker of the app the instance belongs to).
see: | cyme.models.Queue. |
---|
A queue declaration: name, exchange, exchange type, routing key, and options. Options is a json encoded mapping of queue, exchange and binding options supported by kombu.compat.entry_to_queue().
see: | cyme.supervisor. |
---|
The supervisor wakes up at intervals to monitor for changes in the model. It can also be requested to perform specific operations, e.g. restart an instance, add queues to instance, and these operations can be either async or sync.
It is responsible for:
The supervisor is resilient to intermittent connection failures, and will auto-retry any operation that is dependent on a broker.
Since workers cannot respond to broadcast commands while the broker is off-line, the supervisor will not restart affected instances until the instance has had a chance to reconnect (decided by the wait_after_broker_revived attribute).
see: | cyme.controller. |
---|
The controller is a series of cl actors to control applications, instances and queues. It is used by the HTTP interface, but can also be used directly.