澳门新葡亰平台官网Celery 进阶学习

by admin on 2020年2月6日

Node.js 13.0.3 已经发布。

windows server 2012 r2 iis8.5 部署asp.net mvc4/5程序小结

Celery 进阶学习

参考链接: Celery 4.1.0
documentation

显著变化

  • fs
    • 重做rmdir() 递归 (cjihrig)#30644
      • maxBusyTries选项被重命名为maxRetries,其默认值为0。该emfileWait选项已被删除,并且EMFILE的错误
        使用与其他错误相同的重试逻辑。retryDelay已经支持该选项。ENFILE现在错误会进行重试。
  • http
    • 使每个流或每个服务器的最大标头大小可配置(Anna
      Henningsen)#30570
  • http2
    • 澳门新葡亰平台官网 ,使最大允许的流可配置(Denys
      Otrishko)#30534
    • 允许配置最大容许的无效帧(Denys
      Otrishko)#30534
  • wasi
    • 引入初步的WASI支持(Cjihrig)#30258

其他 179 项更新: 

(文/开源中国)    

原文链接:

初始文件

安装部署celery相关的pip包,参考文档或Celery
部署小记

另外,本文使用ipython作为控制台的交互式解释器,pip install ipython

tasks.py例1

from celery import Celery


class CeleryConfig():
    broker_url = 'redis://localhost'
    result_backend = 'redis://localhost'
    timezone = 'Asia/Shanghai'


app = Celery()
app.config_from_object(CeleryConfig)


@app.task
def add(a, b):
    return a + b

以上文件可以正常通过以下命令启动

celery -A tasks worker --loglevel=info

例1中使用类的方式来加载配置,其他方式有:

Configuration

  • app.conf.timezone = 'Asia/Shanghai'
    app.conf.update(option1=True, option2='xxx', ...)
  • app.config_from_object(param)方法,参数可以是模块名的字符串形式、模块对象实体、配置的类或对象等
  • app.config_from_envvar(param)方法,参数是系统的环境变量名,而这个变量对应的值是模块的字符串。如:os.environ.setdefault('CELERY_CONFIG_MODULE', 'celeryconfig')

基本配置

打开服务器管理器,点击添加角色和功能。

澳门新葡亰平台官网 1

选择基于角色或基于功能的安装。

澳门新葡亰平台官网 2

选择服务器。

澳门新葡亰平台官网 3

选择webserver(iis)

澳门新葡亰平台官网 4

 

选择角色。

澳门新葡亰平台官网 5

打开web服务器(IIS)选项。选择常见http功能。

澳门新葡亰平台官网 6

安全性选择中勾选常用的安全性选项。

澳门新葡亰平台官网 7

应用程序开发选项中,请确保选择.net 3.5,.net 4.5,ISAPI扩展。

澳门新葡亰平台官网 8

开始安装。

抽象任务(类)

所有task都必须使用@app.task装饰器来装饰,经过装饰器之后,这些任务会继承Task类。可以通过继承Task类,来创建一个抽象类,供task装饰

tasks.py例2

from celery import Celery


# 抽象tasks
from celery import Task
class DebugTask(Task):
    # 在调用之前打印一行字
    def __call__(self, *args, **kwargs):
        print('TASK STARTING: {0.name}[{0.request.id}]'.format(self))
        return super(DebugTask, self).__call__(*args, **kwargs)


class CeleryConfig():
    broker_url = 'redis://localhost'
    result_backend = 'redis://localhost'
    timezone = 'Asia/Shanghai'


app = Celery()
app.config_from_object(CeleryConfig)


@app.task(base=DebugTask)
def add(a, b):
    return a + b

ipython调试

In [1]: from tasks import add

In [2]: add.delay(2, 3)
Out[2]: <AsyncResult: d9e63190-0591-403d-a5be-8b59893fcb2d>

celery输出

[2017-11-20 15:52:05,660: INFO/MainProcess] Received task: tasks.add[d9e63190-0591-403d-a5be-8b59893fcb2d]  
[2017-11-20 15:52:05,662: WARNING/ForkPoolWorker-4] TASK STARTING: tasks.add[d9e63190-0591-403d-a5be-8b59893fcb2d]
[2017-11-20 15:52:05,666: INFO/ForkPoolWorker-4] Task tasks.add[d9e63190-0591-403d-a5be-8b59893fcb2d] succeeded in 0.00480927500029793s: 5

常见错误

TTP 错误 403.14 – Forbidden

Web 服务器被配置为不列出此目录的内容。

最可能的原因:

·        
没有为请求的 URL 配置默认文档,并且没有在服务器上启用目录浏览。

可尝试的操作:

·        
如果不希望启用目录浏览,请确保配置了默认文档并且该文件存在。

·        
使用 IIS 管理器启用目录浏览。

1.   
打开 IIS 管理器。

2.   
在“功能”视图中,双击“目录浏览”。

3.   
在“目录浏览”页上,在“操作”窗格中单击“启用”。

·        
确认站点或应用程序配置文件中的 configuration/system.webServer/directoryBrowse@enabled
特性被设置为 True。

详细错误信息:

模块

   DirectoryListingModule

通知

   ExecuteRequestHandler

处理程序

   StaticFile

错误代码

   0x00000000

请求的 URL

   

物理路径

   D:websiteyuanjing

登录方法

   匿名

登录用户

   匿名

 

 

 

 

 

澳门新葡亰平台官网 9

解决方案:

1)         发布时不要选择预编译,否则会出现这样的错误;

2)         站点所在的文件夹,保证iis有访问和控制权限。

 

 

acks_late选项

task在经过worker确认(acknowledge)之后,才会从worker的任务队列中移除。并且worker维护的任务队列可以保留相当大量的队列信息,即使这个worker被杀掉,任务信息仍然可以转移到其他的worker

  • broker在收到来自worker的确认之后,便不会发送任务信息给其他的worker

  • worker默认的确认时机:任务信息被worker接收

  • 设置了acks_late选项之后,worker的确认时机变为任务被实际执行

  • 在保证taskidempotent(幂等的),即任意次执行代码所产生的影响和一次执行的影响相同,可以使用acks_late选项

  • http://docs.celeryproject.org/en/master/userguide/tasks.html#Task.acks_late

  • http://docs.celeryproject.org/en/master/faq.html#faq-acks-late-vs-retry

HTTP 错误 500.19 – Internal Server Error

无限期阻塞的任务

由于网络传输等问题,导致任务无限期阻塞,会阻止此worker实例执行其他工作,解决方案是:

  • I/O任务:确保添加超时(可配合retry)。例如:使用requests

connect_timeout, read_timeout = 5.0, 30.0
response = requests.get(URL, timeout=(connect_timeout, read_timeout))
  • time limits选项也可以很方便地规定任务遵循一个时间限制,但是这个时间过后,这个worker会被直接杀掉,所以该选项仅用于没有使用其他超时方案的情况
    (http://docs.celeryproject.org/en/master/userguide/workers.html#worker-time-limits)

无法访问请求的页面,因为该页的相关配置数据无效。

详细错误信息

Prefork池预取设置

prefork池默认将异步发送尽可能多的任务到进程中(进程预取任务)。对于延时短的任务,这样会加快速度,但是如果是高延时的任务,该进程后面的任务会长期处于等待。

默认设置: worker会发送任务给缓冲区可写的进程,例子如下

-> send task T1 to process A
# A executes T1
-> send task T2 to process B
# B executes T2
<- T2 complete sent by process B

-> send task T3 to process A
# A still executing T1, T3 stuck in local buffer and won't start until
# T1 returns, and other queued tasks won't be sent to idle processes
<- T1 complete sent by process A
# A executes T3

使用-Ofair选项可以关闭预取设置,此时,worker会发送任务给真正可用于工作的进程,例子如下

-> send task T1 to process A
# A executes T1
-> send task T2 to process B
# B executes T2
<- T2 complete sent by process B

-> send T3 to process B
# B executes T3

<- T3 complete sent by process B
<- T1 complete sent by process A

:

模块

   IIS Web Core

通知

   BeginRequest

处理程序

   尚未确定

错误代码

   0x80070021

配置错误

   不能在此路径中使用此配置节。如果在父级别上锁定了该节,便会出现这种情况。锁定是默认设置的(overrideModeDefault=”Deny”),或者是通过包含
overrideMode=”Deny” 或旧有的
allowOverride=”false” 的位置标记明确设置的。

配置文件

   \?D:websiteyuanjingweb.config

请求的
URL

   

物理路径

   D:websiteyuanjing

登录方法

   尚未确定

登录用户

   尚未确定

 

 

 

 

配置源

task options

@app.task(options…)

link

:

   79:     <modules runAllManagedModulesForAllRequests="true" />

   80:     <handlers>

   81:       <remove name="ExtensionlessUrlHandler-ISAPI-4.0_32bit" />

澳门新葡亰平台官网 10

解决方案:

出现这个错误是因为从 IIS
7开始采用了更安全的
web.config 管理机制,默认情况下会锁住配置项不允许更改。要取消锁定可以运行命令行

 %windir%system32inetsrvappcmd unlock config
-section:system.webServer/handlers 。其中的
handlers 是错误信息中红字显示的节点名称。 
如果modules也被锁定。

logging

worker会自动建立log,当然你也可以自定义log,例子如下

from celery.utils.log import get_task_logger

logger = get_task_logger(__name__)

@app.task
def add(x, y):
    logger.info('Adding {0} + {1}'.format(x, y))
    return x + y

HTTP 错误 500.19 – Internal Server Error

参数检测(typing)

>>> @app.task
... def add(x, y):
...     return x + y

# Calling the task with two arguments works:
>>> add.delay(8, 8)
<AsyncResult: f59d71ca-1549-43e0-be41-4e8821a83c0c>

# Calling the task with only one argument fails:
>>> add.delay(8)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "celery/app/task.py", line 376, in delay
    return self.apply_async(args, kwargs)
  File "celery/app/task.py", line 485, in apply_async
    check_arguments(*(args or ()), **(kwargs or {}))
TypeError: add() takes exactly 2 arguments (1 given)

# typing 属性
>>> @app.task(typing=False)
... def add(x, y):
...     return x + y

# Works locally, but the worker reciving the task will raise an error.
>>> add.delay(8)
<AsyncResult: f59d71ca-1549-43e0-be41-4e8821a83c0c>

无法访问请求的页面,因为该页的相关配置数据无效。

详细错误信息

隐藏敏感信息(避免进入log)

v4.0之后,且task_protocol为2或以上才有效(该值在4.0之后默认为2)

可以使用argsreprkwargsrepr调用参数来覆盖敏感信息,例子如下

>>> add.apply_async((2, 3), argsrepr='(<secret-x>, <secret-y>)')

>>> charge.s(account, card='1234 5678 1234 5678').set(
...     kwargsrepr=repr({'card': '**** **** **** 5678'})
... ).delay()

但实际上,只要可以从broker中读取数据,仍然可以获得这些“敏感信息”,所以如果需要高度保密的数据,要使用其他方法存储(加密等)

:

模块

   IIS Web Core

通知

   BeginRequest

处理程序

   尚未确定

错误代码

   0x80070021

配置错误

   不能在此路径中使用此配置节。如果在父级别上锁定了该节,便会出现这种情况。锁定是默认设置的(overrideModeDefault=”Deny”),或者是通过包含
overrideMode=”Deny” 或旧有的
allowOverride=”false” 的位置标记明确设置的。

配置文件

   \?D:websiteyuanjingweb.config

请求的
URL

   

物理路径

   D:websiteyuanjing

登录方法

   尚未确定

登录用户

   尚未确定

 

 

 

 

配置源

重试(Retrying)

当任务执行出现错误情况,可以通过设置retry来解决可恢复的错误。celeryretry机制会确保由相同的队列去执行此task-id的原始任务。简单的例子如下

@app.task(bind=True)
def send_twitter_status(self, oauth, tweet):
    try:
        twitter = Twitter(oauth)
        twitter.update_status(tweet)
    except (Twitter.FailWhaleError, Twitter.LoginError) as exc:
        raise self.retry(exc=exc)

特别注意,retryraise出来的,所以,即使后面有代码,也不会执行。另外,这个异常会被worker视为需要重试,以便在启用result backend时,可以存储正确的状态(RETRY)

重试的各种选项和应用:

以下是@app.task的参数

  • max_retries:
    当超过该参数设置的重试次数,会终止并报错。默认是3次,设置为None表示不重试
  • default_retry_delay:
    默认重试间隔时间。默认是180s,可以在retry调用中使用countdown参数覆盖
  • autoretry_for:
    对特定的异常自动重试,如autoretry_for=(Exception,),仅适用于v4.0
  • retry_kwargs: 后面接字典类型,如retry_kwargs={'max_retries': 5}

以下三个选项是v4.1版本支持的

  • retry_backoff:
    可以是布尔型或者数字,如果设置为True,则遵循“指数退避”,即第一次重试在1s后,第二次2s,第三次4s,第n次(2^(n-1))s。
    如果设置为数字m,则表示基数为m,即第一次3s,第二次6s,第n次(3×2^(n-1))s。
    如果设置为False,则表示不延迟
  • retry_backoff_max:
    如果retry_backoff打开,这个选项决定了两次自动重试之间最大的延时,超过了这个延时,则不再重试。默认600s
    (这里理解可能有问题?需要具体测试: If retry_backoff is enabled,
    this option will set a maximum delay in seconds between task
    autoretries. By default, this option is set to 600, which is 10
    minutes.)
  • retry_jitter:
    布尔型,jitter(抖动)。抖动用于将随机性引入指数退避延迟,以防队列中过多的任务同时被执行。如果设置为True,则retry_backoff计算的延迟值将被视为最大值,而实际延迟值将是介于0和最大值之间的随机数。该设置默认为True

:

   84:     <validation validateIntegratedModeConfiguration="false"/>

   85:     <modules runAllManagedModulesForAllRequests="true"/>

   86:     <handlers>

澳门新葡亰平台官网 11

可以在命令行运行

%windir%system32inetsrvappcmd unlock config
-section:system.webServer/modules

注意:cmd.exe要以管理员身份启动。

澳门新葡亰平台官网 12

 

 

task方法选项列表

类似@app.task(option1=xx, option2=yy),括号内的参数即选项
http://docs.celeryproject.org/en/master/userguide/tasks.html#general

一些选项(部分参考文档即可):

  • max_retries:
    只有在任务调用self.retry或者任务使用autoretry_for参数进行装饰时才适用
  • throws:
    值是tuple类型,里面的“异常”不会被视为错误而导致失败或重发,即使发生,也是成功的
  • default_retry_delay: 重试间隔时间
  • rate_limit:
    关联task_default_rate_limit这个设置,表示某个时间段执行的任务数目,如:"100/m"表示一分钟最多100条。默认无限制
  • time_limit / soft_time_limit:
    任务完成的时间限制。前者是经过时限之后worker被杀死,然后用新worker代替,后者是经过时限之后抛出SoftTimeLimitExceeded异常,供开发者处理。默认没有时限。
  • ignore_result / store_errors_even_if_ignored
  • name
  • request
  • serializer
  • compression
  • backend
  • ack_late
  • track_started

状态

link

Handlers

在任务返回、失败、重试、成功、超时等事件发生的时候,触发特定的方法:after_return,
on_failure, on_retry, on_success, on_timeout

可用于状态转移的监控,如发邮件提醒等

一个自定义请求的例子如下

import logging
from celery.worker.request import Request

logger = logging.getLogger('my.package')

class MyRequest(Request):
    'A minimal custom request to log failures and hard time limits.'

    def on_timeout(self, soft, timeout):
        super(MyRequest, self).on_timeout(soft, timeout)
        if not soft:
           logger.warning(
               'A hard timeout was enforced for task %s',
               self.task.name
           )

    def on_failure(self, exc_info, send_failed_event=True, return_ok=False):
        super(Request, self).on_failure(
            exc_info,
            send_failed_event=send_failed_event,
            return_ok=return_ok
        )
        logger.warning(
            'Failure detected for task %s',
            self.task.name
        )

class MyTask(Task):
    Request = MyRequest  # you can use a FQN 'my.package:MyRequest'

@app.task(base=MyTask)
def some_longrunning_task():
    # use your imagination

最佳实践

Tips and Best
Practices

Optimizing

  1. 忽略不需要的结果,ignore_result=True
  2. 尽量避免使用同步子任务(task调用需要依赖其他task执行的结果,这样会造成互相等待,陷入死锁)
    Avoid launching synchronous
    subtasks
  3. 设置broker_pool_limit,默认为10,可以根据使用连接的线程的数目调整
    link
  4. worker_prefetch_multiplier表示一次prefetch多少条消息乘以并发进程数,默认值为4(每个进程4个消息)。对于长时间的任务,可以把这个值设置为1,其实就相当于关闭预取;对于短时任务,可以设置大一些,比如64,128等;对于长短不一的任务,可以通过Routing
    Tasks,即分队列的方式执行
  5. 针对长任务,许多人希望的是让当前执行的任务数与保留待确认的任务数目相同,且都等于当前并发数(如-c 10,此时在执行的任务是10个,等待的任务数也是10个)。满足这样的要求选项:task_acks_late = Trueworker_prefetch_multiplier = 1
  6. 在默认的prefork模式下,进程池中的进程可能处于空闲或忙碌的状态。-O是优化选项,如果是default,进程是预取来自worker中的任务的,可能造成长时间的等待;如果是fair,进程只在有空闲的时候,才会去取任务执行。设置fair对于耗时长的任务来说比较有利

发表评论

电子邮件地址不会被公开。 必填项已用*标注

网站地图xml地图