原文我发表于天善智能

某些情况下,某些任务需要在指定的一些机器上运行,

Celery是通过Exchanges, queues and routing keys这3个概念实现

这是官方文档的解释

Exchanges, queues and routing keys.

Messages are sent to exchanges.

An exchange routes messages to one or more queues. Several exchange types exists, providing different ways to do routing, or implementing different messaging scenarios.

The message waits in the queue until someone consumes it.

The message is deleted from the queue when it has been acknowledged.

我的理解时,Celery的不同worker是可以处理不同的queue的(也可以是routing_keyg或者2者结合),而quene是可以分配到机器的:

task->queue->worker

celery的代码示例:

通过MyRouter定义了不同的task到queue的分配方式

from __future__ import absolute_import

from celery import Celery

from kombu import Exchange, Queue

app = Celery('proj',

broker='amqp://guest@bipython',

backend='amqp://',

include=['proj.tasks'])

#修改CELERY_QUEUES和MyRouter函数指定分配

class MyRouter(object):

def route_for_task(self, task, args=None, kwargs=None):

if task == 'proj.tasks.bipython_testsleep':

return {

'queue': 'bipython'

}

elif task == 'proj.tasks.sengtest_testsleep':

return {

"queue": "sengtest"

}

return None

default_exchange = Exchange('default', type='direct')

host_exchange = Exchange('host', type='direct')

# Optional configuration, see the application user guide.

app.conf.update(

CELERY_QUEUES = (

Queue('default', default_exchange, routing_key='default'),

Queue('bipython', host_exchange, routing_key='host.bipython'),

Queue('sengtest', host_exchange, routing_key='host.sengtest'),

),

CELERY_DEFAULT_QUEUE = 'default',

CELERY_DEFAULT_EXCHANGE = 'default',

CELERY_DEFAULT_ROUTING_KEY = 'default',

CELERY_TASK_SERIALIZER='json',

CELERY_ACCEPT_CONTENT=['json'], # Ignore other content

CELERY_RESULT_SERIALIZER='json',

CELERY_TASK_RESULT_EXPIRES=3600,

CELERY_TIMEZONE='Asia/Shanghai',

CELERY_ROUTES = (MyRouter(), ),

)

if __name__ == '__main__':

app.start()

服务启动的命令:

通过-Q参数定义对应的queue

#host:bipython

celery -A proj worker -l info -Q bipython

#host:sengtest

celery -A proj worker -l info -Q sengtest

测试代码:

测试了直接调用和并行功能:

from proj.tasks import testsleep

from proj.tasks import sengtest_testsleep

from proj.tasks import bipython_testsleep

result2 = bipython_testsleep.delay(11)

result1 = sengtest_testsleep.delay(9)

result2 = bipython_testsleep.delay(11)

result1 = sengtest_testsleep.delay(9)

from celery import group

groupresult = group(sengtest_testsleep.s(9),bipython_testsleep.s(11))()

groupresult.get()

具体代码截图

sg_trans.gif

测试结果:

bipython结果:

sg_trans.gif

sengtest结果:

sg_trans.gif

消息队列的检查命令:

在最初的测试过程中,无法确认是否发到消息队列,使用了一下命令检查,必要时可以使用

celery -A proj amqp

#查看消息队列的数量

basic.get default

Logo

鸿蒙生态一站式服务平台。

更多推荐