Source code for netzob.Model.Vocabulary.Domain.Variables.Nodes.Agg

# -*- coding: utf-8 -*-

# +---------------------------------------------------------------------------+
# |          01001110 01100101 01110100 01111010 01101111 01100010            |
# |                                                                           |
# |               Netzob : Inferring communication protocols                  |
# +---------------------------------------------------------------------------+
# | Copyright (C) 2011-2017 Georges Bossert and Frédéric Guihéry              |
# | This program is free software: you can redistribute it and/or modify      |
# | it under the terms of the GNU General Public License as published by      |
# | the Free Software Foundation, either version 3 of the License, or         |
# | (at your option) any later version.                                       |
# |                                                                           |
# | This program is distributed in the hope that it will be useful,           |
# | but WITHOUT ANY WARRANTY; without even the implied warranty of            |
# | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the              |
# | GNU General Public License for more details.                              |
# |                                                                           |
# | You should have received a copy of the GNU General Public License         |
# | along with this program. If not, see <http://www.gnu.org/licenses/>.      |
# +---------------------------------------------------------------------------+
# | @url      : http://www.netzob.org                                         |
# | @contact  : contact@netzob.org                                            |
# | @sponsors : Amossys, http://www.amossys.fr                                |
# |             Supélec, http://www.rennes.supelec.fr/ren/rd/cidre/           |
# +---------------------------------------------------------------------------+

# +---------------------------------------------------------------------------+
# | File contributors :                                                       |
# |       - Georges Bossert <georges.bossert (a) supelec.fr>                  |
# |       - Frédéric Guihéry <frederic.guihery (a) amossys.fr>                |
# +---------------------------------------------------------------------------+

# +---------------------------------------------------------------------------+
# | Standard library imports                                                  |
# +---------------------------------------------------------------------------+
import random
from bitarray import bitarray

# +---------------------------------------------------------------------------+
# | Related third party imports                                               |
# +---------------------------------------------------------------------------+

# +---------------------------------------------------------------------------+
# | Local application imports                                                 |
# +---------------------------------------------------------------------------+
from netzob.Common.Utils.Decorators import typeCheck, NetzobLogger, public_api
from netzob.Model.Vocabulary.Domain.Variables.Nodes.AbstractVariableNode import AbstractVariableNode
from netzob.Model.Vocabulary.Domain.Parser.ParsingPath import ParsingPath, ParsingException
from netzob.Model.Vocabulary.Domain.Specializer.SpecializingPath import SpecializingPath
from netzob.Fuzzing.Mutators.DomainMutator import FuzzingMode


# Class used to denote current variable, in order to handle self recursivity
class SELF(object):
    pass


[docs]@NetzobLogger class Agg(AbstractVariableNode): r"""The Agg class is a node variable that represents a concatenation of variables. An aggregate node concatenates the values that are accepted by its children nodes. It can be used to specify a succession of tokens. The Agg constructor expects some parameters: :param children: The sequence of variable elements contained in the aggregate. The default value is None. :param last_optional: A flag indicating if the last element of the children is optional or not. The default value is False. :param name: The name of the variable (if None, the name will be generated). :type children: a :class:`list` of :class:`Variable <netzob.Model.Vocabulary.Domain.Variables.AbstractVariable.AbstractVariable>`, optional :type last_optional: :class:`bool`, optional :type name: :class:`str`, optional The Agg class supports modeling of direct recursions on the right. To do so, the flag ``SELF`` is available, and should only be used in the last position of the aggregate (see example below). The Agg class provides the following public variables: :var children: The sorted typed list of children attached to the variable node. :vartype children: a list of :class:`Variable <netzob.Model.Vocabulary.Domain.Variables.AbstractVariable.AbstractVariable>` **Aggregate examples** For example, the following code represents a field that accepts values that are made of a String of 3 to 20 random characters followed by a ".txt" extension: >>> from netzob.all import * >>> t1 = String(nbChars=(3,20)) >>> t2 = String(".txt") >>> f = Field(Agg([t1, t2])) The following example shows an aggregate between BitArray variables: >>> from netzob.all import * >>> f = Field(Agg([BitArray('01101001'), BitArray(nbBits=3), BitArray(nbBits=5)])) >>> t = next(f.specialize()) >>> len(t) 2 **Examples of Agg internal attribute access** >>> from netzob.all import * >>> domain = Agg([Raw(), String()]) >>> print(domain.children[0].dataType) Raw(nbBytes=(0,8192)) >>> print(domain.children[1].dataType) String(nbChars=(0,8192)) >>> domain.children.append(Agg([10, 20, 30])) >>> len(domain.children) 3 >>> domain.children.remove(domain.children[0]) >>> len(domain.children) 2 **Abstraction of aggregate variables** This example shows the abstraction process of an Aggregate variable: >>> from netzob.all import * >>> v1 = String(nbChars=(1, 10)) >>> v2 = String(".txt") >>> f0 = Field(Agg([v1, v2]), name="f0") >>> f1 = Field(String("!"), name="f1") >>> f = Field([f0, f1]) >>> data = "john.txt!" >>> f.abstract(data) OrderedDict([('f0', b'john.txt'), ('f1', b'!')]) In the following example, an Aggregate variable is defined. A message that does not correspond to the expected model is then parsed, thus an exception is returned: >>> from netzob.all import * >>> v1 = String(nbChars=(1, 10)) >>> v2 = String(".txt") >>> f0 = Field(Agg([v1, v2]), name="f0") >>> f1 = Field(String("!"), name="f1") >>> f = Field([f0, f1]) >>> data = "johntxt!" >>> f.abstract(data) Traceback (most recent call last): ... netzob.Model.Vocabulary.AbstractField.AbstractionException: With the symbol/field 'Field', cannot abstract the data: 'johntxt!'. Error: 'No parsing path returned while parsing 'b'johntxt!''' **Specialization of aggregate variables** This example shows the specialization process of an Aggregate variable: >>> from netzob.all import * >>> d1 = String("hello") >>> d2 = String(" john") >>> f = Field(Agg([d1, d2])) >>> next(f.specialize()) b'hello john' **Optional last variable** This example shows the specialization and parsing of an aggregate with an optional last variable: >>> from netzob.all import * >>> a = Agg([int8(2), int8(3)], last_optional=True) >>> f = Field(a) >>> res = next(f.specialize()) >>> res == b'\x02' or res == b'\x02\x03' True >>> d = b'\x02\x03' >>> f.abstract(d) OrderedDict([('Field', b'\x02\x03')]) >>> d = b'\x02' >>> f.abstract(d) OrderedDict([('Field', b'\x02')]) **Modeling indirect imbrication** The following example shows how to specify a field with a structure (``v2``) that can contain another structure (``v0``), through a tierce structure (``v1``). The flag ``last_optional`` is used to indicate that the specialization or parsing of the last element of the aggregates ``v1`` and ``v2`` is optional. >>> from netzob.all import * >>> v0 = Agg(["?", int8(4)]) >>> v1 = Agg(["!", int8(3), v0], last_optional=True) >>> v2 = Agg([int8(2), v1], last_optional=True) >>> f = Field(v2) >>> >>> # Test specialization >>> res = next(f.specialize()) >>> res == b'\x02' or res == b'\x02!\x03' or res == b'\x02!\x03?\x04' True >>> >>> # Test parsing >>> f.abstract(res) OrderedDict([('Field', b'\x02')]) .. warning:: **Important note about recursion** The library can handle both direct and indirect recursion. However, there is a limitation requiring the use of a recursing variable on the **right side of a statement**. Any other behavior could lead to infinite recursion during the loading of the model. To help understand what syntax should be preferred, here is a list of annotated BNF syntaxes. *invalid syntaxes:* .. productionlist:: A: [A] integer : <recursion on the left side> B: ( "(" B ) | ( "." ")" ) : <recursion on the middle> *valid adaptations from above examples*: .. productionlist:: A: integer+ : <recursion is replaced by a repeat approach> B: B' ")" : <split the statement ...> B': ( "(" B ) | "." : <direct recursion converted in an indirect one : on the right> *valid recursion examples*: .. productionlist:: C: "." C* : <a string with one or more dot characters> D: ( D | "." )* : <a string with zero or more dot characters> **Modeling direct recursion, simple example** The following example shows how to specify a field with a structure (``v``) that can optionally contain itself. To model such recursive structure, the ``SELF`` flag has to be used in the last position of the aggregate. >>> from netzob.all import * >>> v = Agg([int8(interval=(1, 5)), SELF], last_optional=True) >>> f = Field(v) >>> >>> # Test specialization >>> res = next(f.specialize()) >>> res # doctest: +SKIP b'\x02\x04\x01' >>> >>> # Test parsing >>> res_data = f.abstract(res) # doctest: +SKIP True **Modeling direct recursion, more complex example** This example introduces a recursion in the middle of an expression by modeling a pair group of parentheses (``'('`` and ``')'``), around a single character (``'+'``). The BNF syntax of this model would be: .. productionlist:: parentheses: ( "(" parentheses ) | ( "+" ")" ) This syntax introduces a recursivity in the middle of the `left` statement, which **is not supported**. Instead, this syntax could be adapted to move the recursivity to the right. .. productionlist:: parentheses: left right left: ( "(" parentheses ) | "+" right: ")" The following models describe this issue and provide a workaround. **BAD way** >>> from netzob.all import * >>> parentheses = Agg(["(", Alt([SELF, "+"]), ")"]) Traceback (most recent call last): ValueError: SELF can only be set at the last position of an Agg **GOOD way** >>> from netzob.all import * >>> parentheses = Agg([]) >>> left = Agg(["(", Alt([parentheses, "+"])]) >>> right = ")" >>> parentheses.children += [left, right] >>> >>> symbol = Symbol([Field(parentheses)]) >>> next(symbol.specialize()) b'((+))' **Modeling indirect recursion, simple example** The following example shows how to specify a field with a structure (``v2``) that contains another structure (``v1``), which can itself contain the first structure (``v2``). The flag ``last_optional`` is used to indicate that the specialization or parsing of the last element of the aggregate ``v2`` is optional. >>> from netzob.all import * >>> v1 = Agg([]) >>> v2 = Agg([int8(interval=(1, 3)), v1], last_optional=True) >>> v1.children = ["!", v2] >>> f = Field(v2) >>> res = next(f.specialize()) >>> res # doctest: +SKIP b'\x03!\x03!\x03!\x03' >>> >>> # Test parsing >>> f.abstract(res) # doctest: +SKIP OrderedDict([('Field', b'\x01!\x01')]) **Modeling indirect recursion, more complex example** The following syntax provides a way to parse and specialize a subset of mathematical expressions including pair group of parentheses, digits from 0 to 9 and two arithmetic operators ('+' and '*'). .. productionlist:: num: "0" | "1" | "2" | "3" | "4" | "5" | "6" | "7" | "8" | "9" operator: "+" | "*" operation: left [right] left: num | subop right: operator operation subop: "(" operation ")" The following examples **should** be compatible with these expressions:: 1 + 2 1 + 2 + 3 1 + (2 + 3) (1 + 2) + 3 (1 + 2) + 3 + 4 1 + (2 * 3) + (4 * 5) 1 + (2 * (3 + 4)) + 5 1 + ((2 * 3) * 4) * 5 These last expressions **should not** be compatible with these expressions:: 1 1 ** 2 1 * (2 * 3 1 * This example of indirect recursion introduces a recursion of the `operation` statement, called in the `subop` statement. >>> from netzob.all import * >>> num = Alt(["0", "1", "2", "3", "4", "5", "6", "7", "8", "9"]) >>> operator = Alt([" + ", " * "]) >>> operation = Agg([], last_optional=True) >>> subop = Agg(["(", operation, ")"]) >>> left = Alt([num, subop]) >>> right = Agg([operator, operation]) >>> operation.children += [left, right] >>> sym = Symbol([Field(operation)]) >>> next(sym.specialize()) # doctest: +SKIP b'((((4 * 8 * 4) + 5 + 9 + 0) * 7 * 0 + (4 + 9 + (3 * 4 + 2) * 0) * 9) + 4 * 7)' """ @public_api def __init__(self, children=None, last_optional=False, name=None): super(Agg, self).__init__(self.__class__.__name__, children=children, name=name) self._last_optional = last_optional
[docs] @public_api def copy(self, map_objects=None): """Copy the current object as well as all its dependencies. :return: A new object of the same type. :rtype: :class:`Agg <netzob.Model.Vocabulary.Domain.Variables.Nodes.Agg.Agg>` """ if map_objects is None: map_objects = {} if self in map_objects: return map_objects[self] new_agg = Agg([], last_optional=self._last_optional) map_objects[self] = new_agg new_children = [] for child in self.children: if child in map_objects.keys(): new_children.append(map_objects[child]) else: new_child = child.copy(map_objects) new_children.append(new_child) new_agg.children = new_children return new_agg
@typeCheck(ParsingPath) def parse(self, parsingPath, acceptCallBack=True, carnivorous=False, triggered=False): """Parse the content with the definition domain of the aggregate. """ dataToParse = parsingPath.getData(self).copy() self._logger.debug("Parse '{}' as {} with parser path '{}'".format(dataToParse.tobytes(), self, parsingPath)) # Clean parsed data associated to children (needed if we are in a iteration of a Repeat) for child in self.children: if parsingPath.hasData(child): parsingPath.removeData(child) # initialy, there is a unique path to test (the provided one) parsingPath.assignData(dataToParse.copy(), self.children[0]) try: for path in self._inner_parse(parsingPath, 0, False, carnivorous): parsedData = None for child in self.children: if path.hasData(child): child_data = path.getData(child).copy() if parsedData is None: parsedData = child_data else: parsedData += child_data if parsedData is not None: self._logger.debug("Agg data successfuly parsed with {}: '{}'".format(self, parsedData.tobytes())) path.addResult(self, parsedData) yield path except Exception as e: pass def _inner_parse(self, parsingPath, i_child, all_parsed, carnivorous): # we parse all the children with the parserPaths produced by previous children # Handle optional field situation, where all data may have already been parsed before the last field if all_parsed is True: yield parsingPath current_child = self.children[i_child] if i_child < len(self.children) - 1: next_child = self.children[i_child + 1] else: next_child = None self._logger.debug("Parse {} (child {}/{}) with {}".format(current_child, i_child + 1, len(self.children), parsingPath)) value_before_parsing = parsingPath.getData(current_child).copy() childParsingPaths = current_child.parse(parsingPath, carnivorous=carnivorous) for childParsingPath in childParsingPaths: value_after_parsing = childParsingPath.getData(current_child).copy() remainingValue = value_before_parsing[len(value_after_parsing):].copy() self._logger.debug("Children {} succesfuly applied with the parsingPath {}".format(current_child, childParsingPath)) if next_child is not None: # Handle optional field if len(remainingValue) == 0 and i_child == len(self.children) - 2 and self._last_optional: all_parsed = True # Else send the remaining data to the last field else: childParsingPath.assignData(remainingValue, next_child) # Recursive call to parse next child try: yield from self._inner_parse(childParsingPath, i_child + 1, all_parsed, carnivorous) except Exception as e: pass else: # Final child has been parsed yield childParsingPath else: raise ParsingException() @typeCheck(SpecializingPath) def specialize(self, specializingPath, acceptCallBack=True, preset=None, triggered=False): """Specializes an Agg""" from netzob.Fuzzing.Mutator import MaxFuzzingException # If we are in a fuzzing mode if preset is not None and preset.get(self) is not None: # Retrieve the mutator mutator = preset.get(self) # As the current node variable is preset, we set its children to be inaccessible when targeted by another field/variable for child in self.children: specializingPath.setInaccessibleVariableRecursively(child) if mutator.mode == FuzzingMode.FIXED: while True: generated_value = mutator.generate() if isinstance(generated_value, bitarray): value = generated_value else: value = bitarray(endian='big') value.frombytes(generated_value) specializingPath.addResult(self, value) yield specializingPath for path in self._inner_specialize(specializingPath, 0, preset): try: # Just call the generate() method to increment the counter of mutation mutator.generate() except MaxFuzzingException: self._logger.debug("Maximum mutation counter reached") break yield path else: yield from self._inner_specialize(specializingPath, 0, preset) def _inner_specialize(self, specializingPath, idx, preset): # Select the child to specialize child = self.children[idx] self._logger.debug("Specialize {0} child with {1}".format(child, specializingPath)) specialize_last_child = True if len(self.children) - 1 == idx and self._last_optional: self._logger.debug("Last child is optional") # Randomely select if we are going to specialize the last child specialize_last_child = random.choice([True, False]) if specialize_last_child: self._logger.debug("Last child is optional, and this option is taken") else: self._logger.debug("Last child is optional, and this option is not taken") self._produce_data(specializingPath, specialize_last_child) yield specializingPath return # Handle self recursivity if type(child) == type and child == SELF: # Nothing to specialize in this case (the recursive specialization is done later) childSpecializingPaths = (specializingPath, ) else: if not specializingPath.hasData(child): childSpecializingPaths = child.specialize(specializingPath, preset=preset) else: self._logger.debug("Not specializing the AGG.child as it has already a data") childSpecializingPaths = (specializingPath, ) for path in childSpecializingPaths: # Handle recursive mode if type(child) == type and child == SELF and specialize_last_child: if path.hasData(self): newResult = path.getData(self) else: newResult = bitarray('') for inner_path in self._inner_specialize(path, idx, preset): if inner_path.hasData(self): current_value = inner_path.getData(self) newResult = newResult + current_value self._logger.debug("Cumulative generated value for {}: {}".format(self, newResult.tobytes())) if idx == len(self.children) - 1: self._produce_data(path, specialize_last_child) self._logger.debug("End of specialization for AGG '{}'".format(self)) yield path else: yield from self._inner_specialize(path, idx + 1, preset) def _produce_data(self, path, specialize_last_child): data = bitarray() for idx, child in enumerate(self.children): if len(self.children) - 1 == idx and not specialize_last_child: pass elif type(child) == type and child == SELF: pass else: if path.hasData(child): data += path.getData(child) else: self._logger.debug("At least one AGG child ('{}') has no content, therefore we don't produce content for the AGG".format(child)) self._logger.debug("Callback registered on ancestor node: '{}'".format(self)) self._logger.debug("Callback registered due to absence of content in target: '{}'".format(child)) path.registerVariablesCallBack( [child], self, parsingCB=False) return self._logger.debug("Generated value for {}: {}".format(self, data.tobytes())) path.addResult(self, data)
def _test_agg(): r""" >>> from netzob.all import * >>> Conf.apply() ## Size field on the right Size field targeting a field containing a agg variable, with size field on the right: >>> f1 = Field(Agg(["A", "B", "C"]), name='f1') >>> f2 = Field(Size(f1, dataType=uint8()), name='f2') >>> s = Symbol([f2, f1]) >>> d = next(s.specialize()) >>> d b'\x03ABC' >>> s.abstract(d) OrderedDict([('f2', b'\x03'), ('f1', b'ABC')]) Size field targeting a agg variable, with size field on the right: >>> v1 = Agg(["A", "B", "C"]) >>> v2 = Size(v1, dataType=uint8()) >>> s = Symbol([Field(v2, name='f1'), Field(v1, name='f2')]) >>> d = next(s.specialize()) >>> d b'\x03ABC' >>> s.abstract(d) OrderedDict([('f1', b'\x03'), ('f2', b'ABC')]) ## Size field on the left Size field targeting a field containing a agg variable, with size field on the left: >>> f1 = Field(Agg(["A", "B", "C"]), name='f1') >>> f2 = Field(Size(f1, dataType=uint8()), name='f2') >>> s = Symbol([f1, f2]) >>> d = next(s.specialize()) >>> d b'ABC\x03' >>> s.abstract(d) OrderedDict([('f1', b'ABC'), ('f2', b'\x03')]) Size field targeting a agg variable, with size field on the left: >>> v1 = Agg(["A", "B", "C"]) >>> v2 = Size(v1, dataType=uint8()) >>> s = Symbol([Field(v1, name='f1'), Field(v2, name='f2')]) >>> d = next(s.specialize()) >>> d b'ABC\x03' >>> s.abstract(d) OrderedDict([('f1', b'ABC'), ('f2', b'\x03')]) ## Value field on the right Value field targeting a field containing a agg variable, with value field on the right: >>> f1 = Field(Agg(["A", "B", "C"]), name='f1') >>> f2 = Field(Value(f1), name='f2') >>> s = Symbol([f2, f1]) >>> d = next(s.specialize()) >>> d b'ABCABC' >>> s.abstract(d) OrderedDict([('f2', b'ABC'), ('f1', b'ABC')]) Value field targeting a agg variable, with value field on the right: >>> v1 = Agg(["A", "B", "C"]) >>> v2 = Value(v1) >>> s = Symbol([Field(v2, name='f2'), Field(v1, name='f1')]) >>> d = next(s.specialize()) >>> d b'ABCABC' >>> s.abstract(d) OrderedDict([('f2', b'ABC'), ('f1', b'ABC')]) ## Value field on the left Value field targeting a field containing a agg variable, with value field on the left: >>> f1 = Field(Agg(["A", "B", "C"]), name='f1') >>> f2 = Field(Value(f1), name='f2') >>> s = Symbol([f1, f2]) >>> d = next(s.specialize()) >>> d b'ABCABC' >>> s.abstract(d) OrderedDict([('f1', b'ABC'), ('f2', b'ABC')]) Value field targeting a agg variable, with value field on the left: >>> v1 = Agg(["A", "B", "C"]) >>> v2 = Value(v1) >>> s = Symbol([Field(v1, name='f1'), Field(v2, name='f2')]) >>> d = next(s.specialize()) >>> d b'ABCABC' >>> s.abstract(d) OrderedDict([('f1', b'ABC'), ('f2', b'ABC')]) """ def _test_agg_complex(): r""" ## Create a complex structure that contains multiple Aggregates with a Checksum >>> from netzob.all import * >>> v1 = Data(Raw(b'1'), name="V1") >>> v2 = Data(Raw(b'2'), name="V2") >>> icmp_ext_checksum = InternetChecksum([], dataType=Raw(nbBytes=2, unitSize=UnitSize.SIZE_16)) >>> extension_header = Agg([v1, v2, icmp_ext_checksum], name='AGG extension_header') >>> v4 = Data(Raw(b'AAAA'), name="V4") >>> icmp_extension = Field(Agg([extension_header, v4], name='AGG icmp_extension'), "icmp.ext") >>> icmp_ext_checksum.targets = [v1, v2, icmp_ext_checksum, v4] >>> s = Symbol([icmp_extension]) >>> data = next(s.specialize()) >>> data b'12LKAAAA' >>> s.abstract(data) OrderedDict([('icmp.ext', b'12LKAAAA')]) """