かなり久々のブログになる。
PythonにはCeleryというmessaging frameworkがある。 ここのmessagingというのはメールとかMMSとかSMSではなくて, RabbitMQとかZeroMQのmessagingである. 実際, RabbitMQを使って動かすことができる. 試してはいないが, 他にはMongoDBとかCouchDBも使うことができるらしい.
ちょろっとドキュメントを読んだ程度では使い方がわからない。 ベースになっているkombuに 変えようかとも思ったが、わざわざ低水準側にいって抽象度が下がるのもつまらないので, 少しコードを読んでみることにした.
環境はpython3.3.2である
pip freezeするとこんなものが入っていると言われた. scraping toolを作ろうとしていたから, lxmlとかrequestsとかが入っている.
SQLAlchemy==0.8.2 amqp==1.2.0 anyjson==0.3.3 billiard==2.7.3.31 celery==3.0.21 distribute==0.6.49 kombu==2.5.12 lxml==3.2.1 python-dateutil==2.1 pytz==2013b requests==1.2.3 six==1.3.0
例によって迷子になること請け合いだが....とりあえず書いてみた.
in celery/__init__.py
from celery.app.base import Celery # noqa
in celery/app/base.py
Celeryは、withで使える(いつなぜ使うのだろう?)
def __enter__(self): return self def __exit__(self, *exc_info): self.close()
肝となるデコレータ
def task(self, *args, **opts): """Creates new task class from any callable.""" if _EXECV and not opts.get('_force_evaluate'): # When using execv the task in the original module will point to a # different app, so doing things like 'add.request' will point to # a differnt task instance. This makes sure it will always use # the task instance from the current app. # Really need a better solution for this :( from . import shared_task as proxies_to_curapp return proxies_to_curapp(*args, _force_evaluate=True, **opts) def inner_create_task_cls(shared=True, filter=None, **opts): _filt = filter # stupid 2to3 def _create_task_cls(fun): if shared: cons = lambda app: app._task_from_fun(fun, **opts) cons.__name__ = fun.__name__ shared_task(cons) if self.accept_magic_kwargs: # compat mode task = self._task_from_fun(fun, **opts) if filter: task = filter(task) return task # return a proxy object that is only evaluated when first used promise = PromiseProxy(self._task_from_fun, (fun, ), opts) self._pending.append(promise) if _filt: return _filt(promise) return promise return _create_task_cls if len(args) == 1 and isinstance(args[0], Callable): return inner_create_task_cls(**opts)(*args) if args: raise TypeError( 'task() takes no arguments (%s given)' % (len(args, ))) return inner_create_task_cls(**opts)
_task_from_funが大事なようだ. 名前からして, functionからtask objectを作る関数である.
def _task_from_fun(self, fun, **options): base = options.pop('base', None) or self.Task bind = options.pop('bind', False) T = type(fun.__name__, (base, ), dict({ 'app': self, 'accept_magic_kwargs': False, 'run': fun if bind else staticmethod(fun), '__doc__': fun.__doc__, '__module__': fun.__module__}, **options))() task = self._tasks[T.name] # return global instance. task.bind(self) return taskPromiseProxyでwrapされるので、それを追う
from celery.local import PromiseProxy, maybe_evaluatecelery/local.py
ほとんどがProxyから来ている. 大事なのはこれのようだ.
PromiseProxy __thingがセットされなかったら評価するdef _get_current_object(self): try: return object.__getattribute__(self, '__thing') except AttributeError: return self.__evaluate__()評価するとは_get_current_objectしてそれをセットする こと なんかキャッシュされている値を消すらしい
def __evaluate__(self, _clean=('_Proxy__local', '_Proxy__args', '_Proxy__kwargs')): try: thing = Proxy._get_current_object(self) object.__setattr__(self, '__thing', thing) return thing finally: for attr in _clean: try: object.__delattr__(self, attr) except AttributeError: # pragma: no cover # May mask errors so ignore passProxyの_get_current_objectを見ていく.
def _get_current_object(self): """Return the current object. This is useful if you want the real object behind the proxy at a time for performance reasons or because you want to pass the object into a different context. """ loc = object.__getattribute__(self, '_Proxy__local') if not hasattr(loc, '__release_local__'): return loc(*self.__args, **self.__kwargs) try: return getattr(loc, self.__name__) except AttributeError: raise RuntimeError('no object bound to {0.__name__}'.format(self))
what's _Proxy__local
def __init__(self, local, args=None, kwargs=None, name=None): object.__setattr__(self, '_Proxy__local', local)
localには何が渡っているのやら? _task_from_fun が作ったobject が入っている
つまり、decoratした関数が入っている
で、__name__って?
Proxy@_default_cls_attr('name', str, __name__) def __name__(self): try: return self.__custom_name__ except AttributeError: return self._get_current_object().__name__
んじゃこりゃ?
initで
if name is not None: object.__setattr__(self, '__custom_name__', name)
とかしているから、decoratorをみるのがよいだろう
def _default_cls_attr(name, type_, cls_value): # Proxy uses properties to forward the standard # class attributes __module__, __name__ and __doc__ to the real # object, but these needs to be a string when accessed from # the Proxy class directly. This is a hack to make that work. # -- See Issue #1087. return type(name, (type_, ), { '__new__': __new__, '__get__': __get__, })
追いかけてもしょうがないなぁ、これ
しかたないので戻って_task_from_funを読む
T = type(fun.__name__, (base, ), dict({ 'app': self, 'accept_magic_kwargs': False, 'run': fun if bind else staticmethod(fun), '__doc__': fun.__doc__, '__module__': fun.__module__}, **options))()
あれ, typeに複数引数を渡すとどうなるんだっけ?
from python 3.3 doc
ということは、base大事
base = options.pop('base', None) or self.Task
なので、Taskが知りたい
@cached_property def Task(self): return self.create_task_cls()
ではcreate_task_clsはなにか,というと
def create_task_cls(self): """Creates a base task class using default configuration taken from this app.""" return self.subclass_with_self('celery.app.task:Task', name='Task', attribute='_app', abstract=True)
subclass_with_selfってなによ?
def subclass_with_self(self, Class, name=None, attribute='app', reverse=None, **kw): """Subclass an app-compatible class by setting its app attribute to be this app instance. App-compatible means that the class has a class attribute that provides the default app it should use, e.g. ``class Foo: app = None``. :param Class: The app-compatible class to subclass. :keyword name: Custom name for the target class. :keyword attribute: Name of the attribute holding the app, default is 'app'. """ Class = symbol_by_name(Class) reverse = reverse if reverse else Class.__name__ def __reduce__(self): return _unpickle_appattr, (reverse, self.__reduce_args__()) attrs = dict({attribute: self}, __module__=Class.__module__, __doc__=Class.__doc__, __reduce__=__reduce__, **kw) return type(name or Class.__name__, (Class, ), attrs)
symbol_by_name てなによ?
Class = symbol_by_name(Class)
がなんだかということになる
from kombu.utils import symbol_by_name
なので今度はkombuのドキュメントを調べる
kombu==2.5.12
http://www.nullege.com/codes/search/kombu.utils.symbol_by_name
がんばって名前からsymbolを取り出すらしい
これでcreate_task_clsに戻れる
celery.app.task:Taskが肝。
return self.subclass_with_self('celery.app.task:Task', name='Task',
class TaskType(type): """Meta class for tasks. Automatically registers the task in the task registry (except if the :attr:`Task.abstract`` attribute is set). If no :attr:`Task.name` attribute is provided, then the name is generated from the module and class name. """
oh, No.
@with_metaclass(TaskType) class Task(object): """Task base class. ... def __call__(self, *args, **kwargs): _task_stack.push(self) self.push_request() try: # add self if this is a bound task if self.__self__ is not None: return self.run(self.__self__, *args, **kwargs) return self.run(*args, **kwargs) finally: self.pop_request() _task_stack.pop()
基本的には実行するだけだが、まわりになんかくっついてる
継承して使う場合用
def run(self, *args, **kwargs): """The body of the task executed by workers.""" raise NotImplementedError('Tasks must define the run method.')
これが一番大事
def apply_async(self, args=None, kwargs=None, task_id=None, producer=None, connection=None, router=None, link=None, link_error=None, publisher=None, add_to_parent=True, reply_to=None, **options):
引数の説明、長い
コア
def apply_async(self, args=None, kwargs=None, with app.producer_or_acquire(producer) as P: self.backend.on_task_call(P, task_id) task_id = P.publish_task(self.name, args, kwargs,
producer_or_acquireがなんだか知りたい.
celery/app/base.py
class Celery(object): @contextmanager def producer_or_acquire(self, producer=None): if producer: yield producer else: with self.amqp.producer_pool.acquire(block=True) as producer: yield producer default_producer = producer_or_acquire # XXX compat
applyの場合は
def build_tracer(name, task, loader=None, hostname=None, store_errors=True,
が構築する関数、
def trace_task(uuid, args, kwargs, request=None): ... try: R = retval = fun(*args, **kwargs) state = SUCCESS
で実行されるようだ
publish_task
celery/app/amqp.py
class TaskProducer(Producer): ... def publish_task(self, task_name, task_args=None, task_kwargs=None, データを組み立てて self.publish( でなんか送信している self.publishは kombu/messaging.py Producerに実装されている signals.task_sent.send(sender=task_name, **body)
send
celery/utils/dispatch/signal.py Signal
def send(self, sender, **named): """Send signal from sender to all connected receivers. If any receiver raises an error, the error propagates back through send, terminating the dispatch loop, so it is quite possible to not have all receivers called if a raises an error.
signarlとmessageが分離しているのはなぜ?
messageはqueueにつっこまれるだけで通知しない?
signalにconnectしている相手に通知らしい
def connect(self, *args, **kwargs): """Connect receiver to sender for signal. :param receiver: A function or an instance method which is to receive signals. Receivers must be hashable objects.
おそらくは、通知した相手がmessageを取りに行くのだろう.
残念ながらメンタルモデルを十分に構築できなかった.
0 件のコメント:
コメントを投稿