diff --git a/benchmarks/appsec_iast_aspects/config.yaml b/benchmarks/appsec_iast_aspects/config.yaml index 946f2b8065d..9d79d7fdd58 100644 --- a/benchmarks/appsec_iast_aspects/config.yaml +++ b/benchmarks/appsec_iast_aspects/config.yaml @@ -110,6 +110,30 @@ join_noaspect: <<: *join_aspect function_name: "join_noaspect" +strip_aspect: &strip_aspect + warmups: 1 + function_name: "iast_strip_aspect" + +strip_noaspect: + <<: *strip_aspect + function_name: "strip_noaspect" + +rstrip_aspect: &rstrip_aspect + warmups: 1 + function_name: "iast_rstrip_aspect" + +rstrip_noaspect: + <<: *rstrip_aspect + function_name: "rstrip_noaspect" + +lstrip_aspect: &lstrip_aspect + warmups: 1 + function_name: "iast_lstrip_aspect" + +lstrip_noaspect: + <<: *lstrip_aspect + function_name: "lstrip_noaspect" + lower_aspect: &lower_aspect warmups: 1 function_name: "iast_lower_aspect" diff --git a/benchmarks/appsec_iast_aspects/functions.py b/benchmarks/appsec_iast_aspects/functions.py index 62fe7b3aaf6..701c65bc779 100644 --- a/benchmarks/appsec_iast_aspects/functions.py +++ b/benchmarks/appsec_iast_aspects/functions.py @@ -28,17 +28,16 @@ "modulo_aspect", "replace_aspect", "repr_aspect", - "rsplit_aspect", "slice_aspect", - "split_aspect", - "split_aspect", - "splitlines_aspect", "str_aspect", "stringio_aspect", "swapcase_aspect", "title_aspect", "translate_aspect", "upper_aspect", + "rstrip_aspect", + "lstrip_aspect", + "strip_aspect", ] notfound_symbols = [] @@ -454,3 +453,27 @@ def iast_split_aspect(): def split_noaspect(): return "foo bar baz".split() + + +def iast_strip_aspect(): + return strip_aspect(None, 1, " foo bar baz ") # noqa: F821 + + +def strip_noaspect(): + return " foo bar baz ".strip() + + +def iast_rstrip_aspect(): + return rstrip_aspect(None, 1, " foo bar baz ") # noqa: F821 + + +def rstrip_noaspect(): + return " foo bar baz ".rstrip() + + +def iast_lstrip_aspect(): + return lstrip_aspect(None, 1, " foo bar baz ") # noqa: F821 + + +def lstrip_noaspect(): + return " foo bar baz ".lstrip() diff --git a/ddtrace/appsec/_iast/_ast/visitor.py b/ddtrace/appsec/_iast/_ast/visitor.py index ce352499f01..75cce51de99 100644 --- a/ddtrace/appsec/_iast/_ast/visitor.py +++ b/ddtrace/appsec/_iast/_ast/visitor.py @@ -71,6 +71,9 @@ def _mark_avoid_convert_recursively(node): "split": _PREFIX + "aspects.split_aspect", # Both regular split and re.split "rsplit": _PREFIX + "aspects.rsplit_aspect", "splitlines": _PREFIX + "aspects.splitlines_aspect", + "lstrip": _PREFIX + "aspects.lstrip_aspect", + "rstrip": _PREFIX + "aspects.rstrip_aspect", + "strip": _PREFIX + "aspects.strip_aspect", # re module and re.Match methods "findall": _PREFIX + "aspects.re_findall_aspect", "finditer": _PREFIX + "aspects.re_finditer_aspect", diff --git a/ddtrace/appsec/_iast/_patch_modules.py b/ddtrace/appsec/_iast/_patch_modules.py index fcac02f74a3..96f707cf7fb 100644 --- a/ddtrace/appsec/_iast/_patch_modules.py +++ b/ddtrace/appsec/_iast/_patch_modules.py @@ -57,7 +57,7 @@ def patch_iast(patch_modules=IAST_PATCH): when_imported("werkzeug.utils")( lambda _: try_wrap_function_wrapper("werkzeug.utils", "secure_filename", path_traversal_sanitizer) ) - # TODO: werkzeug.utils.safe_join propagation doesn't work because strip("._") which is not yet supported by IAST + # TODO: werkzeug.utils.safe_join propagation doesn't work because normpath which is not yet supported by IAST # when_imported("werkzeug.utils")( # lambda _: try_wrap_function_wrapper( # "werkzeug.utils", diff --git a/ddtrace/appsec/_iast/_taint_tracking/aspects.py b/ddtrace/appsec/_iast/_taint_tracking/aspects.py index 99a90e88204..20a11428d87 100644 --- a/ddtrace/appsec/_iast/_taint_tracking/aspects.py +++ b/ddtrace/appsec/_iast/_taint_tracking/aspects.py @@ -110,6 +110,9 @@ "slice_aspect", "split_aspect", "splitlines_aspect", + "lstrip_aspect", + "rstrip_aspect", + "strip_aspect", "str_aspect", "stringio_aspect", "swapcase_aspect", @@ -1312,3 +1315,122 @@ def ospathsplitroot_aspect(*args: Any, **kwargs: Any) -> Any: iast_propagation_error_log(f"_aspect_ospathsplitroot. {e}") return os.path.splitroot(*args, **kwargs) # type: ignore[attr-defined] + + +def lstrip_aspect(orig_function: Optional[Callable], flag_added_args: int, *args: Any, **kwargs: Any) -> TEXT_TYPES: + if orig_function is not None and not isinstance(orig_function, BuiltinFunctionType): + if flag_added_args > 0: + args = args[flag_added_args:] + return orig_function(*args, **kwargs) + + candidate_text = args[0] + args = args[flag_added_args:] + + result = candidate_text.lstrip(*args, **kwargs) + + if not isinstance(candidate_text, IAST.TEXT_TYPES): + return result + + try: + _strip_lstrip_aspect(candidate_text, result) + return result + except Exception as e: + iast_propagation_error_log(f"lstrip_aspect. {e}") + + return result + + +def rstrip_aspect(orig_function: Optional[Callable], flag_added_args: int, *args: Any, **kwargs: Any) -> TEXT_TYPES: + if orig_function is not None and not isinstance(orig_function, BuiltinFunctionType): + if flag_added_args > 0: + args = args[flag_added_args:] + return orig_function(*args, **kwargs) + + candidate_text = args[0] + args = args[flag_added_args:] + + result = candidate_text.rstrip(*args, **kwargs) + + if not isinstance(candidate_text, IAST.TEXT_TYPES): + return result + + try: + ranges_new: List[TaintRange] = [] + ranges_new_append = ranges_new.append + + ranges = get_ranges(candidate_text) + len_result = len(result) + if len_result == len(candidate_text): + taint_pyobject_with_ranges(result, tuple(ranges)) + else: + for taint_range in ranges: + if taint_range.start >= len_result: + continue + + new_length = min(len_result - taint_range.start, taint_range.length) + new_range = TaintRange( + start=taint_range.start, + length=new_length, + source=taint_range.source, + secure_marks=taint_range.secure_marks, + ) + ranges_new_append(new_range) + taint_pyobject_with_ranges(result, tuple(ranges_new)) + return result + except Exception as e: + iast_propagation_error_log(f"rstrip_aspect. {e}") + + return result + + +def strip_aspect(orig_function: Optional[Callable], flag_added_args: int, *args: Any, **kwargs: Any) -> TEXT_TYPES: + if orig_function is not None and not isinstance(orig_function, BuiltinFunctionType): + if flag_added_args > 0: + args = args[flag_added_args:] + return orig_function(*args, **kwargs) + + candidate_text = args[0] + args = args[flag_added_args:] + result = candidate_text.strip(*args, **kwargs) + + if not isinstance(candidate_text, IAST.TEXT_TYPES): + return result + + try: + _strip_lstrip_aspect(candidate_text, result) + return result + except Exception as e: + iast_propagation_error_log(f"strip_aspect. {e}") + + return result + + +def _strip_lstrip_aspect(candidate_text, result): + ranges_new: List[TaintRange] = [] + ranges = get_ranges(candidate_text) + start_pos = candidate_text.index(result) + len_result = len(result) + end_pos = start_pos + len_result + if len_result != len(candidate_text): + for taint_range in ranges: + range_start = taint_range.start + range_end = range_start + taint_range.length + + if range_end <= start_pos or range_start >= end_pos: + continue + + # Calculate new range boundaries + new_start = max(range_start - start_pos, 0) + new_end = min(range_end - start_pos, len_result) + new_length = new_end - new_start + + if new_length > 0: + # Create a new range with adjusted position and length + new_range = TaintRange( + start=new_start, + length=new_length, + source=taint_range.source, + secure_marks=taint_range.secure_marks, + ) + ranges_new.append(new_range) + taint_pyobject_with_ranges(result, tuple(ranges_new)) diff --git a/scripts/iast/mod_leak_functions.py b/scripts/iast/mod_leak_functions.py index f53e7aa2e94..b9dffec3f01 100644 --- a/scripts/iast/mod_leak_functions.py +++ b/scripts/iast/mod_leak_functions.py @@ -308,14 +308,16 @@ async def test_doit(): string12 = string11 + "_notainted" string13 = string12.rsplit("_", 1)[0] string13_2 = string13 + " " + string13 + string13_3 = string13_2.strip() + string13_4 = string13_3.rstrip() + string13_5 = string13_4.lstrip() try: - string13_3, string13_5, string13_5 = string13_2.split(" ") + string13_5_1, string13_5_2, string13_5_3 = string13_5.split(" ") except ValueError: pass - sink_points(string13_2) - + sink_points(string13_5) # os path propagation - string14 = os.path.join(string13_2, "a") + string14 = os.path.join(string13_5, "a") string15 = os.path.split(string14)[0] string16 = os.path.dirname(string15 + "/" + "foobar") string17 = os.path.basename("/foobar/" + string16) diff --git a/tests/appsec/iast/aspects/test_strip_aspect.py b/tests/appsec/iast/aspects/test_strip_aspect.py new file mode 100644 index 00000000000..754906f672f --- /dev/null +++ b/tests/appsec/iast/aspects/test_strip_aspect.py @@ -0,0 +1,341 @@ +from hypothesis import given +from hypothesis.strategies import one_of +import pytest + +from ddtrace.appsec._iast._taint_tracking import OriginType +from ddtrace.appsec._iast._taint_tracking._taint_objects import get_tainted_ranges +from ddtrace.appsec._iast._taint_tracking._taint_objects import is_pyobject_tainted +from ddtrace.appsec._iast._taint_tracking._taint_objects import taint_pyobject +import ddtrace.appsec._iast._taint_tracking.aspects as ddtrace_aspects +from tests.appsec.iast.iast_utils import string_strategies + + +@given(one_of(string_strategies)) +def test_strip_aspect(text): + assert ddtrace_aspects.strip_aspect(None, 1, text) == text.strip() + + +@given(one_of(string_strategies)) +def test_rstrip_aspect(text): + assert ddtrace_aspects.rstrip_aspect(None, 1, text) == text.rstrip() + + +@given(one_of(string_strategies)) +def test_lstrip_aspect(text): + assert ddtrace_aspects.lstrip_aspect(None, 1, text) == text.lstrip() + + +@pytest.mark.parametrize( + "obj1,obj2,should_be_tainted,expected_start,expected_length", + [ + (",;,;aaa", ",;:", True, 0, 3), + (",;,;aaa", "123", True, 0, 7), + ("aaa,;,;", ",;:", True, 0, 3), + (",;,;aaa,;,;", ",;:", True, 0, 3), + (" hello ", None, True, 0, 5), + ("\t\ntext\t\n", None, True, 0, 4), + ("xxxhelloxxx", "x", True, 0, 5), + ("", "abc", False, 0, 0), + ("abc", "", True, 0, 3), + (" ", None, False, 0, 0), + ("...###text###...", ".#", True, 0, 4), + ("абвгтекстабвг", "абвг", True, 0, 5), + ("🌟✨text🌟✨", "🌟✨", True, 0, 4), + ("\u200b\u200btext\u200b", "\u200b", True, 0, 4), + (" \t\n\r text \t\n\r ", None, True, 0, 4), + ("...text...", ".", True, 0, 4), + ], +) +def test_strip_aspect_tainted(obj1, obj2, should_be_tainted, expected_start, expected_length): + """Test strip aspect with various input combinations.""" + obj1 = taint_pyobject( + pyobject=obj1, + source_name="test_strip_aspect", + source_value=obj1, + source_origin=OriginType.PARAMETER, + ) + + result = ddtrace_aspects.strip_aspect(None, 1, obj1, obj2) + if obj2 is None: + assert result == obj1.strip() + else: + assert result == obj1.strip(obj2) + + assert is_pyobject_tainted(result) == should_be_tainted + if should_be_tainted: + ranges = get_tainted_ranges(result) + assert ranges[0].start == expected_start + assert ranges[0].length == expected_length + + +@pytest.mark.parametrize( + "obj1,obj2,expected_start,expected_length", + [ + (",;,;aaa", ",;:", 0, 3), + ("aaa,;,;", ",;:", 0, 3), + (",;,;aaa,;,;", ",;:", 0, 3), + (" hello ", None, 0, 5), + ("\t\n text \t\n", None, 0, 4), + ("xxxhelloxxx", "x", 0, 5), + ("...###text###...", ".#", 0, 4), + ("абвгтекстабвг", "абвг", 0, 5), + ("...text...", ".", 0, 4), + ], +) +@pytest.mark.parametrize("ranges_position", list(range(1, 7))) +def test_strip_aspect_tainted_multiple_ranges(obj1, obj2, expected_start, expected_length, ranges_position): + from ddtrace.appsec._iast._taint_tracking.aspects import add_aspect + + concat_obj1 = add_aspect( + taint_pyobject( + pyobject=obj1[:ranges_position], + source_name="obj1_pos1", + source_value=obj1[:ranges_position], + source_origin=OriginType.PARAMETER, + ), + taint_pyobject( + pyobject=obj1[ranges_position:], + source_name="obj1_pos2", + source_value=obj1[ranges_position:], + source_origin=OriginType.PARAMETER, + ), + ) + + result = ddtrace_aspects.strip_aspect(None, 1, concat_obj1, obj2) + if obj2 is None: + assert result == concat_obj1.strip() + else: + assert result == concat_obj1.strip(obj2) + + assert is_pyobject_tainted(result) + + assert len(get_tainted_ranges(result)) + + +@pytest.mark.parametrize( + "obj1,obj2,should_be_tainted,expected_start,expected_length", + [ + ("aaa,;,;", ",;:", True, 0, 3), + ("text.....", ".", True, 0, 4), + ("text ", None, True, 0, 4), + ("hello\t\n\r", None, True, 0, 5), + ("text\u200b\u200b", "\u200b", True, 0, 4), + ("text🌟✨", "🌟✨", True, 0, 4), + ("", "abc", False, 0, 0), + ("abc", "", True, 0, 3), + (" ", None, False, 0, 0), + ("text...###", ".#", True, 0, 4), + ("текстабвг", "абвг", True, 0, 5), + ("text\t \n", None, True, 0, 4), + ("hello...", ".", True, 0, 5), + ("text \t\n\r", None, True, 0, 4), + ], +) +def test_rstrip_aspect_tainted(obj1, obj2, should_be_tainted, expected_start, expected_length): + """Test rstrip aspect with various input combinations.""" + obj1 = taint_pyobject( + pyobject=obj1, + source_name="test_rstrip_aspect", + source_value=obj1, + source_origin=OriginType.PARAMETER, + ) + + result = ddtrace_aspects.rstrip_aspect(None, 1, obj1, obj2) + if obj2 is None: + assert result == obj1.rstrip() + else: + assert result == obj1.rstrip(obj2) + + assert is_pyobject_tainted(result) == should_be_tainted + if should_be_tainted: + ranges = get_tainted_ranges(result) + assert ranges[0].start == expected_start + assert ranges[0].length == expected_length + + +@pytest.mark.parametrize( + "obj1,obj2", + [ + ("aaaa,;,;", ",;:"), + ("text.....", "."), + ("text ", None), + ("hello\t\n\r", None), + ("text\u200b\u200b", "\u200b"), + ("text🌟✨", "🌟✨"), + ("abcd", ""), + ("text...###", ".#"), + ("текстабвг", "абвг"), + ("text\t \n", None), + ("hello...", "."), + ("text \t\n\r", None), + ], +) +@pytest.mark.parametrize("ranges_position", list(range(1, 5))) +def test_rstrip_aspect_tainted_multiple_ranges(obj1, obj2, ranges_position): + from ddtrace.appsec._iast._taint_tracking.aspects import add_aspect + + concat_obj1 = add_aspect( + taint_pyobject( + pyobject=obj1[:ranges_position], + source_name="obj1_pos1", + source_value=obj1[:ranges_position], + source_origin=OriginType.PARAMETER, + ), + taint_pyobject( + pyobject=obj1[ranges_position:], + source_name="obj1_pos2", + source_value=obj1[ranges_position:], + source_origin=OriginType.PARAMETER, + ), + ) + result = ddtrace_aspects.rstrip_aspect(None, 1, concat_obj1, obj2) + if obj2 is None: + assert result == concat_obj1.rstrip() + else: + assert result == concat_obj1.rstrip(obj2) + + assert is_pyobject_tainted(result) + + ranges = get_tainted_ranges(result) + assert ranges + + for i in range(len(ranges)): + if i == 0: + len_range = ranges_position + start = 0 + else: + start = ranges_position + len_range = len(result) - ranges_position + assert ranges[i].start == start + assert ranges[i].length == len_range + + +@pytest.mark.parametrize( + "obj1,obj2,should_be_tainted,expected_start,expected_length", + [ + (",;,;aaa", ",;:", True, 0, 3), + (".....text", ".", True, 0, 4), + (" text", None, True, 0, 4), + ("\t\n\rtext", None, True, 0, 4), + ("\u200b\u200btext", "\u200b", True, 0, 4), + ("🌟✨text", "🌟✨", True, 0, 4), + ("", "abc", False, 0, 0), + ("abc", "", True, 0, 3), + (" ", None, False, 0, 0), + ("...###text", ".#", True, 0, 4), + ("абвгтекст", "абвг", True, 0, 5), + ("\t \ntext", None, True, 0, 4), + ("...hello", ".", True, 0, 5), + ("\t\n\r text", None, True, 0, 4), + ], +) +def test_lstrip_aspect_tainted(obj1, obj2, should_be_tainted, expected_start, expected_length): + """Test lstrip aspect with various input combinations.""" + obj1 = taint_pyobject( + pyobject=obj1, + source_name="test_lstrip_aspect", + source_value=obj1, + source_origin=OriginType.PARAMETER, + ) + + result = ddtrace_aspects.lstrip_aspect(None, 1, obj1, obj2) + if obj2 is None: + assert result == obj1.lstrip() + else: + assert result == obj1.lstrip(obj2) + + assert is_pyobject_tainted(result) == should_be_tainted + if should_be_tainted: + ranges = get_tainted_ranges(result) + assert ranges[0].start == expected_start + assert ranges[0].length == expected_length + + +@pytest.mark.parametrize( + "obj1,obj2", + [ + (",;,;aaa", ",;:"), + (".....text", "."), + (" text", None), + ("\t\n\rtext", None), + ("-_text", "-_"), + ("abccdefg", ""), + ("...###text", ".#"), + ("абвгтекст", "абвг"), + ("\t \ntext", None), + ("...hellos", "."), + ("\t\n\r text1234", None), + ], +) +@pytest.mark.parametrize("ranges_position", list(range(1, 5))) +def test_lstrip_aspect_tainted_multiple_ranges(obj1, obj2, ranges_position): + from ddtrace.appsec._iast._taint_tracking.aspects import add_aspect + + concat_obj1 = add_aspect( + taint_pyobject( + pyobject=obj1[:ranges_position], + source_name="obj1_pos1", + source_value=obj1[:ranges_position], + source_origin=OriginType.PARAMETER, + ), + taint_pyobject( + pyobject=obj1[ranges_position:], + source_name="obj1_pos2", + source_value=obj1[ranges_position:], + source_origin=OriginType.PARAMETER, + ), + ) + + result = ddtrace_aspects.lstrip_aspect(None, 1, concat_obj1, obj2) + + if obj2 is None: + assert result == concat_obj1.lstrip() + else: + assert result == concat_obj1.lstrip(obj2) + + assert is_pyobject_tainted(result) + + ranges = get_tainted_ranges(result) + assert ranges + + if len(ranges) == 1: + assert ranges[0].start == 0 + assert ranges[0].length == len(result) + elif len(ranges) == 2: + for i in range(len(ranges)): + if i == 0: + len_range = ranges_position - (len(concat_obj1) - len(result)) + start = 0 + else: + start = ranges_position - (len(concat_obj1) - len(result)) + len_range = len(result) - start + assert ranges[i].start == start, f"Assertion error: R[{ranges[i]}][{i}] == {start}" + assert ranges[i].length == len_range, f"Assertion error: R[{ranges[i]}][{i}] == {len_range}" + else: + pytest.xfail(f"Invalid ranges: {ranges}") + + +def test_strip_with_multiple_ranges(): + """Test strip aspect with text containing multiple tainted ranges.""" + text = "...hello...world..." + obj1 = taint_pyobject( + pyobject=text, + source_name="test_multiple_ranges", + source_value=text, + source_origin=OriginType.PARAMETER, + ) + + # Test all three strip functions + strip_result = ddtrace_aspects.strip_aspect(None, 1, obj1, ".") + rstrip_result = ddtrace_aspects.rstrip_aspect(None, 1, obj1, ".") + lstrip_result = ddtrace_aspects.lstrip_aspect(None, 1, obj1, ".") + + # Verify results + assert strip_result == "hello...world" + assert rstrip_result == "...hello...world" + assert lstrip_result == "hello...world..." + + # Verify taint ranges are preserved + assert is_pyobject_tainted(strip_result) + assert is_pyobject_tainted(rstrip_result) + assert is_pyobject_tainted(lstrip_result) diff --git a/tests/appsec/iast/fixtures/propagation_path.py b/tests/appsec/iast/fixtures/propagation_path.py index b8ecfdbd990..0383ae228d6 100644 --- a/tests/appsec/iast/fixtures/propagation_path.py +++ b/tests/appsec/iast/fixtures/propagation_path.py @@ -153,7 +153,10 @@ def propagation_memory_check(origin_string1, tainted_string_2): string13_pre = string12 + "\n" string13 = string13_pre + "notainted" # TAINTSOURCE1TAINTSOURCE2-TAINTSOURCE1TAINTSOURCE2-TAINTSOURCE1TAINTSOURCE_notainted\nnotainted - string14 = string13.splitlines()[0] # string14 = string12 + string13_1 = string13.strip() + string13_2 = string13_1.rstrip() + string13_3 = string13_2.lstrip() + string14 = string13_3.splitlines()[0] # string14 = string12 # TAINTSOURCE1TAINTSOURCE2-TAINTSOURCE1TAINTSOURCE2-TAINTSOURCE1TAINTSOURCE_notainted string15 = os.path.join("foo", "bar", string14) # /foo/bar/TAINTSOURCE1TAINTSOURCE2-TAINTSOURCE1TAINTSOURCE2-TAINTSOURCE1TAINTSOURCE_notainted diff --git a/tests/appsec/integrations/packages_tests/test_iast_sanitizers.py b/tests/appsec/integrations/packages_tests/test_iast_sanitizers.py index ba832d7f086..0df3d57fcca 100644 --- a/tests/appsec/integrations/packages_tests/test_iast_sanitizers.py +++ b/tests/appsec/integrations/packages_tests/test_iast_sanitizers.py @@ -29,6 +29,7 @@ def patch_modules(): """ _ = _iast_patched_module("html") _ = _iast_patched_module("markupsafe") + _ = _iast_patched_module("werkzeug.utils") _ = _iast_patched_module("pymysql.connections") _ = _iast_patched_module("pymysql.converters") _ = _iast_patched_module("mysql.connector.conversion") @@ -49,13 +50,11 @@ def test_werkzeug_secure_filename(): value = mod.werkzeug_secure_filename(tainted) assert value == "a-etc_passwd_DROP_TABLE_users.txt" - # TODO: the propagation doesn't work correctly in werkzeug.secure_filename because that function implements - # STRING.strip("._") which is not yet supported by IAST - # ranges = get_tainted_ranges(value) - # assert len(ranges) > 0 - # for _range in ranges: - # assert _range.has_secure_mark(VulnerabilityType.PATH_TRAVERSAL) - # assert is_pyobject_tainted(value) + ranges = get_tainted_ranges(value) + assert len(ranges) > 0 + for _range in ranges: + assert _range.has_secure_mark(VulnerabilityType.PATH_TRAVERSAL) + assert is_pyobject_tainted(value) def test_werkzeug_secure_safe_join():