#-*- coding: utf-8 -*-
#+---------------------------------------------------------------------------+
#| 01001110 01100101 01110100 01111010 01101111 01100010 |
#| |
#| Netzob : Inferring communication protocols |
#+---------------------------------------------------------------------------+
#| Copyright (C) 2011-2017 Georges Bossert and Frédéric Guihéry |
#| This program is free software: you can redistribute it and/or modify |
#| it under the terms of the GNU General Public License as published by |
#| the Free Software Foundation, either version 3 of the License, or |
#| (at your option) any later version. |
#| |
#| This program is distributed in the hope that it will be useful, |
#| but WITHOUT ANY WARRANTY; without even the implied warranty of |
#| MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
#| GNU General Public License for more details. |
#| |
#| You should have received a copy of the GNU General Public License |
#| along with this program. If not, see <http://www.gnu.org/licenses/>. |
#+---------------------------------------------------------------------------+
#| @url : http://www.netzob.org |
#| @contact : contact@netzob.org |
#| @sponsors : Amossys, http://www.amossys.fr |
#| Supélec, http://www.rennes.supelec.fr/ren/rd/cidre/ |
#+---------------------------------------------------------------------------+
#+---------------------------------------------------------------------------+
#| File contributors : |
#| - Georges Bossert <georges.bossert (a) supelec.fr> |
#| - Frédéric Guihéry <frederic.guihery (a) amossys.fr> |
#+---------------------------------------------------------------------------+
#+---------------------------------------------------------------------------+
#| Standard library imports |
#+---------------------------------------------------------------------------+
import random
import socket
#+---------------------------------------------------------------------------+
#| Related third party imports |
#+---------------------------------------------------------------------------+
#+---------------------------------------------------------------------------+
#| Local application imports |
#+---------------------------------------------------------------------------+
from netzob.Common.Utils.Decorators import typeCheck, public_api, NetzobLogger
from netzob.Model.Grammar.Transitions.Transition import Transition
from netzob.Model.Grammar.Transitions.OpenChannelTransition import OpenChannelTransition
from netzob.Model.Grammar.States.AbstractState import AbstractState
from netzob.Model.Grammar.Transitions.AbstractTransition import AbstractTransition
from netzob.Model.Grammar.Transitions.CloseChannelTransition import CloseChannelTransition
from netzob.Model.Vocabulary.EmptySymbol import EmptySymbol
from netzob.Model.Vocabulary.UnknownSymbol import UnknownSymbol
from netzob.Simulator.AbstractionLayer import Operation
[docs]@NetzobLogger
class State(AbstractState):
"""This class represents a state in an automaton.
The State constructor expects some parameters:
:param name: The name of the state. If `None`, it is set to 'State'.
:type name: :class:`str`, optional
The State class provides the following public variables:
:var name: The name of the state. The default value is 'State'.
:var transitions: The list of outgoing transitions
:vartype name: :class:`str`
:vartype transitions: ~typing.List[~netzob.Model.Grammar.Transitions.Transition.Transition]
The following example shows the definition of an ``s0`` state and an ``s1`` state:
>>> from netzob.all import *
>>> s0 = State()
>>> s0.name
'State'
>>> s1 = State(name="S1")
>>> s1.name
'S1'
"""
@public_api
def __init__(self, name=None):
super(State, self).__init__(name=name)
self.__transitions = []
[docs] @public_api
def copy(self):
r"""Copy the current state.
:return: A new object of the same type.
:rtype: :class:`State <netzob.Model.Grammar.States.State.State>`
"""
state = State(name=self.name)
state.transitions = list(self.transitions)
state.active = self.active
state.cbk_modify_transition = list(self.cbk_modify_transition)
state.cbk_filter_transitions = list(self.cbk_filter_transitions)
return state
def execute(self, actor):
self._logger.debug(" [+] At state '{}'".format(self.name))
actor.visit_log.append(" [+] At state '{}'".format(self.name))
# If necessary, filter available transitions
available_transitions = self.__filter_available_transitions(actor, self.transitions)
# Check if the actor has received a message. If so, we execute the step as not an initiator
if actor.abstractionLayer.check_received():
# Check if we should consider reception (i.e. there exists at least one transition in inverseInitiator mode)
should_consider_reception = False
for transition in available_transitions:
if isinstance(transition, Transition):
is_transition_initiator = (actor.initiator and not transition.inverseInitiator) or (not actor.initiator and transition.inverseInitiator)
if is_transition_initiator is False:
should_consider_reception = True
break
if should_consider_reception:
actor.visit_log.append(" [+] At state '{}', received packet on communication channel. Switching to execution as not initiator.".format(self.name))
self._logger.debug("Data received on the communication channel. Switching to execution as not initiator to handle the received message.")
return self.executeAsNotInitiator(actor, available_transitions)
# Else, randomly pick a transition
actor.visit_log.append(" [+] Randomly choosing a transition to execute or to wait for an input symbol")
next_transition = self.__pick_next_transition(actor, available_transitions)
if next_transition is None:
return None
# If transition is in initiator mode
if (actor.initiator and not next_transition.inverseInitiator) or (not actor.initiator and next_transition.inverseInitiator):
# If necessary, modify the current transition
next_transition = self.__modify_current_transition(actor, next_transition, available_transitions)
# Execute next transition as initiator
nextState = self.executeAsInitiator(actor, next_transition)
else:
# Execute next transition as not initiator
nextState = self.executeAsNotInitiator(actor, available_transitions)
return nextState
def executeAsInitiator(self, actor, next_transition):
"""This method picks the next available transition and executes it.
"""
self._logger.debug("[actor='{}'] Execute state {} as an initiator".format(str(actor), self.name))
self.active = True
self._logger.debug("[actor='{}'] Next transition for state '{}': {}.".format(str(actor), self.name, next_transition))
# Execute picked transition as an initiator
try:
nextState = next_transition.executeAsInitiator(actor)
self._logger.debug("[actor='{}'] Transition '{}' leads to state: {}.".format(str(actor), str(next_transition), str(nextState)))
except Exception as e:
self.active = False
raise
if nextState is None:
self._logger.debug("[actor='{}'] The execution of transition '{}' on state '{}' did not return the next state".format(str(actor), str(next_transition), self.name))
self.active = False
return nextState
def executeAsNotInitiator(self, actor, available_transitions):
"""This method executes the current state as not an initiator.
This method will wait for a maximum amount of time the
reception of a symbol and will try to select the appropriate
transition which would be triggered by received symbol. At
the end, if no exception occurs, it returns the next state.
"""
self._logger.debug("[actor='{}'] Execute state {} as a non-initiator".format(str(actor), self.name))
self.active = True
# if no transition exists we quit
if len(self.transitions) == 0:
self._logger.debug("[actor='{}'] The current state '{}' has no transitions available".format(str(actor), self.name))
self.active = False
return None
next_transition = None
nextState = None
# Execute the first special transition (inputSymbolProbability equals 100.0)
for transition in self.transitions:
if transition.inputSymbolProbability == 100.0:
next_transition = transition
# Else, execute the closing transition, if it is the last one remaining
if next_transition is None:
if len(self.transitions) == 1 and self.transitions[
0].TYPE == CloseChannelTransition.TYPE:
next_transition = self.transitions[0]
if next_transition is not None:
actor.visit_log.append(" [+] Going to execute transition '{}'".format(str(next_transition)))
nextState = next_transition.executeAsNotInitiator(actor)
self._logger.debug("[actor='{}'] Transition '{}' leads to state: {}.".format(
str(actor), str(next_transition), str(nextState)))
if nextState is None:
self.active = False
raise Exception(
"The execution of transition '{}' on state '{}' did not return the next state.".
format(next_transition.name, self.name))
return nextState
# Else, we wait to receive a symbol
received_symbol = None
received_message = None
from netzob.Simulator.Actor import ActorStopException
try:
(received_symbol, received_message, received_structure) = actor.abstractionLayer.readSymbol()
if received_symbol is None:
raise Exception("The abstraction layer returned a None received symbol")
self._logger.debug("[actor='{}'] Input symbol: '{}'".format(str(actor), str(received_symbol)))
# Find the transition which accepts the received symbol as an input symbol, along with the correct input symbol preset
next_transition = None
for transition in self.transitions:
is_transition_initiator = (actor.initiator and not transition.inverseInitiator) or (not actor.initiator and transition.inverseInitiator)
if is_transition_initiator:
continue
if transition.type == Transition.TYPE and id(transition.inputSymbol) == id(received_symbol):
if transition.inputSymbolPreset is not None:
self._logger.debug("Checking input symbol preset")
# Check preset
if received_symbol.check_preset(received_structure, transition.inputSymbolPreset):
self._logger.debug("Receive good symbol with good preset setting")
actor.visit_log.append(" [+] Received one of the expected symbols ('{}'), with good preset settings ('{}')".format(received_symbol, transition.inputSymbolPreset))
next_transition = transition
break
else:
next_transition = transition
break
actor.visit_log.append(" [+] Input symbol '{}' corresponds to transition '{}'".format(str(received_symbol), str(next_transition)))
except ActorStopException:
raise
except socket.timeout:
self._logger.debug("[actor='{}'] In state '{}', timeout on abstractionLayer.readSymbol()".format(str(actor), self.name))
# Check if there is a transition with an EmptySymbol as input symbol
self._logger.debug("[actor='{}'] Check if a transition expects an EmptySymbol as input symbol".format(str(actor)))
next_transition = None
for transition in self.transitions:
if transition.type == Transition.TYPE and isinstance(transition.inputSymbol, EmptySymbol):
self._logger.debug("[actor='{}'] The transition '{}' expects an EmptySymbol as input symbol ".format(str(actor), str(transition)))
next_transition = transition
actor.visit_log.append(" [+] Receiving no symbol (EmptySymbol) corresponds to transition '{}'".format(str(next_transition)))
break
else:
self._logger.debug("[actor='{}'] No transition expects an EmptySymbol as input symbol".format(str(actor)))
self.active = False
if actor.automata.cbk_read_symbol_timeout is not None:
actor.automata.cbk_read_symbol_timeout(self, None)
# Returning None here will stop the actor
return
except OSError as e:
self._logger.debug("[actor='{}'] The underlying abstraction channel seems to be closed, so we stop the current actor".format(str(actor)))
return
except Exception as e:
self._logger.debug("[actor='{}'] An exception occured when waiting for a symbol at state '{}': '{}'".format(str(actor), self.name, e))
self.active = False
raise
# If a callback function is defined, we call it in order to execute an external program that may change the selected transition
next_transition = self.__modify_current_transition(actor, next_transition, available_transitions)
# Execute the retained transition
if next_transition is None:
self._logger.debug("[actor='{}'] The received symbol did not match any of the registered transition".format(str(actor)))
#nextState = self
# Handle case where received symbol is unknown
if isinstance(received_symbol, UnknownSymbol):
if actor.automata.cbk_read_unknown_symbol is not None:
actor.automata.cbk_read_unknown_symbol(self,
None,
received_message)
else:
raise Exception("The received message is unknown")
# Handle case where received symbol is known but unexpected
else:
if actor.automata.cbk_read_unexpected_symbol is not None:
actor.automata.cbk_read_unexpected_symbol(self,
None,
received_symbol,
received_message,
received_structure)
else:
raise Exception("The received symbol did not match any of expected symbols, for actor '{}'".format(actor))
else:
for cbk in next_transition.cbk_action:
self._logger.debug("[actor='{}'] A callback function is defined at the end of transition '{}'".format(str(actor), next_transition.name))
cbk(received_symbol, received_message, received_structure, Operation.ABSTRACT, self, actor.memory)
nextState = next_transition.executeAsNotInitiator(actor)
self._logger.debug("[actor='{}'] Transition '{}' leads to state: {}.".format(str(actor), str(next_transition), str(nextState)))
self.active = False
return nextState
def __pick_next_transition(self, actor, available_transitions):
"""Returns the next transition by considering the priority (inputSymbolProbability) of the transition and a random choice.
It can return None.
:return: the next transition or None if no transition available
:rtype: :class:`AbstractTransition <netzob.Model.Grammar.Transition.AbstractTransition.AbstractTransition>`
"""
# create a dictionary to host the available transition
prioritizedTransitions = dict()
for transition in available_transitions:
# Handle transition priority (inputSymbolProbability)
if transition.inputSymbolProbability in list(prioritizedTransitions.keys()):
prioritizedTransitions[transition.inputSymbolProbability].append(transition.copy())
else:
prioritizedTransitions[transition.inputSymbolProbability] = [transition.copy()]
if len(prioritizedTransitions) == 0:
return None
list_probabilities = sorted(prioritizedTransitions.keys())
list_probabilities = list_probabilities[::-1]
available_transitions = prioritizedTransitions[list_probabilities[0]]
# Randomly select the next transition
next_transition = random.choice(available_transitions)
# Log initiator mode
if isinstance(next_transition, Transition):
is_transition_initiator = (actor.initiator and not next_transition.inverseInitiator) or (not actor.initiator and next_transition.inverseInitiator)
if is_transition_initiator:
actor.visit_log.append(" [+] Picking transition '{}' (initiator)".format(next_transition))
else:
actor.visit_log.append(" [+] Waiting for an input symbol to decide the transition (not initiator)")
elif isinstance(next_transition, OpenChannelTransition):
initiator_mode = "open channel"
actor.visit_log.append(" [+] Picking transition '{}' ({})".format(next_transition, initiator_mode))
else:
initiator_mode = "close channel"
actor.visit_log.append(" [+] Picking transition '{}' ({})".format(next_transition, initiator_mode))
return next_transition
def __modify_current_transition(self, actor, current_transition, available_transitions):
r"""If a callback function is defined, we call it in order to execute
an external program that may change the selected transition.
"""
self._logger.debug("[actor='{}'] Test if a callback function is defined at state '{}'".format(actor, self.name))
for cbk in self.cbk_modify_transition:
self._logger.debug("[actor='{}'] A callback function is defined at state '{}'".format(actor, self.name))
available_transitions = [cloned_transition.copy() for cloned_transition in available_transitions]
current_transition = cbk(available_transitions,
current_transition,
self,
actor.abstractionLayer.last_sent_symbol,
actor.abstractionLayer.last_sent_message,
actor.abstractionLayer.last_sent_structure,
actor.abstractionLayer.last_received_symbol,
actor.abstractionLayer.last_received_message,
actor.abstractionLayer.last_received_structure,
actor.memory)
is_transition_initiator = (actor.initiator and not current_transition.inverseInitiator) or (not actor.initiator and current_transition.inverseInitiator)
if is_transition_initiator:
transition_mode = "initiator"
else:
transition_mode = "not initiator"
actor.visit_log.append(" [+] Changing transition to '{}' ({}), through callback".format(current_transition, transition_mode))
else:
self._logger.debug("[actor='{}'] No callback function is defined at state '{}'".format(actor, self.name))
return current_transition
def __filter_available_transitions(self, actor, available_transitions):
r"""If a callback function is defined, we call it in order to execute
an external program that may change the available transitions.
"""
self._logger.debug("[actor='{}'] Test if a callback function is defined at state '{}'".format(actor, self.name))
for cbk in self.cbk_filter_transitions:
self._logger.debug("[actor='{}'] A callback function is defined at state '{}'".format(actor, self.name))
available_transitions = [cloned_transition.copy() for cloned_transition in available_transitions]
available_transitions = cbk(available_transitions,
self,
actor.abstractionLayer.last_sent_symbol,
actor.abstractionLayer.last_sent_message,
actor.abstractionLayer.last_sent_structure,
actor.abstractionLayer.last_received_symbol,
actor.abstractionLayer.last_received_message,
actor.abstractionLayer.last_received_structure,
actor.memory)
actor.visit_log.append(" [+] Filtering available transitions through callback")
else:
self._logger.debug("[actor='{}'] No callback function is defined at state '{}'".format(actor, self.name))
return available_transitions
@typeCheck(AbstractTransition)
def removeTransition(self, transition):
"""remove the specified transition from the list
of transition which starts on the current state.
:param transition: the transition to remove
:type transition: :class:`Transition <netzob.Model.Grammar.Transitions.Transition.Transition>`
:raise: TypeError if param is not a Transition and a ValueError if the transition
is not registered
"""
if transition not in self.__transitions:
raise ValueError("The transition is not associated to the current state so cannot be removed.")
self.__transitions.remove(transition)
@public_api
@property
def transitions(self):
return self.__transitions
@transitions.setter # type: ignore
def transitions(self, transitions):
self.__transitions = transitions
def _test():
r"""
>>> from netzob.all import *
>>> s0 = State()
>>> s0.name
'State'
>>> s1 = State(name="S1")
>>> s1.name
'S1'
>>> t = Transition(s0, s1, None, None)
>>> t.startState.name
'State'
>>> t.endState.name
'S1'
>>> len(s0.transitions)
1
>>> s0.transitions[0].startState.name
'State'
>>> s0.transitions[0].endState.name
'S1'
# Test copy()
>>> from netzob.all import *
>>> s0 = State(name="s0")
>>> s1 = State(name="s1")
>>> t = CloseChannelTransition(s0, s1, name="transition")
>>> s0.copy()
s0
"""