Package stmpy

Module stmpy provides support for state machines in Python.

Contributing

stmpy is on GitHub. Pull requests and bug reports are welcome.

Expand source code
"""Module `stmpy` provides support for state machines in Python.

## Contributing

`stmpy` [is on GitHub](https://github.com/falkr/stmpy). Pull
requests and bug reports are welcome.
"""

import time
import logging
from queue import Queue
from queue import Empty
from threading import Thread

from .machine import Machine
from .driver import Driver
from .spin import to_promela
from .graphviz import to_graphviz

__all__ = ['Machine', 'Driver', 'to_promela', 'to_graphviz']

def get_graphviz_dot(machine):
    """ For compatibility. Use to_graphviz() instead. """
    return to_graphviz(machine)

__version__ = '0.7.6'
"""
The current version of stmpy.
"""

Sub-modules

stmpy.driver
stmpy.graphviz
stmpy.machine
stmpy.spin

Functions

def to_graphviz(machine)

Return the graph of the state machine.

The format is the dot format for Graphviz, and can be directly used as input to Graphviz.

To learn more about Graphviz, visit https://graphviz.gitlab.io.

Display in Jupyter Notebook Install Python support for Graphviz via pip install stmpy.graphviz. Install Graphviz. In a notebook, build a stmpy.Machine. Then, declare a cell with the following content:

from graphviz import Source
src = Source(stmpy.get_graphviz_dot(stm))
src

Using Graphviz on the Command Line

Write the graph file with the following code:

with open("graph.gv", "w") as file:
print(stmpy.get_graphviz_dot(stm), file=file)

You can now use the command line tools from Graphviz to create a graphic file with the graph. For instance:

dot -Tsvg graph.gv -o graph.svg
Expand source code
def to_graphviz(machine):
    """
    Return the graph of the state machine.

    The format is the dot format for Graphviz, and can be directly used as input
    to Graphviz.

    To learn more about Graphviz, visit https://graphviz.gitlab.io.

    **Display in Jupyter Notebook**
    Install Python support for Graphviz via `pip install graphviz`.
    Install Graphviz.
    In a notebook, build a stmpy.Machine. Then, declare a cell with the
    following content:

        from graphviz import Source
        src = Source(stmpy.get_graphviz_dot(stm))
        src

    **Using Graphviz on the Command Line**

    Write the graph file with the following code:

        with open("graph.gv", "w") as file:
        print(stmpy.get_graphviz_dot(stm), file=file)

    You can now use the command line tools from Graphviz to create a graphic
    file with the graph. For instance:

        dot -Tsvg graph.gv -o graph.svg

    """
    s = []
    s.append('digraph G {\n')
    s.append('node [shape=box style=rounded fontname=Helvetica];\n')
    s.append('edge [ fontname=Helvetica ];\n')
    # initial state
    s.append('initial [shape=point width=0.2];\n')
    # final states
    counter = 1
    for t_id in machine._table:
        transition = machine._table[t_id]
        if not transition.internal:
            if transition.target == 'final':
                s.append('f{} [shape=doublecircle width=0.1 label="" style=filled fillcolor=black];\n'.format(counter))
            counter = counter + 1

    for state_name in machine._states:
        s.append(_print_state(machine._states[state_name]))
    # initial transition
    counter = 0
    s.append(_print_transition(machine._initial_transition, counter))
    counter = 1
    for t_id in machine._table:
        transition = machine._table[t_id]
        if not transition.internal:
            s.append(_print_transition(transition, counter))
            counter = counter + 1
    s.append('}')
    return ''.join(s)
def to_promela(machines)

Experimental feature to export machines as Promela processes for model checking in Spin.

Expand source code
def to_promela(machines):
    """ Experimental feature to export machines as Promela processes 
    for model checking in Spin.
    """
    p = Promela(machines)
    return p.to_promela()

Classes

class Driver

A driver can run several machines.

Run-to-completion: One driver contains one thread. Machines assigned to a driver are executed within this single thread. This provides a strict temporal ordering of behavior for state machines assigned to the same driver. A driver only executes one transition at a time, and always executes this transition to completion. This means that the action within a transition can access shared variables without interleaving behavior. One transition is always executed separate from all other transitions.

Create a new driver.

Expand source code
class Driver:
    """
    A driver can run several machines.

    **Run-to-completion:**
    One driver contains one thread. Machines assigned to a driver are executed
    within this single thread. This provides a strict temporal ordering of
    behavior for state machines assigned to the same driver. A driver only
    executes one transition at a time, and always executes this transition to
    completion. This means that the action within a transition can access
    shared variables without interleaving behavior. One transition is always
    executed separate from all other transitions.
    """

    _stms_by_id = {}

    def __init__(self):
        """Create a new driver."""
        self._logger = logging.getLogger(__name__)
        self._logger.debug('Logging works')
        self._active = False
        self._event_queue = Queue()
        self._timer_queue = []
        self._next_timeout = None
        # TODO need clarity if this should be a class variable
        Driver._stms_by_id = {}

    def _wake_queue(self):
        # Sends a None event to wake up the queue.
        self._event_queue.put(None)

    def print_status(self):
        """Provide a snapshot of the current status."""
        s = []
        s.append('=== State Machines: ===\n')
        for stm_id in Driver._stms_by_id:
            stm = Driver._stms_by_id[stm_id]
            s.append('    - {} in state {}\n'.format(stm.id, stm.state))
        s.append('=== Events in Queue: ===\n')
        for event in self._event_queue.queue:
            if event is not None:
                s.append('    - {} for {} with args:{} kwargs:{}\n'.format(
                    event['id'], event['stm'].id,
                    event['args'], event['kwargs']))
        s.append('=== Active Timers: {} ===\n'.format(len(self._timer_queue)))
        for timer in self._timer_queue:
            s.append('    - {} for {} with timeout {}\n'.format(
                timer['id'], timer['stm'].id, timer['timeout']))
        s.append('=== ================ ===\n')
        return ''.join(s)

    def status(self):
        """Provide a snapshot of the current status.
        +------ Remaining steps: 023 ---------+
        | timers  -----> t1 (for stm_tick) 3000 ms
        |                t2 (for stm_tick) 3000 ms
        | (+ 3 more)     t3 (for stm_tick) 3000 ms
        +---------------------------------------
        | stm1: A     in state  s_01
        | Queue head --> A (saved)
        |                B
        |                C
        |                ... (+ 3 more)
        +---------------------------------------"""
        s = []
        s.append('=== State Machines: ===\n')
        for stm_id in Driver._stms_by_id:
            stm = Driver._stms_by_id[stm_id]
            s.append('    - {} in state {}\n'.format(stm.id, stm.state))
        s.append('=== Events in Queue: ===\n')
        for event in self._event_queue.queue:
            if event is not None:
                s.append('    - {} for {} with args:{} kwargs:{}\n'.format(
                    event['id'], event['stm'].id,
                    event['args'], event['kwargs']))
        s.append('=== Active Timers: {} ===\n'.format(len(self._timer_queue)))
        for timer in self._timer_queue:
            s.append('    - {} for {} with timeout {}\n'.format(
                timer['id'], timer['stm'].id, timer['timeout']))
        s.append('=== ================ ===\n')
        return ''.join(s)

    def add_machine(self, machine):
        """Add the state machine to this driver."""
        self._logger.debug('Adding machine {} to driver'.format(machine.id))
        machine._driver = self
        machine._reset()
        if machine.id is not None:
            # TODO warning when STM already registered
            Driver._stms_by_id[machine.id] = machine
            self._add_event(event_id=None, args=[], kwargs={}, stm=machine)

    def start(self, max_transitions=None, keep_active=False):
        """
        Start the driver.

        This method creates a thread which runs the event loop.
        The method returns immediately. To wait until the driver
        finishes, use `stmpy.Driver.wait_until_finished`.

        `max_transitions`: execute only this number of transitions, then stop
        `keep_active`: When true, keep the driver running even when all state
        machines terminated
        """
        self._active = True
        self._max_transitions = max_transitions
        self._keep_active = keep_active
        self.thread = Thread(target=self._start_loop)
        self.thread.start()

    def step(self, steps=1):
        """Execute a single step."""
        self.start(max_transitions=steps)
        self.wait_until_finished()

    def stop(self):
        """Stop the driver."""
        self._active = False
        self._wake_queue()

    def wait_until_finished(self):
        """Blocking method to wait until the driver finished its execution."""
        try:
            self.thread.join()
        except KeyboardInterrupt:
            self._logger.debug('Keyboard interrupt detected, stopping driver.')
            self._active = False
            self._wake_queue()

    def _sort_timer_queue(self):
        self._timer_queue = sorted(
            self._timer_queue, key=lambda timer: timer['timeout_abs'])

    def _start_timer(self, name, timeout, stm):
        self._logger.debug('Start timer with name={} from stm={}'
                           .format(name, stm.id))
        timeout_abs = _current_time_millis() + int(timeout)
        self._stop_timer(name, stm, log=False)
        self._timer_queue.append(
            {'id': name, 'timeout': timeout, 'timeout_abs': timeout_abs,
             'stm': stm, 'tid': stm.id + '_' + name})
        self._sort_timer_queue()
        self._wake_queue()

    def _stop_timer(self, name, stm, log=True):
        if log: self._logger.debug('Stopping timer with name={} from stm={}'
                                   .format(name, stm.id))
        index = 0
        index_to_delete = None
        tid = stm.id + '_' + name
        for timer in self._timer_queue:
            if timer['tid'] == tid:
                index_to_delete = index
            index = index + 1
        if index_to_delete is not None:
            self._timer_queue.pop(index_to_delete)

    def _get_timer(self, name, stm):
        tid = stm.id + '_' + name
        for timer in self._timer_queue:
            if timer['tid'] == tid:
                return timer['timeout_abs'] - _current_time_millis()
        return None

    def _check_timers(self):
        """
        Check for expired timers.

        If there are any timers that expired, place them in the event
        queue.
        """
        if self._timer_queue:
            timer = self._timer_queue[0]
            if timer['timeout_abs'] < _current_time_millis():
                # the timer is expired, remove first element in queue
                self._timer_queue.pop(0)
                # put into the event queue
                self._logger.debug('Timer {} expired for stm {}, adding it to event queue.'.format(timer['id'], timer['stm'].id))
                self._add_event(timer['id'], [], {}, timer['stm'], front=True)
                # not necessary to set next timeout,
                # complete check timers will be called again
            else:
                self._next_timeout = (
                    timer['timeout_abs'] - _current_time_millis()) / 1000
                if self._next_timeout < 0:
                    self._next_timeout = 0
        else:
            self._next_timeout = None

    def _add_event(self, event_id, args, kwargs, stm, front=False):
        if front:
            self._event_queue.queue.appendleft({'id': event_id, 'args': args, 'kwargs': kwargs, 'stm': stm})
        else:
            self._event_queue.put({'id': event_id, 'args': args, 'kwargs': kwargs, 'stm': stm})

    def send(self, message_id, stm_id, args=None, kwargs=None):
        """
        Send a message to a state machine handled by this driver.

        If you have a reference to the state machine, you can also send it
        directly to it by using `stmpy.Machine.send`.

        `stm_id` must be the id of a state machine earlier added to the driver.
        """
        if args == None: args = []
        if kwargs == None: kwargs = {}
        if stm_id not in Driver._stms_by_id:
            self._logger.warn('Machine with name {} cannot be found. '
                              'Ignoring message {}.'.format(stm_id, message_id))
        else:
            stm = Driver._stms_by_id[stm_id]
            self._add_event(message_id, args, kwargs, stm)

    def _terminate_stm(self, stm_id):
        self._logger.debug('Terminating machine {}.'.format(stm_id))
        # removing it from the table of machines
        Driver._stms_by_id.pop(stm_id, None)
        if not self._keep_active and not Driver._stms_by_id:
            self._logger.debug('No machines anymore, stopping driver.')
            self._active = False
            self._wake_queue()

    def _execute_transition(self, stm, event_id, args, kwargs, event):
        if stm._defers_event(event_id):
            stm._add_to_defer_queue(event)
            self._logger.debug('Machine {} defers event {} in state {}'.format(stm._id, event_id, stm._state))
            return
        stm._execute_transition(event_id, args, kwargs)
        if self._max_transitions is not None:
            self._max_transitions = self._max_transitions-1
            if self._max_transitions == 0:
                self._logger.debug('Stopping driver because max_transitions reached.')
                self._active = False

    def _start_loop(self):
        self._logger.debug('Starting loop of the driver.')
        while self._active:
            self._check_timers()
            try:
                event = self._event_queue.get(block=True,
                                              timeout=(self._next_timeout))
                if event is not None:
                    # (None events are just used to wake up the queue.)
                    self._execute_transition(stm=event['stm'],
                                             event_id=event['id'],
                                             args=event['args'],
                                             kwargs=event['kwargs'], event=event)
            except Empty:
                # timeout has occured
                self._logger.debug('Timer expired, driver loop active again.')
            except KeyboardInterrupt:
                self.active = False
                self._logger.debug('Keyboard interrupt. Stopping the driver.')
        self._logger.debug('Driver loop is finished.')

Methods

def add_machine(self, machine)

Add the state machine to this driver.

Expand source code
def add_machine(self, machine):
    """Add the state machine to this driver."""
    self._logger.debug('Adding machine {} to driver'.format(machine.id))
    machine._driver = self
    machine._reset()
    if machine.id is not None:
        # TODO warning when STM already registered
        Driver._stms_by_id[machine.id] = machine
        self._add_event(event_id=None, args=[], kwargs={}, stm=machine)
def print_status(self)

Provide a snapshot of the current status.

Expand source code
def print_status(self):
    """Provide a snapshot of the current status."""
    s = []
    s.append('=== State Machines: ===\n')
    for stm_id in Driver._stms_by_id:
        stm = Driver._stms_by_id[stm_id]
        s.append('    - {} in state {}\n'.format(stm.id, stm.state))
    s.append('=== Events in Queue: ===\n')
    for event in self._event_queue.queue:
        if event is not None:
            s.append('    - {} for {} with args:{} kwargs:{}\n'.format(
                event['id'], event['stm'].id,
                event['args'], event['kwargs']))
    s.append('=== Active Timers: {} ===\n'.format(len(self._timer_queue)))
    for timer in self._timer_queue:
        s.append('    - {} for {} with timeout {}\n'.format(
            timer['id'], timer['stm'].id, timer['timeout']))
    s.append('=== ================ ===\n')
    return ''.join(s)
def send(self, message_id, stm_id, args=None, kwargs=None)

Send a message to a state machine handled by this driver.

If you have a reference to the state machine, you can also send it directly to it by using Machine.send().

stm_id must be the id of a state machine earlier added to the driver.

Expand source code
def send(self, message_id, stm_id, args=None, kwargs=None):
    """
    Send a message to a state machine handled by this driver.

    If you have a reference to the state machine, you can also send it
    directly to it by using `stmpy.Machine.send`.

    `stm_id` must be the id of a state machine earlier added to the driver.
    """
    if args == None: args = []
    if kwargs == None: kwargs = {}
    if stm_id not in Driver._stms_by_id:
        self._logger.warn('Machine with name {} cannot be found. '
                          'Ignoring message {}.'.format(stm_id, message_id))
    else:
        stm = Driver._stms_by_id[stm_id]
        self._add_event(message_id, args, kwargs, stm)
def start(self, max_transitions=None, keep_active=False)

Start the driver.

This method creates a thread which runs the event loop. The method returns immediately. To wait until the driver finishes, use Driver.wait_until_finished().

max_transitions: execute only this number of transitions, then stop keep_active: When true, keep the driver running even when all state machines terminated

Expand source code
def start(self, max_transitions=None, keep_active=False):
    """
    Start the driver.

    This method creates a thread which runs the event loop.
    The method returns immediately. To wait until the driver
    finishes, use `stmpy.Driver.wait_until_finished`.

    `max_transitions`: execute only this number of transitions, then stop
    `keep_active`: When true, keep the driver running even when all state
    machines terminated
    """
    self._active = True
    self._max_transitions = max_transitions
    self._keep_active = keep_active
    self.thread = Thread(target=self._start_loop)
    self.thread.start()
def status(self)

Provide a snapshot of the current status. +------ Remaining steps: 023 ---------+ | timers -----> t1 (for stm_tick) 3000 ms | t2 (for stm_tick) 3000 ms | (+ 3 more) t3 (for stm_tick) 3000 ms +--------------------------------------- | stm1: A in state s_01 | Queue head –> A (saved) | B | C | … (+ 3 more) +---------------------------------------

Expand source code
def status(self):
    """Provide a snapshot of the current status.
    +------ Remaining steps: 023 ---------+
    | timers  -----> t1 (for stm_tick) 3000 ms
    |                t2 (for stm_tick) 3000 ms
    | (+ 3 more)     t3 (for stm_tick) 3000 ms
    +---------------------------------------
    | stm1: A     in state  s_01
    | Queue head --> A (saved)
    |                B
    |                C
    |                ... (+ 3 more)
    +---------------------------------------"""
    s = []
    s.append('=== State Machines: ===\n')
    for stm_id in Driver._stms_by_id:
        stm = Driver._stms_by_id[stm_id]
        s.append('    - {} in state {}\n'.format(stm.id, stm.state))
    s.append('=== Events in Queue: ===\n')
    for event in self._event_queue.queue:
        if event is not None:
            s.append('    - {} for {} with args:{} kwargs:{}\n'.format(
                event['id'], event['stm'].id,
                event['args'], event['kwargs']))
    s.append('=== Active Timers: {} ===\n'.format(len(self._timer_queue)))
    for timer in self._timer_queue:
        s.append('    - {} for {} with timeout {}\n'.format(
            timer['id'], timer['stm'].id, timer['timeout']))
    s.append('=== ================ ===\n')
    return ''.join(s)
def step(self, steps=1)

Execute a single step.

Expand source code
def step(self, steps=1):
    """Execute a single step."""
    self.start(max_transitions=steps)
    self.wait_until_finished()
def stop(self)

Stop the driver.

Expand source code
def stop(self):
    """Stop the driver."""
    self._active = False
    self._wake_queue()
def wait_until_finished(self)

Blocking method to wait until the driver finished its execution.

Expand source code
def wait_until_finished(self):
    """Blocking method to wait until the driver finished its execution."""
    try:
        self.thread.join()
    except KeyboardInterrupt:
        self._logger.debug('Keyboard interrupt detected, stopping driver.')
        self._active = False
        self._wake_queue()
class Machine (name, transitions, obj, states=None)

Implements a state machine.

A machine must be added to a driver to execute it.

Create a new state machine.

Throws an exception if the state machine is not well-formed.

Transitions: Transitions are specified as a dictionary with the following key / value pairs:

  • trigger: string with the name of a trigger, either a message to receive or the name of a timer.
  • source: string with the name of a state.
  • target: string with the name of a state.
  • effect: (optional) a set of strings that refers to method name of the object passed to the state machine via obj. Several effects can be separated with a ;.

    !python

    t_1 = {'trigger': 'tick', 'source': 's_tick', 'target': 's_tock', 'effect': 'on_tick'}

Initial Transition: A state machine must have a single initial transition. This is a normal transition that has a source state with name 'initial', and no trigger.

#!python
t_0 = {'source': 'initial',
       'target': 's_tick',
       'effect': 'on_init'}

Compound Transitions: A compound transition is used to declare a transition that can contain decisions. A compound transition can decide upon the target state at run-time, for instance based on data in variables. It is declared like a normal transition, but does not declare any effect or target. Instead, it refers to a function that is executed. The function must return a string that determines the target state. The key 'targets' (notice the plural 's') allows to specify the potential target states. This has no influence on the behavior of the state machine, but is just used when the data structure is also used to generate a state machine graph.

#!python
def transition_1(args, kwargs):
    # do something
    if ... :
        return 's1'
    else:
        return 's2'

t_3 = {'source': 's_0',
       'trigger': 't',
       'targets': 's1 s2',
       'function': transition_1}

States: States are specified as sources and targets as part of the transitions. This is done by simple strings. The name initial refers to the initial state of the state machine. (An initial transition is necessary, see above.) The name final refers to the final state of the machine. Once a machine executes a transition with target state final, it terminates.

States can declare internal transitions. These are transitions that have the same source and target state, similar to self-transitions. However, they don't ever leave the state, so that any entry or exit actions declared in the state are not executed. An internal transition is declared as part of the extended state definition. It simply lists the name of the trigger (here a) as key and the list of actions it executes as value.

#!python
s_0 = {'name': 's_0',
'a': 'action1(); action2()'}

Deferred Events

A state can defer an event. In this case, the event, if it happens, does not trigger a transition, but is ignored in the input queue until the state machine switches into another state that does not defer the event anymore. This is useful to handle events that can arrive in states when they are not useful yet. To declare a deferred event, simply add the event with its name as key in the extended state description, and use the keyword defer as value:

#!python
s1 = {'name': 's1',
    'a': 'defer'}

Actions and Effects: The value of the attributes for transition effects and for state entry and exit actions can list several actions that are called on the object provided to the state machine.

This list of actions can look in the following way:

#!python
effect='m1; m2(); m3(1, True, "a"); m4(*)'

This is a semicolon-separated list of actions that are called, here as part of a transition's effect. Method m1 has no arguments, and neither does m2. This means the empty brackets are optional. Method m3 has three literal arguments, here the integer 1, the boolean True and the string 'a'. Note how the string is surrounded by double quotation marks, since the entire effect is coded in single quotation marks. Vice-versa is also possible. The last method, m4, declares an asterisk as argument. This means that the state machine uses the args and kwargs of the incoming event and offers them to the method.

The actions can also directly refer to the state machine actions Machine.start_timer() and Machine.stop_timer(). A transition can for instance declare the following effects:

#!python
effect='start_timer("t1", 100); stop_timer("t2");'

Entry-, Exit-, and Do-Actions

States also declare entry and exit actions that are called when they are entered or exited. To declare these actions, declare a dictionary for the state. The name key refers to the name of the state that is also used in the transition declaration.

#!python
s_0 = {'name': 's_0',
       'entry': 'op1; op2',
       'exit': 'op3'}

A state can also declare a do-action. This action is started once the state is entered, after any entry actions, if there are any. Do-actions can refer to code that takes a long time to run, and are executed in their own thread, so that they don't block the execution of other behavior. Once the do-action finishes, the state machine automatically dispatches an event with name done. This implies that a state with a do-action has only one outgoing transition, and this transition must be triggered by the event done.

#!python
s1 = {'name': 's1',
      'do': 'do_action("a")'}

name: Name of the state machine. This name is used to send messages to it, and show its state during debugging.

transitions: A set of transitions, as explained above. There must be at least one initial transition.

obj: An object that encapsulates any actions called from states or transitions.

states: Optional state declarations to add entry and exit actions to them.

Expand source code
class Machine:
    """
    Implements a state machine.

    A machine must be added to a driver to execute it.
    """

    def _parse_transitions(self, transitions, states):
        self._initial_transition = None
        for transition_string in transitions:
            t_dict = transition_string  # ast.literal_eval(transition_string)
            # TODO error handling: string may be written in a wrong way
            source = t_dict['source']
            if source == 'initial':
                self._initial_transition = _Transition(transition_string)
            else:
                trigger = t_dict['trigger']
                t_id = _tid(source, trigger)
                transition = _Transition(transition_string)
                # TODO error handling: what if several transition with same
                # id start from same source state?
                self._table[t_id] = transition
        if self._initial_transition is None:
            raise Exception('The machine has no initial transition')
        # parse states for internal transitions
        for s_dict in states:
            source = s_dict['name']
            for key in s_dict.keys():
                if key not in ['name', 'entry', 'exit']:
                    t_id = _tid(source, key)
                    transition = _Transition({'source': source, 'target': source, 'effect': s_dict[key], 'internal': True})
                    self._table[t_id] = transition

    def _parse_states(self, states):
        for s_dict in states:
            name = s_dict['name']
            # TODO check that state name is given
            # initial state cannot be detailed
            self._states[name] = _State(s_dict)

    def __init__(self, name, transitions, obj, states=None):
        """Create a new state machine.

        Throws an exception if the state machine is not well-formed.

        **Transitions:**
        Transitions are specified as a dictionary with the following key / value
        pairs:

          * trigger: string with the name of a trigger, either a message to receive or the name of a timer.
          * source: string with the name of a state.
          * target: string with the name of a state.
          * effect: (optional) a set of strings that refers to method name of the object passed to the state machine via `obj`. Several effects can be separated with a `;`.

            #!python
            t_1 = {'trigger': 'tick',
                   'source': 's_tick',
                   'target': 's_tock',
                   'effect': 'on_tick'}

        **Initial Transition:**
        A state machine must have a single initial transition. This is a normal
        transition that has a source state with name `'initial'`, and no
        trigger.

            #!python
            t_0 = {'source': 'initial',
                   'target': 's_tick',
                   'effect': 'on_init'}

        **Compound Transitions:**
        A compound transition is used to declare a transition that can contain decisions.
        A compound transition can decide upon the target state at run-time, for instance based on data in variables.
        It is declared like a normal transition, but does not declare any effect or target.
        Instead, it refers to a function that is executed. The function must return a string that determines the target state.
        The key 'targets' (notice the plural 's') allows to specify the potential target states.
        This has no influence on the behavior of the state machine, but is just used when the data structure is also
        used to generate a state machine graph.

            #!python
            def transition_1(args, kwargs):
                # do something
                if ... :
                    return 's1'
                else:
                    return 's2'

            t_3 = {'source': 's_0',
                   'trigger': 't',
                   'targets': 's1 s2',
                   'function': transition_1}

        **States:**
        States are specified as sources and targets as part of the transitions.
        This is done by simple strings. The name `initial` refers to the initial state
        of the state machine. (An initial transition is necessary, see above.)
        The name `final` refers to the final state of the machine.
        Once a machine executes a transition with target state `final`, it terminates.

        States can declare internal transitions. These are transitions that have the
        same source and target state, similar to self-transitions. However, they don't ever
        leave the state, so that any entry or exit actions declared in the state are not executed.
        An internal transition is declared as part of the extended state definition.
        It simply lists the name of the trigger (here `a`) as key and the list of actions it executes
        as value.

            #!python
            s_0 = {'name': 's_0',
            'a': 'action1(); action2()'}

        **Deferred Events**

        A state can defer an event. In this case, the event, if it happens, does not trigger a transition,
        but is ignored in the input queue until the state machine switches into another state
        that does not defer the event anymore.
        This is useful to handle events that can arrive in states when they are not useful yet.
        To declare a deferred event, simply add the event with its name as key in the
        extended state description, and use the keyword `defer` as value:

            #!python
            s1 = {'name': 's1',
                'a': 'defer'}

        **Actions and Effects:**
        The value of the attributes for transition effects and for state entry
        and exit actions can list several actions that are called on the object
        provided to the state machine.

        This list of actions can look in the following way:

            #!python
            effect='m1; m2(); m3(1, True, "a"); m4(*)'

        This is a semicolon-separated list of actions that are called, here as
        part of a transition's effect. Method m1 has no arguments, and neither
        does m2. This means the empty brackets are optional. Method m3 has three
        literal arguments, here the integer 1, the boolean True and the string
        'a'. Note how the string is surrounded by double quotation marks, since
        the entire effect is coded in single quotation marks. Vice-versa is also
        possible. The last method, m4, declares an asterisk as argument. This
        means that the state machine uses the args and kwargs of the incoming
        event and offers them to the method.

        The actions can also directly refer to the state machine actions
        `stmpy.Machine.start_timer` and `stmpy.Machine.stop_timer`.
        A transition can for instance declare the following effects:

            #!python
            effect='start_timer("t1", 100); stop_timer("t2");'

        **Entry-, Exit-, and Do-Actions**

        States also declare entry and exit actions that are called when they are entered or exited.
        To declare these actions, declare a dictionary for the state. The name key refers to
        the name of the state that is also used in the transition declaration.

            #!python
            s_0 = {'name': 's_0',
                   'entry': 'op1; op2',
                   'exit': 'op3'}

        A state can also declare a do-action. This action is started once the state is entered,
        after any entry actions, if there are any. Do-actions can refer to code that takes a long time
        to run, and are executed in their own thread, so that they don't block the execution of other
        behavior. Once the do-action finishes, the state machine automatically dispatches an event
        with name `done`. This implies that a state with a do-action has only one outgoing transition, and this
        transition must be triggered by the event `done`.

            #!python
            s1 = {'name': 's1',
                  'do': 'do_action("a")'}

        `name`: Name of the state machine. This name is used to send messages to it, and show its state during debugging.

        `transitions`: A set of transitions, as explained above. There must be at least one initial transition.

        `obj`: An object that encapsulates any actions called from states or transitions.

        `states`: Optional state declarations to add entry and exit actions to them.
        """
        self._logger = logging.getLogger(__name__)
        self._state = 'initial'
        self._obj = obj
        self._id = name
        self._table = {}
        self._states = {}
        if states == None: states = []
        self._parse_states(states)
        self._parse_transitions(transitions, states)
        self._defer_queue = None

    @property
    def state(self):
        """Return the current control state of the machine.

        This property can be accessed for debugging only.
        """
        return self._state

    @property
    def id(self):
        """Return the name of this machine."""
        return self._id

    @property
    def driver(self):
        """Return the driver this machine is attached to."""
        return self._driver

    def _reset(self):
        self._state = 'initial'

    def _run_function(self, obj, function_name, args, kwargs, asynchronous=False):
        function_name = function_name.strip()
        self._logger.debug('Running function {}.'.format(function_name))
        func = getattr(obj, function_name)
        if asynchronous:
            def running(function, args, kwargs):
                try:
                    function(*args, **kwargs)
                except AttributeError as error:
                    self._logger.error('Error when running function {} from machine.'.format(function_name), exc_info=True)
                # dispatch completion event
                self._logger.debug('Do action complete, sending completion action after done.'.format())
                self._driver._add_event(event_id='done', args=args, kwargs=kwargs, stm=self)
            function = getattr(obj, function_name.strip())
            thread = Thread(target=running, args=[function, args, kwargs])
            thread.start()
            self._logger.debug('Started do action.'.format())
        else:
            try:
                func(*args, **kwargs)
            except AttributeError as error:
                self._logger.error('Error when running function {} from machine.'.format(function_name), exc_info=True)

    def _run_state_machine_function(self, name, args, kwargs):
        if name == 'start_timer':
            if len(args) != 2:
                self._logger.error('Method {} expects 2 args.'.format(name))
            self.start_timer(args[0], args[1])
        elif name == 'stop_timer':
            if len(args) != 1:
                self._logger.error('Method {} expects 1 arg.'.format(name))
            self.stop_timer(args[0])
        elif name == 'terminate':
            self.terminate()
        else:
            self._logger.error('Action {} is not a built-in method.'.format(name))

    def _initialize(self, driver):
        self._driver = driver

    def _run_actions(self, actions, args=None, kwargs=None):
        if args == None: args = []
        if kwargs == None: kwargs = {}
        for action in actions:
            if action['event_args']: # use the arguments provided by the event
                args, kwargs = args, kwargs
            else: # use the arguments provided in the declaration
                args, kwargs = action['args'], {}
            if _is_state_machine_method(action['name']):
                self._run_state_machine_function(action['name'], args, kwargs)
            else:
                self._run_function(self._obj, action['name'], args, kwargs)

    def _defers_event(self, event_id):
        if self._state in self._states:
            return event_id in self._states[self._state].defer
        return False

    def _add_to_defer_queue(self, event):
        if self._defer_queue is None:
            self._defer_queue = []
        # add at beginning, because we reverse when putting back
        self._defer_queue.insert(0, event)

    def _enter_state(self, state, args, kwargs):
        self._logger.debug('Machine {} enters state {}'.format(self.id, state))
        if self._state!=state and self._defer_queue!=None and len(self._defer_queue)>0:
            self._logger.debug('Machine {} transfers back {} deferred events into event queue.'.format(self.id, len(self._defer_queue)))
            self._driver._event_queue.queue.extendleft(self._defer_queue)
            self._defer_queue.clear()
        if state in self._states:
            # execute any entry actions
            self._run_actions(self._states[state].entry)
            # execute any do actions
            if self._states[state].do:
                do_action = self._states[state].do[0]
                if do_action['event_args']:
                    self._run_function(self._obj, do_action['name'], args, kwargs, asynchronous=True)
                else:
                    self._run_function(self._obj, do_action['name'], do_action['args'], {}, asynchronous=True)
        self._state = state

    def _exit_state(self, state):
        self._logger.debug('Machine {} exits state {}'.format(self.id, state))
        # execute any exit actions
        if state in self._states:
            self._run_actions(self._states[state].exit)

    def _execute_transition(self, event_id, args, kwargs):
        previous_state = self._state
        if self._state == 'initial':
            transition = self._initial_transition
        else:
            t_id = _tid(self._state, event_id)
            if t_id not in self._table:
                self._logger.warning(
                    'Machine {} is in state {} and received '
                    'event {}, but no transition with this event is declared!'
                    .format(self.id, self._state, event_id))
                return
            else:
                transition = self._table[t_id]
                if not transition.internal:
                    self._exit_state(self._state)
        # execute all effects
        self._run_actions(transition.effect, args, kwargs)
        if transition.internal:
            self._logger.debug('Internal transition in {} state {} triggered by {}'.format(self.id, previous_state, event_id))
        else:
            if transition.target:
                # simple transition
                target = transition.target
            else:
                # compound transitions defined in code
                target = transition.function(*args, **kwargs)
            # go into the next state
            if target == 'final':
                self.terminate()
                self._logger.debug('Transition in {} from {} to final state triggered by {}'.format(self.id, previous_state, event_id))
            else:
                self._enter_state(target, args, kwargs)
                self._logger.debug('Transition in {} from {} to {} triggered by {}'.format(self.id, previous_state, target, event_id))

    def start_timer(self, timer_id, timeout):
        """
        Start a timer or restart an active one.

        The timeout is given in milliseconds. If a timer with the
        same name already exists, it is restarted with the specified timeout.
        Note that the timeout is intended as the minimum time until the timer's
        expiration, but may vary due to the state of the event queue and the
        load of the system.
        """
        self._logger.debug('Start timer {} in stm {}'.format(timer_id, self.id))
        self._driver._start_timer(timer_id, timeout, self)

    def stop_timer(self, timer_id):
        """
        Stop a timer.

        If the timer is not active, nothing happens.
        """
        self._logger.debug('Stop timer {} in stm {}'.format(timer_id, self.id))
        self._driver._stop_timer(timer_id, self)

    def get_timer(self, timer_id):
        """
        Gets the remaining time for the timer.

        If the timer is not active, `None` is returned.
        """
        return self._driver._get_timer(timer_id, self)

    def send(self, message_id, args=None, kwargs=None):
        """
        Send a message to this state machine.

        To send a message to a state machine by its name, use
        `stmpy.Driver.send` instead.
        """
        if args == None: args = []
        if kwargs == None: kwargs = {}
        self._logger.debug('Send {} in stm {}'.format(message_id, self.id))
        self._driver._add_event(
            event_id=message_id, args=args, kwargs=kwargs, stm=self)

    def terminate(self):
        """
        Terminate this state machine.

        This removes it from the driver.
        If this is the last state machine of the driver and the driver is
        not configured to stay active, this will also terminate the driver.
        """
        self._driver._terminate_stm(self.id)

Instance variables

var driver

Return the driver this machine is attached to.

Expand source code
@property
def driver(self):
    """Return the driver this machine is attached to."""
    return self._driver
var id

Return the name of this machine.

Expand source code
@property
def id(self):
    """Return the name of this machine."""
    return self._id
var state

Return the current control state of the machine.

This property can be accessed for debugging only.

Expand source code
@property
def state(self):
    """Return the current control state of the machine.

    This property can be accessed for debugging only.
    """
    return self._state

Methods

def get_timer(self, timer_id)

Gets the remaining time for the timer.

If the timer is not active, None is returned.

Expand source code
def get_timer(self, timer_id):
    """
    Gets the remaining time for the timer.

    If the timer is not active, `None` is returned.
    """
    return self._driver._get_timer(timer_id, self)
def send(self, message_id, args=None, kwargs=None)

Send a message to this state machine.

To send a message to a state machine by its name, use Driver.send() instead.

Expand source code
def send(self, message_id, args=None, kwargs=None):
    """
    Send a message to this state machine.

    To send a message to a state machine by its name, use
    `stmpy.Driver.send` instead.
    """
    if args == None: args = []
    if kwargs == None: kwargs = {}
    self._logger.debug('Send {} in stm {}'.format(message_id, self.id))
    self._driver._add_event(
        event_id=message_id, args=args, kwargs=kwargs, stm=self)
def start_timer(self, timer_id, timeout)

Start a timer or restart an active one.

The timeout is given in milliseconds. If a timer with the same name already exists, it is restarted with the specified timeout. Note that the timeout is intended as the minimum time until the timer's expiration, but may vary due to the state of the event queue and the load of the system.

Expand source code
def start_timer(self, timer_id, timeout):
    """
    Start a timer or restart an active one.

    The timeout is given in milliseconds. If a timer with the
    same name already exists, it is restarted with the specified timeout.
    Note that the timeout is intended as the minimum time until the timer's
    expiration, but may vary due to the state of the event queue and the
    load of the system.
    """
    self._logger.debug('Start timer {} in stm {}'.format(timer_id, self.id))
    self._driver._start_timer(timer_id, timeout, self)
def stop_timer(self, timer_id)

Stop a timer.

If the timer is not active, nothing happens.

Expand source code
def stop_timer(self, timer_id):
    """
    Stop a timer.

    If the timer is not active, nothing happens.
    """
    self._logger.debug('Stop timer {} in stm {}'.format(timer_id, self.id))
    self._driver._stop_timer(timer_id, self)
def terminate(self)

Terminate this state machine.

This removes it from the driver. If this is the last state machine of the driver and the driver is not configured to stay active, this will also terminate the driver.

Expand source code
def terminate(self):
    """
    Terminate this state machine.

    This removes it from the driver.
    If this is the last state machine of the driver and the driver is
    not configured to stay active, this will also terminate the driver.
    """
    self._driver._terminate_stm(self.id)