かなり久々のブログになる。
PythonにはCeleryというmessaging frameworkがある。
ここのmessagingというのはメールとかMMSとかSMSではなくて,
RabbitMQとかZeroMQのmessagingである.
実際, RabbitMQを使って動かすことができる.
試してはいないが, 他にはMongoDBとかCouchDBも使うことができるらしい.
ちょろっとドキュメントを読んだ程度では使い方がわからない。
ベースになっているkombuに
変えようかとも思ったが、わざわざ低水準側にいって抽象度が下がるのもつまらないので,
少しコードを読んでみることにした.
環境はpython3.3.2である
pip freezeするとこんなものが入っていると言われた. scraping
toolを作ろうとしていたから, lxmlとかrequestsとかが入っている.
SQLAlchemy==0.8.2
amqp==1.2.0
anyjson==0.3.3
billiard==2.7.3.31
celery==3.0.21
distribute==0.6.49
kombu==2.5.12
lxml==3.2.1
python-dateutil==2.1
pytz==2013b
requests==1.2.3
six==1.3.0
例によって迷子になること請け合いだが....とりあえず書いてみた.
in celery/__init__.py
from celery.app.base import Celery # noqa
in celery/app/base.py
Celeryは、withで使える(いつなぜ使うのだろう?)
def __enter__(self):
return self
def __exit__(self, *exc_info):
self.close()
肝となるデコレータ
def task(self, *args, **opts):
"""Creates new task class from any callable."""
if _EXECV and not opts.get('_force_evaluate'):
# When using execv the task in the original module will point to a
# different app, so doing things like 'add.request' will point to
# a differnt task instance. This makes sure it will always use
# the task instance from the current app.
# Really need a better solution for this :(
from . import shared_task as proxies_to_curapp
return proxies_to_curapp(*args, _force_evaluate=True, **opts)
def inner_create_task_cls(shared=True, filter=None, **opts):
_filt = filter # stupid 2to3
def _create_task_cls(fun):
if shared:
cons = lambda app: app._task_from_fun(fun, **opts)
cons.__name__ = fun.__name__
shared_task(cons)
if self.accept_magic_kwargs: # compat mode
task = self._task_from_fun(fun, **opts)
if filter:
task = filter(task)
return task
# return a proxy object that is only evaluated when first used
promise = PromiseProxy(self._task_from_fun, (fun, ), opts)
self._pending.append(promise)
if _filt:
return _filt(promise)
return promise
return _create_task_cls
if len(args) == 1 and isinstance(args[0], Callable):
return inner_create_task_cls(**opts)(*args)
if args:
raise TypeError(
'task() takes no arguments (%s given)' % (len(args, )))
return inner_create_task_cls(**opts)
_task_from_funが大事なようだ.
名前からして, functionからtask objectを作る関数である.
def _task_from_fun(self, fun, **options):
base = options.pop('base', None) or self.Task
bind = options.pop('bind', False)
T = type(fun.__name__, (base, ), dict({
'app': self,
'accept_magic_kwargs': False,
'run': fun if bind else staticmethod(fun),
'__doc__': fun.__doc__,
'__module__': fun.__module__}, **options))()
task = self._tasks[T.name] # return global instance.
task.bind(self)
return task
PromiseProxyでwrapされるので、それを追う
from celery.local import PromiseProxy, maybe_evaluate
celery/local.py
ほとんどがProxyから来ている. 大事なのはこれのようだ.
PromiseProxy
__thingがセットされなかったら評価する
def _get_current_object(self):
try:
return object.__getattribute__(self, '__thing')
except AttributeError:
return self.__evaluate__()
評価するとは_get_current_objectしてそれをセットする こと
なんかキャッシュされている値を消すらしい
def __evaluate__(self,
_clean=('_Proxy__local',
'_Proxy__args',
'_Proxy__kwargs')):
try:
thing = Proxy._get_current_object(self)
object.__setattr__(self, '__thing', thing)
return thing
finally:
for attr in _clean:
try:
object.__delattr__(self, attr)
except AttributeError: # pragma: no cover
# May mask errors so ignore
pass
Proxyの_get_current_objectを見ていく.
def _get_current_object(self):
"""Return the current object. This is useful if you want the real
object behind the proxy at a time for performance reasons or because
you want to pass the object into a different context.
"""
loc = object.__getattribute__(self, '_Proxy__local')
if not hasattr(loc, '__release_local__'):
return loc(*self.__args, **self.__kwargs)
try:
return getattr(loc, self.__name__)
except AttributeError:
raise RuntimeError('no object bound to {0.__name__}'.format(self))
what's _Proxy__local
def __init__(self, local, args=None, kwargs=None, name=None):
object.__setattr__(self, '_Proxy__local', local)
localには何が渡っているのやら? _task_from_fun が作ったobject が入っている
つまり、decoratした関数が入っている
で、__name__って?
Proxy
@_default_cls_attr('name', str, __name__)
def __name__(self):
try:
return self.__custom_name__
except AttributeError:
return self._get_current_object().__name__
んじゃこりゃ?
initで
if name is not None:
object.__setattr__(self, '__custom_name__', name)
とかしているから、decoratorをみるのがよいだろう
def _default_cls_attr(name, type_, cls_value):
# Proxy uses properties to forward the standard
# class attributes __module__, __name__ and __doc__ to the real
# object, but these needs to be a string when accessed from
# the Proxy class directly. This is a hack to make that work.
# -- See Issue #1087.
return type(name, (type_, ), {
'__new__': __new__, '__get__': __get__,
})
追いかけてもしょうがないなぁ、これ
しかたないので戻って_task_from_funを読む
T = type(fun.__name__, (base, ), dict({
'app': self,
'accept_magic_kwargs': False,
'run': fun if bind else staticmethod(fun),
'__doc__': fun.__doc__,
'__module__': fun.__module__}, **options))()
あれ, typeに複数引数を渡すとどうなるんだっけ?
from python 3.3 doc
3引数で呼び出した場合、新しい型オブジェクトを返します。
本質的には class 文の動的な形式です。
name 文字列はクラス名で、__name__ 属性になります。
bases タプルは基底クラスの羅列で、__bases__ 属性になります。
dict 辞書はクラス本体の定義を含む名前空間で、__dict__ 属性になります。
ということは、base大事
base = options.pop('base', None) or self.Task
なので、Taskが知りたい
@cached_property
def Task(self):
return self.create_task_cls()
ではcreate_task_clsはなにか,というと
def create_task_cls(self):
"""Creates a base task class using default configuration
taken from this app."""
return self.subclass_with_self('celery.app.task:Task', name='Task',
attribute='_app', abstract=True)
subclass_with_selfってなによ?
def subclass_with_self(self, Class, name=None, attribute='app',
reverse=None, **kw):
"""Subclass an app-compatible class by setting its app attribute
to be this app instance.
App-compatible means that the class has a class attribute that
provides the default app it should use, e.g.
``class Foo: app = None``.
:param Class: The app-compatible class to subclass.
:keyword name: Custom name for the target class.
:keyword attribute: Name of the attribute holding the app,
default is 'app'.
"""
Class = symbol_by_name(Class)
reverse = reverse if reverse else Class.__name__
def __reduce__(self):
return _unpickle_appattr, (reverse, self.__reduce_args__())
attrs = dict({attribute: self}, __module__=Class.__module__,
__doc__=Class.__doc__, __reduce__=__reduce__, **kw)
return type(name or Class.__name__, (Class, ), attrs)
symbol_by_name てなによ?
Class = symbol_by_name(Class)
がなんだかということになる
from kombu.utils import symbol_by_name
なので今度はkombuのドキュメントを調べる
kombu==2.5.12
http://www.nullege.com/codes/search/kombu.utils.symbol_by_name
がんばって名前からsymbolを取り出すらしい
これでcreate_task_clsに戻れる
celery.app.task:Taskが肝。
return self.subclass_with_self('celery.app.task:Task', name='Task',
class TaskType(type):
"""Meta class for tasks.
Automatically registers the task in the task registry (except
if the :attr:`Task.abstract`` attribute is set).
If no :attr:`Task.name` attribute is provided, then the name is generated
from the module and class name.
"""
oh, No.
@with_metaclass(TaskType)
class Task(object):
"""Task base class.
...
def __call__(self, *args, **kwargs):
_task_stack.push(self)
self.push_request()
try:
# add self if this is a bound task
if self.__self__ is not None:
return self.run(self.__self__, *args, **kwargs)
return self.run(*args, **kwargs)
finally:
self.pop_request()
_task_stack.pop()
基本的には実行するだけだが、まわりになんかくっついてる
継承して使う場合用
def run(self, *args, **kwargs):
"""The body of the task executed by workers."""
raise NotImplementedError('Tasks must define the run method.')
これが一番大事
def apply_async(self, args=None, kwargs=None,
task_id=None, producer=None, connection=None, router=None,
link=None, link_error=None, publisher=None,
add_to_parent=True, reply_to=None, **options):
引数の説明、長い
コア
def apply_async(self, args=None, kwargs=None,
with app.producer_or_acquire(producer) as P:
self.backend.on_task_call(P, task_id)
task_id = P.publish_task(self.name, args, kwargs,
producer_or_acquireがなんだか知りたい.
celery/app/base.py
class Celery(object):
@contextmanager
def producer_or_acquire(self, producer=None):
if producer:
yield producer
else:
with self.amqp.producer_pool.acquire(block=True) as producer:
yield producer
default_producer = producer_or_acquire # XXX compat
applyの場合は
def build_tracer(name, task, loader=None, hostname=None, store_errors=True,
が構築する関数、
def trace_task(uuid, args, kwargs, request=None):
...
try:
R = retval = fun(*args, **kwargs)
state = SUCCESS
で実行されるようだ
publish_task
celery/app/amqp.py
class TaskProducer(Producer):
...
def publish_task(self, task_name, task_args=None, task_kwargs=None,
データを組み立てて
self.publish(
でなんか送信している
self.publishは kombu/messaging.py Producerに実装されている
signals.task_sent.send(sender=task_name, **body)
send
celery/utils/dispatch/signal.py Signal
def send(self, sender, **named):
"""Send signal from sender to all connected receivers.
If any receiver raises an error, the error propagates back through
send, terminating the dispatch loop, so it is quite possible to not
have all receivers called if a raises an error.
signarlとmessageが分離しているのはなぜ?
messageはqueueにつっこまれるだけで通知しない?
signalにconnectしている相手に通知らしい
def connect(self, *args, **kwargs):
"""Connect receiver to sender for signal.
:param receiver: A function or an instance method which is to
receive signals. Receivers must be hashable objects.
おそらくは、通知した相手がmessageを取りに行くのだろう.
残念ながらメンタルモデルを十分に構築できなかった.