Switching from WAF scripts to identYwaf (avoiding redundant work from my side)

This commit is contained in:
Miroslav Stampar
2019-05-24 13:09:28 +02:00
parent ef7d4bb404
commit 0c79504ff1
98 changed files with 1534 additions and 2119 deletions

View File

@@ -108,6 +108,7 @@ from lib.request.templates import getPageTemplate
from lib.techniques.union.test import unionTest
from lib.techniques.union.use import configUnion
from thirdparty import six
from thirdparty.identywaf import identYwaf
from thirdparty.six.moves import http_client as _http_client
def checkSqlInjection(place, parameter, value):
@@ -1402,119 +1403,54 @@ def checkWaf():
kb.resendPostOnRedirect = popValue()
kb.redirectChoice = popValue()
# TODO: today
if retVal:
warnMsg = "heuristics detected that the target "
warnMsg += "is protected by some kind of WAF/IPS"
logger.critical(warnMsg)
pass
# identYwaf
#if conf.timeout == defaults.timeout:
#logger.warning("dropping timeout to %d seconds (i.e. '--timeout=%d')" % (IDS_WAF_CHECK_TIMEOUT, IDS_WAF_CHECK_TIMEOUT))
#conf.timeout = IDS_WAF_CHECK_TIMEOUT
if not conf.identifyWaf:
message = "do you want sqlmap to try to detect backend "
message += "WAF/IPS? [y/N] "
# identYwaf
if readInput(message, default='N', boolean=True):
conf.identifyWaf = True
#def _(*args, **kwargs):
#page, headers, code = None, None, None
#try:
#pushValue(kb.redirectChoice)
#pushValue(kb.resendPostOnRedirect)
if conf.timeout == defaults.timeout:
logger.warning("dropping timeout to %d seconds (i.e. '--timeout=%d')" % (IDS_WAF_CHECK_TIMEOUT, IDS_WAF_CHECK_TIMEOUT))
conf.timeout = IDS_WAF_CHECK_TIMEOUT
#kb.redirectChoice = REDIRECTION.YES
#kb.resendPostOnRedirect = True
#if kwargs.get("get"):
#kwargs["get"] = urlencode(kwargs["get"])
#kwargs["raise404"] = False
#kwargs["silent"] = True
#kwargs["finalCode"] = True
#page, headers, code = Request.getPage(*args, **kwargs)
#except Exception:
#pass
#finally:
#kb.resendPostOnRedirect = popValue()
#kb.redirectChoice = popValue()
#message = "are you sure that you want to "
#message += "continue with further target testing? [y/N] "
#choice = readInput(message, default='N', boolean=True)
#if not conf.tamper:
#warnMsg = "please consider usage of tamper scripts (option '--tamper')"
#singleTimeWarnMessage(warnMsg)
#if not choice:
#raise SqlmapUserQuitException
hashDBWrite(HASHDB_KEYS.CHECK_WAF_RESULT, retVal, True)
return retVal
@stackedmethod
def identifyWaf():
if not conf.identifyWaf:
return None
if not kb.wafFunctions:
setWafFunctions()
kb.testMode = True
infoMsg = "using WAF scripts to detect "
infoMsg += "backend WAF/IPS protection"
logger.info(infoMsg)
@cachedmethod
def _(*args, **kwargs):
page, headers, code = None, None, None
try:
pushValue(kb.redirectChoice)
pushValue(kb.resendPostOnRedirect)
kb.redirectChoice = REDIRECTION.YES
kb.resendPostOnRedirect = True
if kwargs.get("get"):
kwargs["get"] = urlencode(kwargs["get"])
kwargs["raise404"] = False
kwargs["silent"] = True
kwargs["finalCode"] = True
page, headers, code = Request.getPage(*args, **kwargs)
except Exception:
pass
finally:
kb.resendPostOnRedirect = popValue()
kb.redirectChoice = popValue()
return page or "", headers or {}, code
retVal = []
for function, product in kb.wafFunctions:
if retVal and "unknown" in product.lower():
continue
try:
logger.debug("checking for WAF/IPS product '%s'" % product)
found = function(_)
except Exception as ex:
errMsg = "exception occurred while running "
errMsg += "WAF script for '%s' ('%s')" % (product, getSafeExString(ex))
logger.critical(errMsg)
found = False
if found:
errMsg = "WAF/IPS identified as '%s'" % product
logger.critical(errMsg)
retVal.append(product)
if retVal:
if kb.wafSpecificResponse and "You don't have permission to access" not in kb.wafSpecificResponse and len(retVal) == 1 and "unknown" in retVal[0].lower():
handle, filename = tempfile.mkstemp(prefix=MKSTEMP_PREFIX.SPECIFIC_RESPONSE)
os.close(handle)
with openFile(filename, "w+b") as f:
f.write(kb.wafSpecificResponse)
message = "WAF/IPS specific response can be found in '%s'. " % filename
message += "If you know the details on used protection please "
message += "report it along with specific response "
message += "to '%s'" % DEV_EMAIL_ADDRESS
logger.warn(message)
message = "are you sure that you want to "
message += "continue with further target testing? [y/N] "
choice = readInput(message, default='N', boolean=True)
if not conf.tamper:
warnMsg = "please consider usage of tamper scripts (option '--tamper')"
singleTimeWarnMessage(warnMsg)
if not choice:
raise SqlmapUserQuitException
else:
warnMsg = "WAF/IPS product hasn't been identified"
logger.warn(warnMsg)
kb.testType = None
kb.testMode = False
return retVal
@stackedmethod
def checkNullConnection():
"""
@@ -1666,6 +1602,3 @@ def checkInternet():
def setVerbosity(): # Cross-referenced function
raise NotImplementedError
def setWafFunctions(): # Cross-referenced function
raise NotImplementedError

View File

@@ -20,7 +20,6 @@ from lib.controller.checks import checkInternet
from lib.controller.checks import checkNullConnection
from lib.controller.checks import checkWaf
from lib.controller.checks import heuristicCheckSqlInjection
from lib.controller.checks import identifyWaf
from lib.core.agent import agent
from lib.core.common import dataToStdout
from lib.core.common import extractRegexResult
@@ -423,9 +422,6 @@ def start():
checkWaf()
if conf.identifyWaf:
identifyWaf()
if conf.nullConnection:
checkNullConnection()