mirror of
https://github.com/nmap/nmap.git
synced 2025-12-07 05:01:29 +00:00
for file in `grep "* including the terms and conditions of this license text as well. \*" * -r --files-with-match `; do sed "s/\* including the terms and conditions of this license text as well. \*/* including the terms and conditions of this license text as well. */g" -i $file; done
693 lines
31 KiB
Python
693 lines
31 KiB
Python
#!/usr/bin/env python
|
|
# -*- coding: utf-8 -*-
|
|
|
|
# ***********************IMPORTANT NMAP LICENSE TERMS************************
|
|
# * *
|
|
# * The Nmap Security Scanner is (C) 1996-2013 Insecure.Com LLC. Nmap is *
|
|
# * also a registered trademark of Insecure.Com LLC. This program is free *
|
|
# * software; you may redistribute and/or modify it under the terms of the *
|
|
# * GNU General Public License as published by the Free Software *
|
|
# * Foundation; Version 2 ("GPL"), BUT ONLY WITH ALL OF THE CLARIFICATIONS *
|
|
# * AND EXCEPTIONS DESCRIBED HEREIN. This guarantees your right to use, *
|
|
# * modify, and redistribute this software under certain conditions. If *
|
|
# * you wish to embed Nmap technology into proprietary software, we sell *
|
|
# * alternative licenses (contact sales@nmap.com). Dozens of software *
|
|
# * vendors already license Nmap technology such as host discovery, port *
|
|
# * scanning, OS detection, version detection, and the Nmap Scripting *
|
|
# * Engine. *
|
|
# * *
|
|
# * Note that the GPL places important restrictions on "derivative works", *
|
|
# * yet it does not provide a detailed definition of that term. To avoid *
|
|
# * misunderstandings, we interpret that term as broadly as copyright law *
|
|
# * allows. For example, we consider an application to constitute a *
|
|
# * derivative work for the purpose of this license if it does any of the *
|
|
# * following with any software or content covered by this license *
|
|
# * ("Covered Software"): *
|
|
# * *
|
|
# * o Integrates source code from Covered Software. *
|
|
# * *
|
|
# * o Reads or includes copyrighted data files, such as Nmap's nmap-os-db *
|
|
# * or nmap-service-probes. *
|
|
# * *
|
|
# * o Is designed specifically to execute Covered Software and parse the *
|
|
# * results (as opposed to typical shell or execution-menu apps, which will *
|
|
# * execute anything you tell them to). *
|
|
# * *
|
|
# * o Includes Covered Software in a proprietary executable installer. The *
|
|
# * installers produced by InstallShield are an example of this. Including *
|
|
# * Nmap with other software in compressed or archival form does not *
|
|
# * trigger this provision, provided appropriate open source decompression *
|
|
# * or de-archiving software is widely available for no charge. For the *
|
|
# * purposes of this license, an installer is considered to include Covered *
|
|
# * Software even if it actually retrieves a copy of Covered Software from *
|
|
# * another source during runtime (such as by downloading it from the *
|
|
# * Internet). *
|
|
# * *
|
|
# * o Links (statically or dynamically) to a library which does any of the *
|
|
# * above. *
|
|
# * *
|
|
# * o Executes a helper program, module, or script to do any of the above. *
|
|
# * *
|
|
# * This list is not exclusive, but is meant to clarify our interpretation *
|
|
# * of derived works with some common examples. Other people may interpret *
|
|
# * the plain GPL differently, so we consider this a special exception to *
|
|
# * the GPL that we apply to Covered Software. Works which meet any of *
|
|
# * these conditions must conform to all of the terms of this license, *
|
|
# * particularly including the GPL Section 3 requirements of providing *
|
|
# * source code and allowing free redistribution of the work as a whole. *
|
|
# * *
|
|
# * As another special exception to the GPL terms, Insecure.Com LLC grants *
|
|
# * permission to link the code of this program with any version of the *
|
|
# * OpenSSL library which is distributed under a license identical to that *
|
|
# * listed in the included docs/licenses/OpenSSL.txt file, and distribute *
|
|
# * linked combinations including the two. *
|
|
# * *
|
|
# * Any redistribution of Covered Software, including any derived works, *
|
|
# * must obey and carry forward all of the terms of this license, including *
|
|
# * obeying all GPL rules and restrictions. For example, source code of *
|
|
# * the whole work must be provided and free redistribution must be *
|
|
# * allowed. All GPL references to "this License", are to be treated as *
|
|
# * including the terms and conditions of this license text as well. *
|
|
# * *
|
|
# * Because this license imposes special exceptions to the GPL, Covered *
|
|
# * Work may not be combined (even as part of a larger work) with plain GPL *
|
|
# * software. The terms, conditions, and exceptions of this license must *
|
|
# * be included as well. This license is incompatible with some other open *
|
|
# * source licenses as well. In some cases we can relicense portions of *
|
|
# * Nmap or grant special permissions to use it in other open source *
|
|
# * software. Please contact fyodor@nmap.org with any such requests. *
|
|
# * Similarly, we don't incorporate incompatible open source software into *
|
|
# * Covered Software without special permission from the copyright holders. *
|
|
# * *
|
|
# * If you have any questions about the licensing restrictions on using *
|
|
# * Nmap in other works, are happy to help. As mentioned above, we also *
|
|
# * offer alternative license to integrate Nmap into proprietary *
|
|
# * applications and appliances. These contracts have been sold to dozens *
|
|
# * of software vendors, and generally include a perpetual license as well *
|
|
# * as providing for priority support and updates. They also fund the *
|
|
# * continued development of Nmap. Please email sales@nmap.com for further *
|
|
# * information. *
|
|
# * *
|
|
# * If you have received a written license agreement or contract for *
|
|
# * Covered Software stating terms other than these, you may choose to use *
|
|
# * and redistribute Covered Software under those terms instead of these. *
|
|
# * *
|
|
# * Source is provided to this software because we believe users have a *
|
|
# * right to know exactly what a program is going to do before they run it. *
|
|
# * This also allows you to audit the software for security holes (none *
|
|
# * have been found so far). *
|
|
# * *
|
|
# * Source code also allows you to port Nmap to new platforms, fix bugs, *
|
|
# * and add new features. You are highly encouraged to send your changes *
|
|
# * to the dev@nmap.org mailing list for possible incorporation into the *
|
|
# * main distribution. By sending these changes to Fyodor or one of the *
|
|
# * Insecure.Org development mailing lists, or checking them into the Nmap *
|
|
# * source code repository, it is understood (unless you specify otherwise) *
|
|
# * that you are offering the Nmap Project (Insecure.Com LLC) the *
|
|
# * unlimited, non-exclusive right to reuse, modify, and relicense the *
|
|
# * code. Nmap will always be available Open Source, but this is important *
|
|
# * because the inability to relicense code has caused devastating problems *
|
|
# * for other Free Software projects (such as KDE and NASM). We also *
|
|
# * occasionally relicense the code to third parties as discussed above. *
|
|
# * If you wish to specify special license conditions of your *
|
|
# * contributions, just say so when you send them. *
|
|
# * *
|
|
# * This program is distributed in the hope that it will be useful, but *
|
|
# * WITHOUT ANY WARRANTY; without even the implied warranty of *
|
|
# * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the Nmap *
|
|
# * license file for more details (it's in a COPYING file included with *
|
|
# * Nmap, and also available from https://svn.nmap.org/nmap/COPYING *
|
|
# * *
|
|
# ***************************************************************************/
|
|
|
|
import os
|
|
import re
|
|
import unittest
|
|
import zenmapCore
|
|
import zenmapCore.NmapParser
|
|
from zenmapGUI.SearchGUI import SearchParser
|
|
from SearchResult import HostSearch
|
|
|
|
class NetworkInventory(object):
|
|
"""This class acts as a container for aggregated scans. It is also
|
|
responsible for opening/saving the aggregation from/to persistent storage."""
|
|
def __init__(self, filename=None):
|
|
# A list of all scans that make up this inventory
|
|
self.scans = []
|
|
|
|
# A dictionary mapping parsed scans to filenames they were loaded from
|
|
self.filenames = {}
|
|
|
|
# A dictionary mapping IP addresses into HostInfo objects
|
|
self.hosts = {}
|
|
|
|
if filename != None:
|
|
self.open_from_file(filename)
|
|
|
|
def add_scan(self, scan, filename=None):
|
|
"""Adds a scan to the list of scans. The object passed as an argument
|
|
should be a parsed nmap result."""
|
|
from time import localtime
|
|
|
|
for host in scan.get_hosts():
|
|
addr = ""
|
|
if host.ipv6 is not None:
|
|
# This is an IPv6 host, so we add the IPv6 address to the map
|
|
addr = host.ipv6["addr"]
|
|
elif host.ip is not None:
|
|
# IPv4
|
|
addr = host.ip["addr"]
|
|
|
|
if addr not in self.hosts:
|
|
# Add this host to the hosts dictionary, mapped by IP address
|
|
self.hosts[addr] = host.make_clone()
|
|
else:
|
|
# This host is already present in the host list, so we need to update its
|
|
# info with the info held in the current host object
|
|
old_host = self.hosts[addr]
|
|
# We need to find old_host's scan date
|
|
old_date = localtime(0)
|
|
for old_scan in self.scans:
|
|
if old_host in old_scan.get_hosts():
|
|
old_date = old_scan.get_date()
|
|
new_date = scan.get_date()
|
|
self._update_host_info(old_host, host, old_date, new_date, scan)
|
|
|
|
self.scans.append(scan)
|
|
|
|
if filename != None:
|
|
basename = os.path.basename(filename)
|
|
|
|
if basename in self.filenames.values():
|
|
# We need to generate a new filename, since this basename already exists
|
|
base = basename
|
|
ext = "xml"
|
|
try:
|
|
base, ext = basename.rsplit(".", 1)
|
|
except ValueError:
|
|
pass
|
|
|
|
counter = 2
|
|
while basename in self.filenames.values():
|
|
basename = "%s %s.%s" % (base, counter, ext)
|
|
counter += 1
|
|
|
|
self.filenames[scan] = basename
|
|
|
|
def remove_scan(self, scan):
|
|
"""Removes a scan and any host information it contained from the inventory."""
|
|
# Note: If a scan is passed in that isn't in the inventory then this
|
|
# method will throw a ValueError Exception and will not finish
|
|
# Remove the scan from our scan list
|
|
self.scans.remove(scan)
|
|
|
|
# Clear the host dictionary
|
|
self.hosts = {}
|
|
|
|
# Remember the scan list
|
|
scans = self.scans
|
|
|
|
# Empty it
|
|
self.scans = []
|
|
|
|
# Delete the filename entry, if any
|
|
if scan in self.filenames:
|
|
del self.filenames[scan]
|
|
|
|
# For each scan in the remembered list, append it to the scan list and update
|
|
# the host list accordingly
|
|
for scan in scans:
|
|
self.add_scan(scan)
|
|
|
|
def _update_host_info(self, old_host, new_host, old_date, new_date, new_scan):
|
|
"""This function is called when a host needs to be added to the hosts
|
|
dictionary, but another HostInfo object for that host already exists
|
|
in the dictionary (from a previous scan). In that case, we need to
|
|
update the original HostInfo object so that it holds information from
|
|
both scans."""
|
|
|
|
# Ports
|
|
old_list = []
|
|
old_list.extend(old_host.ports)
|
|
for new_port in new_host.ports:
|
|
# Check if new_port is already present in old_host's ports
|
|
for old_port in old_host.ports:
|
|
if old_port["portid"] == new_port["portid"] and old_port["protocol"] == new_port["protocol"]:
|
|
old_list.remove(old_port)
|
|
# We update old_host's port information to reflect the latest known port state
|
|
if old_date < new_date:
|
|
index = old_host.ports.index(old_port)
|
|
old_host.ports[index] = new_port
|
|
# Finished processing this new_port, we jump to the next one
|
|
break
|
|
else:
|
|
# This new_port isn't present in old_host, so we simply append it to
|
|
# old_host's port info
|
|
old_host.ports.append(new_port)
|
|
|
|
ports = new_scan.get_port_protocol_dict()
|
|
|
|
#remove ports which are no longer up
|
|
if old_date < new_date:
|
|
for defunct_port in old_list:
|
|
#Check if defunt_port is in ports and that the protocol matches
|
|
port_number = int(defunct_port['portid'])
|
|
if port_number in ports:
|
|
if defunct_port['protocol'] in ports[port_number]:
|
|
old_host.ports.remove(defunct_port)
|
|
|
|
# extraports, ipidsequence, state, tcpsequence, tcptssequence, uptime
|
|
if old_date < new_date:
|
|
old_host.extraports = new_host.extraports
|
|
old_host.ipidsequence = new_host.ipidsequence
|
|
old_host.state = new_host.state
|
|
old_host.tcpsequence = new_host.tcpsequence
|
|
old_host.tcptssequence = new_host.tcptssequence
|
|
old_host.uptime = new_host.uptime
|
|
|
|
# Comment
|
|
if old_host.comment == "":
|
|
old_host.comment = new_host.comment
|
|
elif new_host.comment != "":
|
|
old_host.comment = "%s\n\n%s" % (old_host.comment, new_host.comment)
|
|
|
|
# Hostnames
|
|
# Replace old_host's hostname with new_host's if old_host has no
|
|
# hostname or new_host's is newer.
|
|
if len(new_host.hostnames) > 0 and \
|
|
(len(old_host.hostnames) == 0 or old_date < new_date):
|
|
old_host.hostnames = new_host.hostnames
|
|
|
|
# MAC address
|
|
# If there was no MAC address set in old_host, set it to whatever is in new_host.mac.
|
|
# Do the same if both hosts have a MAC address set, but new_host's address is newer.
|
|
if old_host.mac is None or \
|
|
(old_host.mac is not None and new_host.mac is not None and old_date < new_date):
|
|
old_host.mac = new_host.mac
|
|
|
|
# OS detection fields
|
|
# Replace old_host's OS detection fields with new_host's if old_host has no
|
|
# OS detection info or new_host's info is newer.
|
|
if len(new_host.osmatches) > 0 and (len(old_host.osmatches) == 0 or old_date < new_date):
|
|
old_host.osmatches = new_host.osmatches
|
|
old_host.ports_used = new_host.ports_used
|
|
|
|
# Traceroute information
|
|
if len(new_host.trace) > 0 and (len(old_host.trace) == 0 or old_date < new_date):
|
|
old_host.trace = new_host.trace
|
|
|
|
def get_scans(self):
|
|
return self.scans
|
|
|
|
def get_hosts(self):
|
|
return self.hosts.values()
|
|
|
|
def get_hosts_up(self):
|
|
return filter(lambda h: h.get_state() == 'up', self.hosts.values())
|
|
|
|
def get_hosts_down(self):
|
|
return filter(lambda h: h.get_state() == 'down', self.hosts.values())
|
|
|
|
def open_from_file(self, path):
|
|
"""Loads a scan from the given file."""
|
|
from zenmapCore.NmapParser import NmapParser
|
|
|
|
parsed = NmapParser(path)
|
|
parsed.parse()
|
|
self.add_scan(parsed, path)
|
|
|
|
def open_from_dir(self, path):
|
|
"""Loads all scans from the given directory into the network inventory."""
|
|
from zenmapCore.NmapParser import NmapParser
|
|
|
|
for filename in os.listdir(path):
|
|
fullpath = os.path.join(path, filename)
|
|
if os.path.isdir(fullpath):
|
|
continue
|
|
parsed = NmapParser(fullpath)
|
|
parsed.parse()
|
|
self.add_scan(parsed, filename=fullpath)
|
|
|
|
def save_to_file(self, path, index, format = "xml"):
|
|
"""Saves the scan with the given list index into a file with a given
|
|
path. With format = "xml", saves Nmap XML; otherwise saves plain text
|
|
output."""
|
|
f = open(path, 'w')
|
|
if format == "xml":
|
|
self.get_scans()[index].write_xml(f)
|
|
self.filenames[self.get_scans()[index]] = f
|
|
else:
|
|
self.get_scans()[index].write_text(f)
|
|
f.close()
|
|
|
|
def _generate_filenames(self, path):
|
|
"""Generates filenames for all scans that don't already have a filename."""
|
|
# The directory must not contain filenames other than those in the self.filenames dictionary
|
|
for filename in os.listdir(path):
|
|
if os.path.basename(filename) not in self.filenames.values():
|
|
raise Exception("The destination directory contains a file (%s) that's not a part "
|
|
"of the current inventory. The inventory will not be saved." %
|
|
os.path.basename(filename))
|
|
|
|
for scan in self.scans:
|
|
if scan in self.filenames:
|
|
# This scan already has a filename
|
|
continue
|
|
|
|
date = "%04d%02d%02d%02d%02d" % (scan.date[0], scan.date[1], scan.date[2],
|
|
scan.date[3], scan.date[4])
|
|
filename = scan.get_scan_name()
|
|
|
|
# Prepend the date
|
|
filename = "%s %s" % (date, filename)
|
|
|
|
# Sanitize the filename
|
|
for char in ["\"", "'", "/", "\\", "?", "*", ":", ";"]:
|
|
if char in filename:
|
|
filename = filename.replace(char, "_")
|
|
|
|
# Filename length check
|
|
# (http://en.wikipedia.org/wiki/Filename#Comparison_of_file_name_limitations)
|
|
if len(filename) > 250:
|
|
filename = filename[:250]
|
|
|
|
# TODO: Filename security checks?
|
|
|
|
# Try to open the file in append mode. If file.tell() returns a greater-than-zero
|
|
# value, this means that the file already exists and has some data in it, so we
|
|
# choose another filename until we successfully open a zero-length file.
|
|
filename_full = filename + ".xml"
|
|
counter = 2
|
|
while filename_full in self.filenames.values():
|
|
# There's already a scan with this filename, so we generate a new name by appending
|
|
# the counter value before the file extension.
|
|
filename_full = "%s %s.xml" % (filename, str(counter))
|
|
counter += 1
|
|
|
|
# Add the filename to the list of saved filenames
|
|
self.filenames[scan] = filename_full
|
|
|
|
def save_to_dir(self, path):
|
|
"""Saves all scans in the inventory into a given directory and returns
|
|
a list of (full-path) filenames that were used to save the scans."""
|
|
self._generate_filenames(path)
|
|
|
|
for scan, filename in self.filenames.iteritems():
|
|
f = open(os.path.join(path, filename), "w")
|
|
scan.write_xml(f)
|
|
f.close()
|
|
|
|
return self.filenames.values()
|
|
|
|
def open_from_db(self, id):
|
|
pass
|
|
|
|
def save_to_db(self):
|
|
# For now, this saves each scan making up the inventory separately in
|
|
# the database.
|
|
from time import time
|
|
from cStringIO import StringIO
|
|
from zenmapCore.UmitDB import Scans
|
|
|
|
for parsed in self.get_scans():
|
|
f = StringIO()
|
|
parsed.write_xml(f)
|
|
|
|
scan = Scans(scan_name = parsed.scan_name,
|
|
nmap_xml_output = f.getvalue(),
|
|
date = time())
|
|
|
|
class FilteredNetworkInventory(NetworkInventory):
|
|
def __init__(self, filename = None):
|
|
NetworkInventory.__init__(self, filename)
|
|
|
|
# A dictionary listing host filtering criteria
|
|
self.search_dict = {}
|
|
self.filtered_hosts = []
|
|
search_keywords = dict()
|
|
search_keywords["target"] = "target"
|
|
search_keywords["t"] = "target"
|
|
search_keywords["inroute"] = "in_route"
|
|
search_keywords["ir"] = "in_route"
|
|
search_keywords["hostname"] = "hostname"
|
|
search_keywords["service"] = "service"
|
|
search_keywords["s"] = "service"
|
|
search_keywords["os"] = "os"
|
|
search_keywords["open"] = "open"
|
|
search_keywords["op"] = "open"
|
|
search_keywords["closed"] = "closed"
|
|
search_keywords["cp"] = "closed"
|
|
search_keywords["filtered"] = "filtered"
|
|
search_keywords["fp"] = "filtered"
|
|
search_keywords["unfiltered"] = "unfiltered"
|
|
search_keywords["ufp"] = "unfiltered"
|
|
search_keywords["open|filtered"] = "open_filtered"
|
|
search_keywords["ofp"] = "open_filtered"
|
|
search_keywords["closed|filtered"] = "closed_filtered"
|
|
search_keywords["cfp"] = "closed_filtered"
|
|
self.search_parser = SearchParser(self, search_keywords)
|
|
|
|
# FIXME: This method doesn't do anything. We just need to support
|
|
# the type of interface that SearchParser expects in order to use it.
|
|
# Perhaps, we will eventually refactor the SearchParser a little bit
|
|
# more?
|
|
def init_search_dirs(self, junk):
|
|
pass
|
|
|
|
def get_hosts(self):
|
|
if len(self.search_dict) > 0:
|
|
return self.filtered_hosts
|
|
else:
|
|
return NetworkInventory.get_hosts(self)
|
|
def get_hosts_up(self):
|
|
if len(self.search_dict) > 0:
|
|
return filter(lambda h: h.get_state() == 'up', self.filtered_hosts)
|
|
else:
|
|
return NetworkInventory.get_hosts_up(self)
|
|
def get_hosts_down(self):
|
|
if len(self.search_dict) > 0:
|
|
return filter(lambda h: h.get_state() == 'down', self.filtered_hosts)
|
|
else:
|
|
return NetworkInventory.get_hosts_down(self)
|
|
def get_total_host_count(self):
|
|
return len(self.hosts)
|
|
def _match_all_args(self, host, operator, args):
|
|
"""A helper function that calls the matching function for the given
|
|
operator and each of its arguments."""
|
|
for arg in args:
|
|
positive = True
|
|
if arg != "" and arg[0] == "!":
|
|
arg = arg[1:]
|
|
positive = False
|
|
if positive != self.__getattribute__("match_%s" % operator)(host, arg):
|
|
# No match for this operator
|
|
return False
|
|
else:
|
|
# if the operator is not supported, pretend its true
|
|
# All arguments for this operator produced a match
|
|
return True
|
|
def get_host_count(self):
|
|
return len(self.network_inventory.hosts)
|
|
def match_keyword(self, host, keyword):
|
|
return self.match_os(host, keyword) or\
|
|
self.match_target(host, keyword) or\
|
|
self.match_service(host, keyword)
|
|
def match_target(self, host, name):
|
|
return HostSearch.match_target(host, name)
|
|
def match_in_route(self, host, hop):
|
|
hops = host.get_trace().get('hops', [])
|
|
return hop in hops
|
|
def match_hostname(self, host, hostname):
|
|
return HostSearch.match_hostname(host, hostname)
|
|
def match_service(self, host, service):
|
|
return HostSearch.match_service(host, service)
|
|
def match_os(self, host, os):
|
|
return HostSearch.match_os(host, os)
|
|
def match_open(self, host, portno):
|
|
host_ports = host.get_ports()
|
|
return HostSearch.match_port(host_ports, portno, "open")
|
|
def match_closed(self, host, portno):
|
|
host_ports = host.get_ports()
|
|
return HostSearch.match_port(host_ports, portno, "closed")
|
|
def match_filtered(self, host, portno):
|
|
host_ports = host.get_ports()
|
|
return HostSearch.match_port(host_ports, portno, "filtered")
|
|
def match_unfiltered(self, host, portno):
|
|
host_ports = host.get_ports()
|
|
return HostSearch.match_port(host_ports, portno, "unfiltered")
|
|
def match_open_filtered(self, host, portno):
|
|
host_ports = host.get_ports()
|
|
return HostSearch.match_port(host_ports, portno, "open|filtered")
|
|
def match_closed_filtered(self, host, portno):
|
|
host_ports = host.get_ports()
|
|
return HostSearch.match_port(host_ports, portno, "closed|filtered")
|
|
def apply_filter(self, filter_text):
|
|
self.filter_text = filter_text.lower()
|
|
self.search_parser.update(self.filter_text)
|
|
self.filtered_hosts = []
|
|
for hostname, host in self.hosts.iteritems():
|
|
# For each host in this scan
|
|
# Test each given operator against the current host
|
|
for operator, args in self.search_dict.iteritems():
|
|
if not self._match_all_args(host, operator, args):
|
|
# No match => we discard this scan_result
|
|
break
|
|
else:
|
|
# All operator-matching functions have returned True, so this
|
|
# host satisfies all conditions
|
|
self.filtered_hosts.append(host)
|
|
|
|
class NetworkInventoryTest(unittest.TestCase):
|
|
def test_no_external_modification(self):
|
|
"""Test that HostInfo objects passed into the inventory are not modified
|
|
during aggregation."""
|
|
scan_1 = zenmapCore.NmapParser.ParserBasics()
|
|
host_a = zenmapCore.NmapParser.HostInfo()
|
|
host_a.hostnames = ["a"]
|
|
host_a.set_state('up')
|
|
scan_1.start = "1000000000"
|
|
scan_1.nmap["hosts"] = [host_a]
|
|
|
|
scan_2 = zenmapCore.NmapParser.ParserBasics()
|
|
host_b = zenmapCore.NmapParser.HostInfo()
|
|
host_b.hostnames = ["b"]
|
|
host_b.set_state('up')
|
|
scan_2.start = "1000000001"
|
|
scan_2.nmap["hosts"] = [host_b]
|
|
|
|
inv = NetworkInventory()
|
|
inv.add_scan(scan_1)
|
|
inv.add_scan(scan_2)
|
|
|
|
self.assertEqual(host_a.hostnames, ["a"])
|
|
self.assertEqual(host_b.hostnames, ["b"])
|
|
self.assertEqual(scan_1.nmap["hosts"], [host_a])
|
|
self.assertEqual(scan_2.nmap["hosts"], [host_b])
|
|
self.assertEqual(inv.get_hosts_up()[0].hostnames, ["b"])
|
|
def test_cancel_and_remove_scan(self):
|
|
"""Test that canceling and removing a scan does not blow away the inventory hosts"""
|
|
added_ips = ['10.0.0.1','10.0.0.2']
|
|
removed_ips = ['10.0.0.3']
|
|
scan_1 = zenmapCore.NmapParser.ParserBasics()
|
|
host_a = zenmapCore.NmapParser.HostInfo()
|
|
host_a.hostnames = ["a"]
|
|
host_a.set_ip({'addr':added_ips[0]})
|
|
scan_1.start = "1000000000"
|
|
scan_1.nmap["hosts"] = [host_a]
|
|
|
|
scan_2 = zenmapCore.NmapParser.ParserBasics()
|
|
host_b = zenmapCore.NmapParser.HostInfo()
|
|
host_b.hostnames = ["b"]
|
|
host_b.set_ip({'addr':added_ips[1]})
|
|
scan_2.start = "1000000001"
|
|
scan_2.nmap["hosts"] = [host_b]
|
|
|
|
scan_3 = zenmapCore.NmapParser.ParserBasics()
|
|
host_c = zenmapCore.NmapParser.HostInfo()
|
|
host_c.hostnames = ["b"]
|
|
host_c.set_ip({'addr':removed_ips[0]})
|
|
scan_3.start = "1000000001"
|
|
scan_3.nmap["hosts"] = [host_c]
|
|
|
|
inv = NetworkInventory()
|
|
inv.add_scan(scan_1)
|
|
inv.add_scan(scan_2)
|
|
try:
|
|
inv.remove_scan(scan_3)
|
|
except:
|
|
pass
|
|
self.assertEqual(added_ips, inv.hosts.keys())
|
|
self.assertEqual(host_a.hostnames, ["a"])
|
|
self.assertEqual(host_b.hostnames, ["b"])
|
|
|
|
class FilteredNetworkInventoryTest(unittest.TestCase):
|
|
def test_filter(self):
|
|
"""Test that the filter still works after moving code to the """
|
|
"""HostSearch class"""
|
|
from zenmapCore.NmapParser import NmapParser
|
|
inv = FilteredNetworkInventory()
|
|
scan = NmapParser()
|
|
scan.parse_file("test/xml_test9.xml")
|
|
filter_text = "open:22 os:linux service:openssh"
|
|
inv.add_scan(scan)
|
|
inv.apply_filter(filter_text)
|
|
assert(len(inv.get_hosts()) == 2)
|
|
|
|
class PortChangeTest(unittest.TestCase):
|
|
def test_port(self):
|
|
"""Verify that the port status (open/filtered/closed) is diplayed """ \
|
|
"""correctly when the port status changes in newer scans"""
|
|
from zenmapCore.NmapParser import NmapParser
|
|
inv = NetworkInventory()
|
|
scan1 = NmapParser()
|
|
scan1.parse_file("test/xml_test13.xml")
|
|
inv.add_scan(scan1)
|
|
scan2 = NmapParser()
|
|
scan2.parse_file("test/xml_test14.xml")
|
|
inv.add_scan(scan2)
|
|
assert(len(inv.get_hosts()[0].ports) == 2)
|
|
scan3 = NmapParser()
|
|
scan3.parse_file("test/xml_test15.xml")
|
|
inv.add_scan(scan3)
|
|
assert(len(inv.get_hosts()[0].ports) == 0)
|
|
|
|
# Additional test case for when the two scans have port scan ranges
|
|
# which do not overlap. Example nmap -F -sU versus
|
|
# nmap -F scanme.nmap.org
|
|
inv = NetworkInventory()
|
|
scan4 = NmapParser()
|
|
scan4.parse_file("test/xml_test16.xml")
|
|
inv.add_scan(scan4)
|
|
assert(len(inv.get_hosts()[0].ports)==3)
|
|
scan5 = NmapParser()
|
|
scan5.parse_file("test/xml_test17.xml")
|
|
inv.add_scan(scan5)
|
|
assert(len(inv.get_hosts()[0].ports)==7)
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|
|
if False:
|
|
|
|
scan1 = NmapParser("/home/ndwi/scanz/neobee_1.xml")
|
|
scan1.parse()
|
|
scan2 = NmapParser("/home/ndwi/scanz/scanme_nmap_org.usr")
|
|
scan2.parse()
|
|
|
|
inventory1 = NetworkInventory()
|
|
inventory1.add_scan(scan1)
|
|
inventory1.add_scan(scan2)
|
|
|
|
for host in inventory1.get_hosts():
|
|
print "%s" % host.ip["addr"],
|
|
#if len(host.hostnames) > 0:
|
|
# print "[%s]:" % host.hostnames[0]["hostname"]
|
|
#else:
|
|
# print ":"
|
|
#for port in host.ports:
|
|
# print " %s: %s" % (port["portid"], port["port_state"])
|
|
#print " OS matches: %s" % host.osmatches
|
|
#print " Ports used: %s" % host.ports_used
|
|
#print " Trace: %s" % host.trace
|
|
#if "hops" in host.trace:
|
|
# print " (%d)" % len(host.trace["hops"])
|
|
|
|
inventory1.remove_scan(scan2)
|
|
print
|
|
for host in inventory1.get_hosts():
|
|
print "%s" % host.ip["addr"],
|
|
|
|
inventory1.add_scan(scan2)
|
|
print
|
|
for host in inventory1.get_hosts():
|
|
print "%s" % host.ip["addr"],
|
|
|
|
dir = "/home/ndwi/scanz/top01"
|
|
inventory1.save_to_dir(dir)
|
|
|
|
inventory2 = NetworkInventory()
|
|
inventory2.open_from_dir(dir)
|
|
|
|
print
|
|
for host in inventory2.get_hosts():
|
|
print "%s" % host.ip["addr"],
|