From 2b3df5882f5b45fec5d599fe67773db97420a36f Mon Sep 17 00:00:00 2001 From: batrick Date: Thu, 12 Nov 2009 01:33:52 +0000 Subject: [PATCH] [NSE] Patch to add worker threads to NSE for scripts to use. Right now a script is limited in parallelism to working on one socket at any time. A script can now create a worker thread that will be capable of doing work on sockets in parallel with the parent script. See [1] for more information. This patch also comes with condition variables that are similar to POSIX condition variables. They are used in the same fashion as NSE's mutexes (nmap.mutex). [1] http://seclists.org/nmap-dev/2009/q4/294 --- nse_main.lua | 32 ++++++++++++++ nse_nmaplib.cc | 75 +++++++++++++++++++++++++++++++++ nselib/nmap.luadoc | 48 +++++++++++++++++++++ nselib/stdnse.lua | 102 +++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 257 insertions(+) diff --git a/nse_main.lua b/nse_main.lua index 6d3d1334a..8795217f7 100644 --- a/nse_main.lua +++ b/nse_main.lua @@ -51,6 +51,7 @@ local loadstring = loadstring; local next = next; local pairs = pairs; local rawget = rawget; +local rawset = rawset; local select = select; local setfenv = setfenv; local setmetatable = setmetatable; @@ -88,6 +89,8 @@ do -- Append the nselib directory to the Lua search path package.path = package.path..";"..path.."?.lua"; end +local stdnse = require "stdnse"; + (require "strict")() -- strict global checking -- NSE_YIELD_VALUE @@ -536,6 +539,34 @@ local function run (threads) _R[SELECTED_BY_NAME] = function() return current and current.selected_by_name; end + rawset(stdnse, "new_thread", function (main, ...) + assert(type(main) == "function", "function expected"); + local co = create(function(...) main(...) end); -- do not return results + print_debug(2, "%s spawning new thread (%s).", + current.parent.info, tostring(co)); + local thread = { + co = co, + args = {n = select("#", ...), ...}, + host = current.host, + port = current.port, + parent = current.parent, + info = format("'%s' worker (%s)", current.short_basename, tostring(co)); + -- d = function(...) end, -- output no debug information + }; + local thread_mt = { + __metatable = Thread, + __index = current, + }; + setmetatable(thread, thread_mt); + total, all[co], pending[co] = total+1, thread, thread; + local function info () + return status(co), rawget(thread, "error"); + end + return co, info; + end); + rawset(stdnse, "base", function () + return current.co; + end); -- Loop while any thread is running or waiting. while next(running) or next(waiting) do @@ -581,6 +612,7 @@ local function run (threads) thread:d("%THREAD against %s%s threw an error!\n%s\n", thread.host.ip, thread.port and ":"..thread.port.number or "", traceback(co, tostring(result))); + thread.error = result; thread:close(); elseif status(co) == "suspended" then if result == NSE_YIELD_VALUE then diff --git a/nse_nmaplib.cc b/nse_nmaplib.cc index b276e0ace..ab276a064 100644 --- a/nse_nmaplib.cc +++ b/nse_nmaplib.cc @@ -325,6 +325,73 @@ static int l_mutex (lua_State *L) return 1; // aux_mutex closure } +static int aux_condvar (lua_State *L) +{ + size_t i, n = 0; + enum {WAIT, SIGNAL, BROADCAST}; + static const char * op[] = {"wait", "signal", "broadcast"}; + switch (luaL_checkoption(L, 1, NULL, op)) + { + case WAIT: + lua_pushthread(L); + lua_rawseti(L, lua_upvalueindex(1), lua_objlen(L, lua_upvalueindex(1))+1); + return nse_yield(L); + case SIGNAL: + n = lua_objlen(L, lua_upvalueindex(1)); + break; + case BROADCAST: + n = 1; + break; + } + lua_pushvalue(L, lua_upvalueindex(1)); + for (i = lua_objlen(L, -1); i >= n; i--) + { + lua_rawgeti(L, -1, i); /* get the thread */ + if (lua_isthread(L, -1)) + nse_restore(lua_tothread(L, -1), 0); + lua_pop(L, 1); /* pop the thread */ + lua_pushnil(L); + lua_rawseti(L, -2, i); + } + return 0; +} + +static int aux_condvar_done (lua_State *L) +{ + lua_State *thread = lua_tothread(L, 1); + lua_pushvalue(L, lua_upvalueindex(1)); // aux_condvar closure + lua_pushliteral(L, "broadcast"); // wake up all threads waiting + luaL_checkstack(thread, 2, "aux_condvar_done"); + lua_xmove(L, thread, 2); + if (lua_pcall(thread, 1, 0, 0) != 0) lua_pop(thread, 1); // pop error msg + return 0; +} + +static int l_condvar (lua_State *L) +{ + int t = lua_type(L, 1); + if (t == LUA_TNONE || t == LUA_TNIL || t == LUA_TBOOLEAN || t == LUA_TNUMBER) + luaL_argerror(L, 1, "object expected"); + lua_pushvalue(L, 1); + lua_gettable(L, lua_upvalueindex(1)); + if (lua_isnil(L, -1)) + { + lua_newtable(L); // waiting threads + lua_pushnil(L); // placeholder for aux_mutex_done + lua_pushcclosure(L, aux_condvar, 2); + lua_pushvalue(L, -1); // aux_condvar closure + lua_pushcclosure(L, aux_condvar_done, 1); + lua_setupvalue(L, -2, 2); // replace nil upvalue with aux_condvar_done + lua_pushvalue(L, 1); // "condition variable object" + lua_pushvalue(L, -2); // condvar function + lua_settable(L, lua_upvalueindex(1)); // Add to condition variable table + } + lua_pushvalue(L, -1); // aux_condvar closure + lua_getupvalue(L, -1, 2); // aux_mutex_done closure + nse_destructor(L, 'a'); + return 1; // condition variable closure +} + Target *get_target (lua_State *L, int index) { int top = lua_gettop(L); @@ -617,6 +684,14 @@ int luaopen_nmap (lua_State *L) lua_pushcclosure(L, l_mutex, 1); /* mutex function */ lua_setfield(L, -2, "mutex"); + lua_newtable(L); + lua_createtable(L, 0, 1); + lua_pushliteral(L, "v"); + lua_setfield(L, -2, "__mode"); + lua_setmetatable(L, -2); // Allow closures to be collected (see l_condvar) + lua_pushcclosure(L, l_condvar, 1); // condvar function + lua_setfield(L, -2, "condvar"); + lua_newtable(L); lua_setfield(L, -2, "registry"); diff --git a/nselib/nmap.luadoc b/nselib/nmap.luadoc index 71772091e..9e6f6ca9f 100644 --- a/nselib/nmap.luadoc +++ b/nselib/nmap.luadoc @@ -172,6 +172,54 @@ function get_interface_link(interface_name) -- end function mutex(object) +--- Create a condition variable for an object. +-- +-- This function returns a function that works as a Condition Variable for the +-- given object parameter. The object can be any Lua data type except +-- nil, Booleans, and Numbers. The Condition Variable (returned +-- function) allows you wait, signal, and broadcast on the condition variable. +-- The Condition Variable function takes only one argument, which must be one of +-- * "wait": Wait on the condition variable until another thread wakes us. +-- * "signal": Wake up a single thread from the waiting set of threads for this condition variable. +-- * "broadcast": Wake up all threads in the waiting set of threads for this condition variable. +-- +-- NSE maintains a weak reference to the Condition Variable so other calls to +-- nmap.condvar with the same object will return the same function (Condition +-- Variable); however, if you discard your reference to the Condition +-- Variable then it may be collected; and, Subsequent calls to nmap.condvar with +-- the object will return a different Condition Variable function! +-- +-- In NSE, Condition Variables are typically used to coordinate with threads +-- created using the stdnse.new_thread facility. The worker threads must +-- wait until work is available that the master thread (the actual running +-- script) will provide. Once work is created, the master thread will awaken +-- one or more workers so that the work can be done. +-- +-- It is important to check the predicate (the test to see if your worker +-- thread should "wait" or not) BEFORE and AFTER the call to wait. You are +-- not guaranteed spurious wakeups will not occur (that is, there is no +-- guarantee your thread will not be awakened when no thread called +-- "signal" or "broadcast" on the condition variable). +-- One important check for your worker threads, before and after waiting, +-- should be to check that the master script thread is still alive. +-- (To check that the master script thread is alive, obtain the "base" thread +-- using stdnse.base and use coroutine.status). You do not want your worker +-- threads to continue when the script has ended for reasons unknown to your +-- worker thread. You are guaranteed that all threads waiting on a condition +-- variable will be awakened if any thread that has accessed the condition +-- variable via nmap.condvar ends for any reason. This is +-- essential to prevent deadlock with threads waiting for another thread to awaken +-- them that has ended unexpectedly. +-- @see stdnse.new_thread +-- @see stdnse.base +-- @param object Object to create a condition variable for. +-- @return ConditionVariable Condition variable function. +-- @usage +-- local myobject = {} +-- local cv = nmap.condvar(myobject) +-- cv "wait" -- waits until another thread calls cv "signal" +function condvar(object) + --- Creates a new exception handler. -- -- This function returns an exception handler function. The exception handler is diff --git a/nselib/stdnse.lua b/nselib/stdnse.lua index 2b7e0fb35..af19348e2 100644 --- a/nselib/stdnse.lua +++ b/nselib/stdnse.lua @@ -259,3 +259,105 @@ function string_or_blank(string, blank) end end +--- This function allows you to create worker threads that may perform +-- network tasks in parallel with your script thread. +-- +-- Any network task (e.g. socket:connect(...)) will cause the +-- running thread to yield to NSE. This allows network tasks to appear to be +-- blocking while being able to run multiple network tasks at once. +-- While this is useful for running multiple separate scripts, it is +-- unfortunately difficult for a script itself to perform network tasks in +-- parallel. In order to allow scripts to also have network tasks running in +-- parallel, we provide this function, stdnse.new_thread, to +-- create a new thread that can perform its own network related tasks +-- in parallel with the script. +-- +-- The script launches the worker thread by calling the new_thread +-- function with the parameters: +-- * The main Lua function for the script to execute, similar to the script action function. +-- * The variable number of arguments to be passed to the worker's main function. +-- +-- The stdnse.new_thread function will return two results: +-- * The worker thread's base (main) coroutine (useful for tracking status). +-- * A status query function (described below). +-- +-- The status query function shall return two values: +-- * The result of coroutine.status using the worker thread base coroutine. +-- * The error object thrown that ended the worker thread or nil if no error was thrown. This is typically a string, like most Lua errors. +-- +-- Note that NSE discards all return values of the worker's main function. You +-- must use function parameters, upvalues or environments to communicate +-- results. +-- +-- You should use the condition variable (nmap.condvar) +-- and mutex (nmap.mutex) facilities to coordinate with your +-- worker threads. Keep in mind that Nmap is single threaded so there are +-- no (memory) issues in synchronization to worry about; however, there +-- is resource contention. Your resources are usually network bandwidth, +-- network sockets, etc. Condition variables are also useful if the work for any +-- single thread is dynamic. For example, a web server spider script with a pool +-- of workers will initially have a single root html document. Following the +-- retrieval of the root document, the set of resources to be retrieved +-- (the worker's work) will become very large (an html document adds many +-- new hyperlinks (resources) to fetch). +--@name new_thread +--@class function +--@param main The main function of the worker thread. +--@param ... The arguments passed to the main worker thread. +--@return co The base coroutine of the worker thread. +--@return info A query function used to obtain status information of the worker. +--@usage +--local requests = {"/", "/index.html", --[[ long list of objects ]]} +-- +--function thread_main (host, port, responses, ...) +-- local condvar = nmap.condvar(responses); +-- local what = {n = select("#", ...), ...}; +-- local allReqs = nil; +-- for i = 1, what.n do +-- allReqs = http.pGet(host, port, what[i], nil, nil, allReqs); +-- end +-- local p = assert(http.pipeline(host, port, allReqs)); +-- for i, response in ipairs(p) do responses[#responses+1] = response end +-- condvar "signal"; +--end +-- +--function many_requests (host, port) +-- local threads = {}; +-- local responses = {}; +-- local condvar = nmap.condvar(responses); +-- local i = 1; +-- repeat +-- local j = math.min(i+10, #requests); +-- local co = stdnse.new_thread(thread_main, host, port, responses, +-- unpack(requests, i, j)); +-- threads[co] = true; +-- i = j+1; +-- until i > #requests; +-- repeat +-- condvar "wait"; +-- for thread in pairs(threads) do +-- if coroutine.status(thread) == "dead" then threads[thread] = nil end +-- end +-- until next(threads) == nil; +-- return responses; +--end +do end -- no function here, see nse_main.lua + +--- Returns the base coroutine of the running script. +-- +-- A script may be resuming multiple coroutines to facilitate its own +-- collaborative multithreading design. Because there is a "root" or "base" +-- coroutine that lets us determine whether the script is still active +-- (that is, the script did not end, possibly due to an error), we provide +-- this stdnse.base function that will retrieve the base +-- coroutine of the script. This base coroutine is the coroutine that runs +-- the action function. +-- +-- The base coroutine is useful for many reasons but here are some common +-- uses: +-- * We want to attribute the ownership of an object (perhaps a network socket) to a script. +-- * We want to identify if the script is still alive. +--@name base +--@class function +--@return coroutine Returns the base coroutine of the running script. +do end -- no function here, see nse_main.lua