mirror of
https://github.com/ysoftdevs/wapifuzz.git
synced 2026-05-11 10:20:03 +02:00
Added return codes + reactions to them
This commit is contained in:
@@ -1,76 +1,76 @@
|
||||
import sys
|
||||
import json
|
||||
from typing import Union, List
|
||||
|
||||
|
||||
class ConfigurationManager:
|
||||
config = None
|
||||
|
||||
def __init__(self, config_file_pointer):
|
||||
ConfigurationManager.config = json.load(config_file_pointer)
|
||||
self._config_validation()
|
||||
|
||||
@staticmethod
|
||||
def get_startup_command():
|
||||
return ConfigurationManager.config["startup_command"] if "startup_command" in ConfigurationManager.config else None
|
||||
|
||||
@staticmethod
|
||||
def get_payloads_folders_for_boolean_json_primitive() -> Union[List, None]:
|
||||
return ConfigurationManager._get_payloads_folders_for_specific_json_primitive("boolean")
|
||||
|
||||
@staticmethod
|
||||
def get_payloads_folders_for_number_json_primitive() -> Union[List, None]:
|
||||
return ConfigurationManager._get_payloads_folders_for_specific_json_primitive("number")
|
||||
|
||||
@staticmethod
|
||||
def get_payloads_folders_for_string_json_primitive() -> Union[List, None]:
|
||||
return ConfigurationManager._get_payloads_folders_for_specific_json_primitive("string")
|
||||
|
||||
@staticmethod
|
||||
def _get_payloads_folders_for_specific_json_primitive(json_type: str) -> Union[List, None]:
|
||||
mapping = ConfigurationManager._get_payloads_to_json_primitives_mapping()
|
||||
if mapping:
|
||||
return mapping[json_type] if json_type in mapping else None
|
||||
else:
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def _get_payloads_to_json_primitives_mapping():
|
||||
return ConfigurationManager.config["payloads_to_json_primitives_mapping"] if "payloads_to_json_primitives_mapping" in ConfigurationManager.config else None
|
||||
|
||||
@staticmethod
|
||||
def get_receive_timeout():
|
||||
return ConfigurationManager.config["receive_timeout"]
|
||||
|
||||
@staticmethod
|
||||
def get_reporting_interval():
|
||||
return ConfigurationManager.config["reporting_interval"]
|
||||
|
||||
@staticmethod
|
||||
def get_keywords_for_endpoints_skipping() -> List:
|
||||
return ConfigurationManager.config["skipping_endpoints_keywords"]
|
||||
|
||||
@staticmethod
|
||||
def get_target():
|
||||
return ConfigurationManager.config["target"]
|
||||
|
||||
@staticmethod
|
||||
def is_http_fuzzing_allowed():
|
||||
return ConfigurationManager.config["http_fuzzing"]
|
||||
|
||||
def _config_validation(self):
|
||||
reporting_interval: Union[int, float] = self.config["reporting_interval"]
|
||||
receive_timeout: Union[int, float] = self.config["receive_timeout"]
|
||||
http_fuzzing: bool = self.config["http_fuzzing"]
|
||||
|
||||
if reporting_interval <= 0 or reporting_interval < receive_timeout:
|
||||
print("Wrong reporting interval. Should be smaller than receive_timeout.")
|
||||
sys.exit(-1)
|
||||
|
||||
if "target" not in ConfigurationManager.config:
|
||||
print("Missing configuration of target.")
|
||||
sys.exit(-1)
|
||||
|
||||
if http_fuzzing is None:
|
||||
print("Missing flag for enabling / disabling HTTP fuzzing.")
|
||||
sys.exit(-1)
|
||||
import sys
|
||||
import json
|
||||
from typing import Union, List
|
||||
|
||||
|
||||
class ConfigurationManager:
|
||||
config = None
|
||||
|
||||
def __init__(self, config_file_pointer):
|
||||
ConfigurationManager.config = json.load(config_file_pointer)
|
||||
self._config_validation()
|
||||
|
||||
@staticmethod
|
||||
def get_startup_command():
|
||||
return ConfigurationManager.config["startup_command"] if "startup_command" in ConfigurationManager.config else None
|
||||
|
||||
@staticmethod
|
||||
def get_payloads_folders_for_boolean_json_primitive() -> Union[List, None]:
|
||||
return ConfigurationManager._get_payloads_folders_for_specific_json_primitive("boolean")
|
||||
|
||||
@staticmethod
|
||||
def get_payloads_folders_for_number_json_primitive() -> Union[List, None]:
|
||||
return ConfigurationManager._get_payloads_folders_for_specific_json_primitive("number")
|
||||
|
||||
@staticmethod
|
||||
def get_payloads_folders_for_string_json_primitive() -> Union[List, None]:
|
||||
return ConfigurationManager._get_payloads_folders_for_specific_json_primitive("string")
|
||||
|
||||
@staticmethod
|
||||
def _get_payloads_folders_for_specific_json_primitive(json_type: str) -> Union[List, None]:
|
||||
mapping = ConfigurationManager._get_payloads_to_json_primitives_mapping()
|
||||
if mapping:
|
||||
return mapping[json_type] if json_type in mapping else None
|
||||
else:
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def _get_payloads_to_json_primitives_mapping():
|
||||
return ConfigurationManager.config["payloads_to_json_primitives_mapping"] if "payloads_to_json_primitives_mapping" in ConfigurationManager.config else None
|
||||
|
||||
@staticmethod
|
||||
def get_receive_timeout():
|
||||
return ConfigurationManager.config["receive_timeout"]
|
||||
|
||||
@staticmethod
|
||||
def get_reporting_interval():
|
||||
return ConfigurationManager.config["reporting_interval"]
|
||||
|
||||
@staticmethod
|
||||
def get_keywords_for_endpoints_skipping() -> List:
|
||||
return ConfigurationManager.config["skipping_endpoints_keywords"]
|
||||
|
||||
@staticmethod
|
||||
def get_target():
|
||||
return ConfigurationManager.config["target"]
|
||||
|
||||
@staticmethod
|
||||
def is_http_fuzzing_allowed():
|
||||
return ConfigurationManager.config["http_fuzzing"]
|
||||
|
||||
def _config_validation(self):
|
||||
reporting_interval: Union[int, float] = self.config["reporting_interval"]
|
||||
receive_timeout: Union[int, float] = self.config["receive_timeout"]
|
||||
http_fuzzing: bool = self.config["http_fuzzing"]
|
||||
|
||||
if reporting_interval <= 0 or reporting_interval < receive_timeout:
|
||||
print("Wrong reporting interval. Should be smaller than receive_timeout.")
|
||||
sys.exit(2)
|
||||
|
||||
if "target" not in ConfigurationManager.config:
|
||||
print("Missing configuration of target.")
|
||||
sys.exit(2)
|
||||
|
||||
if http_fuzzing is None:
|
||||
print("Missing flag for enabling / disabling HTTP fuzzing.")
|
||||
sys.exit(2)
|
||||
|
||||
@@ -85,3 +85,6 @@ class Fuzzer:
|
||||
def fuzz(self):
|
||||
report_progress(self._session, self._junit_logger)
|
||||
self._session.fuzz()
|
||||
|
||||
def was_there_any_failure(self):
|
||||
return self._junit_logger.was_there_any_failure
|
||||
|
||||
@@ -31,6 +31,8 @@ class JUnitLogger(ifuzz_logger_backend.IFuzzLoggerBackend):
|
||||
self._default_value = None
|
||||
self._mutant_value = None
|
||||
|
||||
self.was_there_any_failure: bool = False
|
||||
|
||||
def open_test_step(self, description):
|
||||
skipped_count = 0
|
||||
for skipped_test_case_message_regex in self.SKIPPED_TEST_CASE_MESSAGES_REGEX:
|
||||
@@ -48,6 +50,7 @@ class JUnitLogger(ifuzz_logger_backend.IFuzzLoggerBackend):
|
||||
|
||||
def log_error(self, description):
|
||||
self._error = description
|
||||
self.was_there_any_failure = True
|
||||
|
||||
def log_recv(self, data):
|
||||
self._received_bytes = helpers.hex_str(data)
|
||||
@@ -71,6 +74,7 @@ class JUnitLogger(ifuzz_logger_backend.IFuzzLoggerBackend):
|
||||
|
||||
def log_fail(self, description=""):
|
||||
self._failure = description
|
||||
self.was_there_any_failure = True
|
||||
|
||||
def log_pass(self, description=""):
|
||||
pass
|
||||
|
||||
@@ -18,7 +18,7 @@ def report_progress(session, junit_logger):
|
||||
except:
|
||||
pass
|
||||
finally:
|
||||
os._exit(1)
|
||||
os._exit(2)
|
||||
|
||||
if is_fuzzing_still_in_progress(session):
|
||||
plan_another_report(session, junit_logger, ConfigurationManager.get_reporting_interval())
|
||||
|
||||
@@ -36,7 +36,12 @@ def main():
|
||||
|
||||
fuzzer = Fuzzer(endpoints, text_logger, junit_logger, protocol)
|
||||
fuzzer.fuzz()
|
||||
return fuzzer.was_there_any_failure()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
was_there_any_failure: bool = main()
|
||||
if was_there_any_failure:
|
||||
exit(1)
|
||||
else:
|
||||
exit(0)
|
||||
|
||||
Reference in New Issue
Block a user