开源中文网

您的位置: 首页 > erlang > 正文

当stackless遭遇twisted

来源:  作者:

有时候,你会遇到将两个编程工具包协同工作的情况。无论如何,要使Twisted和stackless python这两个庞大的系统在一起工作还需要一些准备工作。Twisted需要其reactor运行在主“tasklet”中,但如果没有网络活动或待执行的deferred代码,reactor循环将会终止掉整个程序,这正是我们要使用stackless的目的。

在使其能正常工作之前我们需要做如下的设置。

  1. import stackless
  2. from twisted.internet import reactor, task
  3.  
  4. reactor_tasklet = None
  5.  
  6. def reactor_run( ):
  7.    # 每0.0001秒调用stackless.schedule
  8.    reactor_tasklet = stackless.getcurrent( )
  9.  
  10.    # 此处需要防止reactor将其他的tasklets阻塞掉
  11.    schedulingTask = task.LoopingCall( stackless.schedule )
  12.    schedulingTask.start( 0.0001 )
  13.    reactor.run( )
  14.  
  15. = stackless.tasklet( reactor_run )
  16. # 运行scheduler.stackless.run( )
  17. t.run( )
接下来,我们需要通过Python的函数修饰符来将上述简单的例子扩展成一个更加通用的解决方案(此处我们使用Python的decorator模块)。


  1. def __filter( d ):
  2.    if isinstance( d, failure.Failure ):
  3.        if isinstance( d.value, TaskletExit ):
  4.           print "ignore taskletexit"
  5.           return None
  6.        return d
  7.    return d
  8.  
  9. def __wrapper( d, f, *args, **kwargs ):
  10.  try:
  11.     rv = defer.maybeDeferred( f, *args, **kwargs )
  12.     rv.addCallback(__filter )
  13.     rv.addCallback( d.callback )
  14.     rv.addErrback( __filter )
  15.  except TaskletExit:
  16.     pass
  17.  except Exception, e:
  18.     print e, dir( e )
  19.     d.errback( e )

__filter负责过滤发送给Tasklets的TaskletExit异常,否则Twisted会将其封装成一个twisted.python.failure.Failure实例,此时会产生一个未捕获的“Deferred”异常。由于这也不是我们想发生的,因此需要对其进行过滤。当然,在真正的代码中,我们需要移除相关的打印信息。
__wrapper负责实际的函数调用。他使用maybeDeferred函数以确保在该函数调用之后我们只处理Deferred对象。

__wrapper使用Twisted的回调机制来确保作为函数参数的Deferred对象在实际函数调用结果可用的时候就立即被调用。该Deferred参数对于接下来描述的函数修饰符的工作至关重要。



  1. reactor_tasklet = None
  2.  
  3. @decorator
  4. def deferred_tasklet( f, *args, **kwargs ):
  5.    d = defer.Deferred( )
  6.    t = stackless.tasklet( __wrapper )
  7.    t( d, f, *args, **kwargs )
  8.    t.run( )
  9.    return d
  10.  
  11. @decorator
  12. def blocking_tasklet( f, *args, **kwargs ):
  13.    f2 = deferred_tasklet( f )
  14.    d = f2( *args, **kwargs )
  15.    if reactor_tasklet != stackless.getcurrent( ) and stackless.getcurrent( ) != stackless.getmain( ):
  16.  
  17.       return block_on( d )
  18.    raise RuntimeError( "Cannot block in reactor task" )
  19.  
  20. def block_on( d ):
  21.    chan = stackless.channel( )
  22.    d.addBoth( lambda x,y=chan: y.send( x ) )
  23.    return chan.receive( )

此处我们定义了两个主要的函数修饰符deferred_tasklet和blocking_tasklet,以及功能函数block_on。第一个函数仅仅是返回一个Deferred对象,和作为参数传递给__wrapper函数的Deferred对象一样;如果我们之前有注意到,该Deferred对象会在函数的结果可用的时候自动触发。我们在此处真正要做的是创建一个stackless.tasklet对象然后在该微线程中运行该__wrapper函数。

blocking_tasklet在此之上更进一步,他将我们之前传给其的Deferred对象转换成一个阻塞的函数调用。首先,他使用一些检测机制来确保其不会阻塞在Twisted reactor运行的同一个tasklet中。某些情况下,你或许需要在reactor的tasklet中保存stackless.getcurrent()值。我们有时候也需要知道我们当前运行的tasklet是否是主Stackless tasklet;虽然这不可能发生,但安全为上。

功能函数设置一个Stackless的channel,之后添加了一个小的lambda闭包,该闭包只负责将其参数发送给stackless的channel。该闭包存在于我们等待的Deferred对象的callback和errback链中。当一切都准备就绪,我们调用receive,该操作会阻塞该tasklet直到Deferred对象结束并产生callbacks或errback。从这一点来说,我们通过channel对象receive原函数的返回值并将其作为本函数的返回值返回给我们的函数。

只要我们不在Twisted reactor的同一个tasklet中,我们就可以使用block_on函数来将异步代码转换成顺序执行的同步代码。这些功能也可以通过使用Twisted内置的Callbacks修饰符,但这会将decorated function转变成一个生成器,而我们并不希望这样做。

~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Twistless 就是通过上述而整理开发的twisted与stackless交互的桥梁类库。 

stackless 官方也有关于与twisted整合使用的demo。
猛戳这里或者这里可以得到

Tags:当stackless遭遇twisted
相关文章列表:
关于开源中文网 - 联系我们 - 广告服务 - 网站地图 - 版权声明