mirror of
https://github.com/ysoftdevs/wapifuzz.git
synced 2026-01-13 15:13:29 +01:00
91 lines
3.8 KiB
Python
91 lines
3.8 KiB
Python
import sys
|
|
import ssl
|
|
from typing import List
|
|
from boofuzz import Session, Target, SocketConnection, s_get, pedrpc
|
|
from progress_reporter import report_progress
|
|
from configuration_manager import ConfigurationManager
|
|
from post_test_case_callback import PostTestCaseCallback
|
|
from blocks_generator import generate_http_fuzzed_blocks, generate_url_attributes_fuzzed_blocks, \
|
|
generate_body_fuzzed_blocks
|
|
|
|
|
|
class Fuzzer:
|
|
def __init__(self, endpoints, text_logger, junit_logger, protocol: str):
|
|
self._endpoints = endpoints
|
|
self._text_logger = text_logger
|
|
self._junit_logger = junit_logger
|
|
self._protocol = protocol
|
|
self._session = None
|
|
|
|
self._configure_session()
|
|
|
|
self._remove_endpoints_by_keywords(ConfigurationManager.get_keywords_for_endpoints_skipping())
|
|
|
|
if ConfigurationManager.is_http_fuzzing_allowed():
|
|
self._generate_http_fuzzing()
|
|
self._generate_uri_attributes_fuzzing()
|
|
self._generate_request_body_fuzzing()
|
|
self._generate_request_body_fuzzing(add_quotation_marks_into_non_string_primitives=True)
|
|
|
|
def _configure_session(self):
|
|
target_config = ConfigurationManager.get_target()
|
|
startup_command = ConfigurationManager.get_startup_command()
|
|
|
|
ssl_context = None
|
|
if self._protocol == 'ssl':
|
|
ssl_context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
|
|
ssl_context.check_hostname = False
|
|
ssl_context.verify_mode = ssl.CERT_NONE
|
|
|
|
recv_timeout = ConfigurationManager.get_receive_timeout()
|
|
|
|
remote_connection = SocketConnection(
|
|
target_config["hostname"],
|
|
target_config["port"],
|
|
proto=self._protocol,
|
|
sslcontext=ssl_context,
|
|
recv_timeout=recv_timeout
|
|
)
|
|
if startup_command:
|
|
process_monitor = pedrpc.Client(target_config["hostname"], 26002)
|
|
process_monitor_options = {"start_commands": [startup_command]}
|
|
target = Target(connection=remote_connection, procmon=process_monitor, procmon_options=process_monitor_options)
|
|
else:
|
|
target = Target(connection=remote_connection)
|
|
|
|
self._session = Session(
|
|
target=target,
|
|
fuzz_loggers=[self._text_logger, self._junit_logger],
|
|
post_test_case_callbacks=[PostTestCaseCallback.post_test_callback],
|
|
restart_sleep_time=0,
|
|
keep_web_open=False,
|
|
fuzz_db_keep_only_n_pass_cases=sys.maxsize,
|
|
crash_threshold_element=10,
|
|
crash_threshold_request=30)
|
|
|
|
def _generate_http_fuzzing(self):
|
|
self._session.connect(s_get(generate_http_fuzzed_blocks()))
|
|
|
|
def _generate_uri_attributes_fuzzing(self):
|
|
for endpoint in self._endpoints:
|
|
for request in endpoint["Requests"]:
|
|
request_name = generate_url_attributes_fuzzed_blocks(endpoint, request)
|
|
self._session.connect(s_get(request_name))
|
|
|
|
def _generate_request_body_fuzzing(self, add_quotation_marks_into_non_string_primitives=False):
|
|
for endpoint in self._endpoints:
|
|
for request in endpoint["Requests"]:
|
|
request_name = generate_body_fuzzed_blocks(endpoint, request, add_quotation_marks_into_non_string_primitives)
|
|
self._session.connect(s_get(request_name))
|
|
|
|
def _remove_endpoints_by_keywords(self, keywords: List[str]):
|
|
for keyword in keywords:
|
|
self._endpoints[:] = [endpoint for endpoint in self._endpoints if keyword not in endpoint.get('Uri')]
|
|
|
|
def fuzz(self):
|
|
report_progress(self._session, self._junit_logger)
|
|
self._session.fuzz()
|
|
|
|
def was_there_any_failure(self):
|
|
return self._junit_logger.was_there_any_failure
|