1
0
mirror of https://github.com/nmap/nmap.git synced 2026-01-18 20:29:02 +00:00

Improved XML parsing speed.

Replaced long if/elif/else sequence of checks by a direct access
data structure based on an element_name -> callback mapping.
This commit is contained in:
henri
2012-12-23 08:35:28 +00:00
parent 73e6e9a2d9
commit 36f8adf2a6

View File

@@ -1117,6 +1117,25 @@ class NmapContentHandler(xml.sax.handler.ContentHandler):
self.current_host = None
self.current_port = None
self._start_elem_handlers = {
u"nmaprun": self._start_nmaprun,
u"host": self._start_host,
u"status": self._start_status,
u"address": self._start_address,
u"hostname": self._start_hostname,
u"extraports": self._start_extraports,
u"port": self._start_port,
u"state": self._start_state,
u"service": self._start_service,
u"script": self._start_script,
u"osmatch": self._start_osmatch,
u"finished": self._start_finished,
}
self._end_elem_handlers = {
u'host': self._end_host,
u'port': self._end_port,
}
def parent_element(self):
"""Return the name of the element containing the current one, or None if
this is the root element."""
@@ -1126,161 +1145,173 @@ class NmapContentHandler(xml.sax.handler.ContentHandler):
def startElement(self, name, attrs):
"""This method keeps track of element_stack. The real parsing work is
done in startElementAux. This is to make it easy for startElementAux to
done in the _start_*() handlers. This is to make it easy for them to
bail out on error."""
self.startElementAux(name, attrs)
handler = self._start_elem_handlers.get(name)
if handler is not None:
handler(name, attrs)
self.element_stack.append(name)
def endElement(self, name):
"""This method keeps track of element_stack. The real parsing work is
done in endElementAux."""
done in _end_*() handlers."""
self.element_stack.pop()
handler = self._end_elem_handlers.get(name)
if handler is not None:
handler(name)
self.endElementAux(name)
def _start_nmaprun(self, name, attrs):
assert self.parent_element() == None
if attrs.has_key(u"start"):
start_timestamp = int(attrs.get(u"start"))
self.scan.start_date = datetime.datetime.fromtimestamp(start_timestamp)
self.scan.scanner = attrs.get(u"scanner")
self.scan.args = attrs.get(u"args")
self.scan.version = attrs.get(u"version")
def startElementAux(self, name, attrs):
if name == u"nmaprun":
assert self.parent_element() == None
if attrs.has_key(u"start"):
start_timestamp = int(attrs.get(u"start"))
self.scan.start_date = datetime.datetime.fromtimestamp(start_timestamp)
self.scan.scanner = attrs.get(u"scanner")
self.scan.args = attrs.get(u"args")
self.scan.version = attrs.get(u"version")
elif name == u"host":
assert self.parent_element() == u"nmaprun"
self.current_host = Host()
self.scan.hosts.append(self.current_host)
elif name == u"status":
assert self.parent_element() == u"host"
assert self.current_host is not None
try:
state = attrs[u"state"]
except KeyError:
warn(u"%s element of host %s is missing the \"state\" attribute; assuming \"unknown\"." % (name, self.current_host.format_name()))
return
self.current_host.state = state
elif name == u"address":
assert self.parent_element() == u"host"
assert self.current_host is not None
try:
addr = attrs[u"addr"]
except KeyError:
warn(u"%s element of host %s is missing the \"addr\" attribute; skipping." % (name, self.current_host.format_name()))
return
addrtype = attrs.get(u"addrtype", u"ipv4")
self.current_host.add_address(Address.new(addrtype, addr))
elif name == u"hostname":
assert self.parent_element() == u"hostnames"
assert self.current_host is not None
try:
hostname = attrs[u"name"]
except KeyError:
warn(u"%s element of host %s is missing the \"name\" attribute; skipping." % (name, self.current_host.format_name()))
return
self.current_host.add_hostname(hostname)
elif name == u"extraports":
assert self.parent_element() == u"ports"
assert self.current_host is not None
try:
state = attrs[u"state"]
except KeyError:
warn(u"%s element of host %s is missing the \"state\" attribute; assuming \"unknown\"." % (name, self.current_host.format_name()))
state = None
if state in self.current_host.extraports:
warn(u"Duplicate extraports state \"%s\" in host %s." % (state, self.current_host.format_name()))
try:
count = int(attrs[u"count"])
except KeyError:
warn(u"%s element of host %s is missing the \"count\" attribute; assuming 0." % (name, self.current_host.format_name()))
count = 0
except ValueError:
warn(u"Can't convert extraports count \"%s\" to an integer in host %s; assuming 0." % (attrs[u"count"], self.current_host.format_name()))
count = 0
self.current_host.extraports[state] = count
elif name == u"port":
assert self.parent_element() == u"ports"
assert self.current_host is not None
try:
portid_str = attrs[u"portid"]
except KeyError:
warn(u"%s element of host %s missing the \"portid\" attribute; skipping." % (name, self.current_host.format_name()))
return
try:
portid = int(portid_str)
except ValueError:
warn(u"Can't convert portid \"%s\" to an integer in host %s; skipping port." % (portid_str, self.current_host.format_name()))
return
try:
protocol = attrs[u"protocol"]
except KeyError:
warn(u"%s element of host %s missing the \"protocol\" attribute; skipping." % (name, self.current_host.format_name()))
return
self.current_port = Port((portid, protocol))
elif name == u"state":
assert self.parent_element() == u"port"
assert self.current_host is not None
if self.current_port is None:
return
if not attrs.has_key(u"state"):
warn(u"%s element of port %s is missing the \"state\" attribute; assuming \"unknown\"." % (name, self.current_port.spec_string()))
return
self.current_port.state = attrs[u"state"]
self.current_host.add_port(self.current_port)
elif name == u"service":
assert self.parent_element() == u"port"
assert self.current_host is not None
if self.current_port is None:
return
self.current_port.service.name = attrs.get(u"name")
self.current_port.service.product = attrs.get(u"product")
self.current_port.service.version = attrs.get(u"version")
self.current_port.service.extrainfo = attrs.get(u"extrainfo")
self.current_port.service.tunnel = attrs.get(u"tunnel")
elif name == u"script":
result = ScriptResult()
try:
result.id = attrs[u"id"]
except KeyError:
warn(u"%s element missing the \"id\" attribute; skipping." % name)
return
try:
result.output = attrs[u"output"]
except KeyError:
warn(u"%s element missing the \"output\" attribute; skipping." % name)
return
if self.parent_element() == u"prescript":
self.scan.pre_script_results.append(result)
elif self.parent_element() == u"postscript":
self.scan.post_script_results.append(result)
elif self.parent_element() == u"hostscript":
self.current_host.script_results.append(result)
elif self.parent_element() == u"port":
self.current_port.script_results.append(result)
else:
warn(u"%s element not inside prescript, postscript, hostscript, or port element; ignoring." % name)
return
elif name == u"osmatch":
assert self.parent_element() == u"os"
assert self.current_host is not None
if not attrs.has_key(u"name"):
warn(u"%s element of host %s is missing the \"name\" attribute; skipping." % (name, self.current_host.format_name()))
return
self.current_host.os.append(attrs[u"name"])
elif name == u"finished":
assert self.parent_element() == u"runstats"
if attrs.has_key(u"time"):
end_timestamp = int(attrs.get(u"time"))
self.scan.end_date = datetime.datetime.fromtimestamp(end_timestamp)
def _start_host(self, name, attrs):
assert self.parent_element() == u"nmaprun"
self.current_host = Host()
self.scan.hosts.append(self.current_host)
def endElementAux(self, name):
if name == u"host":
self.current_host.script_results.sort()
self.current_host = None
elif name == u"port":
self.current_port.script_results.sort()
self.current_port = None
def _start_status(self, name, attrs):
assert self.parent_element() == u"host"
assert self.current_host is not None
try:
state = attrs[u"state"]
except KeyError:
warn(u"%s element of host %s is missing the \"state\" attribute; assuming \"unknown\"." % (name, self.current_host.format_name()))
return
self.current_host.state = state
def _start_address(self, name, attrs):
assert self.parent_element() == u"host"
assert self.current_host is not None
try:
addr = attrs[u"addr"]
except KeyError:
warn(u"%s element of host %s is missing the \"addr\" attribute; skipping." % (name, self.current_host.format_name()))
return
addrtype = attrs.get(u"addrtype", u"ipv4")
self.current_host.add_address(Address.new(addrtype, addr))
def _start_hostname(self, name, attrs):
assert self.parent_element() == u"hostnames"
assert self.current_host is not None
try:
hostname = attrs[u"name"]
except KeyError:
warn(u"%s element of host %s is missing the \"name\" attribute; skipping." % (name, self.current_host.format_name()))
return
self.current_host.add_hostname(hostname)
def _start_extraports(self, name, attrs):
assert self.parent_element() == u"ports"
assert self.current_host is not None
try:
state = attrs[u"state"]
except KeyError:
warn(u"%s element of host %s is missing the \"state\" attribute; assuming \"unknown\"." % (name, self.current_host.format_name()))
state = None
if state in self.current_host.extraports:
warn(u"Duplicate extraports state \"%s\" in host %s." % (state, self.current_host.format_name()))
try:
count = int(attrs[u"count"])
except KeyError:
warn(u"%s element of host %s is missing the \"count\" attribute; assuming 0." % (name, self.current_host.format_name()))
count = 0
except ValueError:
warn(u"Can't convert extraports count \"%s\" to an integer in host %s; assuming 0." % (attrs[u"count"], self.current_host.format_name()))
count = 0
self.current_host.extraports[state] = count
def _start_port(self, name, attrs):
assert self.parent_element() == u"ports"
assert self.current_host is not None
try:
portid_str = attrs[u"portid"]
except KeyError:
warn(u"%s element of host %s missing the \"portid\" attribute; skipping." % (name, self.current_host.format_name()))
return
try:
portid = int(portid_str)
except ValueError:
warn(u"Can't convert portid \"%s\" to an integer in host %s; skipping port." % (portid_str, self.current_host.format_name()))
return
try:
protocol = attrs[u"protocol"]
except KeyError:
warn(u"%s element of host %s missing the \"protocol\" attribute; skipping." % (name, self.current_host.format_name()))
return
self.current_port = Port((portid, protocol))
def _start_state(self, name, attrs):
assert self.parent_element() == u"port"
assert self.current_host is not None
if self.current_port is None:
return
if not attrs.has_key(u"state"):
warn(u"%s element of port %s is missing the \"state\" attribute; assuming \"unknown\"." % (name, self.current_port.spec_string()))
return
self.current_port.state = attrs[u"state"]
self.current_host.add_port(self.current_port)
def _start_service(self, name, attrs):
assert self.parent_element() == u"port"
assert self.current_host is not None
if self.current_port is None:
return
self.current_port.service.name = attrs.get(u"name")
self.current_port.service.product = attrs.get(u"product")
self.current_port.service.version = attrs.get(u"version")
self.current_port.service.extrainfo = attrs.get(u"extrainfo")
self.current_port.service.tunnel = attrs.get(u"tunnel")
def _start_script(self, name, attrs):
result = ScriptResult()
try:
result.id = attrs[u"id"]
except KeyError:
warn(u"%s element missing the \"id\" attribute; skipping." % name)
return
try:
result.output = attrs[u"output"]
except KeyError:
warn(u"%s element missing the \"output\" attribute; skipping." % name)
return
if self.parent_element() == u"prescript":
self.scan.pre_script_results.append(result)
elif self.parent_element() == u"postscript":
self.scan.post_script_results.append(result)
elif self.parent_element() == u"hostscript":
self.current_host.script_results.append(result)
elif self.parent_element() == u"port":
self.current_port.script_results.append(result)
else:
warn(u"%s element not inside prescript, postscript, hostscript, or port element; ignoring." % name)
return
def _start_osmatch(self, name, attrs):
assert self.parent_element() == u"os"
assert self.current_host is not None
if not attrs.has_key(u"name"):
warn(u"%s element of host %s is missing the \"name\" attribute; skipping." % (name, self.current_host.format_name()))
return
self.current_host.os.append(attrs[u"name"])
def _start_finished(self, name, attrs):
assert self.parent_element() == u"runstats"
if attrs.has_key(u"time"):
end_timestamp = int(attrs.get(u"time"))
self.scan.end_date = datetime.datetime.fromtimestamp(end_timestamp)
def _end_host(self, name):
self.current_host.script_results.sort()
self.current_host = None
def _end_port(self, name):
self.current_port.script_results.sort()
self.current_port = None
class XMLWriter (xml.sax.saxutils.XMLGenerator):
def __init__(self, f):