1
0
mirror of https://github.com/nmap/nmap.git synced 2025-12-15 20:29:03 +00:00

Enforce PEP 8 style on Ndiff

Issues fixed:

1       E111 indentation is not a multiple of four
1       E201 whitespace after '['
14      E251 no spaces around keyword / parameter equals
7       E301 expected 1 blank line, found 0
55      E302 expected 2 blank lines, found 1
69      E501 line too long (80 characters)
3       W291 trailing whitespace
4       W601 .has_key() is deprecated, use 'in'
This commit is contained in:
dmiller
2014-01-10 20:43:32 +00:00
parent 393b4b21ee
commit da0c947004
4 changed files with 249 additions and 118 deletions

View File

@@ -7,8 +7,8 @@
#
# Copyright 2008 Insecure.Com LLC
# Ndiff is distributed under the same license as Nmap. See the file COPYING or
# http://nmap.org/data/COPYING. See http://nmap.org/book/man-legal.html for more
# details.
# http://nmap.org/data/COPYING. See http://nmap.org/book/man-legal.html for
# more details.
#
# David Fifield
# based on a design by Michael Pattrick
@@ -26,9 +26,10 @@ verbose = False
NDIFF_XML_VERSION = u"1"
class Scan(object):
"""A single Nmap scan, corresponding to a single invocation of Nmap. It is a
container for a list of hosts. It also has utility methods to load itself
"""A single Nmap scan, corresponding to a single invocation of Nmap. It is
a container for a list of hosts. It also has utility methods to load itself
from an Nmap XML file."""
def __init__(self):
self.scanner = None
@@ -41,7 +42,7 @@ class Scan(object):
self.post_script_results = []
def sort_hosts(self):
self.hosts.sort(key = lambda h: h.get_id())
self.hosts.sort(key=lambda h: h.get_id())
def load(self, f):
"""Load a scan from the Nmap XML in the file-like object f."""
@@ -66,7 +67,8 @@ class Scan(object):
attrs[u"args"] = self.args
if self.start_date is not None:
attrs[u"start"] = "%d" % time.mktime(self.start_date.timetuple())
attrs[u"startstr"] = self.start_date.strftime("%a %b %d %H:%M:%S %Y")
attrs[u"startstr"] = self.start_date.strftime(
"%a %b %d %H:%M:%S %Y")
if self.version is not None:
attrs[u"version"] = self.version
writer.startElement(u"nmaprun", attrs)
@@ -82,13 +84,17 @@ class Scan(object):
if self.args is not None:
elem.setAttribute(u"args", self.args)
if self.start_date is not None:
elem.setAttribute(u"start", "%d" % time.mktime(self.start_date.timetuple()))
elem.setAttribute(u"startstr", self.start_date.strftime("%a %b %d %H:%M:%S %Y"))
elem.setAttribute(
u"start", "%d" % time.mktime(self.start_date.timetuple()))
elem.setAttribute(
u"startstr",
self.start_date.strftime("%a %b %d %H:%M:%S %Y"))
if self.version is not None:
elem.setAttribute(u"version", self.version)
frag.appendChild(elem)
return frag
class Host(object):
"""A single host, with a state, addresses, host names, a dict mapping port
specs to Ports, and a list of OS matches. Host states are strings, or None
@@ -103,8 +109,8 @@ class Host(object):
self.script_results = []
def get_id(self):
"""Return an id that is used to determine if hosts are "the same" across
scans."""
"""Return an id that is used to determine if hosts are "the same"
across scans."""
if len(self.addresses) > 0:
return str(sorted(self.addresses)[0])
if len(self.hostnames) > 0:
@@ -142,8 +148,9 @@ class Host(object):
def extraports_string(self):
list = [(count, state) for (state, count) in self.extraports.items()]
# Reverse-sort by count.
list.sort(reverse = True)
return u", ".join([u"%d %s ports" % (count, state) for (count, state) in list])
list.sort(reverse=True)
return u", ".join(
[u"%d %s ports" % (count, state) for (count, state) in list])
def state_to_dom_fragment(self, document):
frag = document.createDocumentFragment()
@@ -189,7 +196,8 @@ class Host(object):
if len(self.hostnames) > 0:
hostnames_elem = document.createElement(u"hostnames")
for hostname in self.hostnames:
hostnames_elem.appendChild(self.hostname_to_dom_fragment(document, hostname))
hostnames_elem.appendChild(
self.hostname_to_dom_fragment(document, hostname))
elem.appendChild(hostnames_elem)
ports_elem = document.createElement(u"ports")
@@ -215,6 +223,7 @@ class Host(object):
frag.appendChild(elem)
return frag
class Address(object):
def __init__(self, s):
self.s = s
@@ -259,27 +268,34 @@ class Address(object):
# The sort_key method in the Address subclasses determines the order in which
# addresses are displayed. We do IPv4, then IPv6, then MAC.
class IPv4Address(Address):
type = property(lambda self: u"ipv4")
def sort_key(self):
return (0, self.s)
class IPv6Address(Address):
type = property(lambda self: u"ipv6")
def sort_key(self):
return (1, self.s)
class MACAddress(Address):
type = property(lambda self: u"mac")
def sort_key(self):
return (2, self.s)
class Port(object):
"""A single port, consisting of a port specification, a state, and a service
version. A specification, or "spec," is the 2-tuple (number, protocol). So
(10, "tcp") corresponds to the port 10/tcp. Port states are strings, or None
for "unknown"."""
def __init__(self, spec, state = None):
"""A single port, consisting of a port specification, a state, and a
service version. A specification, or "spec," is the 2-tuple (number,
protocol). So (10, "tcp") corresponds to the port 10/tcp. Port states are
strings, or None for "unknown"."""
def __init__(self, spec, state=None):
self.spec = spec
self.state = state
self.service = Service()
@@ -316,6 +332,7 @@ class Port(object):
frag.appendChild(elem)
return frag
class Service(object):
"""A service version as determined by -sV scan. Also contains the looked-up
port name if -sV wasn't used."""
@@ -331,6 +348,7 @@ class Service(object):
# self.devicetype = None
__hash__ = None
def __eq__(self, other):
return self.name == other.name \
and self.product == other.product \
@@ -379,12 +397,14 @@ class Service(object):
frag.appendChild(elem)
return frag
class ScriptResult(object):
def __init__(self):
self.id = None
self.output = None
__hash__ = None
def __eq__(self, other):
return self.id == other.id and self.output == other.output
@@ -413,23 +433,26 @@ class ScriptResult(object):
frag.appendChild(elem)
return frag
def format_banner(scan):
"""Format a startup banner more or less like Nmap does."""
scanner = u"Nmap"
if scan.scanner is not None and scan.scanner != u"nmap":
scanner = scan.scanner
parts = [ scanner ]
parts = [scanner]
if scan.version is not None:
parts.append(scan.version)
parts.append(u"scan")
if scan.start_date is not None:
parts.append(u"initiated %s" % scan.start_date.strftime("%a %b %d %H:%M:%S %Y"))
parts.append(u"initiated %s" % scan.start_date.strftime(
"%a %b %d %H:%M:%S %Y"))
if scan.args is not None:
parts.append(u"as: %s" % scan.args)
return u" ".join(parts)
def print_script_result_diffs_text(title, script_results_a, script_results_b,
script_result_diffs, f = sys.stdout):
script_result_diffs, f=sys.stdout):
table = Table(u"*")
for sr_diff in script_result_diffs:
sr_diff.append_to_port_table(table)
@@ -443,6 +466,7 @@ def print_script_result_diffs_text(title, script_results_a, script_results_b,
print >> f, u" %s:" % title
print >> f, table
def script_result_diffs_to_dom_fragment(elem, script_results_a,
script_results_b, script_result_diffs, document):
if len(script_results_a) == 0 and len(script_results_b) == 0:
@@ -464,12 +488,13 @@ def script_result_diffs_to_dom_fragment(elem, script_results_a,
elem.appendChild(sr_diff.to_dom_fragment(document))
return elem
def host_pairs(a, b):
"""Take hosts lists a and b, which must be sorted by id, and return pairs.
When the heads of both lists have the same ids, they are returned together.
Otherwise the one with the smaller id is returned, with an empty host as its
counterpart, and the one with the higher id will remain in its list for a
later iteration."""
Otherwise the one with the smaller id is returned, with an empty host as
its counterpart, and the one with the higher id will remain in its list for
a later iteration."""
i = 0
j = 0
while i < len(a) and j < len(b):
@@ -490,11 +515,13 @@ def host_pairs(a, b):
yield Host(), b[j]
j += 1
class ScanDiff(object):
"""An abtract class for different diff output types. Subclasses must define
various output methods."""
def __init__(self, scan_a, scan_b, f = sys.stdout):
"""Create a ScanDiff from the "before" scan_a and the "after" scan_b."""
def __init__(self, scan_a, scan_b, f=sys.stdout):
"""Create a ScanDiff from the "before" scan_a and the "after"
scan_b."""
self.scan_a = scan_a
self.scan_b = scan_b
self.f = f
@@ -505,7 +532,8 @@ class ScanDiff(object):
self.output_beginning()
pre_script_result_diffs = ScriptResultDiff.diff_lists(self.scan_a.pre_script_results, self.scan_b.pre_script_results)
pre_script_result_diffs = ScriptResultDiff.diff_lists(
self.scan_a.pre_script_results, self.scan_b.pre_script_results)
self.output_pre_scripts(pre_script_result_diffs)
cost = 0
@@ -518,15 +546,18 @@ class ScanDiff(object):
host = host_a or host_b
self.output_host_diff(h_diff)
post_script_result_diffs = ScriptResultDiff.diff_lists(self.scan_a.post_script_results, self.scan_b.post_script_results)
post_script_result_diffs = ScriptResultDiff.diff_lists(
self.scan_a.post_script_results,
self.scan_b.post_script_results)
self.output_post_scripts(post_script_result_diffs)
self.output_ending()
return cost
class ScanDiffText(ScanDiff):
def __init__(self, scan_a, scan_b, f = sys.stdout):
def __init__(self, scan_a, scan_b, f=sys.stdout):
ScanDiff.__init__(self, scan_a, scan_b, f)
def output_beginning(self):
@@ -555,8 +586,9 @@ class ScanDiffText(ScanDiff):
def output_ending(self):
pass
class ScanDiffXML(ScanDiff):
def __init__(self, scan_a, scan_b, f = sys.stdout):
def __init__(self, scan_a, scan_b, f=sys.stdout):
ScanDiff.__init__(self, scan_a, scan_b, f)
impl = xml.dom.minidom.getDOMImplementation()
@@ -566,7 +598,8 @@ class ScanDiffXML(ScanDiff):
def nmaprun_differs(self):
for attr in ("scanner", "version", "args", "start_date", "end_date"):
if getattr(self.scan_a, attr, None) != getattr(self.scan_b, attr, None):
if getattr(self.scan_a, attr, None) !=\
getattr(self.scan_b, attr, None):
return True
return False
@@ -576,10 +609,13 @@ class ScanDiffXML(ScanDiff):
self.writer.startElement(u"scandiff", {})
if self.nmaprun_differs():
self.writer.frag_a(self.scan_a.nmaprun_to_dom_fragment(self.document))
self.writer.frag_b(self.scan_b.nmaprun_to_dom_fragment(self.document))
self.writer.frag_a(
self.scan_a.nmaprun_to_dom_fragment(self.document))
self.writer.frag_b(
self.scan_b.nmaprun_to_dom_fragment(self.document))
elif verbose:
self.writer.frag(self.scan_a.nmaprun_to_dom_fragment(self.document))
self.writer.frag(
self.scan_a.nmaprun_to_dom_fragment(self.document))
def output_pre_scripts(self, pre_script_result_diffs):
if len(pre_script_result_diffs) > 0 or verbose:
@@ -611,9 +647,10 @@ class ScanDiffXML(ScanDiff):
self.writer.endElement(u"nmapdiff")
self.writer.endDocument()
class HostDiff(object):
"""A diff of two Hosts. It contains the two hosts, variables describing what
changed, and a list of PortDiffs and OS differences."""
"""A diff of two Hosts. It contains the two hosts, variables describing
what changed, and a list of PortDiffs and OS differences."""
def __init__(self, host_a, host_b):
self.host_a = host_a
self.host_b = host_b
@@ -639,12 +676,14 @@ class HostDiff(object):
self.id_changed = True
self.cost += 1
all_specs = list(set(self.host_a.ports.keys()).union(set(self.host_b.ports.keys())))
all_specs = list(
set(self.host_a.ports.keys()).union(
set(self.host_b.ports.keys())))
all_specs.sort()
for spec in all_specs:
# Currently we only compare ports with the same spec. This ignores
# the possibility that a service is moved lock, stock, and barrel to
# another port.
# the possibility that a service is moved lock, stock, and barrel
# to another port.
port_a = self.host_a.ports.get(spec)
port_b = self.host_b.ports.get(spec)
diff = PortDiff(port_a or Port(spec), port_b or Port(spec))
@@ -654,20 +693,24 @@ class HostDiff(object):
self.port_diffs[port] = diff
self.cost += diff.cost
os_diffs = difflib.SequenceMatcher(None, self.host_a.os, self.host_b.os)
os_diffs = difflib.SequenceMatcher(
None, self.host_a.os, self.host_b.os)
self.os_diffs = os_diffs.get_opcodes()
os_cost = len([x for x in self.os_diffs if x[0] != "equal"])
if os_cost > 0:
self.os_changed = True
self.cost += os_cost
extraports_a = tuple((count, state) for (state, count) in self.host_a.extraports.items())
extraports_b = tuple((count, state) for (state, count) in self.host_b.extraports.items())
extraports_a = tuple((count, state)
for (state, count) in self.host_a.extraports.items())
extraports_b = tuple((count, state)
for (state, count) in self.host_b.extraports.items())
if extraports_a != extraports_b:
self.extraports_changed = True
self.cost += 1
self.script_result_diffs = ScriptResultDiff.diff_lists(self.host_a.script_results, self.host_b.script_results)
self.script_result_diffs = ScriptResultDiff.diff_lists(
self.host_a.script_results, self.host_b.script_results)
self.cost += len(self.script_result_diffs)
def include_diff(self, diff):
@@ -680,7 +723,7 @@ class HostDiff(object):
return True
return diff.cost > 0
def print_text(self, f = sys.stdout):
def print_text(self, f=sys.stdout):
host_a = self.host_a
host_b = self.host_b
@@ -738,7 +781,8 @@ class HostDiff(object):
print >> f, u"-OS details:"
elif len(host_b.os) > 0:
print >> f, u"+OS details:"
# os_diffs is a list of 5-tuples returned by difflib.SequenceMatcher.
# os_diffs is a list of 5-tuples returned by
# difflib.SequenceMatcher.
for op, i1, i2, j1, j2 in self.os_diffs:
if op == "replace" or op == "delete":
for i in range(i1, i2):
@@ -809,15 +853,18 @@ class HostDiff(object):
hostnameset_a = set(host_a.hostnames)
hostnameset_b = set(host_b.hostnames)
for hostname in sorted(hostnameset_a.intersection(hostnameset_b)):
hostnames_elem.appendChild(host_a.hostname_to_dom_fragment(document, hostname))
hostnames_elem.appendChild(
host_a.hostname_to_dom_fragment(document, hostname))
a_elem = document.createElement(u"a")
for hostname in sorted(hostnameset_a - hostnameset_b):
a_elem.appendChild(host_a.hostname_to_dom_fragment(document, hostname))
a_elem.appendChild(
host_a.hostname_to_dom_fragment(document, hostname))
if a_elem.hasChildNodes():
hostnames_elem.appendChild(a_elem)
b_elem = document.createElement(u"b")
for hostname in sorted(hostnameset_b - hostnameset_a):
b_elem.appendChild(host_b.hostname_to_dom_fragment(document, hostname))
b_elem.appendChild(
host_b.hostname_to_dom_fragment(document, hostname))
if b_elem.hasChildNodes():
hostnames_elem.appendChild(b_elem)
if hostnames_elem.hasChildNodes():
@@ -848,21 +895,25 @@ class HostDiff(object):
# OS changes.
if self.os_changed or verbose:
os_elem = document.createElement(u"os")
# os_diffs is a list of 5-tuples returned by difflib.SequenceMatcher.
# os_diffs is a list of 5-tuples returned by
# difflib.SequenceMatcher.
for op, i1, i2, j1, j2 in self.os_diffs:
if op == "replace" or op == "delete":
a_elem = document.createElement(u"a")
for i in range(i1, i2):
a_elem.appendChild(host_a.os_to_dom_fragment(document, host_a.os[i]))
a_elem.appendChild(host_a.os_to_dom_fragment(
document, host_a.os[i]))
os_elem.appendChild(a_elem)
if op == "replace" or op == "insert":
b_elem = document.createElement(u"b")
for i in range(j1, j2):
b_elem.appendChild(host_b.os_to_dom_fragment(document, host_b.os[i]))
b_elem.appendChild(host_b.os_to_dom_fragment(
document, host_b.os[i]))
os_elem.appendChild(b_elem)
if op == "equal":
for i in range(i1, i2):
os_elem.appendChild(host_a.os_to_dom_fragment(document, host_a.os[i]))
os_elem.appendChild(host_a.os_to_dom_fragment(
document, host_a.os[i]))
if os_elem.hasChildNodes():
host_elem.appendChild(os_elem)
@@ -878,6 +929,7 @@ class HostDiff(object):
return frag
class PortDiff(object):
"""A diff of two Ports. It contains the two ports and the cost of changing
one into the other. If the cost is 0 then the two ports are the same."""
@@ -899,7 +951,8 @@ class PortDiff(object):
if self.port_a.service != self.port_b.service:
self.cost += 1
self.script_result_diffs = ScriptResultDiff.diff_lists(self.port_a.script_results, self.port_b.script_results)
self.script_result_diffs = ScriptResultDiff.diff_lists(
self.port_a.script_results, self.port_b.script_results)
self.cost += len(self.script_result_diffs)
# PortDiffs are inserted into a Table and then printed, not printed out
@@ -933,7 +986,8 @@ class PortDiff(object):
frag = document.createDocumentFragment()
portdiff_elem = document.createElement(u"portdiff")
frag.appendChild(portdiff_elem)
if self.port_a.spec == self.port_b.spec and self.port_a.state == self.port_b.state:
if (self.port_a.spec == self.port_b.spec and
self.port_a.state == self.port_b.state):
port_elem = document.createElement(u"port")
port_elem.setAttribute(u"portid", unicode(self.port_a.spec[0]))
port_elem.setAttribute(u"protocol", self.port_a.spec[1])
@@ -942,13 +996,16 @@ class PortDiff(object):
state_elem.setAttribute(u"state", self.port_a.state)
port_elem.appendChild(state_elem)
if self.port_a.service == self.port_b.service:
port_elem.appendChild(self.port_a.service.to_dom_fragment(document))
port_elem.appendChild(
self.port_a.service.to_dom_fragment(document))
else:
a_elem = document.createElement(u"a")
a_elem.appendChild(self.port_a.service.to_dom_fragment(document))
a_elem.appendChild(
self.port_a.service.to_dom_fragment(document))
port_elem.appendChild(a_elem)
b_elem = document.createElement(u"b")
b_elem.appendChild(self.port_b.service.to_dom_fragment(document))
b_elem.appendChild(
self.port_b.service.to_dom_fragment(document))
port_elem.appendChild(b_elem)
for sr_diff in self.script_result_diffs:
port_elem.appendChild(sr_diff.to_dom_fragment(document))
@@ -963,6 +1020,7 @@ class PortDiff(object):
return frag
class ScriptResultDiff(object):
def __init__(self, sr_a, sr_b):
"""One of sr_a and sr_b may be None."""
@@ -997,8 +1055,8 @@ class ScriptResultDiff(object):
return diffs
diff_lists = staticmethod(diff_lists)
# Script result diffs are appended to a port table rather than being printed
# directly, so append_to_port_table exists instead of print_text.
# Script result diffs are appended to a port table rather than being
# printed directly, so append_to_port_table exists instead of print_text.
def append_to_port_table(self, table):
a_lines = []
b_lines = []
@@ -1021,7 +1079,9 @@ class ScriptResultDiff(object):
def to_dom_fragment(self, document):
frag = document.createDocumentFragment()
if self.sr_a is not None and self.sr_b is not None and self.sr_a == self.sr_b:
if (self.sr_a is not None and
self.sr_b is not None and
self.sr_a == self.sr_b):
frag.appendChild(self.sr_a.to_dom_fragment(document))
else:
if self.sr_a is not None:
@@ -1034,12 +1094,13 @@ class ScriptResultDiff(object):
frag.appendChild(b_elem)
return frag
class Table(object):
"""A table of character data, like NmapOutputTable."""
def __init__(self, template):
"""template is a string consisting of "*" and other characters. Each "*"
is a left-justified space-padded field. All other characters are copied
to the output."""
"""template is a string consisting of "*" and other characters. Each
"*" is a left-justified space-padded field. All other characters are
copied to the output."""
self.widths = []
self.rows = []
self.prefix = u""
@@ -1101,10 +1162,12 @@ class Table(object):
lines.append(u"".join(parts).rstrip())
return u"\n".join(lines)
def warn(str):
"""Print a warning to stderr."""
print >> sys.stderr, str
class NmapContentHandler(xml.sax.handler.ContentHandler):
"""The xml.sax ContentHandler for the XML parser. It contains a Scan object
that is filled in and can be read back again once the parse method is
@@ -1139,8 +1202,8 @@ class NmapContentHandler(xml.sax.handler.ContentHandler):
}
def parent_element(self):
"""Return the name of the element containing the current one, or None if
this is the root element."""
"""Return the name of the element containing the current one, or None
if this is the root element."""
if len(self.element_stack) == 0:
return None
return self.element_stack[-1]
@@ -1164,9 +1227,10 @@ class NmapContentHandler(xml.sax.handler.ContentHandler):
def _start_nmaprun(self, name, attrs):
assert self.parent_element() == None
if attrs.has_key(u"start"):
if "start" in attrs:
start_timestamp = int(attrs.get(u"start"))
self.scan.start_date = datetime.datetime.fromtimestamp(start_timestamp)
self.scan.start_date = datetime.datetime.fromtimestamp(
start_timestamp)
self.scan.scanner = attrs.get(u"scanner")
self.scan.args = attrs.get(u"args")
self.scan.version = attrs.get(u"version")
@@ -1181,7 +1245,9 @@ class NmapContentHandler(xml.sax.handler.ContentHandler):
assert self.current_host is not None
state = attrs.get(u"state")
if state is None:
warn(u"%s element of host %s is missing the \"state\" attribute; assuming \"unknown\"." % (name, self.current_host.format_name()))
warn(u'%s element of host %s is missing the "state" attribute; '
'assuming \unknown\.' % (
name, self.current_host.format_name()))
return
self.current_host.state = state
@@ -1190,7 +1256,9 @@ class NmapContentHandler(xml.sax.handler.ContentHandler):
assert self.current_host is not None
addr = attrs.get(u"addr")
if addr is None:
warn(u"%s element of host %s is missing the \"addr\" attribute; skipping." % (name, self.current_host.format_name()))
warn(u'%s element of host %s is missing the "addr" '
'attribute; skipping.' % (
name, self.current_host.format_name()))
return
addrtype = attrs.get(u"addrtype", u"ipv4")
self.current_host.add_address(Address.new(addrtype, addr))
@@ -1200,7 +1268,9 @@ class NmapContentHandler(xml.sax.handler.ContentHandler):
assert self.current_host is not None
hostname = attrs.get(u"name")
if hostname is None:
warn(u"%s element of host %s is missing the \"name\" attribute; skipping." % (name, self.current_host.format_name()))
warn(u'%s element of host %s is missing the "name" '
'attribute; skipping.' % (
name, self.current_host.format_name()))
return
self.current_host.add_hostname(hostname)
@@ -1209,20 +1279,27 @@ class NmapContentHandler(xml.sax.handler.ContentHandler):
assert self.current_host is not None
state = attrs.get(u"state")
if state is None:
warn(u"%s element of host %s is missing the \"state\" attribute; assuming \"unknown\"." % (name, self.current_host.format_name()))
warn(u'%s element of host %s is missing the "state" '
'attribute; assuming "unknown".' % (
name, self.current_host.format_name()))
state = None
if state in self.current_host.extraports:
warn(u"Duplicate extraports state \"%s\" in host %s." % (state, self.current_host.format_name()))
warn(u'Duplicate extraports state "%s" in host %s.' % (
state, self.current_host.format_name()))
count = attrs.get(u"count")
if count is None:
warn(u"%s element of host %s is missing the \"count\" attribute; assuming 0." % (name, self.current_host.format_name()))
warn(u'%s element of host %s is missing the "count" '
'attribute; assuming 0.' % (
name, self.current_host.format_name()))
count = 0
else:
try:
count = int(count)
except ValueError:
warn(u"Can't convert extraports count \"%s\" to an integer in host %s; assuming 0." % (attrs[u"count"], self.current_host.format_name()))
warn(u"Can't convert extraports count \"%s\" "
"to an integer in host %s; assuming 0." % (
attrs[u"count"], self.current_host.format_name()))
count = 0
self.current_host.extraports[state] = count
@@ -1231,16 +1308,22 @@ class NmapContentHandler(xml.sax.handler.ContentHandler):
assert self.current_host is not None
portid_str = attrs.get(u"portid")
if portid_str is None:
warn(u"%s element of host %s missing the \"portid\" attribute; skipping." % (name, self.current_host.format_name()))
warn(u'%s element of host %s missing the "portid" '
'attribute; skipping.' % (
name, self.current_host.format_name()))
return
try:
portid = int(portid_str)
except ValueError:
warn(u"Can't convert portid \"%s\" to an integer in host %s; skipping port." % (portid_str, self.current_host.format_name()))
warn(u"Can't convert portid \"%s\" to an integer "
"in host %s; skipping port." % (
portid_str, self.current_host.format_name()))
return
protocol = attrs.get(u"protocol")
if protocol is None:
warn(u"%s element of host %s missing the \"protocol\" attribute; skipping." % (name, self.current_host.format_name()))
warn(u'%s element of host %s missing the "protocol" '
'attribute; skipping.' % (
name, self.current_host.format_name()))
return
self.current_port = Port((portid, protocol))
@@ -1249,8 +1332,10 @@ class NmapContentHandler(xml.sax.handler.ContentHandler):
assert self.current_host is not None
if self.current_port is None:
return
if not attrs.has_key(u"state"):
warn(u"%s element of port %s is missing the \"state\" attribute; assuming \"unknown\"." % (name, self.current_port.spec_string()))
if "state" not in attrs:
warn(u'%s element of port %s is missing the "state" '
'attribute; assuming "unknown".' % (
name, self.current_port.spec_string()))
return
self.current_port.state = attrs[u"state"]
self.current_host.add_port(self.current_port)
@@ -1270,12 +1355,13 @@ class NmapContentHandler(xml.sax.handler.ContentHandler):
result = ScriptResult()
result.id = attrs.get(u"id")
if result.id is None:
warn(u"%s element missing the \"id\" attribute; skipping." % name)
warn(u'%s element missing the "id" attribute; skipping.' % name)
return
result.output = attrs.get(u"output")
if result.output is None:
warn(u"%s element missing the \"output\" attribute; skipping." % name)
warn(u'%s element missing the "output" attribute; skipping.'
% name)
return
if self.parent_element() == u"prescript":
self.scan.pre_script_results.append(result)
@@ -1286,20 +1372,23 @@ class NmapContentHandler(xml.sax.handler.ContentHandler):
elif self.parent_element() == u"port":
self.current_port.script_results.append(result)
else:
warn(u"%s element not inside prescript, postscript, hostscript, or port element; ignoring." % name)
warn(u"%s element not inside prescript, postscript, hostscript, "
"or port element; ignoring." % name)
return
def _start_osmatch(self, name, attrs):
assert self.parent_element() == u"os"
assert self.current_host is not None
if not attrs.has_key(u"name"):
warn(u"%s element of host %s is missing the \"name\" attribute; skipping." % (name, self.current_host.format_name()))
if "name" not in attrs:
warn(u'%s element of host %s is missing the "name" '
'attribute; skipping.' % (
name, self.current_host.format_name()))
return
self.current_host.os.append(attrs[u"name"])
def _start_finished(self, name, attrs):
assert self.parent_element() == u"runstats"
if attrs.has_key(u"time"):
if "time" in attrs:
end_timestamp = int(attrs.get(u"time"))
self.scan.end_date = datetime.datetime.fromtimestamp(end_timestamp)
@@ -1311,6 +1400,7 @@ class NmapContentHandler(xml.sax.handler.ContentHandler):
self.current_port.script_results.sort()
self.current_port = None
class XMLWriter (xml.sax.saxutils.XMLGenerator):
def __init__(self, f):
xml.sax.saxutils.XMLGenerator.__init__(self, f, "utf-8")
@@ -1318,20 +1408,21 @@ class XMLWriter (xml.sax.saxutils.XMLGenerator):
def frag(self, frag):
for node in frag.childNodes:
node.writexml(self.f, newl = u"\n")
node.writexml(self.f, newl=u"\n")
def frag_a(self, frag):
self.startElement(u"a", {})
for node in frag.childNodes:
node.writexml(self.f, newl = u"\n")
node.writexml(self.f, newl=u"\n")
self.endElement(u"a")
def frag_b(self, frag):
self.startElement(u"b", {})
for node in frag.childNodes:
node.writexml(self.f, newl = u"\n")
node.writexml(self.f, newl=u"\n")
self.endElement(u"b")
def usage():
print u"""\
Usage: %s [option] FILE1 FILE2
@@ -1349,17 +1440,20 @@ EXIT_EQUAL = 0
EXIT_DIFFERENT = 1
EXIT_ERROR = 2
def usage_error(msg):
print >> sys.stderr, u"%s: %s" % (sys.argv[0], msg)
print >> sys.stderr, u"Try '%s -h' for help." % sys.argv[0]
sys.exit(EXIT_ERROR)
def main():
global verbose
output_format = None
try:
opts, input_filenames = getopt.gnu_getopt(sys.argv[1:], "hv", ["help", "text", "verbose", "xml"])
opts, input_filenames = getopt.gnu_getopt(
sys.argv[1:], "hv", ["help", "text", "verbose", "xml"])
except getopt.GetoptError, e:
usage_error(e.msg)
for o, a in opts:
@@ -1406,6 +1500,7 @@ def main():
else:
return EXIT_DIFFERENT
# Catch uncaught exceptions so they can produce an exit code of 2 (EXIT_ERROR),
# not 1 like they would by default.
def excepthook(type, value, tb):

View File

@@ -18,6 +18,7 @@ for x in dir(ndiff):
sys.dont_write_bytecode = dont_write_bytecode
del dont_write_bytecode
class scan_test(unittest.TestCase):
"""Test the Scan class."""
def test_empty(self):
@@ -54,7 +55,8 @@ class scan_test(unittest.TestCase):
scan.load_from_file("test-scans/complex.xml")
host = scan.hosts[0]
self.assertEqual(len(host.ports), 6)
self.assertEqual(set(host.extraports.items()), set([("filtered", 95), ("open|filtered", 99)]))
self.assertEqual(set(host.extraports.items()),
set([("filtered", 95), ("open|filtered", 99)]))
def test_nmaprun(self):
"""Test that nmaprun information is recorded."""
@@ -94,18 +96,20 @@ class scan_test(unittest.TestCase):
self.assertTrue(len(host.ports[(22, u"tcp")].script_results) > 0)
# This test is commented out because Nmap XML doesn't store any information
# about down hosts, not even the fact that they are down. Recovering the list of
# scanned hosts to infer which ones are down would involve parsing the targets
# out of the /nmaprun/@args attribute (which is non-trivial) and possibly
# looking up their addresses.
# about down hosts, not even the fact that they are down. Recovering the list
# of scanned hosts to infer which ones are down would involve parsing the
# targets out of the /nmaprun/@args attribute (which is non-trivial) and
# possibly looking up their addresses.
# def test_down_state(self):
# """Test that hosts that are not marked "up" are in the "down" state."""
# """Test that hosts that are not marked "up" are in the "down"
# state."""
# scan = Scan()
# scan.load_from_file("test-scans/down.xml")
# self.assertTrue(len(scan.hosts) == 1)
# host = scan.hosts[0]
# self.assertTrue(host.state == "down")
class host_test(unittest.TestCase):
"""Test the Host class."""
def test_empty(self):
@@ -191,6 +195,7 @@ class host_test(unittest.TestCase):
self.assertEqual(h.extraports.values()[0], 95)
self.assertEqual(h.state, "up")
class address_test(unittest.TestCase):
"""Test the Address class."""
def test_ipv4_new(self):
@@ -225,6 +230,7 @@ class address_test(unittest.TestCase):
self.assertEqual(e, e)
self.assertNotEqual(a, e)
class port_test(unittest.TestCase):
"""Test the Port class."""
def test_spec_string(self):
@@ -237,6 +243,7 @@ class port_test(unittest.TestCase):
p = Port((10, "tcp"))
self.assertEqual(p.state_string(), u"unknown")
class service_test(unittest.TestCase):
"""Test the Service class."""
def test_compare(self):
@@ -278,13 +285,16 @@ class service_test(unittest.TestCase):
serv.product = u"FooBar"
serv.version = u"1.2.3"
# Must match Nmap output.
self.assertEqual(serv.version_string(), u"%s %s" % (serv.product, serv.version))
self.assertEqual(serv.version_string(),
u"%s %s" % (serv.product, serv.version))
serv.extrainfo = u"misconfigured"
self.assertEqual(serv.version_string(), u"%s %s (%s)" % (serv.product, serv.version, serv.extrainfo))
self.assertEqual(serv.version_string(),
u"%s %s (%s)" % (serv.product, serv.version, serv.extrainfo))
class ScanDiffSub(ScanDiff):
"""A subclass of ScanDiff that counts diffs for testing."""
def __init__(self, scan_a, scan_b, f = sys.stdout):
def __init__(self, scan_a, scan_b, f=sys.stdout):
ScanDiff.__init__(self, scan_a, scan_b, f)
self.pre_script_result_diffs = []
self.post_script_result_diffs = []
@@ -305,6 +315,7 @@ class ScanDiffSub(ScanDiff):
def output_ending(self):
pass
class scan_diff_test(unittest.TestCase):
"""Test the ScanDiff class."""
def setUp(self):
@@ -374,6 +385,7 @@ class scan_diff_test(unittest.TestCase):
diff = ScanDiffSub(a, b)
self.assertEqual(diff.host_diffs, [])
class host_diff_test(unittest.TestCase):
"""Test the HostDiff class."""
def test_empty(self):
@@ -531,8 +543,8 @@ class host_diff_test(unittest.TestCase):
def test_diff_is_effective(self):
"""Test that a host diff is effective.
This means that if the recommended changes are applied to the first host
the hosts become the same."""
This means that if the recommended changes are applied to the first
host the hosts become the same."""
a = Host()
b = Host()
@@ -569,6 +581,7 @@ class host_diff_test(unittest.TestCase):
self.assertFalse(diff.extraports_changed)
self.assertEqual(diff.cost, 0)
class port_diff_test(unittest.TestCase):
"""Test the PortDiff class."""
def test_equal(self):
@@ -604,6 +617,7 @@ class port_diff_test(unittest.TestCase):
self.assertEqual(PortDiff(a, diff.port_a).cost, 0)
self.assertEqual(PortDiff(b, diff.port_b).cost, 0)
class table_test(unittest.TestCase):
"""Test the table class."""
def test_empty(self):
@@ -676,6 +690,7 @@ class table_test(unittest.TestCase):
t.append(("b"))
self.assertFalse(str(t).endswith("\n"))
class scan_diff_xml_test(unittest.TestCase):
def setUp(self):
a = Scan()
@@ -692,7 +707,9 @@ class scan_diff_xml_test(unittest.TestCase):
try:
document = xml.dom.minidom.parseString(self.xml)
except Exception, e:
self.fail(u"Parsing XML diff output caused the exception: %s" % str(e))
self.fail(u"Parsing XML diff output caused the exception: %s"
% str(e))
def scan_apply_diff(scan, diff):
"""Apply a scan diff to the given scan."""
@@ -702,6 +719,7 @@ def scan_apply_diff(scan, diff):
scan.hosts.append(host)
host_apply_diff(host, h_diff)
def host_apply_diff(host, diff):
"""Apply a host diff to the given host."""
if diff.state_changed:
@@ -739,10 +757,12 @@ def host_apply_diff(host, diff):
host.script_results[host.script_results.index(sr_a)] = sr_b
host.script_results.sort()
def call_quiet(args, **kwargs):
"""Run a command with subprocess.call and hide its output."""
return subprocess.call(args, stdout = subprocess.PIPE,
stderr = subprocess.STDOUT, env = {'PYTHONPATH': "."}, **kwargs)
return subprocess.call(args, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, env={'PYTHONPATH': "."}, **kwargs)
class exit_code_test(unittest.TestCase):
NDIFF = "./scripts/ndiff"

View File

@@ -6,29 +6,34 @@ import distutils.core
import distutils.cmd
import distutils.errors
class null_command(distutils.cmd.Command):
"""This is a dummy distutils command that does nothing. We use it to replace
the install_egg_info and avoid installing a .egg-info file, because there's
no option to disable that."""
"""This is a dummy distutils command that does nothing. We use it to
replace the install_egg_info and avoid installing a .egg-info file, because
there's no option to disable that."""
def initialize_options(self):
pass
def finalize_options(self):
pass
def run(self):
pass
class checked_install(distutils.command.install.install):
"""This is a wrapper around the install command that checks for an error
caused by not having the python-dev package installed. By default, distutils
gives a misleading error message: "invalid Python installation." """
caused by not having the python-dev package installed. By default,
distutils gives a misleading error message: "invalid Python installation."
"""
def finalize_options(self):
try:
distutils.command.install.install.finalize_options(self)
except distutils.errors.DistutilsPlatformError, e:
raise distutils.errors.DistutilsPlatformError(str(e) + "\n"
+ "Installing your distribution's python-dev package may solve this problem.")
raise distutils.errors.DistutilsPlatformError(str(e) + """
Installing your distribution's python-dev package may solve this problem.""")
distutils.core.setup(name = u"ndiff", scripts = [u"scripts/ndiff"],
py_modules = [u"ndiff"],
data_files = [(u"share/man/man1", [u"docs/ndiff.1"])],
cmdclass = {"install_egg_info": null_command, "install": checked_install})
distutils.core.setup(name=u"ndiff", scripts=[u"scripts/ndiff"],
py_modules=[u"ndiff"],
data_files=[(u"share/man/man1", [u"docs/ndiff.1"])],
cmdclass={"install_egg_info": null_command, "install": checked_install})

View File

@@ -18,20 +18,24 @@ VERBOSE = True
r = random.Random()
def hash(s):
digest = hashlib.sha512(s).hexdigest()
return int(digest, 16)
def anonymize_mac_address(addr):
r.seed(hash(addr))
nums = (0, 0, 0) + tuple(r.randrange(256) for i in range(3))
return u":".join(u"%02X" % x for x in nums)
def anonymize_ipv4_address(addr):
r.seed(hash(addr))
nums = (10,) + tuple(r.randrange(256) for i in range(3))
return u".".join(unicode(x) for x in nums)
def anonymize_ipv6_address(addr):
r.seed(hash(addr))
# RFC 4193.
@@ -43,6 +47,7 @@ def anonymize_ipv6_address(addr):
hostname_map = {}
address_map = {}
def anonymize_hostname(name):
if name in hostname_map:
return hostname_map[name]
@@ -60,6 +65,7 @@ mac_re = re.compile(r'\b([0-9a-fA-F]{2}:){5}[0-9a-fA-F]{2}\b')
ipv4_re = re.compile(r'\b([0-9]{1,3}\.){3}[0-9]{1,3}\b')
ipv6_re = re.compile(r'\b([0-9a-fA-F]{1,4}::?){3,}[0-9a-fA-F]{1,4}\b')
def anonymize_address(addr):
if addr in address_map:
return address_map[addr]
@@ -75,21 +81,25 @@ def anonymize_address(addr):
print >> sys.stderr, "Replace %s with %s" % (addr, address_map[addr])
return address_map[addr]
def repl_addr(match):
addr = match.group(0)
anon_addr = anonymize_address(addr)
return anon_addr
def repl_hostname_name(match):
name = match.group(1)
anon_name = anonymize_hostname(name)
return r'<hostname name="%s"' % anon_name
def repl_hostname(match):
name = match.group(1)
anon_name = anonymize_hostname(name)
return r'hostname="%s"' % anon_name
def anonymize_file(f):
for line in f:
repls = []
@@ -101,6 +111,7 @@ def anonymize_file(f):
line = re.sub(r' *\bservicefp="([^"]*)"', r'', line)
yield line
def main():
filename = sys.argv[1]
f = open(filename, "r")