From c701b9559b649aa96fd138000ea93689b435df63 Mon Sep 17 00:00:00 2001 From: david Date: Fri, 10 Apr 2009 19:07:39 +0000 Subject: [PATCH] Merge from /nmap-exp/david/ndiff-mkii and /nmap-exp/david/zenmap-ndiff-alt. This is the new Ndiff output format described in http://seclists.org/nmap-dev/2009/q1/0825.html and http://seclists.org/nmap-dev/2009/q2/0127.html. --- CHANGELOG | 5 + ndiff/README | 104 ++-- ndiff/docs/ndiff.1 | 88 ++-- ndiff/docs/ndiff.dtd | 229 ++++---- ndiff/docs/ndiff.xml | 47 +- ndiff/ndiff | 1181 ++++++++++++++++++++++++------------------ ndiff/ndifftest.py | 1086 +++++++++++++++++++++----------------- 7 files changed, 1449 insertions(+), 1291 deletions(-) diff --git a/CHANGELOG b/CHANGELOG index b2d940906..c4473e009 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,5 +1,10 @@ # Nmap Changelog ($Id$); -*-text-*- +o [Ndiff] The output has been changed to look like Nmap's output and + be easier to read. See the Ndiff README file for an example. The XML + output is now based on Nmap's XML output as well. Zenmap's diff + viewer shows the new output with syntax highlighting. [David] + o [Zenmap] The "Cancel" has been restored to the main screen. It will cancel the scan that is currently being displayed. diff --git a/ndiff/README b/ndiff/README index 26715fba9..4b71c6523 100644 --- a/ndiff/README +++ b/ndiff/README @@ -2,80 +2,56 @@ Ndiff Ndiff is a tool to aid in the comparison of Nmap scans. Specifically, it takes two Nmap XML output files and prints the differences between them: -hosts coming up and down, ports becoming open or closed, and things like -that. +hosts coming up and down, ports becoming open or closed, service and OS +changes. To install, run (as root) python setup.py install It's also possible to run the program from within the distribution without installing it. -Use "ndiff --help" for usage instructions. Output can be in -human-readable text format ("ndiff --text") or machine-readable XML -format ("ndiff --xml"). +Use "ndiff --help" for usage instructions. -Here is a sample of the text output: +Here is a sample of the output: - $ ./ndiff test-scans/random-1.xml test-scans/random-2.xml - Thu Sep 11 11:39:32 2008 -> Tue Sep 16 13:59:22 2008 - cuvtdnray-504.example.com (10.214.143.33): - Host is up, was unknown. - Add ipv4 address 10.214.143.33. - Add hostname cuvtdnray-504.example.com. - +3389/tcp open microsoft-rdp Microsoft Terminal Service - 999 tcp ports are filtered. - scnqxez-842.example.com (10.189.71.117): - Remove hostname scnqxez-842.example.com. - 10.226.19.80: - -21/tcp filtered - +21/tcp open ftp Netgear broadband router ftpd 1.0 - -23/tcp filtered - +23/tcp open telnet Netgear broadband router admin telnetd - -80/tcp filtered - +80/tcp open http Embedded Allegro RomPager webserver 4.07 UPnP/1.0 (ZyXEL ZyWALL 2) - -8701/tcp open unknown - +8701/tcp filtered - ywnleu-108.example.com (10.242.160.155): - Host is up, was unknown. - Add ipv4 address 10.242.160.155. - Add hostname ywnleu-108.example.com. - 1000 tcp ports are filtered. - fiyrownc-307.example.com (10.65.53.252): - Host is unknown, was up. - Remove ipv4 address 10.65.53.252. - Remove hostname fiyrownc-307.example.com. - -8089/tcp open upnp Microsoft Windows UPnP - 999 tcp ports changed state from filtered to unknown. +$ ./ndiff test-scans/random-1.xml test-scans/random-2.xml +-Nmap 4.75 at 2008-09-11 11:39 ++Nmap 4.76 at 2008-09-16 13:59 -Here is an abbreviated sample of the XML output: +-scnqxez-842.example.com (10.189.71.117): ++10.189.71.117: + Host appears to be up. + Not shown: 995 filtered ports + PORT STATE SERVICE VERSION + 20/tcp closed ftp-data + 21/tcp open ftp ProFTPD 1.3.1 + 80/tcp open http Apache httpd + 443/tcp open http Apache httpd + 873/tcp closed rsync - $ ./ndiff --xml test-scans/random-1.xml test-scans/random-2.xml - - - - -
- - - -
- - - - - - - - - -
- - - - - - - ++cuvtdnray-504.example.com (10.214.143.33): ++Host appears to be up. ++Not shown: 999 filtered ports ++PORT STATE SERVICE VERSION ++3389/tcp open microsoft-rdp Microsoft Terminal Service + + 10.226.19.80: + Host appears to be up. +-Not shown: 999 filtered ports ++Not shown: 997 filtered ports + PORT STATE SERVICE VERSION +-21/tcp filtered ++21/tcp open ftp Netgear broadband router ftpd 1.0 +-23/tcp filtered ++23/tcp open telnet Netgear broadband router admin telnetd +-80/tcp filtered ++80/tcp open http Embedded Allegro RomPager webserver 4.07 UPnP/1.0 (ZyXEL ZyWALL 2) +-8701/tcp open unknown ++8701/tcp filtered + + +Use -v or --verbose to see all hosts and ports, not just those that have +changed. Ndiff started as a project by Michael Pattrick during the 2008 Google Summer of Code. Michael designed the program and diff --git a/ndiff/docs/ndiff.1 b/ndiff/docs/ndiff.1 index abfe11ac7..05a5ddb9d 100644 --- a/ndiff/docs/ndiff.1 +++ b/ndiff/docs/ndiff.1 @@ -1,103 +1,73 @@ +'\" t .\" Title: ndiff -.\" Author: -.\" Generator: DocBook XSL Stylesheets v1.73.2 -.\" Date: 09/18/2008 -.\" Manual: -.\" Source: +.\" Author: [see the "Authors" section] +.\" Generator: DocBook XSL Stylesheets v1.74.3 +.\" Date: 03/25/2009 +.\" Manual: [FIXME: manual] +.\" Source: [FIXME: source] +.\" Language: English .\" -.TH "NDIFF" "1" "09/18/2008" "" "" +.TH "NDIFF" "1" "03/25/2009" "[FIXME: source]" "[FIXME: manual]" +.\" ----------------------------------------------------------------- +.\" * set default formatting +.\" ----------------------------------------------------------------- .\" disable hyphenation .nh .\" disable justification (adjust text to left margin only) .ad l +.\" ----------------------------------------------------------------- +.\" * MAIN CONTENT STARTS HERE * +.\" ----------------------------------------------------------------- .SH "NAME" -ndiff - Utility to compare the results of Nmap scans +ndiff \- Utility to compare the results of Nmap scans .SH "SYNOPSIS" -.HP 6 -\fBndiff\fR [\fIoptions\fR] {\fI\fIa\.xml\fR\fR} {\fI\fIb\.xml\fR\fR} +.HP \w'\fBndiff\fR\ 'u +\fBndiff\fR [\fIoptions\fR] {\fIa\&.xml\fR} {\fIb\&.xml\fR} .SH "DESCRIPTION" .PP -Ndiff is a tool to aid in the comparison of Nmap scans\. Specifically, it takes two Nmap XML output files and prints the differences between them: hosts coming up and down, ports becoming open or closed, and things like that\. +Ndiff is a tool to aid in the comparison of Nmap scans\&. It takes two Nmap XML output files and prints the differences between them: hosts coming up and down, ports becoming open or closed, etc\&. .PP -Ndiff compares two scans at a time\. The -\(lqbefore\(rq -scan is called the A scan and the -\(lqafter\(rq -scan is the B scan\. The letters A and B are used to avoid giving the impression that scans must be given in time order\. They do not; it\'s possible to get a -\(lqbackward\(rq -diff from a newer scan to an older scan\. -.PP -Ndiff can produce output in human\-readable text or machine\-readable XML formats\. Use the -\fB\-\-text\fR -and -\fB\-\-xml\fR -options to control which\. Output goes to standard output\. +Ndiff compares two scans at a time\&. The first scan is called the A scan and the second scan is called the B scan\&. .SH "OPTIONS SUMMARY" .PP \fB\-h\fR, \fB\-\-help\fR .RS 4 -Show a help message and exit\. +Show a help message and exit\&. .RE .PP \fB\-v\fR, \fB\-\-verbose\fR .RS 4 -Do not consolidate long port lists into a simple count\. When a host is up in the B scan that was not present in the A scan, commonly most of its ports will change from the state "unknown" to "closed" or "filtered"\. If the port list is very long, it will be consolidated into a line like -.sp -.RS 4 -.nf -994 tcp ports changed state from unknown to filtered\. -.fi -.RE -.sp -With -\fB\-\-verbose\fR, all 994 ports will be listed: -.sp -.RS 4 -.nf -The following tcp ports changed state from unknown to filtered: - 1,3,4,6,7,9,13,17,19\-21,23,24,26,30,32, -33,37,42,43,49,79,81\-85,88\-90,99,100,106,109\-11 -1,119,125,135,139,143,144,146,161,163,179,199,2 -.fi -.RE -.sp -and so on\. -.sp -In XML output, every port is always listed explictly\. -\fB\-\-verbose\fR -has no effect\. +Include all hosts and ports in the output, not only those that have changed\&. .RE .PP \fB\-\-text\fR .RS 4 -Write output in human\-readable text format\. +Write output in human\-readable text format\&. .RE .PP \fB\-\-xml\fR .RS 4 -Write output in machine\-readable text format\. For a description of the XML format see the -\fInmap\.dtd\fR -file in the Ndiff distribution\. +This option is not currently supported\&. .RE .PP -Any other arguments are taken to be the names of Nmap XML output files\. There must be exactly two\. The first one listed is the A scan and the second is the B scan\. +Any other arguments are taken to be the names of Nmap XML output files\&. There must be exactly two\&. The first one listed is the A scan and the second is the B scan\&. .SH "BUGS" .PP Report bugs to the nmap\-dev mailing list at -\. +nmap\-dev@insecure\&.org\&. .SH "HISTORY" .PP -Ndiff started as a project by Michael Pattrick during the 2008 Google Summer of Code\. Michael designed the program and led the discussion of its output formats\. He wrote versions of the program in Perl and C++, but the summer ended shortly after it was decided to rewrite the program in Python for the sake of Windows compatibility\. This Python version is written by David Fifield\. +Ndiff started as a project by Michael Pattrick during the 2008 Google Summer of Code\&. Michael designed the program and led the discussion of its output formats\&. He wrote versions of the program in Perl and C++, but the summer ended shortly after it was decided to rewrite the program in Python for the sake of Windows (and Zenmap) compatibility\&. This Python version was written by David Fifield\&. .SH "AUTHORS" .PP David Fifield - +david@bamsoftware\&.com .PP Michael Pattrick - +mpattrick@rhinovirus\&.org .SH "WEB SITE" .PP -\fI\%http://nmap.org/ndiff/\fR +\m[blue]\fB\%http://nmap.org/ndiff/\fR\m[] diff --git a/ndiff/docs/ndiff.dtd b/ndiff/docs/ndiff.dtd index 069d9539e..93c816afe 100644 --- a/ndiff/docs/ndiff.dtd +++ b/ndiff/docs/ndiff.dtd @@ -2,170 +2,123 @@ DTD for the Ndiff XML output format. David Fifield -Ndiff compares two scans at a time. The "before" and "after" scans are -called the A and B scans, respectively. Some of the XML output uses this -convention, for example the a-start and b-start attributes of the -scandiff element. +Ndiff XML output is similar to Nmap XML output. Inside of the root +nmapdiff element, the scandiff element represents a single diff of two +scans. Inside scandiff, host differences are within hostdiff elements +and port differences are within portdiff elements. These may appear +anywhere where host and port elements would appear, respectively. -The scandiff element represents a single diff of an A and B scan. Within -it are zero or more host elements. At the beginning of each host element -is any number of address and hostname elements, used to identify it. The -addresses and hostnames are taken from the A scan, unless the host was -not present in the A scan, in which case they come from the B scan. -Therefore they may not represent the final status of the host "after" -the diff; the addresses and hostnames may have changed between the A and -B scans. +Within a hostdiff or portdiff, differences are shown with a and b +elements, which show the state of things in the A and B +scan, respectively. These elements can appear most places in the output. -Following the address and hostname elements is an ordered list of -elements, each representing one diff "hunk." A hunk is an atomic -difference operation. For example, the host-state-change element -represents a host changing its state, perhaps from "unknown" to "up". -See the comments above each diff hunk element for a precise description -of what they mean. +The output may contain hosts and ports that haven't changed. These are +stored in normal host and port elements. -The order of diff hunks can matter. For example, - - -is different than the opposite order - - -The first order means, "Change the state of port 100/tcp from open to -closed, then swap ports 100/tcp and 200/tcp." If port 200/tcp was -initially filtered, this results in - PORT STATE - 100/tcp filtered - 200/tcp closed -The second order means, "Swap ports 100/tcp and 200/tcp, then change the -state of port 100/tcp from open to closed." In this case, port 200/tcp -must have originally been open. If port 100/tcp was initially filtered, -this results in - PORT STATE - 100/tcp closed - 200/tcp filtered +Examples: +A host in the A scan but not in the B scan. + + + ... + + +A host in the B scan but not in the A scan. + + + ... + + +A host that gained a hostname and had a port change state. Port 22 is +unchanged. + + + + + + + + + + + + + + + + + + + + + + + + + --> - + + - - - + + - - - + - + + + + + + + + + - + - + - - - - - + - - + + - - + + - - + + - - - + + - - - - - - - - - - - - - - - - - + + + diff --git a/ndiff/docs/ndiff.xml b/ndiff/docs/ndiff.xml index 19c1f9c64..d98884568 100644 --- a/ndiff/docs/ndiff.xml +++ b/ndiff/docs/ndiff.xml @@ -30,25 +30,14 @@ Description - Ndiff is a tool to aid in the comparison of Nmap scans. Specifically, it - takes two Nmap XML output files and prints the differences between them: - hosts coming up and down, ports becoming open or closed, etc. + Ndiff is a tool to aid in the comparison of Nmap scans. It takes two + Nmap XML output files and prints the differences between them: hosts + coming up and down, ports becoming open or closed, etc. - Ndiff compares two scans at a time. The before scan - is called the A scan and the after scan is the B - scan. The letters A and B are used to avoid giving the impression - that scans must be given in time order. They do not; it's possible - to get a backward diff from a newer scan to an older - scan. - - - - Ndiff can produce output in human-readable text or machine-readable - XML formats. Use the and - options to control which. Output goes to - standard output. + Ndiff compares two scans at a time. The first scan is called the A + scan and the second scan is called the B scan. @@ -60,7 +49,7 @@ - + Show a help message and exit. @@ -70,24 +59,8 @@ - Do not consolidate long port lists into a simple count. When - a host is up in the B scan that was not present in the A scan, - commonly most of its ports will change from the state - "unknown" to "closed" or "filtered". If the port list is very - long, it will be consolidated into a line like -994 tcp ports changed state from unknown to filtered. - - With , all 994 ports will be listed: -The following tcp ports changed state from unknown to filtered: - 1,3,4,6,7,9,13,17,19-21,23,24,26,30,32, -33,37,42,43,49,79,81-85,88-90,99,100,106,109-11 -1,119,125,135,139,143,144,146,161,163,179,199,2 - - and so on. - - - In XML output, every port is always listed explictly. - has no effect. + Include all hosts and ports in the output, not only those that + have changed. @@ -103,9 +76,7 @@ - Write output in machine-readable text format. For a - description of the XML format see the - nmap.dtd file in the Ndiff distribution. + This option is not currently supported. diff --git a/ndiff/ndiff b/ndiff/ndiff index 85323fcd5..73c558605 100755 --- a/ndiff/ndiff +++ b/ndiff/ndiff @@ -14,191 +14,34 @@ # based on a design by Michael Pattrick import datetime +import difflib import getopt import sys import time import xml.sax import xml.dom.minidom -# Port state transitions with more than this many occurrences are consolidated -# into one text output entry. -PORT_STATE_CHANGE_CONSOLIDATION_THRESHOLD = 10 -# Consolidated port state ranges whose *character length* is greater than this -# will be doubly consolidated into just a count of ports in text output. -PORT_STATE_CHANGE_DOUBLE_CONSOLIDATION_CHAR_THRESHOLD = 80 +verbose = False -class Port(object): - """A single port, consisting of a port specification, a state, and a service - version. A specification, or "spec," is the 2-tuple (number, protocol). So - (10, "tcp") corresponds to the port 10/tcp. Port states are strings.""" - # This represents an "unknown" port state, the state a port is in when it - # has not been scanned. It must not compare equal with any real Nmap port - # state like "open", "closed", etc. For future compatibility's sake, always - # compare against Port.UNKNOWN, not the literal string "unknown". - UNKNOWN = "unknown" - - def __init__(self, spec, state = None): - self.spec = spec - self.state = state or Port.UNKNOWN - self.service = Service() - - def get_state_string(self): - return Port.state_to_string(self.state) - - def state_to_string(state): - return unicode(state) - state_to_string = staticmethod(state_to_string) - - def spec_to_string(spec): - return u"%d/%s" % spec - spec_to_string = staticmethod(spec_to_string) - -class PortDict(dict): - """This is a dict that creates and inserts a new Port on demand when a - looked-up value isn't found.""" - def __getitem__(self, key): - try: - port = dict.__getitem__(self, key) - except KeyError: - port = Port(key) - self[key] = port - return port - - def __len__(self): - raise ValueError(u"__len__ is not defined for objects of type PortDict.") - -class Service(object): - """A service version as determined by -sV scan. Also contains the looked-up - port name if -sV wasn't used.""" - def __init__(self): - self.name = None - self.product = None - self.version = None - self.extrainfo = None - # self.hostname = None - # self.ostype = None - # self.devicetype = None - # self.tunnel = None - - def __eq__(self, other): - return self.name == other.name \ - and self.product == other.product \ - and self.version == other.version \ - and self.extrainfo == other.extrainfo - - def to_string(self): - """Get a string like in the SERVICE column of Nmap output.""" - if self.name is None: - return u"" - else: - return self.name - - def version_to_string(self): - """Get a string like in the VERSION column of Nmap output.""" - parts = [] - if self.product is not None: - parts.append(self.product) - if self.version is not None: - parts.append(self.version) - if self.extrainfo is not None: - parts.append(u"(%s)" % self.extrainfo) - return u" ".join(parts) - -class Host(object): - """A single host, with a state (unknown, up, or down), addresses, and a - dict mapping port specs to Ports.""" - # This represents an "unknown" host state, the state a host is in when it - # hasn't been scanned. It must not compare equal with the real Nmap host - # states "up" and "down". For future compatibility's sake, always compare - # against Host.UNKNOWN, not the literal string "unknown". - UNKNOWN = "unknown" - - def __init__(self): - self.state = self.UNKNOWN - # Addresses are represented as (address_type, address) tuples. - self.addresses = [] - self.hostnames = [] - self.ports = PortDict() - self.os = [] - - def get_id(self): - """Return an id that is used to determine if hosts are "the same" across - scans.""" - if len(self.addresses) > 0: - return sorted(self.addresses)[0] - if len(self.hostnames) > 0: - return sorted(self.hostnames)[0] - return id(self) - - def format_name(self): - """Return a human-readable identifier for this host.""" - address = None - for address_type in (u"ipv4", u"ipv6", u"mac"): - addrs = [addr for addr in self.addresses if addr[0] == address_type] - if len(addrs) > 0: - address = addrs[0][1] - break - hostname = None - if len(self.hostnames) > 0: - hostname = self.hostnames[0] - if hostname is not None: - if address is not None: - return u"%s (%s)" % (hostname, address) - else: - return hostname - elif address is not None: - return address - - return unicode(id(self)) - - def add_port(self, port): - self.ports[port.spec] = port - - def swap_ports(self, spec_a, spec_b): - """Swap the ports given by the two specs. This is used when a service is - moved from one port to another.""" - port_a = self.ports[spec_a] - port_b = self.ports[spec_b] - assert port_a.spec == spec_a - assert port_b.spec == spec_b - port_a.spec, port_b.spec = port_b.spec, port_a.spec - self.ports[spec_a], self.ports[spec_b] = \ - self.ports[spec_b], self.ports[spec_a] - - def get_known_ports(self): - """Return a list of all this host's Ports that are not in the unknown - state.""" - return [p for p in self.ports.values() if p.state != Port.UNKNOWN] - - def add_address(self, address_type, address): - if address not in self.addresses: - self.addresses.append((address_type, address)) - - def remove_address(self, address_type, address): - try: - self.addresses.remove((address_type, address)) - except ValueError: - pass - - def add_hostname(self, hostname): - if hostname not in self.hostnames: - self.hostnames.append(hostname) - - def remove_hostname(self, hostname): - try: - self.hostnames.remove(hostname) - except ValueError: - pass +NDIFF_XML_VERSION = u"1" class Scan(object): """A single Nmap scan, corresponding to a single invocation of Nmap. It is a container for a list of hosts. It also has utility methods to load itself from an Nmap XML file.""" def __init__(self): + self.version = None self.start_date = None self.end_date = None self.hosts = [] + def find_host(self, id): + for host in self.hosts: + if host.get_id() == id: + return host + else: + return None + def load(self, f): """Load a scan from the Nmap XML in the file-like object f.""" parser = xml.sax.make_parser() @@ -214,385 +57,708 @@ class Scan(object): finally: f.close() +class Host(object): + """A single host, with a state, addresses, host names, a dict mapping port + specs to Ports, and a list of OS matches. Host states are strings, or None + for "unknown".""" + def __init__(self): + self.state = None + self.addresses = [] + self.hostnames = [] + self.ports = {} + self.extraports = {} + self.os = [] -# What follows are all the possible diff hunk types. Each is capable of redering -# itself to a string or to an XML DOM fragment. -class DiffHunk(object): - """A DiffHunk is a single unit of a diff. Each one represents one atomic - change: a host state change, port state change, and so on.""" + def get_id(self): + """Return an id that is used to determine if hosts are "the same" across + scans.""" + if len(self.addresses) > 0: + return str(sorted(self.addresses)[0]) + if len(self.hostnames) > 0: + return str(sorted(self.hostnames)[0]) + return id(self) - def to_string(self): - """Return a human-readable string representation of this hunk.""" - raise Exception(u"The to_string method must be overridden.") + def format_name(self): + """Return a human-readable identifier for this host.""" + address_s = u", ".join(a.s for a in sorted(self.addresses)) + hostname_s = u", ".join(sorted(self.hostnames)) + if len(hostname_s) > 0: + if len(address_s) > 0: + return u"%s (%s)" % (hostname_s, address_s) + else: + return hostname_s + elif len(address_s) > 0: + return address_s + else: + return u"" - def to_dom_fragment(self, document): - """Return an XML DocumentFragment representation of this hunk.""" - raise Exception(u"The to_dom_fragment method must be overridden.") + def add_port(self, port): + self.ports[port.spec] = port -class HostStateChangeHunk(DiffHunk): - def __init__(self, a_state, b_state): - self.a_state = a_state - self.b_state = b_state + def add_address(self, address): + if address not in self.addresses: + self.addresses.append(address) - def to_string(self): - return u"Host is %s, was %s." % (self.b_state, self.a_state) + def add_hostname(self, hostname): + if hostname not in self.hostnames: + self.hostnames.append(hostname) - def to_dom_fragment(self, document): + def is_extraports(self, state): + return state is None or state in self.extraports + + def extraports_string(self): + list = [(count, state) for (state, count) in self.extraports.items()] + # Reverse-sort by count. + list.sort(reverse = True) + return u", ".join([u"%d %s ports" % (count, state) for (count, state) in list]) + + def state_to_dom_fragment(self, document): frag = document.createDocumentFragment() - elem = document.createElement(u"host-state-change") - elem.setAttribute(u"a-state", self.a_state) - elem.setAttribute(u"b-state", self.b_state) + if self.state is not None: + elem = document.createElement(u"status") + elem.setAttribute(u"state", self.state) + frag.appendChild(elem) + return frag + + def hostname_to_dom_fragment(self, document, hostname): + frag = document.createDocumentFragment() + elem = document.createElement(u"hostname") + elem.setAttribute(u"name", hostname) frag.appendChild(elem) return frag -class HostAddressAddHunk(DiffHunk): - def __init__(self, address_type, address): - self.address_type = address_type - self.address = address - - def to_string(self): - return u"Add %s address %s." % (self.address_type, self.address) - - def to_dom_fragment(self, document): + def extraports_to_dom_fragment(self, document): frag = document.createDocumentFragment() - elem = document.createElement(u"host-address-add") - frag.appendChild(elem) - address_elem = document.createElement(u"address") - address_elem.setAttribute(u"addrtype", self.address_type) - address_elem.setAttribute(u"addr", self.address) - elem.appendChild(address_elem) + for state, count in self.extraports.items(): + elem = document.createElement(u"extraports") + elem.setAttribute(u"state", state) + elem.setAttribute(u"count", unicode(count)) + frag.appendChild(elem) return frag -class HostAddressRemoveHunk(DiffHunk): - def __init__(self, address_type, address): - self.address_type = address_type - self.address = address - - def to_string(self): - return u"Remove %s address %s." % (self.address_type, self.address) - - def to_dom_fragment(self, document): + def os_to_dom_fragment(self, document, os): frag = document.createDocumentFragment() - elem = document.createElement(u"host-address-remove") - frag.appendChild(elem) - address_elem = document.createElement(u"address") - address_elem.setAttribute(u"addrtype", self.address_type) - address_elem.setAttribute(u"addr", self.address) - elem.appendChild(address_elem) - return frag - -class HostHostnameAddHunk(DiffHunk): - def __init__(self, hostname): - self.hostname = hostname - - def to_string(self): - return u"Add hostname %s." % self.hostname - - def to_dom_fragment(self, document): - frag = document.createDocumentFragment() - elem = document.createElement(u"host-hostname-add") - frag.appendChild(elem) - hostname_elem = document.createElement(u"hostname") - hostname_elem.setAttribute(u"name", self.hostname) - elem.appendChild(hostname_elem) - return frag - -class HostHostnameRemoveHunk(DiffHunk): - def __init__(self, hostname): - self.hostname = hostname - - def to_string(self): - return u"Remove hostname %s." % self.hostname - - def to_dom_fragment(self, document): - frag = document.createDocumentFragment() - elem = document.createElement(u"host-hostname-remove") - frag.appendChild(elem) - hostname_elem = document.createElement("hostname") - hostname_elem.setAttribute(u"name", self.hostname) - elem.appendChild(hostname_elem) - return frag - -class HostOsAddHunk(DiffHunk): - def __init__(self, os): - self.os = os - - def to_string(self): - return u"Add OS \"%s\"." % self.os - - def to_dom_fragment(self, document): - frag = document.createDocumentFragment() - elem = document.createElement(u"host-os-add") - elem.setAttribute(u"name", self.os) + elem = document.createElement(u"osmatch") + elem.setAttribute(u"name", os) frag.appendChild(elem) return frag -class HostOsRemoveHunk(DiffHunk): - def __init__(self, os): - self.os = os - - def to_string(self): - return u"Remove OS \"%s\"." % self.os - def to_dom_fragment(self, document): frag = document.createDocumentFragment() - elem = document.createElement(u"host-os-remove") - elem.setAttribute(u"name", self.os) + elem = document.createElement(u"host") + + if self.state is not None: + elem.appendChild(self.state_to_dom_fragment(document)) + + for addr in self.addresses: + elem.appendChild(addr.to_dom_fragment(document)) + + if len(self.hostnames) > 0: + hostnames_elem = document.createElement(u"hostnames") + for hostname in self.hostnames: + hostnames_elem.appendChild(self.hostname_to_dom_fragment(document, hostname)) + elem.appendChild(hostnames_elem) + + ports_elem = document.createElement(u"ports") + ports_elem.appendChild(self.extraports_to_dom_fragment(document)) + for port in sorted(self.ports.values()): + if not self.is_extraports(port.state): + ports_elem.appendChild(port.to_dom_fragment(document)) + if ports_elem.hasChildNodes(): + elem.appendChild(ports_elem) + + if len(self.os) > 0: + os_elem = document.createElement(u"os") + for os in self.os: + os_elem.appendChild(self.os_to_dom_fragment(document, os)) + elem.appendChild(os_elem) + frag.appendChild(elem) return frag -class PortIdChangeHunk(DiffHunk): - def __init__(self, a_spec, b_spec): - self.a_spec = a_spec - self.b_spec = b_spec +class Address(object): + def __init__(self, s): + self.s = s - def to_string(self): - return u"Service on %s is now running on %s." % (Port.spec_to_string(self.a_spec), Port.spec_to_string(self.b_spec)) + def __eq__(self, other): + return self.__cmp__(other) == 0 + + def __ne__(self, other): + return not self.__eq__(other) + + def __hash__(self): + return hash(self.sort_key()) + + def __cmp__(self, other): + return cmp(self.sort_key(), other.sort_key()) + + def __str__(self): + return str(self.s) + + def __unicode__(self): + return self.s + + def new(type, s): + if type == u"ipv4": + return IPv4Address(s) + elif type == u"ipv6": + return IPv6Address(s) + elif type == u"mac": + return MACAddress(s) + else: + raise ValueError(u"Unknown address type %s." % type) + new = staticmethod(new) def to_dom_fragment(self, document): frag = document.createDocumentFragment() - elem = document.createElement(u"port-id-change") - elem.setAttribute(u"a-portid", unicode(self.a_spec[0])) - elem.setAttribute(u"a-protocol", self.a_spec[1]) - elem.setAttribute(u"b-portid", unicode(self.b_spec[0])) - elem.setAttribute(u"b-protocol", self.b_spec[1]) + elem = document.createElement(u"address") + elem.setAttribute(u"addr", self.s) + elem.setAttribute(u"addrtype", self.type) frag.appendChild(elem) return frag -class PortStateChangeHunk(DiffHunk): - def __init__(self, spec, a_port, b_port): +# The sort_key method in the Address subclasses determines the order in which +# addresses are displayed. We do IPv4, then IPV6, then MAC. + +class IPv4Address(Address): + type = property(lambda self: u"ipv4") + def sort_key(self): + return (0, self.s) + +class IPv6Address(Address): + type = property(lambda self: u"ipv6") + def sort_key(self): + return (1, self.s) + +class MACAddress(Address): + type = property(lambda self: u"mac") + def sort_key(self): + return (2, self.s) + +class Port(object): + """A single port, consisting of a port specification, a state, and a service + version. A specification, or "spec," is the 2-tuple (number, protocol). So + (10, "tcp") corresponds to the port 10/tcp. Port states are strings, or None + for "unknown".""" + def __init__(self, spec, state = None): self.spec = spec - self.a_port = a_port - self.b_port = b_port + self.state = state + self.service = Service() - def to_string(self): - lines = [] - a_str = u"%s %s" % (self.a_port.service.to_string(), self.a_port.service.version_to_string()) - b_str = u"%s %s" % (self.b_port.service.to_string(), self.b_port.service.version_to_string()) - if self.a_port.state != Port.UNKNOWN: - lines.append("-%s %s %s" % (Port.spec_to_string(self.a_port.spec), self.a_port.state, a_str)) - if self.b_port.state != Port.UNKNOWN: - lines.append("+%s %s %s" % (Port.spec_to_string(self.b_port.spec), self.b_port.state, b_str)) - return u"\n".join(lines) + def state_string(self): + if self.state is None: + return u"unknown" + else: + return unicode(self.state) - def service_elem(service, document, name): - """Create a service element.""" - elem = document.createElement(name) - if service.name is not None: - elem.setAttribute(u"name", service.name) - if service.product is not None: - elem.setAttribute(u"product", service.product) - if service.version is not None: - elem.setAttribute(u"version", service.version) - if service.extrainfo is not None: - elem.setAttribute(u"extrainfo", service.extrainfo) - return elem - service_elem = staticmethod(service_elem) + def spec_string(self): + return u"%d/%s" % self.spec + + def __cmp__(self, other): + d = cmp(self.spec, other.spec) + if d != 0: + return d + return cmp((self.spec, self.service), (other.spec, other.service)) def to_dom_fragment(self, document): frag = document.createDocumentFragment() - elem = document.createElement(u"port-state-change") + elem = document.createElement(u"port") elem.setAttribute(u"portid", unicode(self.spec[0])) elem.setAttribute(u"protocol", self.spec[1]) - elem.setAttribute(u"a-state", self.a_port.state) - elem.setAttribute(u"b-state", self.b_port.state) + if self.state is not None: + state_elem = document.createElement(u"state") + state_elem.setAttribute(u"state", self.state) + elem.appendChild(state_elem) + elem.appendChild(self.service.to_dom_fragment(document)) frag.appendChild(elem) - if not self.a_port.service == self.b_port.service: - elem.appendChild(self.service_elem(self.a_port.service, document, u"a-service")) - elem.appendChild(self.service_elem(self.b_port.service, document, u"b-service")) return frag -def partition_port_state_changes(diff): - """Partition a list of PortStateChangeHunks into equivalence classes - based on the tuple (protocol, a_state, b_state). The partition is returned - as a list of lists of hunks.""" - transitions = {} - for hunk in diff: - if not isinstance(hunk, PortStateChangeHunk): - continue - a_state = hunk.a_port.state - b_state = hunk.b_port.state - protocol = hunk.spec[1] - transitions.setdefault((protocol, a_state, b_state), []).append(hunk) - return transitions.values() +class Service(object): + """A service version as determined by -sV scan. Also contains the looked-up + port name if -sV wasn't used.""" + def __init__(self): + self.name = None + self.product = None + self.version = None + self.extrainfo = None + self.tunnel = None -def consolidate_port_state_changes(diff, threshold = 0): - """Return a list of list of PortStateChangeHunks, where each list - contains hunks with the same partition and state change. A group of hunks is - returned in the list of lists only when its length exceeds threshold. Any - hunks moved to the list of lists are removed from diff in place. This is to - avoid overwhelming the output with a flood of "is filtered, was unknown" - messages.""" - partition = partition_port_state_changes(diff) - consolidated = [] - for group in partition: - if len(group) > threshold: - for hunk in group: - diff.remove(hunk) - consolidated.append(group) - return consolidated + # self.hostname = None + # self.ostype = None + # self.devicetype = None + + def __eq__(self, other): + return self.name == other.name \ + and self.product == other.product \ + and self.version == other.version \ + and self.extrainfo == other.extrainfo + + def __ne__(self, other): + return not self.__eq__(other) + + def name_string(self): + parts = [] + if self.tunnel is not None: + parts.append(self.tunnel) + if self.name is not None: + parts.append(self.name) + + if len(parts) == 0: + return None + else: + return u"/".join(parts) + + def version_string(self): + """Get a string like in the VERSION column of Nmap output.""" + parts = [] + if self.product is not None: + parts.append(self.product) + if self.version is not None: + parts.append(self.version) + if self.extrainfo is not None: + parts.append(u"(%s)" % self.extrainfo) + + if len(parts) == 0: + return None + else: + return u" ".join(parts) + + def to_dom_fragment(self, document): + frag = document.createDocumentFragment() + elem = document.createElement(u"service") + for attr in (u"name", u"product", u"version", u"extrainfo", u"tunnel"): + v = getattr(self, attr) + if v is None: + continue + elem.setAttribute(attr, v) + if len(elem.attributes) > 0: + frag.appendChild(elem) + return frag + +def format_banner(scan): + """Format a startup banner more or less like Nmap does.""" + parts = [u"Nmap"] + if scan.version is not None: + parts.append(scan.version) + if scan.start_date is not None: + parts.append(u"at %s" % scan.start_date.strftime("%Y-%m-%d %H:%M")) + return u" ".join(parts) class ScanDiff(object): - """A complete diff of two scans. It is a container for two scans and the - diff between them, which is a list of (scan, host_diff) tuples as returned - by scan_diff.""" + """A complete diff of two scans. It is a container for two scans and a dict + mapping hosts to HostDiffs.""" def __init__(self, scan_a, scan_b): """Create a ScanDiff from the "before" scan_a and the "after" scan_b.""" self.scan_a = scan_a self.scan_b = scan_b - self.diff = scan_diff(scan_a, scan_b) + self.hosts = [] + self.host_diffs = {} - def print_text(self, f = sys.stdout, verbose = False): + self.diff() + + def diff(self): + a_ids = [h.get_id() for h in self.scan_a.hosts] + b_ids = [h.get_id() for h in self.scan_b.hosts] + for id in sorted(set(a_ids).union(set(b_ids))): + # Currently we never consider diffing hosts with a different id + # (address or host name), which could lead to better diffs. + host_a = self.scan_a.find_host(id) + host_b = self.scan_b.find_host(id) + h_diff = HostDiff(host_a or Host(), host_b or Host()) + if h_diff.cost > 0 or verbose: + host = host_a or host_b + self.hosts.append(host) + self.host_diffs[host] = h_diff + + def print_text(self, f = sys.stdout): """Print this diff in a human-readable text form.""" - if self.scan_a.start_date is not None: - start_date_a_str = self.scan_a.start_date.ctime() + banner_a = format_banner(self.scan_a) + banner_b = format_banner(self.scan_b) + if banner_a != banner_b: + print >> f, u"-%s" % banner_a + print >> f, u"+%s" % banner_b else: - start_date_a_str = u"" - if self.scan_b.start_date is not None: - start_date_b_str = self.scan_b.start_date.ctime() - else: - start_date_b_str = u"" - print >> f, "%s -> %s" % (start_date_a_str, start_date_b_str) - for host, h_diff in self.diff: - print >> f, "%s:" % host.format_name() - h_diff_copy = h_diff[:] - cons_port_state_changes = consolidate_port_state_changes(h_diff_copy, PORT_STATE_CHANGE_CONSOLIDATION_THRESHOLD) - for hunk in h_diff_copy: - print >> f, u"\n".join(u"\t" + s for s in hunk.to_string().split(u"\n")); - for group in cons_port_state_changes: - a_state = group[0].a_port.state - b_state = group[0].b_port.state - protocol = group[0].spec[1] - port_list = [hunk.spec[0] for hunk in group] - port_list_string = render_port_list(port_list) - if verbose or len(port_list_string) <= \ - PORT_STATE_CHANGE_DOUBLE_CONSOLIDATION_CHAR_THRESHOLD: - if a_state == Port.UNKNOWN: - print >> f, u"\tThe following %d %s ports are %s:" % (len(port_list), protocol, b_state) - else: - print >> f, u"\tThe following %d %s ports changed state from %s to %s:" % (len(port_list), protocol, a_state, b_state) - print >> f, u"\t\t" + port_list_string - else: - if a_state == Port.UNKNOWN: - print >> f, u"\t%d %s ports are %s." % (len(port_list), protocol, b_state) - else: - print >> f, u"\t%d %s ports changed state from %s to %s." % (len(port_list), protocol, a_state, b_state) + print >> f, u" %s" % banner_a + + for host in self.hosts: + print + + h_diff = self.host_diffs[host] + h_diff.print_text(f) def print_xml(self, f = sys.stdout): impl = xml.dom.minidom.getDOMImplementation() document = impl.createDocument(None, u"nmapdiff", None) root = document.documentElement + root.setAttribute(u"version", NDIFF_XML_VERSION) scandiff_elem = document.createElement(u"scandiff") - if self.scan_a.start_date is not None: - a_start_date_str = unicode(int(time.mktime(self.scan_a.start_date.timetuple()))) - scandiff_elem.setAttribute(u"a-start", a_start_date_str) - if self.scan_b.start_date is not None: - b_start_date_str = unicode(int(time.mktime(self.scan_b.start_date.timetuple()))) - scandiff_elem.setAttribute(u"b-start", b_start_date_str) root.appendChild(scandiff_elem) - for host, h_diff in self.diff: - host_elem = document.createElement(u"host") - scandiff_elem.appendChild(host_elem) - for (address_type, address) in host.addresses: - address_elem = document.createElement(u"address") - address_elem.setAttribute(u"addrtype", address_type) - address_elem.setAttribute(u"addr", address) - host_elem.appendChild(address_elem) - for hostname in host.hostnames: - hostname_elem = document.createElement(u"hostname") - hostname_elem.setAttribute(u"name", hostname) - host_elem.appendChild(hostname_elem) - for hunk in h_diff: - host_elem.appendChild(hunk.to_dom_fragment(document)) + + for host in self.hosts: + h_diff = self.host_diffs[host] + frag = h_diff.to_dom_fragment(document) + scandiff_elem.appendChild(frag) + document.writexml(f, addindent = u" ", newl = u"\n", encoding = "UTF-8") document.unlink() -def port_diff(a, b): - """Diff two Ports. The return value is a list of DiffHunks.""" - diff = [] - if a.spec != b.spec: - hunk = PortIdChangeHunk(a.spec, b.spec) - diff.append(hunk) - if not (a.state == b.state and a.service == b.service): - hunk = PortStateChangeHunk(b.spec, a, b) - diff.append(hunk) - return diff +class HostDiff(object): + """A diff of two Hosts. It contains the two hosts, variables describing what + changed, and a list of PortDiffs and OS differences.""" + def __init__(self, host_a, host_b): + self.host_a = host_a + self.host_b = host_b + self.state_changed = False + self.id_changed = False + self.os_changed = False + self.extraports_changed = False + self.ports = [] + self.port_diffs = {} + self.os_diffs = [] + self.cost = 0 -def addresses_diff(a, b): - """Diff two lists of addresses. The return value is a list of DiffHunks.""" - diff = [] - a_addresses = set(a) - b_addresses = set(b) - for addrtype, addr in a_addresses - b_addresses: - hunk = HostAddressRemoveHunk(addrtype, addr) - diff.append(hunk) - for addrtype, addr in b_addresses - a_addresses: - hunk = HostAddressAddHunk(addrtype, addr) - diff.append(hunk) - return diff + self.diff() -def hostnames_diff(a, b): - """Diff two lists of hostnames. The return value is a list of DiffHunks.""" - diff = [] - a_hostnames = set(a) - b_hostnames = set(b) + def diff(self): + if self.host_a.state != self.host_b.state: + self.state_changed = True + self.cost += 1 - for hostname in a_hostnames - b_hostnames: - hunk = HostHostnameRemoveHunk(hostname) - diff.append(hunk) - for hostname in b_hostnames - a_hostnames: - hunk = HostHostnameAddHunk(hostname) - diff.append(hunk) - return diff + if set(self.host_a.addresses) != set(self.host_b.addresses) \ + or set(self.host_a.hostnames) != set(self.host_b.hostnames): + self.id_changed = True + self.cost += 1 -def host_diff(a, b): - """Diff two Hosts. The return value is a list of DiffHunks.""" - diff = [] - if a.state != b.state: - hunk = HostStateChangeHunk(a.state, b.state) - diff.append(hunk) + all_specs = list(set(self.host_a.ports.keys()).union(set(self.host_b.ports.keys()))) + all_specs.sort() + for spec in all_specs: + # Currently we only compare ports with the same spec. This ignores + # the possibility that a service is moved lock, stock, and barrel to + # another port. + port_a = self.host_a.ports.get(spec) + port_b = self.host_b.ports.get(spec) + diff = PortDiff(port_a or Port(spec), port_b or Port(spec)) + if self.include_diff(diff): + port = port_a or port_b + self.ports.append(port) + self.port_diffs[port] = diff + self.cost += diff.cost - diff.extend(addresses_diff(a.addresses, b.addresses)) - diff.extend(hostnames_diff(a.hostnames, b.hostnames)) + os_diffs = difflib.SequenceMatcher(None, self.host_a.os, self.host_b.os) + self.os_diffs = os_diffs.get_opcodes() + os_cost = len([x for x in self.os_diffs if x[0] != "equal"]) + if os_cost > 0: + self.os_changed = True + self.cost += os_cost - all_specs = list(set(a.ports.keys()).union(set(b.ports.keys()))) - all_specs.sort() - for spec in all_specs: - diff.extend(port_diff(a.ports[spec], b.ports[spec])) + extraports_a = tuple((count, state) for (state, count) in self.host_a.extraports.items()) + extraports_b = tuple((count, state) for (state, count) in self.host_b.extraports.items()) + if extraports_a != extraports_b: + self.extraports_changed = True + self.cost += 1 - for os in set(a.os) - set(b.os): - diff.append(HostOsRemoveHunk(os)) - for os in set(b.os) - set(a.os): - diff.append(HostOsAddHunk(os)) + def include_diff(self, diff): + # Don't include the diff if the states are only extraports. Include all + # diffs, even those with cost == 0, in verbose mode. + if self.host_a.is_extraports(diff.port_a.state) and \ + self.host_b.is_extraports(diff.port_b.state): + return False + elif verbose: + return True + return diff.cost > 0 - return diff + def print_text(self, f = sys.stdout): + host_a = self.host_a + host_b = self.host_b -def scan_diff(a, b): - """Diff two scans. The return value is a list of tuples. Each tuple has the - form (scan, host_diff), where scan is either a or b, whichever is present in - one of the scans (preferring a if both are present), and host_diff is the - host diff as returned by host_diff.""" - diff = [] - a_hosts = dict((host.get_id(), host) for host in a.hosts) - b_hosts = dict((host.get_id(), host) for host in b.hosts) - all_host_ids = set(a_hosts.keys()).union(set(b_hosts.keys())) - for id in all_host_ids: - host_a = a_hosts.get(id) - host_b = b_hosts.get(id) - h_diff = host_diff(host_a or Host(), host_b or Host()) - if len(h_diff) > 0: - diff.append((host_a or host_b, h_diff)) - return diff + # Names and addresses. + if self.id_changed: + if host_a.state is not None: + print >> f, u"-%s:" % host_a.format_name() + if self.host_b.state is not None: + print >> f, u"+%s:" % host_b.format_name() + else: + print >> f, u" %s:" % host_a.format_name() -def warn(str): - """Print a warning to stderr.""" - print >> sys.stderr, str + # State. + if self.state_changed: + if host_a.state is not None: + print >> f, u"-Host appears to be %s." % host_a.state + if host_b.state is not None: + print >> f, u"+Host appears to be %s." % host_b.state + elif verbose: + print >> f, u" Host appears to be %s." % host_b.state + + # Extraports. + if self.extraports_changed: + if host_a.state is not None: + print >> f, u"-Not shown: %s" % host_a.extraports_string() + if host_b.state is not None: + print >> f, u"+Not shown: %s" % host_b.extraports_string() + elif verbose: + print >> f, u" Not shown: %s" % host_a.extraports_string() + + # Port table. + port_table = Table(u"** * * *") + if host_a.state is None: + mark = u"+" + elif host_b.state is None: + mark = u"-" + else: + mark = u" " + port_table.append((mark, u"PORT", u"STATE", u"SERVICE", u"VERSION")) + + for port in self.ports: + diff = self.port_diffs[port] + port_a = diff.port_a + port_b = diff.port_b + if diff.cost == 0: + if verbose: + port_table.append((u" ", port.spec_string(), port.state_string(), port.service.name_string(), port.service.version_string())) + else: + if not host_a.is_extraports(port_a.state): + port_table.append((u"-", port_a.spec_string(), port_a.state_string(), port_a.service.name_string(), port_a.service.version_string())) + if not host_a.is_extraports(port_b.state): + port_table.append((u"+", port_b.spec_string(), port_b.state_string(), port_b.service.name_string(), port_b.service.version_string())) + + if len(port_table) > 1: + print >> f, port_table + + # OS changes. + if self.os_changed or verbose: + if len(host_a.os) > 0: + if len(host_b.os) > 0: + print >> f, u" OS details:" + else: + print >> f, u"-OS details:" + elif len(host_b.os) > 0: + print >> f, u"+OS details:" + # os_diffs is a list of 5-tuples returned by difflib.SequenceMatcher. + for op, i1, i2, j1, j2 in self.os_diffs: + if op == "replace" or op == "delete": + for i in range(i1, i2): + print >> f, "- %s" % host_a.os[i] + if op == "replace" or op == "insert": + for i in range(j1, j2): + print >> f, "+ %s" % host_b.os[i] + if op == "equal": + for i in range(i1, i2): + print >> f, " %s" % host_a.os[i] + + def to_dom_fragment(self, document): + host_a = self.host_a + host_b = self.host_b + + frag = document.createDocumentFragment() + hostdiff_elem = document.createElement(u"hostdiff") + frag.appendChild(hostdiff_elem) + + if host_a.state is None or host_b.state is None: + # The host is missing in one scan. Output the whole thing. + if host_a.state is not None: + a_elem = document.createElement(u"a") + a_elem.appendChild(host_a.to_dom_fragment(document)) + hostdiff_elem.appendChild(a_elem) + elif host_b.state is not None: + b_elem = document.createElement(u"b") + b_elem.appendChild(host_b.to_dom_fragment(document)) + hostdiff_elem.appendChild(b_elem) + return frag + + host_elem = document.createElement(u"host") + + # State. + if host_a.state == host_b.state: + if verbose: + host_elem.appendChild(host_a.state_to_dom_fragment(document)) + else: + a_elem = document.createElement(u"a") + a_elem.appendChild(host_a.state_to_dom_fragment(document)) + host_elem.appendChild(a_elem) + b_elem = document.createElement(u"b") + b_elem.appendChild(host_b.state_to_dom_fragment(document)) + host_elem.appendChild(b_elem) + + # Addresses. + addrset_a = set(host_a.addresses) + addrset_b = set(host_b.addresses) + for addr in sorted(addrset_a.intersection(addrset_b)): + host_elem.appendChild(addr.to_dom_fragment(document)) + a_elem = document.createElement(u"a") + for addr in sorted(addrset_a - addrset_b): + a_elem.appendChild(addr.to_dom_fragment(document)) + if a_elem.hasChildNodes(): + host_elem.appendChild(a_elem) + b_elem = document.createElement(u"b") + for addr in sorted(addrset_b - addrset_a): + b_elem.appendChild(addr.to_dom_fragment(document)) + if b_elem.hasChildNodes(): + host_elem.appendChild(b_elem) + + # Host names. + hostnames_elem = document.createElement(u"hostnames") + hostnameset_a = set(host_a.hostnames) + hostnameset_b = set(host_b.hostnames) + for hostname in sorted(hostnameset_a.intersection(hostnameset_b)): + hostnames_elem.appendChild(host_a.hostname_to_dom_fragment(document, hostname)) + a_elem = document.createElement(u"a") + for hostname in sorted(hostnameset_a - hostnameset_b): + a_elem.appendChild(host_a.hostname_to_dom_fragment(document, hostname)) + if a_elem.hasChildNodes(): + hostnames_elem.appendChild(a_elem) + b_elem = document.createElement(u"b") + for hostname in sorted(hostnameset_b - hostnameset_a): + b_elem.appendChild(host_b.hostname_to_dom_fragment(document, hostname)) + if b_elem.hasChildNodes(): + hostnames_elem.appendChild(b_elem) + if hostnames_elem.hasChildNodes(): + host_elem.appendChild(hostnames_elem) + + ports_elem = document.createElement(u"ports") + # Extraports. + if host_a.extraports == host_b.extraports: + ports_elem.appendChild(host_a.extraports_to_dom_fragment(document)) + else: + a_elem = document.createElement(u"a") + a_elem.appendChild(host_a.extraports_to_dom_fragment(document)) + ports_elem.appendChild(a_elem) + b_elem = document.createElement(u"b") + b_elem.appendChild(host_b.extraports_to_dom_fragment(document)) + ports_elem.appendChild(b_elem) + # Port list. + for port in self.ports: + p_diff = self.port_diffs[port] + if p_diff.cost == 0: + if verbose: + ports_elem.appendChild(port.to_dom_fragment(document)) + else: + ports_elem.appendChild(p_diff.to_dom_fragment(document)) + if ports_elem.hasChildNodes(): + host_elem.appendChild(ports_elem) + + # OS changes. + if self.os_changed or verbose: + os_elem = document.createElement(u"os") + # os_diffs is a list of 5-tuples returned by difflib.SequenceMatcher. + for op, i1, i2, j1, j2 in self.os_diffs: + if op == "replace" or op == "delete": + a_elem = document.createElement(u"a") + for i in range(i1, i2): + a_elem.appendChild(host_a.os_to_dom_fragment(document, host_a.os[i])) + os_elem.appendChild(a_elem) + if op == "replace" or op == "insert": + b_elem = document.createElement(u"b") + for i in range(j1, j2): + b_elem.appendChild(host_b.os_to_dom_fragment(document, host_b.os[i])) + os_elem.appendChild(b_elem) + if op == "equal": + for i in range(i1, i2): + os_elem.appendChild(host_a.os_to_dom_fragment(document, host_a.os[i])) + if os_elem.hasChildNodes(): + host_elem.appendChild(os_elem) + + hostdiff_elem.appendChild(host_elem) + + return frag + +class PortDiff(object): + """A diff of two Ports. It contains the two ports and the cost of changing + one into the other. If the cost is 0 then the two ports are the same.""" + def __init__(self, port_a, port_b): + self.port_a = port_a + self.port_b = port_b + self.cost = 0 + + self.diff() + + def diff(self): + if self.port_a.spec != self.port_b.spec: + self.cost += 1 + + if self.port_a.state != self.port_b.state: + self.cost += 1 + + if self.port_a.service != self.port_b.service: + self.cost += 1 + + def to_dom_fragment(self, document): + frag = document.createDocumentFragment() + portdiff_elem = document.createElement(u"portdiff") + frag.appendChild(portdiff_elem) + a_elem = document.createElement(u"a") + b_elem = document.createElement(u"b") + portdiff_elem.appendChild(a_elem) + portdiff_elem.appendChild(b_elem) + + a_elem.appendChild(self.port_a.to_dom_fragment(document)) + b_elem.appendChild(self.port_b.to_dom_fragment(document)) + + return frag + +class Table(object): + """A table of character data, like NmapOutputTable.""" + def __init__(self, template): + """template is a string consisting of "*" and other characters. Each "*" + is a left-justified space-padded field. All other characters are copied + to the output.""" + self.widths = [] + self.rows = [] + self.prefix = u"" + self.padding = [] + j = 0 + while j < len(template) and template[j] != "*": + j += 1 + self.prefix = template[:j] + j += 1 + i = j + while j < len(template): + while j < len(template) and template[j] != "*": + j += 1 + self.padding.append(template[i:j]) + j += 1 + i = j + + def append(self, row): + strings = [] + + row = list(row) + # Remove trailing Nones. + while len(row) > 0 and row[-1] is None: + row.pop() + + for i in range(len(row)): + if row[i] is None: + s = u"" + else: + s = str(row[i]) + if i == len(self.widths): + self.widths.append(len(s)) + elif len(s) > self.widths[i]: + self.widths[i] = len(s) + strings.append(s) + self.rows.append(strings) + + def __len__(self): + return len(self.rows) + + def __str__(self): + lines = [] + for row in self.rows: + parts = [self.prefix] + i = 0 + while i < len(row): + parts.append(row[i].ljust(self.widths[i])) + if i < len(self.padding): + parts.append(self.padding[i]) + i += 1 + lines.append(u"".join(parts).rstrip()) + return u"\n".join(lines) def parse_port_list(port_list): """Parse a port list like "1-1027,1029-1033,1040,1043,1050,1058-1059,1067-1068,1076,1080,1083-1084" into a list of numbers. Raises ValueError if the port list is somehow - invalid. This function and parse_port_list are rough inverses.""" + invalid.""" result = set() if port_list == u"": return list(result) @@ -613,27 +779,9 @@ def parse_port_list(port_list): result.sort() return result -def render_port_list(ports): - """Render a list of numbers into a string like - "1-1027,1029-1033,1040,1043,1050,1058-1059,1067-1068,1076,1080,1083-1084". - This function and parse_port_list are rough inverses.""" - ports = ports[:] - ports.sort() - chunks = [] - i = 0 - while i < len(ports): - start = i - i += 1 - while i < len(ports) and i - start == ports[i] - ports[start]: - i += 1 - if i - start == 1: - chunks.append(u"%d" % ports[start]) - elif i - start == 1: - chunks.append(u"%d" % ports[start]) - chunks.append(u"%d" % ports[i - 1]) - else: - chunks.append(u"%d-%d" % (ports[start], ports[i - 1])) - return u",".join(chunks) +def warn(str): + """Print a warning to stderr.""" + print >> sys.stderr, str class NmapContentHandler(xml.sax.handler.ContentHandler): """The xml.sax ContentHandler for the XML parser. It contains a Scan object @@ -648,7 +796,6 @@ class NmapContentHandler(xml.sax.handler.ContentHandler): self.scanned_ports = {} self.current_host = None - self.current_extraports = [] self.current_port = None def parent_element(self): @@ -679,6 +826,7 @@ class NmapContentHandler(xml.sax.handler.ContentHandler): if attrs.has_key(u"start"): start_timestamp = int(attrs.get(u"start")) self.scan.start_date = datetime.datetime.fromtimestamp(start_timestamp) + self.scan.version = attrs.get(u"version") elif name == u"scaninfo": assert self.parent_element() == u"nmaprun" try: @@ -720,7 +868,7 @@ class NmapContentHandler(xml.sax.handler.ContentHandler): warn(u"%s element of host %s is missing the \"addr\" attribute; skipping." % (name, self.current_host.format_name())) return addrtype = attrs.get(u"addrtype", u"ipv4") - self.current_host.add_address(addrtype, addr) + self.current_host.add_address(Address.new(addrtype, addr)) elif name == u"hostname": assert self.parent_element() == u"hostnames" assert self.current_host is not None @@ -737,11 +885,18 @@ class NmapContentHandler(xml.sax.handler.ContentHandler): state = attrs[u"state"] except KeyError: warn(u"%s element of host %s is missing the \"state\" attribute; assuming \"unknown\"." % (name, self.current_host.format_name())) - state = Port.UNKNOWN - if state in self.current_extraports: + state = None + if state in self.current_host.extraports: warn(u"Duplicate extraports state \"%s\" in host %s." % (state, self.current_host.format_name())) - # Perhaps check for count == 0. - self.current_extraports.append(state) + try: + count = int(attrs[u"count"]) + except KeyError: + warn(u"%s element of host %s is missing the \"count\" attribute; assuming 0." % (name, self.current_host.format_name())) + count = 0 + except ValueError: + warn(u"Can't convert extraports count \"%s\" to an integer in host %s; assuming 0." % (attrs[u"count"], self.current_host.format_name())) + count = 0 + self.current_host.extraports[state] = count elif name == u"port": assert self.parent_element() == u"ports" assert self.current_host is not None @@ -767,7 +922,7 @@ class NmapContentHandler(xml.sax.handler.ContentHandler): if self.current_port is None: return if not attrs.has_key(u"state"): - warn(u"%s element of port %s is missing the \"state\" attribute; assuming \"unknown\"." % (name, Port.spec_to_string(self.current_port.spec))) + warn(u"%s element of port %s is missing the \"state\" attribute; assuming \"unknown\"." % (name, self.current_port.spec_string())) return self.current_port.state = attrs[u"state"] self.current_host.add_port(self.current_port) @@ -780,6 +935,7 @@ class NmapContentHandler(xml.sax.handler.ContentHandler): self.current_port.service.product = attrs.get(u"product") self.current_port.service.version = attrs.get(u"version") self.current_port.service.extrainfo = attrs.get(u"extrainfo") + self.current_port.service.tunnel = attrs.get(u"tunnel") elif name == u"osmatch": assert self.parent_element() == u"os" assert self.current_host is not None @@ -797,20 +953,19 @@ class NmapContentHandler(xml.sax.handler.ContentHandler): if name == u"nmaprun": self.scanned_ports = None elif name == u"host": - if len(self.current_extraports) == 1: - extraports_state = self.current_extraports[0] - known_ports = self.current_host.get_known_ports() - known_specs = set(port.spec for port in known_ports) + if len(self.current_host.extraports) == 1: + # We can infer the state of unlisted ports. + extraports_state = self.current_host.extraports.keys()[0] + known_specs = self.current_host.ports.keys() for protocol in self.scanned_ports: for portid in self.scanned_ports[protocol]: spec = portid, protocol if spec in known_specs: continue - assert self.current_host.ports[spec].state == Port.UNKNOWN + assert spec not in self.current_host.ports self.current_host.add_port(Port(spec, state = extraports_state)) self.current_host = None - self.current_extraports = [] elif name == u"port": self.current_port = None @@ -822,9 +977,9 @@ Differences include host state changes, port state changes, and changes to service and OS detection. -h, --help display this help - -v, --verbose don't consolidate long port lists into just a count + -v, --verbose also show hosts and ports that haven't changed. --text display output in text format (default) - --xml display output in XML format\ + --xml display output in XML format (not supported)\ """ % sys.argv[0] def usage_error(msg): @@ -833,8 +988,8 @@ def usage_error(msg): sys.exit(1) def main(): + global verbose output_format = None - verbose = False try: opts, input_filenames = getopt.gnu_getopt(sys.argv[1:], "hv", ["help", "text", "verbose", "xml"]) @@ -849,15 +1004,15 @@ def main(): verbose = True elif o == "--text": if output_format is not None and output_format != "text": - usage_error("contradictory output format options.") + usage_error(u"contradictory output format options.") output_format = "text" elif o == "--xml": if output_format is not None and output_format != "xml": - usage_error("contradictory output format options.") + usage_error(u"contradictory output format options.") output_format = "xml" if len(input_filenames) != 2: - usage_error("need exactly two input filenames.") + usage_error(u"need exactly two input filenames.") if output_format is None: output_format = "text" @@ -873,7 +1028,7 @@ def main(): diff = ScanDiff(scan_a, scan_b) if output_format == "text": - diff.print_text(verbose = verbose) + diff.print_text() elif output_format == "xml": diff.print_xml() diff --git a/ndiff/ndifftest.py b/ndiff/ndifftest.py index 6af98be93..6aea52dd0 100755 --- a/ndiff/ndifftest.py +++ b/ndiff/ndifftest.py @@ -6,444 +6,22 @@ import unittest import xml.dom.minidom import StringIO +# The ndiff.py symlink exists so we can do this. from ndiff import * -class parse_port_list_test(unittest.TestCase): - """Test the parse_port_list function.""" - def test_empty(self): - ports = parse_port_list(u"") - self.assertTrue(len(ports) == 0) - - def test_single(self): - ports = parse_port_list(u"1,10,100") - self.assertTrue(len(ports) == 3) - self.assertTrue(set(ports) == set([1, 10, 100])) - - def test_range(self): - ports = parse_port_list(u"10-20") - self.assertTrue(len(ports) == 11) - self.assertTrue(set(ports) == set(range(10, 21))) - - def test_combo(self): - ports = parse_port_list(u"1,10,100-102,150") - self.assertTrue(set(ports) == set([1, 10, 100, 101, 102, 150])) - - def test_dups(self): - ports = parse_port_list(u"5,1-10") - self.assertTrue(len(ports) == 10) - self.assertTrue(set(ports) == set(range(1, 11))) - - def test_invalid(self): - self.assertRaises(ValueError, parse_port_list, u"a") - self.assertRaises(ValueError, parse_port_list, u",1") - self.assertRaises(ValueError, parse_port_list, u"1,,2") - self.assertRaises(ValueError, parse_port_list, u"1,") - self.assertRaises(ValueError, parse_port_list, u"1-2-3") - self.assertRaises(ValueError, parse_port_list, u"10-1") - -class render_port_list_test(unittest.TestCase): - """Test the render_port_list function.""" - def test_roundtrip(self): - TESTS = ([], - [1], - [1,1], - [1,2,3,4,10,11,12], - [1,2,3,4,5,9,8,7,6], - [1,2,3,4,5,3] - ) - - for test in TESTS: - s = render_port_list(test) - result = parse_port_list(s) - self.assertTrue(list(set(test)) == result, u"Expected %s, got %s." % (list(set(test)), result)) - -class partition_port_state_changes_test(unittest.TestCase): - """Test the partition_port_state_changes function.""" - def setUp(self): - a = Scan() - a.load_from_file("test-scans/empty.xml") - b = Scan() - b.load_from_file("test-scans/simple.xml") - self.diff = scan_diff(a, b) - - def test_port_state_change_only(self): - for host, h_diff in self.diff: - partition = partition_port_state_changes(h_diff) - for group in partition: - for hunk in group: - self.assertTrue(isinstance(hunk, PortStateChangeHunk)) - - def test_equivalence(self): - for host, h_diff in self.diff: - partition = partition_port_state_changes(h_diff) - for group in partition: - key = (group[0].spec[1], group[0].a_port.state, group[0].b_port.state) - for hunk in group: - self.assertTrue(key == (hunk.spec[1], hunk.a_port.state, hunk.b_port.state)) - -class consolidate_port_state_changes_test(unittest.TestCase): - """Test the consolidate_port_state_changes function.""" - def setUp(self): - a = Scan() - a.load_from_file("test-scans/empty.xml") - b = Scan() - b.load_from_file("test-scans/simple.xml") - self.diff = scan_diff(a, b) - - def test_removal(self): - for host, h_diff in self.diff: - consolidated = consolidate_port_state_changes(h_diff, 0) - for hunk in h_diff: - self.assertTrue(not isinstance(hunk, PortStateChangeHunk)) - - def test_conservation(self): - pre_length = 0 - for host, h_diff in self.diff: - pre_length = len(h_diff) - consolidated = consolidate_port_state_changes(h_diff, 0) - post_length = len(h_diff) + sum(len(group) for group in consolidated) - self.assertTrue(pre_length == post_length) - - def test_threshold(self): - for host, h_diff in self.diff: - for threshold in (0, 1, 2, 4, 8): - h_diff_copy = h_diff[:] - consolidated = consolidate_port_state_changes(h_diff_copy, threshold) - for group in consolidated: - self.assertTrue(len(group) > threshold, u"Length is %d, should be > %d." % (len(group), threshold)) - -class port_diff_test(unittest.TestCase): - """Test the port_diff function.""" - def test_equal(self): - spec = (10, "tcp") - a = Port(spec) - b = Port(spec) - diff = port_diff(a, b) - self.assertTrue(len(diff) == 0) - - def test_self(self): - p = Port((10, "tcp")) - diff = port_diff(p, p) - self.assertTrue(len(diff) == 0) - - def test_id_change(self): - a = Port((10, "tcp")) - b = Port((20, "tcp")) - diff = port_diff(a, b) - self.assertTrue(len(diff) == 1) - self.assertTrue(isinstance(diff[0], PortIdChangeHunk)) - - def test_state_change(self): - spec = (10, "tcp") - a = Port(spec) - a.state = "open" - b = Port(spec) - b.state = "closed" - diff = port_diff(a, b) - self.assertTrue(len(diff) == 1) - self.assertTrue(isinstance(diff[0], PortStateChangeHunk)) - - def test_id_state_change(self): - a = Port((10, "tcp")) - a.state = "open" - b = Port((20, "tcp")) - b.state = "closed" - diff = port_diff(a, b) - self.assertTrue(len(diff) > 1) - -class service_test(unittest.TestCase): - """Test the Service class.""" - def test_to_string(self): - serv = Service() - self.assertTrue(serv.to_string() == u"") - serv.name = u"ftp" - self.assertTrue(serv.to_string() == serv.name) - - def test_version_to_string(self): - serv = Service() - self.assertTrue(serv.version_to_string() == u"") - serv = Service() - serv.product = u"FooBar" - self.assertTrue(len(serv.version_to_string()) > 0) - serv = Service() - serv.version = u"1.2.3" - self.assertTrue(len(serv.version_to_string()) > 0) - serv = Service() - serv.extrainfo = u"misconfigured" - self.assertTrue(len(serv.version_to_string()) > 0) - serv = Service() - serv.product = u"FooBar" - serv.version = u"1.2.3" - # Must match Nmap output. - self.assertTrue(serv.version_to_string() == u"%s %s" % (serv.product, serv.version)) - serv.extrainfo = u"misconfigured" - self.assertTrue(serv.version_to_string() == u"%s %s (%s)" % (serv.product, serv.version, serv.extrainfo)) - -class host_test(unittest.TestCase): - """Test the Host class.""" - def test_empty(self): - h = Host() - self.assertTrue(len(h.get_known_ports()) == 0) - - def test_format_name(self): - h = Host() - self.assertTrue(isinstance(h.format_name(), basestring)) - h.add_address("ipv4", "127.0.0.1") - self.assertTrue(isinstance(h.format_name(), basestring)) - h.add_hostname("localhost") - self.assertTrue(isinstance(h.format_name(), basestring)) - h.remove_address("ipv4", "127.0.0.1") - - def test_empty_get_port(self): - h = Host() - for num in 10, 100, 1000, 10000: - for proto in ("tcp", "udp", "ip"): - port = h.ports[(num, proto)] - self.assertTrue(port.state == Port.UNKNOWN) - - def test_add_port(self): - h = Host() - spec = (10, "tcp") - port = h.ports[spec] - self.assertTrue(port.state == Port.UNKNOWN, "Port state is %s, expected %s." % (port.get_state_string(), "unknown")) - h.add_port(Port(spec, "open")) - self.assertTrue(len(h.get_known_ports()) == 1) - port = h.ports[spec] - self.assertTrue(port.state == "open", "Port state is %s, expected %s." % (port.get_state_string(), "open")) - h.add_port(Port(spec, "closed")) - self.assertTrue(len(h.get_known_ports()) == 1) - port = h.ports[spec] - self.assertTrue(port.state == "closed", "Port state is %s, expected %s." % (port.get_state_string(), "closed")) - - spec = (22, "tcp") - port = h.ports[spec] - self.assertTrue(port.state == Port.UNKNOWN, "Port state is %s, expected %s." % (port.get_state_string(), "unknown")) - port = Port(spec) - port.state = "open" - port.service.name = "ssh" - h.add_port(port) - self.assertTrue(len(h.get_known_ports()) == 2) - port = h.ports[spec] - self.assertTrue(port.state == "open", "Port state is %s, expected %s." % (port.get_state_string(), "open")) - self.assertTrue(port.service.name == "ssh", "Port service.name is %s, expected %s." % (port.service.name, "ssh")) - - def test_swap_ports(self): - h = Host() - spec_a = (10, "tcp") - spec_b = (20, "tcp") - h.swap_ports(spec_a, spec_b) - self.assertTrue(h.ports[spec_a].state == Port.UNKNOWN) - self.assertTrue(h.ports[spec_b].state == Port.UNKNOWN) - self.assertTrue(h.ports[spec_a].spec == spec_a) - self.assertTrue(h.ports[spec_b].spec == spec_b) - h.add_port(Port(spec_a, "open")) - h.swap_ports(spec_a, spec_b) - self.assertTrue(h.ports[spec_a].state == Port.UNKNOWN) - self.assertTrue(h.ports[spec_b].state == "open") - self.assertTrue(h.ports[spec_a].spec == spec_a) - self.assertTrue(h.ports[spec_b].spec == spec_b) - h.add_port(Port(spec_a, "closed")) - h.swap_ports(spec_a, spec_b) - self.assertTrue(h.ports[spec_a].state == "open") - self.assertTrue(h.ports[spec_b].state == "closed") - self.assertTrue(h.ports[spec_a].spec == spec_a) - self.assertTrue(h.ports[spec_b].spec == spec_b) - -def host_apply_diff(host, diff): - """Apply a host diff to the given host.""" - for hunk in diff: - if isinstance(hunk, HostStateChangeHunk): - assert host.state == hunk.a_state - host.state = hunk.b_state - elif isinstance(hunk, HostAddressAddHunk): - host.add_address(hunk.address_type, hunk.address) - elif isinstance(hunk, HostAddressRemoveHunk): - host.remove_address(hunk.address_type, hunk.address) - elif isinstance(hunk, HostHostnameAddHunk): - host.add_hostname(hunk.hostname) - elif isinstance(hunk, HostHostnameRemoveHunk): - host.remove_hostname(hunk.hostname) - elif isinstance(hunk, PortIdChangeHunk): - host.swap_ports(hunk.a_spec, hunk.b_spec) - elif isinstance(hunk, PortStateChangeHunk): - port = host.ports[hunk.spec] - assert port.state == hunk.a_port.state - host.add_port(Port(hunk.spec, hunk.b_port.state)) - host.ports[hunk.spec].service = hunk.b_port.service - else: - assert False - -class host_diff_test(unittest.TestCase): - """Test the host_diff function.""" - PORT_DIFF_HUNK_TYPES = (PortIdChangeHunk, PortStateChangeHunk) - HOST_DIFF_HUNK_TYPES = (HostStateChangeHunk,) + PORT_DIFF_HUNK_TYPES - - def test_empty(self): - a = Host() - b = Host() - diff = host_diff(a, b) - self.assertTrue(len(diff) == 0) - - def test_self(self): - h = Host() - h.add_port(Port((10, "tcp"), "open")) - h.add_port(Port((22, "tcp"), "closed")) - diff = host_diff(h, h) - self.assertTrue(len(diff) == 0) - - def test_state_change(self): - a = Host() - b = Host() - a.state = "up" - b.state = "down" - diff = host_diff(a, b) - self.assertTrue(len(diff) > 0) - for hunk in diff: - self.assertTrue(isinstance(hunk, self.HOST_DIFF_HUNK_TYPES)) - - def test_state_change_unknown(self): - a = Host() - b = Host() - a.state = "up" - diff = host_diff(a, b) - self.assertTrue(len(diff) > 0) - for hunk in diff: - self.assertTrue(isinstance(hunk, self.HOST_DIFF_HUNK_TYPES)) - diff = host_diff(b, a) - self.assertTrue(len(diff) > 0) - for hunk in diff: - self.assertTrue(isinstance(hunk, self.HOST_DIFF_HUNK_TYPES)) - - def test_port_state_change(self): - a = Host() - b = Host() - spec = (10, "tcp") - a.add_port(Port(spec, "open")) - b.add_port(Port(spec, "closed")) - diff = host_diff(a, b) - self.assertTrue(len(diff) > 0) - for hunk in diff: - self.assertTrue(isinstance(hunk, self.PORT_DIFF_HUNK_TYPES)) - - def test_port_state_change_unknown(self): - a = Host() - b = Host() - b.add_port(Port((10, "tcp"), "open")) - diff = host_diff(a, b) - self.assertTrue(len(diff) > 0) - for hunk in diff: - self.assertTrue(isinstance(hunk, self.PORT_DIFF_HUNK_TYPES)) - diff = host_diff(b, a) - self.assertTrue(len(diff) > 0) - for hunk in diff: - self.assertTrue(isinstance(hunk, self.PORT_DIFF_HUNK_TYPES)) - - def test_port_state_change_multi(self): - a = Host() - b = Host() - a.add_port(Port((10, "tcp"), "open")) - a.add_port(Port((20, "tcp"), "closed")) - a.add_port(Port((30, "tcp"), "open")) - b.add_port(Port((10, "tcp"), "open")) - b.add_port(Port((20, "tcp"), "open")) - b.add_port(Port((30, "tcp"), "open")) - diff = host_diff(a, b) - self.assertTrue(len(diff) > 0) - for hunk in diff: - self.assertTrue(isinstance(hunk, self.PORT_DIFF_HUNK_TYPES)) - - def test_address_add(self): - a = Host() - b = Host() - a.addresses = [] - b.addresses = [("ipv4", "127.0.0.2")] - diff = host_diff(a, b) - self.assertTrue(len(diff) > 0) - for hunk in diff: - self.assertTrue(isinstance(hunk, HostAddressAddHunk)) - - def test_address_add(self): - a = Host() - b = Host() - a.addresses = [("ipv4", "127.0.0.1")] - b.addresses = [] - diff = host_diff(a, b) - self.assertTrue(len(diff) > 0) - for hunk in diff: - self.assertTrue(isinstance(hunk, HostAddressRemoveHunk)) - - def test_address_add(self): - a = Host() - b = Host() - a.addresses = [("ipv4", "127.0.0.1")] - b.addresses = [("ipv4", "127.0.0.2")] - diff = host_diff(a, b) - self.assertTrue(len(diff) > 0) - for hunk in diff: - self.assertTrue(isinstance(hunk, (HostAddressAddHunk, HostAddressRemoveHunk))) - - def test_hostname_add(self): - a = Host() - b = Host() - a.hostnames = [] - b.hostnames = ["b"] - diff = host_diff(a, b) - self.assertTrue(len(diff) > 0) - for hunk in diff: - self.assertTrue(isinstance(hunk, HostHostnameAddHunk)) - - def test_hostname_remove(self): - a = Host() - b = Host() - a.hostnames = ["a"] - b.hostnames = [] - diff = host_diff(a, b) - self.assertTrue(len(diff) > 0) - for hunk in diff: - self.assertTrue(isinstance(hunk, HostHostnameRemoveHunk)) - - def test_hostname_change(self): - a = Host() - b = Host() - a.hostnames = ["a"] - b.hostnames = ["b"] - diff = host_diff(a, b) - self.assertTrue(len(diff) > 0) - for hunk in diff: - self.assertTrue(isinstance(hunk, (HostHostnameAddHunk, HostHostnameRemoveHunk))) - - def test_diff_is_effective(self): - """Test that a host diff is effective. - This means that if the recommended changes are applied to the first host - the hosts become the same.""" - a = Host() - b = Host() - a.add_port(Port((10, "tcp"), "open")) - a.add_port(Port((20, "tcp"), "closed")) - a.add_port(Port((40, "udp"), "open|filtered")) - b.add_port(Port((10, "tcp"), "open")) - b.add_port(Port((30, "tcp"), "open")) - a.add_port(Port((40, "udp"), "open")) - a.hostnames = ["a", "localhost"] - a.hostnames = ["b", "localhost", "b.example.com"] - diff = host_diff(a, b) - host_apply_diff(a, diff) - diff = host_diff(a, b) - self.assertTrue(len(diff) == 0) - class scan_test(unittest.TestCase): """Test the Scan class.""" def test_empty(self): scan = Scan() scan.load_from_file("test-scans/empty.xml") - self.assertTrue(len(scan.hosts) == 0) - self.assertTrue(scan.start_date is not None) - self.assertTrue(scan.end_date is not None) + self.assertEqual(len(scan.hosts), 0) + self.assertNotEqual(scan.start_date, None) + self.assertNotEqual(scan.end_date, None) def test_single(self): scan = Scan() scan.load_from_file("test-scans/single.xml") - self.assertTrue(len(scan.hosts) == 1) + self.assertEqual(len(scan.hosts), 1) def test_simple(self): """Test that the correct number of known ports is returned when there @@ -451,8 +29,7 @@ class scan_test(unittest.TestCase): scan = Scan() scan.load_from_file("test-scans/simple.xml") host = scan.hosts[0] - self.assertTrue(len(host.get_known_ports()) == 2, - u"Expected %d known ports, got %d." % (2, len(host.get_known_ports()))) + self.assertEqual(len(host.ports), 2) def test_extraports(self): """Test that the correct number of known ports is returned when there @@ -460,8 +37,8 @@ class scan_test(unittest.TestCase): scan = Scan() scan.load_from_file("test-scans/single.xml") host = scan.hosts[0] - self.assertTrue(len(host.get_known_ports()) == 100, - u"Expected %d known ports, got %d." % (100, len(host.get_known_ports()))) + self.assertEqual(len(host.ports), 100) + self.assertEqual(host.extraports.items(), [("filtered", 95)]) def test_extraports_multi(self): """Test that the correct number of known ports is returned when there @@ -469,23 +46,22 @@ class scan_test(unittest.TestCase): scan = Scan() scan.load_from_file("test-scans/complex.xml") host = scan.hosts[0] - self.assertTrue(len(host.get_known_ports()) == 5, - u"Expected %d known ports, got %d." % (5, len(host.get_known_ports()))) + self.assertEqual(len(host.ports), 5) + self.assertEqual(set(host.extraports.items()), set([("filtered", 95), ("open|filtered", 100)])) def test_addresses(self): """Test that addresses are recorded.""" scan = Scan() scan.load_from_file("test-scans/simple.xml") host = scan.hosts[0] - self.assertTrue(len(host.addresses) == 1) + self.assertEqual(host.addresses, [IPv4Address("64.13.134.52")]) def test_hostname(self): """Test that hostnames are recorded.""" scan = Scan() scan.load_from_file("test-scans/simple.xml") host = scan.hosts[0] - self.assertTrue(len(host.hostnames) == 1) - self.assertTrue(host.hostnames[0] == u"scanme.nmap.org") + self.assertEqual(host.hostnames, [u"scanme.nmap.org"]) def test_os(self): """Test that OS information is recorded.""" @@ -507,68 +83,587 @@ class scan_test(unittest.TestCase): # host = scan.hosts[0] # self.assertTrue(host.state == "down") -def scan_apply_diff(scan, diff): - """Apply a scan diff to the given scan.""" - for host, h_diff in diff: - for h in scan.hosts: - if h == host: - break - else: - h = Host() - scan.hosts.append(h) - host_apply_diff(h, h_diff) +class host_test(unittest.TestCase): + """Test the Host class.""" + def test_empty(self): + h = Host() + self.assertEqual(len(h.addresses), 0) + self.assertEqual(len(h.hostnames), 0) + self.assertEqual(len(h.ports), 0) + self.assertEqual(len(h.extraports), 0) + self.assertEqual(len(h.os), 0) + + def test_format_name(self): + h = Host() + self.assertTrue(isinstance(h.format_name(), basestring)) + h.add_address(IPv4Address(u"127.0.0.1")) + self.assertTrue(u"127.0.0.1" in h.format_name()) + h.add_address(IPv6Address("::1")) + self.assertTrue(u"127.0.0.1" in h.format_name()) + self.assertTrue(u"::1" in h.format_name()) + h.add_hostname(u"localhost") + self.assertTrue(u"127.0.0.1" in h.format_name()) + self.assertTrue(u"::1" in h.format_name()) + self.assertTrue(u"localhost" in h.format_name()) + + def test_empty_get_port(self): + h = Host() + for num in 10, 100, 1000, 10000: + for proto in ("tcp", "udp", "ip"): + port = h.ports.get((num, proto)) + self.assertEqual(port, None) + + def test_add_port(self): + h = Host() + spec = (10, "tcp") + port = h.ports.get(spec) + self.assertEqual(port, None) + h.add_port(Port(spec, "open")) + self.assertEqual(len(h.ports), 1) + port = h.ports[spec] + self.assertEqual(port.state, "open") + h.add_port(Port(spec, "closed")) + self.assertEqual(len(h.ports), 1) + port = h.ports[spec] + self.assertEqual(port.state, "closed") + + spec = (22, "tcp") + port = h.ports.get(spec) + self.assertEqual(port, None) + port = Port(spec) + port.state = "open" + port.service.name = "ssh" + h.add_port(port) + self.assertEqual(len(h.ports), 2) + port = h.ports[spec] + self.assertEqual(port.state, "open") + self.assertEqual(port.service.name, "ssh") + + def test_extraports(self): + h = Host() + self.assertFalse(h.is_extraports("open")) + self.assertFalse(h.is_extraports("closed")) + self.assertFalse(h.is_extraports("filtered")) + h.extraports["closed"] = 10 + self.assertFalse(h.is_extraports("open")) + self.assertTrue(h.is_extraports("closed")) + self.assertFalse(h.is_extraports("filtered")) + h.extraports["filtered"] = 10 + self.assertFalse(h.is_extraports("open")) + self.assertTrue(h.is_extraports("closed")) + self.assertTrue(h.is_extraports("filtered")) + del h.extraports["closed"] + del h.extraports["filtered"] + self.assertFalse(h.is_extraports("open")) + self.assertFalse(h.is_extraports("closed")) + self.assertFalse(h.is_extraports("filtered")) + + def test_parse(self): + s = Scan() + s.load_from_file("test-scans/single.xml") + h = s.hosts[0] + self.assertEqual(len(h.ports), 100) + self.assertEqual(len(h.extraports), 1) + self.assertEqual(h.extraports.keys()[0], u"filtered") + self.assertEqual(h.extraports.values()[0], 95) + self.assertEqual(h.state, "up") + +class address_test(unittest.TestCase): + """Test the Address class.""" + def test_ipv4_new(self): + a = Address.new("ipv4", "127.0.0.1") + self.assertEqual(a.type, "ipv4") + + def test_ipv6_new(self): + a = Address.new("ipv6", "::1") + self.assertEqual(a.type, "ipv6") + + def test_mac_new(self): + a = Address.new("mac", "00:00:00:00:00:00") + self.assertEqual(a.type, "mac") + + def test_unknown_new(self): + self.assertRaises(ValueError, Address.new, "aaa", "") + + def test_compare(self): + """Test that addresses with the same contents compare equal.""" + a = IPv4Address("127.0.0.1") + self.assertEqual(a, a) + b = IPv4Address("127.0.0.1") + self.assertEqual(a, b) + c = Address.new("ipv4", "127.0.0.1") + self.assertEqual(a, c) + self.assertEqual(b, c) + + d = IPv4Address("1.1.1.1") + self.assertNotEqual(a, d) + + e = IPv6Address("::1") + self.assertEqual(e, e) + self.assertNotEqual(a, e) + + def test_sort(self): + """Test the sort order of addresses.""" + l = [MACAddress("00:00:00:00:00:00"), IPv4Address("127.0.0.1"), IPv6Address("::1")] + l.sort() + self.assertEqual(["ipv4", "ipv6", "mac"], [a.type for a in l]) + + strings = ["3.0.0.0", "20.0.0.0", "100.0.0.2", "100.0.0.10"] + l = [IPv4Address(s) for s in strings] + l.sort() + self.assertEqual(strings, [a.s for a in l]) + + strings = ["3::", "20::", "100::2", "100::10"] + l = [IPv6Address(s) for s in strings] + l.sort() + self.assertEqual(strings, [a.s for a in l]) + + strings = ["20:00:00:00:00:00", "a0:00:00:00:00:20", "a0:00:00:00:00:a0"] + l = [MACAddress(s) for s in strings] + l.sort() + self.assertEqual(strings, [a.s for a in l]) + +class port_test(unittest.TestCase): + """Test the Port class.""" + def test_spec_string(self): + p = Port((10, "tcp")) + self.assertEqual(p.spec_string(), u"10/tcp") + p = Port((100, "ip")) + self.assertEqual(p.spec_string(), u"100/ip") + + def test_state_string(self): + p = Port((10, "tcp")) + self.assertEqual(p.state_string(), u"unknown") + +class service_test(unittest.TestCase): + """Test the Service class.""" + def test_compare(self): + """Test that services with the same contents compare equal.""" + a = Service() + a.name = u"ftp" + a.product = u"FooBar FTP" + a.version = u"1.1.1" + a.tunnel = u"ssl" + self.assertEqual(a, a) + b = Service() + b.name = u"ftp" + b.product = u"FooBar FTP" + b.version = u"1.1.1" + b.tunnel = u"ssl" + self.assertEqual(a, b) + b.name = u"http" + self.assertNotEqual(a, b) + c = Service() + self.assertNotEqual(a, c) + + def test_tunnel(self): + serv = Service() + serv.name = u"http" + serv.tunnel = u"ssl" + self.assertEqual(serv.name_string(), u"ssl/http") + + def test_version_string(self): + serv = Service() + serv.product = u"FooBar" + self.assertTrue(len(serv.version_string()) > 0) + serv = Service() + serv.version = u"1.2.3" + self.assertTrue(len(serv.version_string()) > 0) + serv = Service() + serv.extrainfo = u"misconfigured" + self.assertTrue(len(serv.version_string()) > 0) + serv = Service() + serv.product = u"FooBar" + serv.version = u"1.2.3" + # Must match Nmap output. + self.assertEqual(serv.version_string(), u"%s %s" % (serv.product, serv.version)) + serv.extrainfo = u"misconfigured" + self.assertEqual(serv.version_string(), u"%s %s (%s)" % (serv.product, serv.version, serv.extrainfo)) class scan_diff_test(unittest.TestCase): - """Test the scan_diff function.""" + """Test the ScanDiff class.""" def test_self(self): scan = Scan() scan.load_from_file("test-scans/complex.xml") - diff = scan_diff(scan, scan) - self.assertTrue(len(diff) == 0) + diff = ScanDiff(scan, scan) + self.assertEqual(len(diff.host_diffs), 0) + self.assertEqual(set(diff.hosts), set(diff.host_diffs.keys())) def test_unknown_up(self): a = Scan() a.load_from_file("test-scans/empty.xml") b = Scan() b.load_from_file("test-scans/simple.xml") - diff = scan_diff(a, b) - for host, h_diff in diff: - for hunk in h_diff: - if isinstance(hunk, HostStateChangeHunk): - self.assertTrue(hunk.a_state == Host.UNKNOWN) - self.assertTrue(hunk.b_state == u"up") - break - else: - fail("No host state change found.") + diff = ScanDiff(a, b) + self.assertTrue(len(diff.hosts) >= 1) + self.assertEqual(len(diff.host_diffs), 1) + self.assertEqual(set(diff.hosts), set(diff.host_diffs.keys())) + h_diff = diff.host_diffs.values()[0] + self.assertEqual(h_diff.host_a.state, None) + self.assertEqual(h_diff.host_b.state, "up") def test_up_unknown(self): a = Scan() a.load_from_file("test-scans/simple.xml") b = Scan() b.load_from_file("test-scans/empty.xml") - diff = scan_diff(a, b) - for host, h_diff in diff: - for hunk in h_diff: - if isinstance(hunk, HostStateChangeHunk): - self.assertTrue(hunk.a_state == u"up") - self.assertTrue(hunk.b_state == Port.UNKNOWN) - break - else: - fail("No host state change found.") + diff = ScanDiff(a, b) + self.assertTrue(len(diff.hosts) >= 1) + self.assertEqual(len(diff.host_diffs), 1) + self.assertEqual(set(diff.hosts), set(diff.host_diffs.keys())) + h_diff = diff.host_diffs.values()[0] + self.assertEqual(h_diff.host_a.state, "up") + self.assertEqual(h_diff.host_b.state, None) def test_diff_is_effective(self): - """Test that a scan diff is effective. - This means that if the recommended changes are applied to the first scan - the scans become the same.""" - a = Scan() - a.load_from_file("test-scans/empty.xml") - b = Scan() - b.load_from_file("test-scans/simple.xml") - diff = scan_diff(a, b) - self.assertTrue(len(diff) > 0) - scan_apply_diff(a, diff) - diff = scan_diff(a, b) - self.assertTrue(len(diff) == 0) + """Test that a scan diff is effective. This means that if the + recommended changes are applied to the first scan the scans become the + same.""" + PAIRS = ( + ("empty", "empty"), + ("simple", "complex"), + ("complex", "simple"), + ("single", "os"), + ("os", "single"), + ("random-1", "simple"), + ("simple", "random-1"), + ) + for pair in PAIRS: + a = Scan() + a.load_from_file("test-scans/%s.xml" % pair[0]) + b = Scan() + b.load_from_file("test-scans/%s.xml" % pair[1]) + diff = ScanDiff(a, b) + scan_apply_diff(a, diff) + diff = ScanDiff(a, b) + self.assertEqual(len(diff.host_diffs), 0, "%d != 0 in pair %s" % (len(diff.host_diffs), str(pair))) + self.assertEqual(set(diff.hosts), set(diff.host_diffs.keys())) + +class parse_port_list_test(unittest.TestCase): + """Test the parse_port_list function.""" + def test_empty(self): + ports = parse_port_list(u"") + self.assertEqual(len(ports), 0) + + def test_single(self): + ports = parse_port_list(u"1,10,100") + self.assertEqual(len(ports), 3) + self.assertEqual(set(ports), set([1, 10, 100])) + + def test_range(self): + ports = parse_port_list(u"10-20") + self.assertEqual(len(ports), 11) + self.assertEqual(set(ports), set(range(10, 21))) + + def test_combo(self): + ports = parse_port_list(u"1,10,100-102,150") + self.assertEqual(set(ports), set([1, 10, 100, 101, 102, 150])) + + def test_dups(self): + ports = parse_port_list(u"5,1-10") + self.assertEqual(len(ports), 10) + self.assertEqual(set(ports), set(range(1, 11))) + + def test_invalid(self): + self.assertRaises(ValueError, parse_port_list, u"a") + self.assertRaises(ValueError, parse_port_list, u",1") + self.assertRaises(ValueError, parse_port_list, u"1,,2") + self.assertRaises(ValueError, parse_port_list, u"1,") + self.assertRaises(ValueError, parse_port_list, u"1-2-3") + self.assertRaises(ValueError, parse_port_list, u"10-1") + +class host_diff_test(unittest.TestCase): + """Test the HostDiff class.""" + def test_empty(self): + a = Host() + b = Host() + diff = HostDiff(a, b) + self.assertFalse(diff.id_changed) + self.assertFalse(diff.state_changed) + self.assertFalse(diff.os_changed) + self.assertFalse(diff.extraports_changed) + self.assertEqual(diff.cost, 0) + + def test_self(self): + h = Host() + h.add_port(Port((10, "tcp"), "open")) + h.add_port(Port((22, "tcp"), "closed")) + diff = HostDiff(h, h) + self.assertFalse(diff.id_changed) + self.assertFalse(diff.state_changed) + self.assertFalse(diff.os_changed) + self.assertFalse(diff.extraports_changed) + self.assertEqual(diff.cost, 0) + + def test_state_change(self): + a = Host() + b = Host() + a.state = "up" + b.state = "down" + diff = HostDiff(a, b) + self.assertTrue(diff.state_changed) + self.assertTrue(diff.cost > 0) + + def test_state_change_unknown(self): + a = Host() + b = Host() + a.state = "up" + diff = HostDiff(a, b) + self.assertTrue(diff.state_changed) + self.assertTrue(diff.cost > 0) + diff = HostDiff(b, a) + self.assertTrue(diff.state_changed) + self.assertTrue(diff.cost > 0) + + def test_address_change(self): + a = Host() + b = Host() + b.add_address(Address.new("ipv4", "127.0.0.1")) + diff = HostDiff(a, b) + self.assertTrue(diff.id_changed) + self.assertTrue(diff.cost > 0) + diff = HostDiff(b, a) + self.assertTrue(diff.id_changed) + self.assertTrue(diff.cost > 0) + a.add_address(Address.new("ipv4", "1.1.1.1")) + diff = HostDiff(a, b) + self.assertTrue(diff.id_changed) + self.assertTrue(diff.cost > 0) + diff = HostDiff(b, a) + self.assertTrue(diff.id_changed) + self.assertTrue(diff.cost > 0) + + def test_hostname_change(self): + a = Host() + b = Host() + b.add_hostname("host-1") + diff = HostDiff(a, b) + self.assertTrue(diff.id_changed) + self.assertTrue(diff.cost > 0) + diff = HostDiff(b, a) + self.assertTrue(diff.id_changed) + self.assertTrue(diff.cost > 0) + a.add_address("host-2") + diff = HostDiff(a, b) + self.assertTrue(diff.id_changed) + self.assertTrue(diff.cost > 0) + diff = HostDiff(b, a) + self.assertTrue(diff.id_changed) + self.assertTrue(diff.cost > 0) + + def test_port_state_change(self): + a = Host() + b = Host() + spec = (10, "tcp") + a.add_port(Port(spec, "open")) + b.add_port(Port(spec, "closed")) + diff = HostDiff(a, b) + self.assertTrue(len(diff.ports) > 0) + self.assertEqual(set(diff.ports), set(diff.port_diffs.keys())) + self.assertTrue(diff.cost > 0) + + def test_port_state_change_unknown(self): + a = Host() + b = Host() + b.add_port(Port((10, "tcp"), "open")) + diff = HostDiff(a, b) + self.assertTrue(len(diff.ports) > 0) + self.assertEqual(set(diff.ports), set(diff.port_diffs.keys())) + self.assertTrue(diff.cost > 0) + diff = HostDiff(b, a) + self.assertTrue(len(diff.ports) > 0) + self.assertEqual(set(diff.ports), set(diff.port_diffs.keys())) + self.assertTrue(diff.cost > 0) + + def test_port_state_change_multi(self): + a = Host() + b = Host() + a.add_port(Port((10, "tcp"), "open")) + a.add_port(Port((20, "tcp"), "closed")) + a.add_port(Port((30, "tcp"), "open")) + b.add_port(Port((10, "tcp"), "open")) + b.add_port(Port((20, "tcp"), "open")) + b.add_port(Port((30, "tcp"), "open")) + diff = HostDiff(a, b) + self.assertTrue(diff.cost > 0) + + def test_os_change(self): + a = Host() + b = Host() + a.os.append("os-1") + diff = HostDiff(a, b) + self.assertTrue(diff.os_changed) + self.assertTrue(len(diff.os_diffs) > 0) + self.assertTrue(diff.cost > 0) + diff = HostDiff(b, a) + self.assertTrue(diff.os_changed) + self.assertTrue(len(diff.os_diffs) > 0) + self.assertTrue(diff.cost > 0) + b.os.append("os-2") + diff = HostDiff(a, b) + self.assertTrue(diff.os_changed) + self.assertTrue(len(diff.os_diffs) > 0) + self.assertTrue(diff.cost > 0) + diff = HostDiff(b, a) + self.assertTrue(diff.os_changed) + self.assertTrue(len(diff.os_diffs) > 0) + self.assertTrue(diff.cost > 0) + + def test_extraports_change(self): + a = Host() + b = Host() + a.extraports = {"open": 100} + diff = HostDiff(a, b) + self.assertTrue(diff.extraports_changed) + self.assertTrue(diff.cost > 0) + diff = HostDiff(b, a) + self.assertTrue(diff.extraports_changed) + self.assertTrue(diff.cost > 0) + b.extraports = {"closed": 100} + diff = HostDiff(a, b) + self.assertTrue(diff.extraports_changed) + self.assertTrue(diff.cost > 0) + diff = HostDiff(b, a) + self.assertTrue(diff.extraports_changed) + self.assertTrue(diff.cost > 0) + + def test_diff_is_effective(self): + """Test that a host diff is effective. + This means that if the recommended changes are applied to the first host + the hosts become the same.""" + a = Host() + b = Host() + + a.state = "up" + b.state = "down" + + a.add_port(Port((10, "tcp"), "open")) + a.add_port(Port((20, "tcp"), "closed")) + a.add_port(Port((40, "udp"), "open|filtered")) + b.add_port(Port((10, "tcp"), "open")) + b.add_port(Port((30, "tcp"), "open")) + a.add_port(Port((40, "udp"), "open")) + + a.add_hostname("a") + a.add_hostname("localhost") + b.add_hostname("b") + b.add_hostname("localhost") + b.add_hostname("b.example.com") + + b.add_address(Address.new("ipv4", "1.2.3.4")) + + a.os = ["os-1", "os-2"] + b.os = ["os-2", "os-3"] + + a.extraports = {"filtered": 99} + + diff = HostDiff(a, b) + host_apply_diff(a, diff) + diff = HostDiff(a, b) + + self.assertFalse(diff.id_changed) + self.assertFalse(diff.state_changed) + self.assertFalse(diff.os_changed) + self.assertFalse(diff.extraports_changed) + self.assertEqual(diff.cost, 0) + +class port_diff_test(unittest.TestCase): + """Test the PortDiff class.""" + def test_equal(self): + spec = (10, "tcp") + a = Port(spec) + b = Port(spec) + diff = PortDiff(a, b) + self.assertEqual(diff.cost, 0) + + def test_self(self): + p = Port((10, "tcp")) + diff = PortDiff(p, p) + self.assertEqual(diff.cost, 0) + + def test_state_change(self): + spec = (10, "tcp") + a = Port(spec) + a.state = "open" + b = Port(spec) + b.state = "closed" + diff = PortDiff(a, b) + self.assertTrue(diff.cost > 0) + self.assertEqual(PortDiff(a, diff.port_a).cost, 0) + self.assertEqual(PortDiff(b, diff.port_b).cost, 0) + + def test_id_change(self): + a = Port((10, "tcp")) + a.state = "open" + b = Port((20, "tcp")) + b.state = "open" + diff = PortDiff(a, b) + self.assertTrue(diff.cost > 0) + self.assertEqual(PortDiff(a, diff.port_a).cost, 0) + self.assertEqual(PortDiff(b, diff.port_b).cost, 0) + +class table_test(unittest.TestCase): + """Test the table class.""" + def test_empty(self): + t = Table("") + self.assertEqual(str(t), "") + t = Table("***") + self.assertEqual(str(t), "") + t = Table("* * *") + self.assertEqual(str(t), "") + + def test_none(self): + """Test that None is treated like an empty string when it is not at the + end of a row.""" + t = Table("* * *") + t.append((None, "a", "b")) + self.assertEqual(str(t), " a b") + t = Table("* * *") + t.append(("a", None, "b")) + self.assertEqual(str(t), "a b") + t = Table("* * *") + t.append((None, None, "a")) + self.assertEqual(str(t), " a") + + def test_prefix(self): + t = Table("<<<") + t.append(("a", "b", "c")) + self.assertEqual(str(t), "<<>>*!!!") + t.append(()) + self.assertEqual(str(t), "<<<") + t = Table("<<<*>>>*!!!") + t.append(("a")) + self.assertEqual(str(t), "<<>>") + t = Table("<<<*>>>*!!!") + t.append(("a", "b", "c", "d")) + self.assertEqual(str(t), "<<>>b!!!cd") + + def test_strip(self): + """Test that trailing whitespace is stripped.""" + t = Table("* * * ") + t.append(("a", "b", None)) + self.assertEqual(str(t), "a b") + t = Table("* * *") + t.append(("a", None, None)) + self.assertEqual(str(t), "a") + t = Table("* * *") + t.append(("a", "b", "")) + self.assertEqual(str(t), "a b") + t = Table("* * *") + t.append(("a", "", "")) + self.assertEqual(str(t), "a") + + def test_newline(self): + """Test that there is no trailing newline in a table.""" + t = Table("*") + self.assertFalse(str(t).endswith("\n")) + t.append(("a")) + self.assertFalse(str(t).endswith("\n")) + t.append(("b")) + self.assertFalse(str(t).endswith("\n")) class scan_diff_xml_test(unittest.TestCase): def setUp(self): @@ -588,4 +683,37 @@ class scan_diff_xml_test(unittest.TestCase): except Exception, e: fail(u"Parsing XML diff output caused the exception: %s" % str(e)) +def scan_apply_diff(scan, diff): + """Apply a scan diff to the given scan.""" + for host in diff.hosts: + if host not in scan.hosts: + scan.hosts.append(host) + host_apply_diff(host, diff.host_diffs[host]) + +def host_apply_diff(host, diff): + """Apply a host diff to the given host.""" + if diff.state_changed: + host.state = diff.host_b.state + + if diff.id_changed: + host.addresses = diff.host_b.addresses[:] + host.hostnames = diff.host_b.hostnames[:] + + if diff.os_changed: + host.os = diff.host_b.os[:] + + if diff.extraports_changed: + for state in host.extraports.keys(): + for port in host.ports.values(): + if port.state == state: + del host.ports[port.spec] + host.extraports = diff.host_b.extraports.copy() + + for port in diff.ports: + port_b = diff.port_diffs[port].port_b + if port_b.state is None: + del host.ports[port.spec] + else: + host.ports[port.spec] = diff.port_diffs[port].port_b + unittest.main()