python消息队列celery_Python的分布式调度工具Celery--queue实现按机器执行
原文我发表于天善智能某些情况下,某些任务需要在指定的一些机器上运行,Celery是通过Exchanges, queues and routing keys这3个概念实现这是官方文档的解释Exchanges, queues and routing keys.Messages are sent to exchanges.An exchange routes messages to one or mor
原文我发表于天善智能
某些情况下,某些任务需要在指定的一些机器上运行,
Celery是通过Exchanges, queues and routing keys这3个概念实现
这是官方文档的解释
Exchanges, queues and routing keys.
Messages are sent to exchanges.
An exchange routes messages to one or more queues. Several exchange types exists, providing different ways to do routing, or implementing different messaging scenarios.
The message waits in the queue until someone consumes it.
The message is deleted from the queue when it has been acknowledged.
我的理解时,Celery的不同worker是可以处理不同的queue的(也可以是routing_keyg或者2者结合),而quene是可以分配到机器的:
task->queue->worker
celery的代码示例:
通过MyRouter定义了不同的task到queue的分配方式
from __future__ import absolute_import
from celery import Celery
from kombu import Exchange, Queue
app = Celery('proj',
broker='amqp://guest@bipython',
backend='amqp://',
include=['proj.tasks'])
#修改CELERY_QUEUES和MyRouter函数指定分配
class MyRouter(object):
def route_for_task(self, task, args=None, kwargs=None):
if task == 'proj.tasks.bipython_testsleep':
return {
'queue': 'bipython'
}
elif task == 'proj.tasks.sengtest_testsleep':
return {
"queue": "sengtest"
}
return None
default_exchange = Exchange('default', type='direct')
host_exchange = Exchange('host', type='direct')
# Optional configuration, see the application user guide.
app.conf.update(
CELERY_QUEUES = (
Queue('default', default_exchange, routing_key='default'),
Queue('bipython', host_exchange, routing_key='host.bipython'),
Queue('sengtest', host_exchange, routing_key='host.sengtest'),
),
CELERY_DEFAULT_QUEUE = 'default',
CELERY_DEFAULT_EXCHANGE = 'default',
CELERY_DEFAULT_ROUTING_KEY = 'default',
CELERY_TASK_SERIALIZER='json',
CELERY_ACCEPT_CONTENT=['json'], # Ignore other content
CELERY_RESULT_SERIALIZER='json',
CELERY_TASK_RESULT_EXPIRES=3600,
CELERY_TIMEZONE='Asia/Shanghai',
CELERY_ROUTES = (MyRouter(), ),
)
if __name__ == '__main__':
app.start()
服务启动的命令:
通过-Q参数定义对应的queue
#host:bipython
celery -A proj worker -l info -Q bipython
#host:sengtest
celery -A proj worker -l info -Q sengtest
测试代码:
测试了直接调用和并行功能:
from proj.tasks import testsleep
from proj.tasks import sengtest_testsleep
from proj.tasks import bipython_testsleep
result2 = bipython_testsleep.delay(11)
result1 = sengtest_testsleep.delay(9)
result2 = bipython_testsleep.delay(11)
result1 = sengtest_testsleep.delay(9)
from celery import group
groupresult = group(sengtest_testsleep.s(9),bipython_testsleep.s(11))()
groupresult.get()
具体代码截图
测试结果:
bipython结果:
sengtest结果:
消息队列的检查命令:
在最初的测试过程中,无法确认是否发到消息队列,使用了一下命令检查,必要时可以使用
celery -A proj amqp
#查看消息队列的数量
basic.get default
更多推荐
所有评论(0)