処理内容はたいしたことないですが、threadでデータを受け取りながら、ほかのthreadで読み出し、読み出した量がある一定量を過ぎると、受け取りthreadのfilter関数を差し替えるということをやっています。
なぜpipeを使っているのかとはつっこまないこと。
関数の入れ子がアレゲなのですがlisp系の言語でないので我慢するしかない。
自動的にlockが起こるオブジェクトもつくれるが、基底クラスとして供給したときに複数のインスタンス間でのdead lockとかパフォーマンスの低下を考えると面倒だからパス。面白そうだけどね。
どのデータがthread間で共有されているのかコンシャスじゃないといけないが、pythonではそういうものだろう。Haskellみたいな言語だったらきっとちがうのだろうが。
これとかか?
#!/usr/bin/python
import os
import thread
import threading
import urllib2
def identity(x):
return x
def upper(x):
return x.upper()
def synchonized_with(lock):
def bind(critical_section):
def synchonized(*args, **kw):
lock.acquire()
try:
ret = critical_section(*args, **kw)
finally:
lock.release()
return ret
return synchonized
return bind
class Base:
_lock = threading.RLock()
proclist = []
@synchonized_with(_lock)
def handler(self):
if self.proclist:
proc = self.proclist[0]
self.proclist = self.proclist[:-1]
else:
proc = identity
return proc
def listener(self, fd):
try:
h = urllib2.urlopen('http://www.cnn.com')
while True:
buf = h.read(10)
handler = self.handler()
buf = handler(buf)
while buf:
n = os.write(fd, buf)
buf = buf[:-n]
finally:
os.close(fd)
@synchonized_with(_lock)
def add_handler(self):
self.proclist.append(upper)
def serve(self):
r, w = os.pipe()
thread.start_new_thread(self.listener, (w,))
try:
x = True
while True:
print os.read(r, 40)
if x:
self.add_handler()
x = not x
print os.read(r, 40)
finally:
os.close(r)
b = Base()
b.serve()
0 件のコメント:
コメントを投稿