class documentation

Cooperative task scheduler.

A cooperative task is an iterator where each iteration represents an atomic unit of work. When the iterator yields, it allows the Cooperator to decide which of its tasks to execute next. If the iterator yields a Deferred then work will pause until the Deferred fires and completes its callback chain.

When a Cooperator has more than one task, it distributes work between all tasks.

There are two ways to add tasks to a Cooperator, cooperate and coiterate. cooperate is the more useful of the two, as it returns a CooperativeTask, which can be paused, resumed and waited on. coiterate has the same effect, but returns only a Deferred that fires when the task is done.

Cooperator can be used for many things, including but not limited to:

  • running one or more computationally intensive tasks without blocking
  • limiting parallelism by running a subset of the total tasks simultaneously
  • doing one thing, waiting for a Deferred to fire, doing the next thing, repeat (i.e. serializing a sequence of asynchronous tasks)

Multiple Cooperators do not cooperate with each other, so for most cases you should use the global cooperator.

Method __init__ Create a scheduler-like object to which iterators may be added.
Method coiterate Add an iterator to the list of iterators this Cooperator is currently running.
Method cooperate Start running the given iterator as a long-running cooperative task, by calling next() on it as a periodic timed event.
Method start Begin scheduling steps.
Method stop Stop scheduling steps. Errback the completion Deferreds of all iterators which have been added and forget about them.
Property running Is this Cooperator is currently running?
Method _addTask Add a CooperativeTask object to this Cooperator.
Method _removeTask Remove a CooperativeTask from this Cooperator.
Method _reschedule Undocumented
Method _tasksWhileNotStopped Yield all CooperativeTask objects in a loop as long as this Cooperator's termination condition has not been met.
Method _tick Run one scheduler tick.
Instance Variable _delayedCall Undocumented
Instance Variable _metarator Undocumented
Instance Variable _mustScheduleOnStart Undocumented
Instance Variable _scheduler Undocumented
Instance Variable _started Undocumented
Instance Variable _stopped Undocumented
Instance Variable _tasks Undocumented
Instance Variable _terminationPredicateFactory Undocumented
def __init__(self, terminationPredicateFactory: Callable[[], Callable[[], bool]] = _Timer, scheduler: Callable[[Callable[[], None]], IDelayedCall] = _defaultScheduler, started: bool = True): (source)

Create a scheduler-like object to which iterators may be added.

Parameters
terminationPredicateFactory:Callable[[], Callable[[], bool]]A no-argument callable which will be invoked at the beginning of each step and should return a no-argument callable which will return True when the step should be terminated. The default factory is time-based and allows iterators to run for 1/100th of a second at a time.
scheduler:Callable[[Callable[[], None]], IDelayedCall]A one-argument callable which takes a no-argument callable and should invoke it at some future point. This will be used to schedule each step of this Cooperator.
started:boolA boolean which indicates whether iterators should be stepped as soon as they are added, or if they will be queued up until Cooperator.start is called.

Add an iterator to the list of iterators this Cooperator is currently running.

Equivalent to cooperate, but returns a Deferred that will be fired when the task is done.

Parameters
iterator:Iterator[_TaskResultT]Undocumented
doneDeferred:Optional[Deferred[Iterator[_TaskResultT]]]If specified, this will be the Deferred used as the completion deferred. It is suggested that you use the default, which creates a new Deferred for you.
Returns
Deferred[Iterator[_TaskResultT]]a Deferred that will fire when the iterator finishes.
def cooperate(self, iterator: Iterator[_TaskResultT]) -> CooperativeTask: (source)

Start running the given iterator as a long-running cooperative task, by calling next() on it as a periodic timed event.

Parameters
iterator:Iterator[_TaskResultT]the iterator to invoke.
Returns
CooperativeTaska CooperativeTask object representing this task.
def start(self): (source)

Begin scheduling steps.

def stop(self): (source)

Stop scheduling steps. Errback the completion Deferreds of all iterators which have been added and forget about them.

@property
running: bool = (source)

Is this Cooperator is currently running?

def _addTask(self, task: CooperativeTask): (source)

Add a CooperativeTask object to this Cooperator.

def _removeTask(self, task: CooperativeTask): (source)

Remove a CooperativeTask from this Cooperator.

def _reschedule(self): (source)

Undocumented

def _tasksWhileNotStopped(self) -> Iterable[CooperativeTask]: (source)

Yield all CooperativeTask objects in a loop as long as this Cooperator's termination condition has not been met.

def _tick(self): (source)

Run one scheduler tick.

Undocumented

Undocumented

_mustScheduleOnStart: bool = (source)

Undocumented

_scheduler = (source)

Undocumented

_started: bool = (source)

Undocumented

_stopped: bool = (source)

Undocumented

_terminationPredicateFactory = (source)

Undocumented