2013年8月11日日曜日

celery事始め

このエントリーをブックマークに追加 このエントリーを含むはてなブックマーク

かなり久々のブログになる。

PythonにはCeleryというmessaging frameworkがある。 ここのmessagingというのはメールとかMMSとかSMSではなくて, RabbitMQとかZeroMQのmessagingである. 実際, RabbitMQを使って動かすことができる. 試してはいないが, 他にはMongoDBとかCouchDBも使うことができるらしい.

ちょろっとドキュメントを読んだ程度では使い方がわからない。 ベースになっているkombuに 変えようかとも思ったが、わざわざ低水準側にいって抽象度が下がるのもつまらないので, 少しコードを読んでみることにした.

環境はpython3.3.2である

pip freezeするとこんなものが入っていると言われた. scraping toolを作ろうとしていたから, lxmlとかrequestsとかが入っている.

SQLAlchemy==0.8.2
amqp==1.2.0
anyjson==0.3.3
billiard==2.7.3.31
celery==3.0.21
distribute==0.6.49
kombu==2.5.12
lxml==3.2.1
python-dateutil==2.1
pytz==2013b
requests==1.2.3
six==1.3.0

例によって迷子になること請け合いだが....とりあえず書いてみた.

in celery/__init__.py

    from celery.app.base import Celery                   # noqa

in celery/app/base.py

Celeryは、withで使える(いつなぜ使うのだろう?)

    def __enter__(self):
        return self

    def __exit__(self, *exc_info):
        self.close()

肝となるデコレータ

    def task(self, *args, **opts):
        """Creates new task class from any callable."""
        if _EXECV and not opts.get('_force_evaluate'):
            # When using execv the task in the original module will point to a
            # different app, so doing things like 'add.request' will point to
            # a differnt task instance.  This makes sure it will always use
            # the task instance from the current app.
            # Really need a better solution for this :(
            from . import shared_task as proxies_to_curapp
            return proxies_to_curapp(*args, _force_evaluate=True, **opts)

        def inner_create_task_cls(shared=True, filter=None, **opts):
            _filt = filter  # stupid 2to3

            def _create_task_cls(fun):
                if shared:
                    cons = lambda app: app._task_from_fun(fun, **opts)
                    cons.__name__ = fun.__name__
                    shared_task(cons)
                if self.accept_magic_kwargs:  # compat mode
                    task = self._task_from_fun(fun, **opts)
                    if filter:
                        task = filter(task)
                    return task

                # return a proxy object that is only evaluated when first used
                promise = PromiseProxy(self._task_from_fun, (fun, ), opts)
                self._pending.append(promise)
                if _filt:
                    return _filt(promise)
                return promise

            return _create_task_cls

        if len(args) == 1 and isinstance(args[0], Callable):
            return inner_create_task_cls(**opts)(*args)
        if args:
            raise TypeError(
                'task() takes no arguments (%s given)' % (len(args, )))
        return inner_create_task_cls(**opts)

_task_from_funが大事なようだ. 名前からして, functionからtask objectを作る関数である.

    def _task_from_fun(self, fun, **options):
        base = options.pop('base', None) or self.Task
        bind = options.pop('bind', False)

        T = type(fun.__name__, (base, ), dict({
            'app': self,
            'accept_magic_kwargs': False,
            'run': fun if bind else staticmethod(fun),
            '__doc__': fun.__doc__,
            '__module__': fun.__module__}, **options))()
        task = self._tasks[T.name]  # return global instance.
        task.bind(self)
        return task
PromiseProxyでwrapされるので、それを追う
from celery.local import PromiseProxy, maybe_evaluate
celery/local.py

ほとんどがProxyから来ている. 大事なのはこれのようだ.

PromiseProxy __thingがセットされなかったら評価する
    def _get_current_object(self):
        try:
            return object.__getattribute__(self, '__thing')
        except AttributeError:
            return self.__evaluate__()
評価するとは_get_current_objectしてそれをセットする こと なんかキャッシュされている値を消すらしい
    def __evaluate__(self,
                     _clean=('_Proxy__local',
                             '_Proxy__args',
                             '_Proxy__kwargs')):
        try:
            thing = Proxy._get_current_object(self)
            object.__setattr__(self, '__thing', thing)
            return thing
        finally:
            for attr in _clean:
                try:
                    object.__delattr__(self, attr)
                except AttributeError:  # pragma: no cover
                    # May mask errors so ignore
                    pass
Proxyの_get_current_objectを見ていく.
    def _get_current_object(self):
        """Return the current object.  This is useful if you want the real
        object behind the proxy at a time for performance reasons or because
        you want to pass the object into a different context.
        """
        loc = object.__getattribute__(self, '_Proxy__local')
        if not hasattr(loc, '__release_local__'):
            return loc(*self.__args, **self.__kwargs)
        try:
            return getattr(loc, self.__name__)
        except AttributeError:
            raise RuntimeError('no object bound to {0.__name__}'.format(self))

what's _Proxy__local

    def __init__(self, local, args=None, kwargs=None, name=None):
        object.__setattr__(self, '_Proxy__local', local)

localには何が渡っているのやら? _task_from_fun が作ったobject が入っている

つまり、decoratした関数が入っている

で、__name__って?

Proxy
    @_default_cls_attr('name', str, __name__)
    def __name__(self):
        try:
            return self.__custom_name__
        except AttributeError:
            return self._get_current_object().__name__

んじゃこりゃ?

initで

        if name is not None:
            object.__setattr__(self, '__custom_name__', name)

とかしているから、decoratorをみるのがよいだろう

def _default_cls_attr(name, type_, cls_value):
    # Proxy uses properties to forward the standard
    # class attributes __module__, __name__ and __doc__ to the real
    # object, but these needs to be a string when accessed from
    # the Proxy class directly.  This is a hack to make that work.
    # -- See Issue #1087.

    return type(name, (type_, ), {
        '__new__': __new__, '__get__': __get__,
    })

追いかけてもしょうがないなぁ、これ


しかたないので戻って_task_from_funを読む

        T = type(fun.__name__, (base, ), dict({
            'app': self,
            'accept_magic_kwargs': False,
            'run': fun if bind else staticmethod(fun),
            '__doc__': fun.__doc__,
            '__module__': fun.__module__}, **options))()

あれ, typeに複数引数を渡すとどうなるんだっけ?

from python 3.3 doc

3引数で呼び出した場合、新しい型オブジェクトを返します。 本質的には class 文の動的な形式です。 name 文字列はクラス名で、__name__ 属性になります。 bases タプルは基底クラスの羅列で、__bases__ 属性になります。 dict 辞書はクラス本体の定義を含む名前空間で、__dict__ 属性になります。

ということは、base大事

        base = options.pop('base', None) or self.Task

なので、Taskが知りたい

    @cached_property
    def Task(self):
        return self.create_task_cls()

ではcreate_task_clsはなにか,というと

    def create_task_cls(self):
        """Creates a base task class using default configuration
        taken from this app."""
        return self.subclass_with_self('celery.app.task:Task', name='Task',
                                       attribute='_app', abstract=True)

subclass_with_selfってなによ?

    def subclass_with_self(self, Class, name=None, attribute='app',
                           reverse=None, **kw):
        """Subclass an app-compatible class by setting its app attribute
        to be this app instance.

        App-compatible means that the class has a class attribute that
        provides the default app it should use, e.g.
        ``class Foo: app = None``.

        :param Class: The app-compatible class to subclass.
        :keyword name: Custom name for the target class.
        :keyword attribute: Name of the attribute holding the app,
                            default is 'app'.

        """
        Class = symbol_by_name(Class)
        reverse = reverse if reverse else Class.__name__

        def __reduce__(self):
            return _unpickle_appattr, (reverse, self.__reduce_args__())

        attrs = dict({attribute: self}, __module__=Class.__module__,
                     __doc__=Class.__doc__, __reduce__=__reduce__, **kw)

        return type(name or Class.__name__, (Class, ), attrs)

symbol_by_name てなによ?

        Class = symbol_by_name(Class)

がなんだかということになる

from kombu.utils import symbol_by_name

なので今度はkombuのドキュメントを調べる

kombu==2.5.12

http://www.nullege.com/codes/search/kombu.utils.symbol_by_name

がんばって名前からsymbolを取り出すらしい

これでcreate_task_clsに戻れる

celery.app.task:Taskが肝。

        return self.subclass_with_self('celery.app.task:Task', name='Task',
class TaskType(type):
    """Meta class for tasks.

    Automatically registers the task in the task registry (except
    if the :attr:`Task.abstract`` attribute is set).

    If no :attr:`Task.name` attribute is provided, then the name is generated
    from the module and class name.

    """

oh, No.

@with_metaclass(TaskType)
class Task(object):
    """Task base class.

...

    def __call__(self, *args, **kwargs):
        _task_stack.push(self)
        self.push_request()
        try:
            # add self if this is a bound task
            if self.__self__ is not None:
                return self.run(self.__self__, *args, **kwargs)
            return self.run(*args, **kwargs)
        finally:
            self.pop_request()
            _task_stack.pop()

基本的には実行するだけだが、まわりになんかくっついてる

継承して使う場合用

    def run(self, *args, **kwargs):
        """The body of the task executed by workers."""
        raise NotImplementedError('Tasks must define the run method.')

これが一番大事

    def apply_async(self, args=None, kwargs=None,
                    task_id=None, producer=None, connection=None, router=None,
                    link=None, link_error=None, publisher=None,
                    add_to_parent=True, reply_to=None, **options):

引数の説明、長い

コア

    def apply_async(self, args=None, kwargs=None,
        with app.producer_or_acquire(producer) as P:
            self.backend.on_task_call(P, task_id)
            task_id = P.publish_task(self.name, args, kwargs,

producer_or_acquireがなんだか知りたい.

celery/app/base.py

class Celery(object):
    @contextmanager
    def producer_or_acquire(self, producer=None):
        if producer:
            yield producer
        else:
            with self.amqp.producer_pool.acquire(block=True) as producer:
                yield producer
    default_producer = producer_or_acquire  # XXX compat

applyの場合は

def build_tracer(name, task, loader=None, hostname=None, store_errors=True,

が構築する関数、

    def trace_task(uuid, args, kwargs, request=None):
        ...
                try:
                    R = retval = fun(*args, **kwargs)
                    state = SUCCESS

で実行されるようだ

publish_task

celery/app/amqp.py

class TaskProducer(Producer):
    ...
    def publish_task(self, task_name, task_args=None, task_kwargs=None,
        
        データを組み立てて
        self.publish(
        でなんか送信している
        self.publishは kombu/messaging.py Producerに実装されている

        signals.task_sent.send(sender=task_name, **body)

send

celery/utils/dispatch/signal.py Signal

    def send(self, sender, **named):
        """Send signal from sender to all connected receivers.

        If any receiver raises an error, the error propagates back through
        send, terminating the dispatch loop, so it is quite possible to not
        have all receivers called if a raises an error.

signarlとmessageが分離しているのはなぜ?

messageはqueueにつっこまれるだけで通知しない?

signalにconnectしている相手に通知らしい

    def connect(self, *args, **kwargs):
        """Connect receiver to sender for signal.

        :param receiver: A function or an instance method which is to
            receive signals. Receivers must be hashable objects.

おそらくは、通知した相手がmessageを取りに行くのだろう.

残念ながらメンタルモデルを十分に構築できなかった.

0 件のコメント: