我使用celery来更新新闻聚合站点中的RSS feed。我为每个提要使用一个@task,看起来一切正常。
有一个细节我不确定如何处理:所有提要每分钟都使用@periodic_task更新一次,但是如果提要在开始新任务时仍从上一个定期任务更新,该怎么办?(例如,如果Feed确实很慢或离线,并且任务在重试循环中进行)
目前,我存储任务结果并按以下方式检查其状态:
import socket from datetime import timedelta from celery.decorators import task, periodic_task from aggregator.models import Feed _results = {} @periodic_task(run_every=timedelta(minutes=1)) def fetch_articles(): for feed in Feed.objects.all(): if feed.pk in _results: if not _results[feed.pk].ready(): # The task is not finished yet continue _results[feed.pk] = update_feed.delay(feed) @task() def update_feed(feed): try: feed.fetch_articles() except socket.error, exc: update_feed.retry(args=[feed], exc=exc)
也许我错过了一些使用celery机制来实现相同结果的更复杂/更可靠的方法?
可以使用这样的装饰器:
def single_instance_task(timeout): def task_exc(func): @functools.wraps(func) def wrapper(*args, **kwargs): lock_id = "celery-single-instance-" + func.__name__ acquire_lock = lambda: cache.add(lock_id, "true", timeout) release_lock = lambda: cache.delete(lock_id) if acquire_lock(): try: func(*args, **kwargs) finally: release_lock() return wrapper return task_exc
然后,像这样使用它…
@periodic_task(run_every=timedelta(minutes=1)) @single_instance_task(60*10) def fetch_articles() yada yada...