Module stmpy.spin

Expand source code
import textwrap

class Promela:

    def __init__(self, machines):
        self.machines = machines
        # all triggers used in all machines
        self.machine_to_triggers = {}
        self.machine_to_timers = {}
        self.machine_to_defers = {}
        self.machines_that_defer = []
        self.machines_with_final_state = []
        self.uses_timers = False
        self.uses_defer = False
        for machine in self.machines:
            triggers = []
            timers = []
            for transition in machine._table.values():
                if transition.trigger is not None:
                    triggers.append(transition.trigger)
                    # TODO: convention that timers start with 't'
                    if transition.trigger.startswith('t'):
                        self.uses_timers = True
                        timers.append(transition.trigger)
                if transition.target == 'final':
                    self.machines_with_final_state.append(machine)
            self.machine_to_triggers[machine._id] = set(triggers)
            self.machine_to_timers[machine._id] = set(timers)
            defers = []
            for state in machine._states.values():
                if len(state.defer) > 0 :
                    defers.extend(state.defer)
                    self.machines_that_defer.append(machine)
                    self.uses_defer = True
            self.machine_to_defers[machine._id] = set(defers)
        self.all_triggers = []
        for triggers in self.machine_to_triggers.values():
            self.all_triggers.extend(triggers)
        self.all_triggers = set(self.all_triggers)
        self.machines_that_defer = set(self.machines_that_defer)
        self.machines_with_final_state = set(self.machines_with_final_state)

    def to_promela(self):
        s = []
        mtypes = self.all_triggers.copy()
        mtypes.add('stop')
        s.append('mtype {{{}}};'.format(', '.join(mtypes)))
        
        timer_process = textwrap.dedent("""\
            proctype Timer (chan queue, tc; mtype name) {
              if
              /* the timer expires */
              :: skip -> queue ! name;
              /* the timer is stopped */
              :: tc ? stop -> skip;
              fi;
            }""")
        
        inline_defer = textwrap.dedent('''\
            inline save(msg) {
              atomic {
                queue ? msg ->
                printf("SAVE %e\n", msg);
                queue_temp ! msg;
              }
            }''')
        # queue ? msg -> printf(\"DISCARD %e\n\", msg);
        inline_discard = textwrap.dedent("""\
            inline discard(msg) {
              atomic {
                queue ? msg -> skip;
              }
            }""")
        inline_requeue = textwrap.dedent("""\
            inline requeue() {
              atomic { 
                do
                :: queue ? msg -> queue_temp ! msg; 
                :: empty(queue) -> break; 
                od;
                do
                :: queue_temp ? msg -> queue ! msg;
                :: empty(queue_temp) -> break;
                od;  
              }
            }""")
        if self.uses_timers:
            s.append('')
            s.append(timer_process)
        if self.uses_defer:
            s.append('')
            s.append(inline_defer)
            s.append('')
            s.append(inline_requeue)
        s.append(inline_discard)
        s.append('')
        for machine in self.machines:
            self.to_p_stm(machine, s)
        s.append('')
        s.append('init {')
        for machine in self.machines:
            s.append('  chan to_{} = [10] of {{mtype}};'.format(machine._id))
            s.append('  run {}(to_{});'.format(machine._id, machine._id))
        s.append('}')
        return '\n'.join(s)

    def to_p_stm(self, machine, s):
        requeue = machine in self.machines_that_defer
        s.append('proctype {} (chan queue) {{'.format(machine._id))
        s.append('\n')
        if requeue:
            s.append('  chan queue_temp = [16] of {mtype};')
        for timer in self.machine_to_timers[machine._id]:
            s.append('  chan {}_c = [1] of {{mtype}};'.format(timer))
        s.append('  mtype msg;')
        s.append('\n')
        # initial transition
        s.append('  initial:')
        s.append('    atomic {')
        self._effect(machine._initial_transition, s)
        self._next_state(machine._initial_transition, s)
        s.append('    }')
        s.append('\n')
        
        # complex states, defined as dicts
        complex_states = []
        for state in machine._states:
            complex_states.append(state.name)
            _state(self, state, machine, s, requeue)
            s.append('')
        # simple states, only defined by transitions
        simple_states = []
        for transition in machine._table.values():
            if transition.source not in complex_states:
                simple_states.append(transition.source)
        simple_states = set(simple_states)
        for state in simple_states:
            self._state_simple(state, machine, s, requeue)
            s.append('')
        if machine in self.machines_with_final_state:
            s.append('  end:');
            s.append('    skip;')
        s.append('}')

    def _next_state(self, transition, s, indent='      '):
        if transition.target == 'final':
            s.append(indent + 'goto end;')
        else:
            s.append(indent + 'goto {};'.format(transition.target))

    def _effect(self, transition, s):
        for action in transition.effect:
            if action['name'] == 'start_timer':
                s.append('           run Timer(queue, {}_c, {});'.format(action['args'][0], action['args'][0]))
            elif action['name'] == 'stop_timer':
                s.append('           {}_c ! stop;'.format(action['args'][0]))
            else:
                s.append('           /* {}() */'.format(action['name']))
    
    def _state_simple(self, state, machine, s, requeue):
        outgoing = []
        consumed = []
        for transition in machine._table.values():
            if transition.source == state:
                outgoing.append(transition)
                consumed.append(transition.trigger)
        consumed = set(consumed)
        discard = self.machine_to_triggers[machine._id] - consumed
        
        s.append('  {}:'.format(state))
        s.append('    if :: (queue ?? [{}]) ->'.format('] || queue ?? ['.join(consumed)))
        s.append('       do')

        for drop in discard:
            s.append('        :: discard({});'.format(drop))
        for transition in outgoing:
            self._transition(transition, s, requeue)
        s.append('      od;')
        s.append('    fi;')

    def _state(self, state, machine, s, requeue):
        outgoing = []
        consumed = []
        for transition in machine.transitions:
            if transition.source == state:
                outgoing.append(transition)
                consumed.append(transition.trigger)
        consumed = set(consumed)
        discard = self.machine_to_triggers[machine._id] - consumed - deferred
        
        s.append('  {}:'.format(state.name))
        s.append('    if :: (queue ?? [{}]) ->'.format('] || queue ?? ['.join(consumed)))
        s.append('       do')
        for defer in state.defer:
            s.append('        :: save({});'.format(defer))
        for drop in discard:
            s.append('        :: discard({});'.format(drop))
        for transition in outgoing:
            self._transition(transition, s, requeue)
        s.append('      od;')
        s.append('    fi;')

    def _transition(self, transition, s, requeue):
        s.append('        :: queue ? {} -> atomic {{'.format(transition.trigger))
        if requeue:
            s.append('           requeue();')
        self._effect(transition,s)
        self._next_state(transition, s)
        s.append('        }')

def to_promela(machines):
    """ Experimental feature to export machines as Promela processes 
    for model checking in Spin.
    """
    p = Promela(machines)
    return p.to_promela()

Functions

def to_promela(machines)

Experimental feature to export machines as Promela processes for model checking in Spin.

Expand source code
def to_promela(machines):
    """ Experimental feature to export machines as Promela processes 
    for model checking in Spin.
    """
    p = Promela(machines)
    return p.to_promela()

Classes

class Promela (machines)
Expand source code
class Promela:

    def __init__(self, machines):
        self.machines = machines
        # all triggers used in all machines
        self.machine_to_triggers = {}
        self.machine_to_timers = {}
        self.machine_to_defers = {}
        self.machines_that_defer = []
        self.machines_with_final_state = []
        self.uses_timers = False
        self.uses_defer = False
        for machine in self.machines:
            triggers = []
            timers = []
            for transition in machine._table.values():
                if transition.trigger is not None:
                    triggers.append(transition.trigger)
                    # TODO: convention that timers start with 't'
                    if transition.trigger.startswith('t'):
                        self.uses_timers = True
                        timers.append(transition.trigger)
                if transition.target == 'final':
                    self.machines_with_final_state.append(machine)
            self.machine_to_triggers[machine._id] = set(triggers)
            self.machine_to_timers[machine._id] = set(timers)
            defers = []
            for state in machine._states.values():
                if len(state.defer) > 0 :
                    defers.extend(state.defer)
                    self.machines_that_defer.append(machine)
                    self.uses_defer = True
            self.machine_to_defers[machine._id] = set(defers)
        self.all_triggers = []
        for triggers in self.machine_to_triggers.values():
            self.all_triggers.extend(triggers)
        self.all_triggers = set(self.all_triggers)
        self.machines_that_defer = set(self.machines_that_defer)
        self.machines_with_final_state = set(self.machines_with_final_state)

    def to_promela(self):
        s = []
        mtypes = self.all_triggers.copy()
        mtypes.add('stop')
        s.append('mtype {{{}}};'.format(', '.join(mtypes)))
        
        timer_process = textwrap.dedent("""\
            proctype Timer (chan queue, tc; mtype name) {
              if
              /* the timer expires */
              :: skip -> queue ! name;
              /* the timer is stopped */
              :: tc ? stop -> skip;
              fi;
            }""")
        
        inline_defer = textwrap.dedent('''\
            inline save(msg) {
              atomic {
                queue ? msg ->
                printf("SAVE %e\n", msg);
                queue_temp ! msg;
              }
            }''')
        # queue ? msg -> printf(\"DISCARD %e\n\", msg);
        inline_discard = textwrap.dedent("""\
            inline discard(msg) {
              atomic {
                queue ? msg -> skip;
              }
            }""")
        inline_requeue = textwrap.dedent("""\
            inline requeue() {
              atomic { 
                do
                :: queue ? msg -> queue_temp ! msg; 
                :: empty(queue) -> break; 
                od;
                do
                :: queue_temp ? msg -> queue ! msg;
                :: empty(queue_temp) -> break;
                od;  
              }
            }""")
        if self.uses_timers:
            s.append('')
            s.append(timer_process)
        if self.uses_defer:
            s.append('')
            s.append(inline_defer)
            s.append('')
            s.append(inline_requeue)
        s.append(inline_discard)
        s.append('')
        for machine in self.machines:
            self.to_p_stm(machine, s)
        s.append('')
        s.append('init {')
        for machine in self.machines:
            s.append('  chan to_{} = [10] of {{mtype}};'.format(machine._id))
            s.append('  run {}(to_{});'.format(machine._id, machine._id))
        s.append('}')
        return '\n'.join(s)

    def to_p_stm(self, machine, s):
        requeue = machine in self.machines_that_defer
        s.append('proctype {} (chan queue) {{'.format(machine._id))
        s.append('\n')
        if requeue:
            s.append('  chan queue_temp = [16] of {mtype};')
        for timer in self.machine_to_timers[machine._id]:
            s.append('  chan {}_c = [1] of {{mtype}};'.format(timer))
        s.append('  mtype msg;')
        s.append('\n')
        # initial transition
        s.append('  initial:')
        s.append('    atomic {')
        self._effect(machine._initial_transition, s)
        self._next_state(machine._initial_transition, s)
        s.append('    }')
        s.append('\n')
        
        # complex states, defined as dicts
        complex_states = []
        for state in machine._states:
            complex_states.append(state.name)
            _state(self, state, machine, s, requeue)
            s.append('')
        # simple states, only defined by transitions
        simple_states = []
        for transition in machine._table.values():
            if transition.source not in complex_states:
                simple_states.append(transition.source)
        simple_states = set(simple_states)
        for state in simple_states:
            self._state_simple(state, machine, s, requeue)
            s.append('')
        if machine in self.machines_with_final_state:
            s.append('  end:');
            s.append('    skip;')
        s.append('}')

    def _next_state(self, transition, s, indent='      '):
        if transition.target == 'final':
            s.append(indent + 'goto end;')
        else:
            s.append(indent + 'goto {};'.format(transition.target))

    def _effect(self, transition, s):
        for action in transition.effect:
            if action['name'] == 'start_timer':
                s.append('           run Timer(queue, {}_c, {});'.format(action['args'][0], action['args'][0]))
            elif action['name'] == 'stop_timer':
                s.append('           {}_c ! stop;'.format(action['args'][0]))
            else:
                s.append('           /* {}() */'.format(action['name']))
    
    def _state_simple(self, state, machine, s, requeue):
        outgoing = []
        consumed = []
        for transition in machine._table.values():
            if transition.source == state:
                outgoing.append(transition)
                consumed.append(transition.trigger)
        consumed = set(consumed)
        discard = self.machine_to_triggers[machine._id] - consumed
        
        s.append('  {}:'.format(state))
        s.append('    if :: (queue ?? [{}]) ->'.format('] || queue ?? ['.join(consumed)))
        s.append('       do')

        for drop in discard:
            s.append('        :: discard({});'.format(drop))
        for transition in outgoing:
            self._transition(transition, s, requeue)
        s.append('      od;')
        s.append('    fi;')

    def _state(self, state, machine, s, requeue):
        outgoing = []
        consumed = []
        for transition in machine.transitions:
            if transition.source == state:
                outgoing.append(transition)
                consumed.append(transition.trigger)
        consumed = set(consumed)
        discard = self.machine_to_triggers[machine._id] - consumed - deferred
        
        s.append('  {}:'.format(state.name))
        s.append('    if :: (queue ?? [{}]) ->'.format('] || queue ?? ['.join(consumed)))
        s.append('       do')
        for defer in state.defer:
            s.append('        :: save({});'.format(defer))
        for drop in discard:
            s.append('        :: discard({});'.format(drop))
        for transition in outgoing:
            self._transition(transition, s, requeue)
        s.append('      od;')
        s.append('    fi;')

    def _transition(self, transition, s, requeue):
        s.append('        :: queue ? {} -> atomic {{'.format(transition.trigger))
        if requeue:
            s.append('           requeue();')
        self._effect(transition,s)
        self._next_state(transition, s)
        s.append('        }')

Methods

def to_p_stm(self, machine, s)
Expand source code
def to_p_stm(self, machine, s):
    requeue = machine in self.machines_that_defer
    s.append('proctype {} (chan queue) {{'.format(machine._id))
    s.append('\n')
    if requeue:
        s.append('  chan queue_temp = [16] of {mtype};')
    for timer in self.machine_to_timers[machine._id]:
        s.append('  chan {}_c = [1] of {{mtype}};'.format(timer))
    s.append('  mtype msg;')
    s.append('\n')
    # initial transition
    s.append('  initial:')
    s.append('    atomic {')
    self._effect(machine._initial_transition, s)
    self._next_state(machine._initial_transition, s)
    s.append('    }')
    s.append('\n')
    
    # complex states, defined as dicts
    complex_states = []
    for state in machine._states:
        complex_states.append(state.name)
        _state(self, state, machine, s, requeue)
        s.append('')
    # simple states, only defined by transitions
    simple_states = []
    for transition in machine._table.values():
        if transition.source not in complex_states:
            simple_states.append(transition.source)
    simple_states = set(simple_states)
    for state in simple_states:
        self._state_simple(state, machine, s, requeue)
        s.append('')
    if machine in self.machines_with_final_state:
        s.append('  end:');
        s.append('    skip;')
    s.append('}')
def to_promela(self)
Expand source code
def to_promela(self):
    s = []
    mtypes = self.all_triggers.copy()
    mtypes.add('stop')
    s.append('mtype {{{}}};'.format(', '.join(mtypes)))
    
    timer_process = textwrap.dedent("""\
        proctype Timer (chan queue, tc; mtype name) {
          if
          /* the timer expires */
          :: skip -> queue ! name;
          /* the timer is stopped */
          :: tc ? stop -> skip;
          fi;
        }""")
    
    inline_defer = textwrap.dedent('''\
        inline save(msg) {
          atomic {
            queue ? msg ->
            printf("SAVE %e\n", msg);
            queue_temp ! msg;
          }
        }''')
    # queue ? msg -> printf(\"DISCARD %e\n\", msg);
    inline_discard = textwrap.dedent("""\
        inline discard(msg) {
          atomic {
            queue ? msg -> skip;
          }
        }""")
    inline_requeue = textwrap.dedent("""\
        inline requeue() {
          atomic { 
            do
            :: queue ? msg -> queue_temp ! msg; 
            :: empty(queue) -> break; 
            od;
            do
            :: queue_temp ? msg -> queue ! msg;
            :: empty(queue_temp) -> break;
            od;  
          }
        }""")
    if self.uses_timers:
        s.append('')
        s.append(timer_process)
    if self.uses_defer:
        s.append('')
        s.append(inline_defer)
        s.append('')
        s.append(inline_requeue)
    s.append(inline_discard)
    s.append('')
    for machine in self.machines:
        self.to_p_stm(machine, s)
    s.append('')
    s.append('init {')
    for machine in self.machines:
        s.append('  chan to_{} = [10] of {{mtype}};'.format(machine._id))
        s.append('  run {}(to_{});'.format(machine._id, machine._id))
    s.append('}')
    return '\n'.join(s)