From 5b47fc8affc2a8234483a1165aca91a6fc9e8c84 Mon Sep 17 00:00:00 2001 From: Wout Feys Date: Thu, 20 Mar 2025 14:24:46 +0100 Subject: [PATCH 01/21] Create new process worker script and run it on context creation --- aikido_zen/context/__init__.py | 2 ++ aikido_zen/process_worker/__init__.py | 38 +++++++++++++++++++++++++++ 2 files changed, 40 insertions(+) create mode 100644 aikido_zen/process_worker/__init__.py diff --git a/aikido_zen/context/__init__.py b/aikido_zen/context/__init__.py index 296bc68e..9ecfd1ab 100644 --- a/aikido_zen/context/__init__.py +++ b/aikido_zen/context/__init__.py @@ -14,6 +14,7 @@ from .wsgi import set_wsgi_attributes_on_context from .asgi import set_asgi_attributes_on_context from .extract_route_params import extract_route_params +from .. import process_worker UINPUT_SOURCES = ["body", "cookies", "query", "headers", "xml", "route_params"] current_context = contextvars.ContextVar("current_context", default=None) @@ -37,6 +38,7 @@ class Context: """ def __init__(self, context_obj=None, body=None, req=None, source=None): + process_worker.start_worker() if context_obj: logger.debug("Creating Context instance based on dict object.") self.__dict__.update(context_obj) diff --git a/aikido_zen/process_worker/__init__.py b/aikido_zen/process_worker/__init__.py new file mode 100644 index 00000000..fe635aba --- /dev/null +++ b/aikido_zen/process_worker/__init__.py @@ -0,0 +1,38 @@ +""" +process worker -> When a web server like gUnicorn makes new processes, and those have multiple threads, +Aikido's process worker is linked to those new processes, so in essence it's 1 extra thread. This thread +is responsible for syncing statistics, route data, ... +""" +import multiprocessing +import threading +import time + +from aikido_zen.helpers.logging import logger + + +def start_worker(): + # Find out the running process: + logger.info("[%s](%s) <-- `%s`", + multiprocessing.current_process().name, + multiprocessing.current_process().pid, + threading.current_thread().name) + + # The name is aikido-process-worker- + the current PID + thread_name = "aikido-process-worker-" + str(multiprocessing.current_process().pid) + if any([thread.name == thread_name for thread in threading.enumerate()]): + return # The thread already exists, returning. + + # Create a new daemon thread tht will handle communication to and from background agent + thread = threading.Thread(target=aikido_process_worker_thread, name=thread_name) + thread.daemon = True + thread.start() + + +def aikido_process_worker_thread(): + # Get the current process + current_process = multiprocessing.current_process() + + while True: + # Print information about the process + logger.info(f"Process ID: {current_process.pid}, Name: {current_process.name}") + time.sleep(5) From 6aa778ba9502790e6319fe1a5222ed40f1feb2b2 Mon Sep 17 00:00:00 2001 From: BitterPanda63 Date: Fri, 21 Mar 2025 10:06:12 +0100 Subject: [PATCH 02/21] Remove INITIALIZE_ROUTE command --- .../background_process/commands/__init__.py | 2 - .../commands/initialize_route.py | 11 ------ .../commands/initialize_route_test.py | 37 ------------------- 3 files changed, 50 deletions(-) delete mode 100644 aikido_zen/background_process/commands/initialize_route.py delete mode 100644 aikido_zen/background_process/commands/initialize_route_test.py diff --git a/aikido_zen/background_process/commands/__init__.py b/aikido_zen/background_process/commands/__init__.py index dd244219..0cd75ccc 100644 --- a/aikido_zen/background_process/commands/__init__.py +++ b/aikido_zen/background_process/commands/__init__.py @@ -3,7 +3,6 @@ from aikido_zen.helpers.logging import logger from .attack import process_attack from .read_property import process_read_property -from .initialize_route import process_initialize_route from .user import process_user from .should_ratelimit import process_should_ratelimit from .kill import process_kill @@ -16,7 +15,6 @@ # This maps to a tuple : (function, returns_data?) # Commands that don't return data : "ATTACK": (process_attack, False), - "INITIALIZE_ROUTE": (process_initialize_route, False), "USER": (process_user, False), "KILL": (process_kill, False), "STATISTICS": (process_statistics, False), diff --git a/aikido_zen/background_process/commands/initialize_route.py b/aikido_zen/background_process/commands/initialize_route.py deleted file mode 100644 index 83737e98..00000000 --- a/aikido_zen/background_process/commands/initialize_route.py +++ /dev/null @@ -1,11 +0,0 @@ -"""Exports `process_initialize_route`""" - - -def process_initialize_route(connection_manager, data, queue=None): - """ - This is called the first time a route is discovered to initialize it and add one hit. - data is a dictionary called route_metadata which includes: route, method and url. - """ - if connection_manager: - connection_manager.routes.initialize_route(route_metadata=data) - connection_manager.routes.increment_route(route_metadata=data) diff --git a/aikido_zen/background_process/commands/initialize_route_test.py b/aikido_zen/background_process/commands/initialize_route_test.py deleted file mode 100644 index 69e7abbb..00000000 --- a/aikido_zen/background_process/commands/initialize_route_test.py +++ /dev/null @@ -1,37 +0,0 @@ -import pytest -from unittest.mock import MagicMock, patch -from .initialize_route import process_initialize_route - - -@pytest.fixture -def mock_connection_manager(): - """Fixture to create a mock connection_manager with a routes attribute.""" - connection_manager = MagicMock() - connection_manager.routes = MagicMock() - return connection_manager - - -def test_process_initialize_route(mock_connection_manager): - """Test that process_initialize_route adds a route when connection_manager is present.""" - data = 123456 - - process_initialize_route( - mock_connection_manager, data, None - ) # conn is not used in this function - - # Check that increment_route and initialize_route methods were called with the correct arguments - mock_connection_manager.routes.initialize_route.assert_called_once_with( - route_metadata=123456 - ) - mock_connection_manager.routes.increment_route.assert_called_once_with( - route_metadata=123456 - ) - - -def test_process_initialize_route_no_connection_manager(): - """Test that process_initialize_route does nothing when connection_manager is not present.""" - data = 123456 - - process_initialize_route(None, data, None) # conn is not used in this function - - # Check that no error occurs From ead26f850c6d9c9e7878a7d3d3411b8d671b6849 Mon Sep 17 00:00:00 2001 From: BitterPanda63 Date: Fri, 21 Mar 2025 10:06:41 +0100 Subject: [PATCH 03/21] Cleanup of process_worker logic --- aikido_zen/context/__init__.py | 2 -- aikido_zen/process_worker/__init__.py | 38 --------------------------- 2 files changed, 40 deletions(-) delete mode 100644 aikido_zen/process_worker/__init__.py diff --git a/aikido_zen/context/__init__.py b/aikido_zen/context/__init__.py index 9ecfd1ab..296bc68e 100644 --- a/aikido_zen/context/__init__.py +++ b/aikido_zen/context/__init__.py @@ -14,7 +14,6 @@ from .wsgi import set_wsgi_attributes_on_context from .asgi import set_asgi_attributes_on_context from .extract_route_params import extract_route_params -from .. import process_worker UINPUT_SOURCES = ["body", "cookies", "query", "headers", "xml", "route_params"] current_context = contextvars.ContextVar("current_context", default=None) @@ -38,7 +37,6 @@ class Context: """ def __init__(self, context_obj=None, body=None, req=None, source=None): - process_worker.start_worker() if context_obj: logger.debug("Creating Context instance based on dict object.") self.__dict__.update(context_obj) diff --git a/aikido_zen/process_worker/__init__.py b/aikido_zen/process_worker/__init__.py deleted file mode 100644 index fe635aba..00000000 --- a/aikido_zen/process_worker/__init__.py +++ /dev/null @@ -1,38 +0,0 @@ -""" -process worker -> When a web server like gUnicorn makes new processes, and those have multiple threads, -Aikido's process worker is linked to those new processes, so in essence it's 1 extra thread. This thread -is responsible for syncing statistics, route data, ... -""" -import multiprocessing -import threading -import time - -from aikido_zen.helpers.logging import logger - - -def start_worker(): - # Find out the running process: - logger.info("[%s](%s) <-- `%s`", - multiprocessing.current_process().name, - multiprocessing.current_process().pid, - threading.current_thread().name) - - # The name is aikido-process-worker- + the current PID - thread_name = "aikido-process-worker-" + str(multiprocessing.current_process().pid) - if any([thread.name == thread_name for thread in threading.enumerate()]): - return # The thread already exists, returning. - - # Create a new daemon thread tht will handle communication to and from background agent - thread = threading.Thread(target=aikido_process_worker_thread, name=thread_name) - thread.daemon = True - thread.start() - - -def aikido_process_worker_thread(): - # Get the current process - current_process = multiprocessing.current_process() - - while True: - # Print information about the process - logger.info(f"Process ID: {current_process.pid}, Name: {current_process.name}") - time.sleep(5) From 932d840d6233fc571a6c628093d8591e5159f252 Mon Sep 17 00:00:00 2001 From: BitterPanda63 Date: Fri, 21 Mar 2025 10:07:01 +0100 Subject: [PATCH 04/21] Create a process_worker and process_worker_loader in thread --- aikido_zen/thread/process_worker.py | 28 ++++++++++++++++++++++ aikido_zen/thread/process_worker_loader.py | 20 ++++++++++++++++ 2 files changed, 48 insertions(+) create mode 100644 aikido_zen/thread/process_worker.py create mode 100644 aikido_zen/thread/process_worker_loader.py diff --git a/aikido_zen/thread/process_worker.py b/aikido_zen/thread/process_worker.py new file mode 100644 index 00000000..bff09780 --- /dev/null +++ b/aikido_zen/thread/process_worker.py @@ -0,0 +1,28 @@ +import multiprocessing +import time + +from aikido_zen.helpers.logging import logger +from aikido_zen.thread import thread_cache + +# Renew the cache from this background worker every 5 seconds +RENEW_CACHE_EVERY_X_SEC = 5 + + +def aikido_process_worker_thread(): + """ + process worker -> When a web server like gUnicorn makes new processes, and those have multiple threads, + Aikido process worker is linked to those new processes, so in essence it's 1 extra thread. This thread + is responsible for syncing statistics, route data, configuration, ... + """ + # Get the current process + current_process = multiprocessing.current_process() + + while True: + # Print information about the process + logger.debug( + f"Process ID: {current_process.pid}, Name: {current_process.name} - process_worker running." + ) + + # Renew the cache every 5 seconds + thread_cache.renew() + time.sleep(RENEW_CACHE_EVERY_X_SEC) diff --git a/aikido_zen/thread/process_worker_loader.py b/aikido_zen/thread/process_worker_loader.py new file mode 100644 index 00000000..a1f87bbb --- /dev/null +++ b/aikido_zen/thread/process_worker_loader.py @@ -0,0 +1,20 @@ +import multiprocessing +import threading + +from aikido_zen.thread.process_worker import aikido_process_worker_thread + + +def load_worker(): + """ + Loads in a new process worker if one does not already exist for the current process + """ + + # The name is aikido-process-worker- + the current PID + thread_name = "aikido-process-worker-" + str(multiprocessing.current_process().pid) + if any([thread.name == thread_name for thread in threading.enumerate()]): + return # The thread already exists, returning. + + # Create a new daemon thread tht will handle communication to and from background agent + thread = threading.Thread(target=aikido_process_worker_thread, name=thread_name) + thread.daemon = True + thread.start() From c10d834e87f188da07ae810a33484815b1421427 Mon Sep 17 00:00:00 2001 From: BitterPanda63 Date: Fri, 21 Mar 2025 10:07:11 +0100 Subject: [PATCH 05/21] make thread_cache process-llocal --- aikido_zen/thread/thread_cache.py | 61 +++++++++++++------------------ 1 file changed, 25 insertions(+), 36 deletions(-) diff --git a/aikido_zen/thread/thread_cache.py b/aikido_zen/thread/thread_cache.py index 0dd8dcc1..257b95fd 100644 --- a/aikido_zen/thread/thread_cache.py +++ b/aikido_zen/thread/thread_cache.py @@ -1,39 +1,19 @@ """Exports class ThreadConfig""" -from threading import local import aikido_zen.background_process.comms as comms -import aikido_zen.helpers.get_current_unixtime_ms as t -from aikido_zen.context import get_current_context from aikido_zen.background_process.routes import Routes from aikido_zen.background_process.service_config import ServiceConfig from aikido_zen.helpers.logging import logger - -THREAD_CONFIG_TTL_MS = 60 * 1000 # Time-To-Live is 60 seconds for the thread cache - -threadlocal_storage = local() - - -def get_cache(): - """Returns the current ThreadCache""" - cache = getattr(threadlocal_storage, "cache", None) - if not cache: - return ThreadCache() - return cache +from aikido_zen.thread import process_worker_loader class ThreadCache: """ - A thread-local cache object that holds routes, bypassed ips, endpoints amount of requests - With a Time-To-Live given by THREAD_CONFIG_TTL_MS + A process-local cache object that holds routes, bypassed ips, endpoints amount of requests """ def __init__(self): - # Load initial data : - self.reset() - self.renew() - - # Save as a thread-local object : - threadlocal_storage.cache = self + self.reset() # Initialize values def is_bypassed_ip(self, ip): """Checks the given IP against the list of bypassed ips""" @@ -43,14 +23,6 @@ def is_user_blocked(self, user_id): """Checks if the user id is blocked""" return user_id in self.config.blocked_uids - def renew_if_ttl_expired(self): - """Renews the data only if TTL has expired""" - ttl_has_expired = ( - t.get_unixtime_ms(monotonic=True) - self.last_renewal > THREAD_CONFIG_TTL_MS - ) - if ttl_has_expired: - self.renew() - def get_endpoints(self): return self.config.endpoints @@ -65,16 +37,14 @@ def reset(self): received_any_stats=False, ) self.reqs = 0 - self.last_renewal = 0 self.middleware_installed = False def renew(self): """ Makes an IPC call to store the amount of hits and requests and renew the config """ - # Don't try to fetch a thread cache if communications don't work or - # if we are not inside the context of a web request - if not comms.get_comms() or not get_current_context(): + + if not comms.get_comms(): return res = comms.get_comms().send_data_to_bg_process( @@ -95,9 +65,28 @@ def renew(self): self.routes.routes = res["data"]["routes"] for route in self.routes.routes.values(): route["hits_delta_since_sync"] = 0 - self.last_renewal = t.get_unixtime_ms(monotonic=True) logger.debug("Renewed thread cache") def increment_stats(self): """Increments the requests""" self.reqs += 1 + + +# For these 2 functions and the data they process, we rely on Python's GIL +# See here: https://wiki.python.org/moin/GlobalInterpreterLock +global_thread_cache = ThreadCache() + + +def get_cache(): + """ + Returns the cache, protected by Python's GIL (so not our own mutex), + and starts the process worker (which syncs info between the cache and agent), if it doesn't already exist. + """ + global global_thread_cache + process_worker_loader.load_worker() + return global_thread_cache + + +def renew(): + global global_thread_cache + global_thread_cache.renew() From 312c115f21bf086ce1fae7d94dbcf175a07263a6 Mon Sep 17 00:00:00 2001 From: BitterPanda63 Date: Fri, 21 Mar 2025 10:07:34 +0100 Subject: [PATCH 06/21] Improve route increment stuff --- aikido_zen/background_process/routes/__init__.py | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/aikido_zen/background_process/routes/__init__.py b/aikido_zen/background_process/routes/__init__.py index 27b21d53..094ff6e6 100644 --- a/aikido_zen/background_process/routes/__init__.py +++ b/aikido_zen/background_process/routes/__init__.py @@ -19,20 +19,17 @@ def __init__(self, max_size=1000): def initialize_route(self, route_metadata): """ - Initializes a route for the first time. + Initializes a route for the first time. `hits_delta_since_sync` counts delta between syncs. """ self.manage_routes_size() key = route_to_key(route_metadata) - if self.routes.get(key): - return self.routes[key] = { "method": route_metadata.get("method"), "path": route_metadata.get("route"), "hits": 0, + "hits_delta_since_sync": 0, "apispec": {}, } - # This field counts the difference in hits in between synchronisation for threads : - self.routes[key]["hits_delta_since_sync"] = 0 def increment_route(self, route_metadata): """ @@ -41,8 +38,8 @@ def increment_route(self, route_metadata): """ key = route_to_key(route_metadata) if not self.routes.get(key): - return - # Add a hit to the route : + self.initialize_route(route_metadata) + # Add hits to route = self.routes.get(key) route["hits"] += 1 route["hits_delta_since_sync"] += 1 From a1f9a2574b638cd3efe299b0ea877d4924dc3035 Mon Sep 17 00:00:00 2001 From: BitterPanda63 Date: Fri, 21 Mar 2025 10:10:23 +0100 Subject: [PATCH 07/21] fix thread_cache_test test cases --- aikido_zen/thread/thread_cache_test.py | 145 +------------------------ 1 file changed, 3 insertions(+), 142 deletions(-) diff --git a/aikido_zen/thread/thread_cache_test.py b/aikido_zen/thread/thread_cache_test.py index 284ad56e..7259d227 100644 --- a/aikido_zen/thread/thread_cache_test.py +++ b/aikido_zen/thread/thread_cache_test.py @@ -1,7 +1,7 @@ import pytest from unittest.mock import patch, MagicMock from aikido_zen.background_process.routes import Routes -from .thread_cache import ThreadCache, THREAD_CONFIG_TTL_MS, threadlocal_storage +from .thread_cache import ThreadCache, get_cache from ..background_process.service_config import ServiceConfig from ..context import current_context, Context from aikido_zen.helpers.iplist import IPList @@ -24,7 +24,7 @@ def run_around_tests(): yield # Make sure to reset thread cache after every test so it does not # interfere with other tests - setattr(threadlocal_storage, "cache", None) + get_cache().reset() current_context.set(None) @@ -35,7 +35,6 @@ def test_initialization(thread_cache: ThreadCache): assert thread_cache.get_endpoints() == [] assert thread_cache.config.blocked_uids == set() assert thread_cache.reqs == 0 - assert thread_cache.last_renewal == 0 def test_is_bypassed_ip(thread_cache: ThreadCache): @@ -55,70 +54,6 @@ def test_is_user_blocked(thread_cache: ThreadCache): assert thread_cache.is_user_blocked("user456") is False -@patch("aikido_zen.background_process.comms.get_comms") -@patch("aikido_zen.helpers.get_current_unixtime_ms.get_unixtime_ms") -def test_renew_if_ttl_expired( - mock_get_unixtime_ms, mock_get_comms, thread_cache: ThreadCache -): - """Test renewing the cache if TTL has expired.""" - mock_get_unixtime_ms.return_value = ( - THREAD_CONFIG_TTL_MS + 1 - ) # Simulate TTL expiration - mock_get_comms.return_value = MagicMock() - mock_get_comms.return_value.send_data_to_bg_process.return_value = { - "success": True, - "data": { - "config": ServiceConfig( - endpoints=[ - { - "graphql": False, - "method": "POST", - "route": "/v2", - "rate_limiting": { - "enabled": False, - }, - "force_protection_off": False, - } - ], - bypassed_ips=["192.168.1.1"], - blocked_uids={"user123"}, - last_updated_at=-1, - received_any_stats=True, - ), - "routes": {}, - }, - } - - thread_cache.renew_if_ttl_expired() - assert thread_cache.is_bypassed_ip("192.168.1.1") - assert thread_cache.get_endpoints() == [ - { - "graphql": False, - "method": "POST", - "route": "/v2", - "rate_limiting": { - "enabled": False, - }, - "force_protection_off": False, - } - ] - assert thread_cache.is_user_blocked("user123") - assert thread_cache.last_renewal > 0 - - -@patch("aikido_zen.background_process.comms.get_comms") -@patch("aikido_zen.helpers.get_current_unixtime_ms.get_unixtime_ms") -def test_renew_if_ttl_not_expired( - mock_get_unixtime_ms, mock_get_comms, thread_cache: ThreadCache -): - """Test that renew is not called if TTL has not expired.""" - mock_get_unixtime_ms.return_value = 0 # Simulate TTL not expired - thread_cache.last_renewal = 0 # Set last renewal to 0 - - thread_cache.renew_if_ttl_expired() - assert thread_cache.last_renewal == 0 # Should not change - - def test_reset(thread_cache: ThreadCache): """Test that reset empties the cache.""" thread_cache.config.bypassed_ips.add("192.168.1.1") @@ -128,7 +63,6 @@ def test_reset(thread_cache: ThreadCache): assert isinstance(thread_cache.config.bypassed_ips, IPList) assert thread_cache.config.blocked_uids == set() assert thread_cache.reqs == 0 - assert thread_cache.last_renewal == 0 def test_increment_stats(thread_cache): @@ -148,7 +82,6 @@ def test_renew_with_no_comms(thread_cache: ThreadCache): assert thread_cache.get_endpoints() == [] assert thread_cache.config.blocked_uids == set() assert thread_cache.reqs == 0 - assert thread_cache.last_renewal == 0 @patch("aikido_zen.background_process.comms.get_comms") @@ -167,7 +100,6 @@ def test_renew_with_invalid_response(mock_get_comms, thread_cache: ThreadCache): assert isinstance(thread_cache.config.bypassed_ips, IPList) assert thread_cache.get_endpoints() == [] assert thread_cache.config.blocked_uids == set() - assert thread_cache.last_renewal > 0 # Should update last_renewal def test_is_bypassed_ip_case_insensitivity(thread_cache: ThreadCache): @@ -194,83 +126,12 @@ def increment_in_thread(): assert thread_cache.reqs == 1000 # 10 threads incrementing 100 times -@patch("aikido_zen.background_process.comms.get_comms") -@patch("aikido_zen.helpers.get_current_unixtime_ms.get_unixtime_ms") -def test_renew_if_ttl_expired_multiple_times( - mock_get_unixtime_ms, mock_get_comms, thread_cache: ThreadCache -): - """Test renewing the cache multiple times if TTL has expired.""" - mock_get_unixtime_ms.return_value = ( - THREAD_CONFIG_TTL_MS + 1 - ) # Simulate TTL expiration - mock_get_comms.return_value = MagicMock() - mock_get_comms.return_value.send_data_to_bg_process.return_value = { - "success": True, - "data": { - "config": ServiceConfig( - endpoints=[ - { - "graphql": False, - "method": "POST", - "route": "/v2", - "rate_limiting": { - "enabled": False, - }, - "force_protection_off": False, - } - ], - bypassed_ips=["192.168.1.1"], - blocked_uids={"user123"}, - last_updated_at=-1, - received_any_stats=True, - ), - "routes": {}, - }, - } - - # First renewal - thread_cache.renew_if_ttl_expired() - assert thread_cache.is_bypassed_ip("192.168.1.1") - assert thread_cache.get_endpoints() == [ - { - "graphql": False, - "method": "POST", - "route": "/v2", - "rate_limiting": { - "enabled": False, - }, - "force_protection_off": False, - } - ] - assert thread_cache.is_user_blocked("user123") - - # Simulate another TTL expiration - mock_get_unixtime_ms.return_value += THREAD_CONFIG_TTL_MS + 1 - thread_cache.renew_if_ttl_expired() - assert thread_cache.is_bypassed_ip("192.168.1.1") - assert thread_cache.get_endpoints() == [ - { - "graphql": False, - "method": "POST", - "route": "/v2", - "rate_limiting": { - "enabled": False, - }, - "force_protection_off": False, - } - ] - assert thread_cache.is_user_blocked("user123") - - @patch("aikido_zen.background_process.comms.get_comms") @patch("aikido_zen.helpers.get_current_unixtime_ms.get_unixtime_ms") def test_parses_routes_correctly( mock_get_unixtime_ms, mock_get_comms, thread_cache: ThreadCache ): """Test renewing the cache multiple times if TTL has expired.""" - mock_get_unixtime_ms.return_value = ( - THREAD_CONFIG_TTL_MS + 1 - ) # Simulate TTL expiration mock_get_comms.return_value = MagicMock() mock_get_comms.return_value.send_data_to_bg_process.return_value = { "success": True, @@ -312,7 +173,7 @@ def test_parses_routes_correctly( } # First renewal - thread_cache.renew_if_ttl_expired() + thread_cache.renew() assert thread_cache.is_bypassed_ip("192.168.1.1") assert thread_cache.get_endpoints() == [ { From a74b3463f6882987b48e16cbf89fb07373523781 Mon Sep 17 00:00:00 2001 From: BitterPanda63 Date: Fri, 21 Mar 2025 10:21:11 +0100 Subject: [PATCH 08/21] Fix all test cases that relied on internal thread cache logic --- aikido_zen/middleware/init_test.py | 15 ++++++--------- aikido_zen/sinks/tests/requests_test.py | 6 +++--- aikido_zen/sinks/tests/urrlib3_test.py | 6 +++--- aikido_zen/vulnerabilities/init_test.py | 16 ++++++---------- 4 files changed, 18 insertions(+), 25 deletions(-) diff --git a/aikido_zen/middleware/init_test.py b/aikido_zen/middleware/init_test.py index dea1e69a..8ee6f2a2 100644 --- a/aikido_zen/middleware/init_test.py +++ b/aikido_zen/middleware/init_test.py @@ -2,16 +2,18 @@ import pytest from aikido_zen.context import current_context, Context, get_current_context -from aikido_zen.thread.thread_cache import ThreadCache, threadlocal_storage +from aikido_zen.thread.thread_cache import ThreadCache, get_cache from . import should_block_request @pytest.fixture(autouse=True) def run_around_tests(): + get_cache().reset() yield # Make sure to reset context and cache after every test so it does not # interfere with other tests current_context.set(None) + get_cache().reset() def test_without_context(): @@ -39,20 +41,15 @@ def set_context(user=None, executed_middleware=False): ).set_as_current_context() -class MyThreadCache(ThreadCache): - def renew_if_ttl_expired(self): - return - - def test_with_context_without_cache(): set_context() - threadlocal_storage.cache = None + get_cache().cache = None assert should_block_request() == {"block": False} def test_with_context_with_cache(): set_context(user={"id": "123"}) - thread_cache = MyThreadCache() + thread_cache = get_cache() thread_cache.config.blocked_uids = ["123"] assert get_current_context().executed_middleware == False @@ -76,7 +73,7 @@ def test_with_context_with_cache(): def test_cache_comms_with_endpoints(): set_context(user={"id": "456"}) - thread_cache = MyThreadCache() + thread_cache = get_cache() thread_cache.config.blocked_uids = ["123"] thread_cache.config.endpoints = [ { diff --git a/aikido_zen/sinks/tests/requests_test.py b/aikido_zen/sinks/tests/requests_test.py index ebc04dfd..628a78c7 100644 --- a/aikido_zen/sinks/tests/requests_test.py +++ b/aikido_zen/sinks/tests/requests_test.py @@ -1,7 +1,7 @@ import os import pytest from aikido_zen.context import Context, current_context -from aikido_zen.thread.thread_cache import ThreadCache, threadlocal_storage +from aikido_zen.thread.thread_cache import ThreadCache, get_cache from aikido_zen.errors import AikidoSSRF from aikido_zen.background_process.comms import reset_comms import aikido_zen.sinks.socket @@ -19,11 +19,12 @@ @pytest.fixture(autouse=True) def run_around_tests(): + get_cache().reset() yield # Make sure to reset context and cache after every test so it does not # interfere with other tests current_context.set(None) - setattr(threadlocal_storage, "cache", None) + get_cache().reset() def set_context_and_lifecycle(url): @@ -48,7 +49,6 @@ def set_context_and_lifecycle(url): source="flask", ) context.set_as_current_context() - ThreadCache() def test_srrf_test(monkeypatch): diff --git a/aikido_zen/sinks/tests/urrlib3_test.py b/aikido_zen/sinks/tests/urrlib3_test.py index 3bcc8122..411c3135 100644 --- a/aikido_zen/sinks/tests/urrlib3_test.py +++ b/aikido_zen/sinks/tests/urrlib3_test.py @@ -1,7 +1,7 @@ import os import pytest from aikido_zen.context import Context, current_context -from aikido_zen.thread.thread_cache import ThreadCache, threadlocal_storage +from aikido_zen.thread.thread_cache import get_cache from aikido_zen.errors import AikidoSSRF from aikido_zen.background_process.comms import reset_comms import aikido_zen.sinks.socket @@ -19,11 +19,12 @@ @pytest.fixture(autouse=True) def run_around_tests(): + get_cache().reset() yield # Make sure to reset context and cache after every test so it does not # interfere with other tests current_context.set(None) - setattr(threadlocal_storage, "cache", None) + get_cache().reset() def set_context_and_lifecycle(url): @@ -48,7 +49,6 @@ def set_context_and_lifecycle(url): source="flask", ) context.set_as_current_context() - ThreadCache() def test_srrf_test(monkeypatch): diff --git a/aikido_zen/vulnerabilities/init_test.py b/aikido_zen/vulnerabilities/init_test.py index 32264c46..c5d6062b 100644 --- a/aikido_zen/vulnerabilities/init_test.py +++ b/aikido_zen/vulnerabilities/init_test.py @@ -3,17 +3,18 @@ from . import run_vulnerability_scan from aikido_zen.context import current_context, Context from aikido_zen.errors import AikidoSQLInjection -from aikido_zen.thread.thread_cache import ThreadCache, threadlocal_storage +from aikido_zen.thread.thread_cache import get_cache from aikido_zen.helpers.iplist import IPList @pytest.fixture(autouse=True) def run_around_tests(): + get_cache().reset() yield # Make sure to reset context and cache after every test so it does not # interfere with other tests current_context.set(None) - setattr(threadlocal_storage, "cache", None) + get_cache().reset() @pytest.fixture @@ -49,14 +50,14 @@ def get_context(): def test_run_vulnerability_scan_no_context(caplog): current_context.set(None) - threadlocal_storage.cache = 1 + get_cache().cache = 1 run_vulnerability_scan(kind="test", op="test", args=tuple()) assert len(caplog.text) == 0 def test_run_vulnerability_scan_no_context_no_lifecycle(caplog): current_context.set(None) - threadlocal_storage.cache = None + get_cache().cache = None run_vulnerability_scan(kind="test", op="test", args=tuple()) assert len(caplog.text) == 0 @@ -64,13 +65,12 @@ def test_run_vulnerability_scan_no_context_no_lifecycle(caplog): def test_run_vulnerability_scan_context_no_lifecycle(caplog): with pytest.raises(Exception): current_context.set(1) - threadlocal_storage.cache = None + get_cache().cache = None run_vulnerability_scan(kind="test", op="test", args=tuple()) def test_lifecycle_cache_ok(caplog, get_context): get_context.set_as_current_context() - cache = ThreadCache() run_vulnerability_scan(kind="test", op="test", args=tuple()) assert "Vulnerability type test currently has no scans implemented" in caplog.text @@ -92,7 +92,6 @@ def test_lifecycle_cache_bypassed_ip(caplog, get_context): def test_sql_injection(caplog, get_context, monkeypatch): get_context.set_as_current_context() - cache = ThreadCache() monkeypatch.setenv("AIKIDO_BLOCK", "1") with pytest.raises(AikidoSQLInjection): run_vulnerability_scan( @@ -104,7 +103,6 @@ def test_sql_injection(caplog, get_context, monkeypatch): def test_sql_injection_with_route_params(caplog, get_context, monkeypatch): get_context.set_as_current_context() - cache = ThreadCache() monkeypatch.setenv("AIKIDO_BLOCK", "1") with pytest.raises(AikidoSQLInjection): run_vulnerability_scan( @@ -116,8 +114,6 @@ def test_sql_injection_with_route_params(caplog, get_context, monkeypatch): def test_sql_injection_with_comms(caplog, get_context, monkeypatch): get_context.set_as_current_context() - cache = ThreadCache() - cache.last_renewal = 9999999999999999999999 monkeypatch.setenv("AIKIDO_BLOCK", "1") with patch("aikido_zen.background_process.comms.get_comms") as mock_get_comms: # Create a mock comms object From 2f08b88c87de660e80eaff51a5a3c9abd2d6f419 Mon Sep 17 00:00:00 2001 From: BitterPanda63 Date: Fri, 21 Mar 2025 10:23:34 +0100 Subject: [PATCH 09/21] Cleanup thread_cache.py --- aikido_zen/thread/thread_cache.py | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/aikido_zen/thread/thread_cache.py b/aikido_zen/thread/thread_cache.py index 257b95fd..9da965f6 100644 --- a/aikido_zen/thread/thread_cache.py +++ b/aikido_zen/thread/thread_cache.py @@ -40,13 +40,10 @@ def reset(self): self.middleware_installed = False def renew(self): - """ - Makes an IPC call to store the amount of hits and requests and renew the config - """ - if not comms.get_comms(): return + # send stored data and receive new config and routes res = comms.get_comms().send_data_to_bg_process( action="SYNC_DATA", obj={ @@ -56,16 +53,19 @@ def renew(self): }, receive=True, ) + if not res["success"] and res["data"]: + return self.reset() - if res["success"] and res["data"]: - if isinstance(res["data"].get("config"), ServiceConfig): - self.config = res["data"]["config"] - if isinstance(res["data"].get("routes"), dict): - self.routes.routes = res["data"]["routes"] - for route in self.routes.routes.values(): - route["hits_delta_since_sync"] = 0 - logger.debug("Renewed thread cache") + # update config + if isinstance(res["data"].get("config"), ServiceConfig): + self.config = res["data"]["config"] + + # update routes + if isinstance(res["data"].get("routes"), dict): + self.routes.routes = res["data"]["routes"] + for route in self.routes.routes.values(): + route["hits_delta_since_sync"] = 0 def increment_stats(self): """Increments the requests""" From 950849c6797a190ed5bdcf402ccc39c7150cfa10 Mon Sep 17 00:00:00 2001 From: BitterPanda63 Date: Fri, 21 Mar 2025 10:24:02 +0100 Subject: [PATCH 10/21] make log message make some sense --- aikido_zen/thread/process_worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aikido_zen/thread/process_worker.py b/aikido_zen/thread/process_worker.py index bff09780..bdf14b7b 100644 --- a/aikido_zen/thread/process_worker.py +++ b/aikido_zen/thread/process_worker.py @@ -20,7 +20,7 @@ def aikido_process_worker_thread(): while True: # Print information about the process logger.debug( - f"Process ID: {current_process.pid}, Name: {current_process.name} - process_worker running." + f"Process ID: {current_process.pid}, Name: {current_process.name} - process_worker renewing thread cache." ) # Renew the cache every 5 seconds From 436f6300b6c590a4a0f129020bdbd9bbba461649 Mon Sep 17 00:00:00 2001 From: BitterPanda63 Date: Fri, 21 Mar 2025 10:29:32 +0100 Subject: [PATCH 11/21] fix thread_cache res check and only start process_worker with context --- aikido_zen/thread/process_worker_loader.py | 3 +++ aikido_zen/thread/thread_cache.py | 3 ++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/aikido_zen/thread/process_worker_loader.py b/aikido_zen/thread/process_worker_loader.py index a1f87bbb..bfb6a183 100644 --- a/aikido_zen/thread/process_worker_loader.py +++ b/aikido_zen/thread/process_worker_loader.py @@ -1,6 +1,7 @@ import multiprocessing import threading +from aikido_zen.context import get_current_context from aikido_zen.thread.process_worker import aikido_process_worker_thread @@ -8,6 +9,8 @@ def load_worker(): """ Loads in a new process worker if one does not already exist for the current process """ + if get_current_context() is None: + return # don't start a worker if it's not related to a request. # The name is aikido-process-worker- + the current PID thread_name = "aikido-process-worker-" + str(multiprocessing.current_process().pid) diff --git a/aikido_zen/thread/thread_cache.py b/aikido_zen/thread/thread_cache.py index 9da965f6..0b6fe1ef 100644 --- a/aikido_zen/thread/thread_cache.py +++ b/aikido_zen/thread/thread_cache.py @@ -3,6 +3,7 @@ import aikido_zen.background_process.comms as comms from aikido_zen.background_process.routes import Routes from aikido_zen.background_process.service_config import ServiceConfig +from aikido_zen.context import get_current_context from aikido_zen.helpers.logging import logger from aikido_zen.thread import process_worker_loader @@ -53,7 +54,7 @@ def renew(self): }, receive=True, ) - if not res["success"] and res["data"]: + if not res["success"] or not res["data"]: return self.reset() From f6d6cfc63397c1e4adf24c81af31f86e1c0a4408 Mon Sep 17 00:00:00 2001 From: BitterPanda63 Date: Fri, 21 Mar 2025 11:03:26 +0100 Subject: [PATCH 12/21] Fix broken routes test --- aikido_zen/background_process/routes/init_test.py | 7 ------- 1 file changed, 7 deletions(-) diff --git a/aikido_zen/background_process/routes/init_test.py b/aikido_zen/background_process/routes/init_test.py index 73dd9dd5..44471f62 100644 --- a/aikido_zen/background_process/routes/init_test.py +++ b/aikido_zen/background_process/routes/init_test.py @@ -102,13 +102,6 @@ def test_increment_route_twice(): assert routes.routes["GET:/api/resource"]["hits"] == 2 -def test_increment_route_that_does_not_exist(): - routes = Routes(max_size=3) - routes.increment_route(gen_route_metadata(route="/api/resource")) - routes.increment_route(gen_route_metadata(route="/api/resource")) - assert len(routes.routes) == 0 - - def test_clear_routes(): routes = Routes(max_size=3) routes.initialize_route(gen_route_metadata(route="/api/resource")) From 3f570f382708c815afc91194c614a96625afffc3 Mon Sep 17 00:00:00 2001 From: Wout Feys Date: Tue, 25 Mar 2025 11:03:35 +0100 Subject: [PATCH 13/21] Fix django mysql e2e test --- end2end/django_mysql_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/end2end/django_mysql_test.py b/end2end/django_mysql_test.py index 7e499be4..de5b4366 100644 --- a/end2end/django_mysql_test.py +++ b/end2end/django_mysql_test.py @@ -81,7 +81,7 @@ def test_initial_heartbeat(): [{ "apispec": {}, "hits": 1, - "hits_delta_since_sync": 1, + "hits_delta_since_sync": 0, "method": "POST", "path": "/app/create" }], From f41512e403b98125c8be22cb772c79613fd000f1 Mon Sep 17 00:00:00 2001 From: Wout Feys Date: Tue, 25 Mar 2025 11:18:58 +0100 Subject: [PATCH 14/21] Also add apispec to e2e test --- end2end/django_mysql_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/end2end/django_mysql_test.py b/end2end/django_mysql_test.py index de5b4366..5443caf6 100644 --- a/end2end/django_mysql_test.py +++ b/end2end/django_mysql_test.py @@ -79,7 +79,7 @@ def test_initial_heartbeat(): assert len(heartbeat_events) == 1 validate_heartbeat(heartbeat_events[0], [{ - "apispec": {}, + "apispec": {'body': {'type': 'form-urlencoded', 'schema': {'type': 'object', 'properties': {'dog_name': {'type': 'string'}}}}, 'query': None, 'auth': None}, "hits": 1, "hits_delta_since_sync": 0, "method": "POST", From 19c7b62e5d8a814e049bc6261cee15eaf1e606a1 Mon Sep 17 00:00:00 2001 From: BitterPanda63 Date: Wed, 26 Mar 2025 11:21:06 +0100 Subject: [PATCH 15/21] Total is 3 requests for django_mysql e2e --- end2end/django_mysql_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/end2end/django_mysql_test.py b/end2end/django_mysql_test.py index 5443caf6..95c8f456 100644 --- a/end2end/django_mysql_test.py +++ b/end2end/django_mysql_test.py @@ -85,5 +85,5 @@ def test_initial_heartbeat(): "method": "POST", "path": "/app/create" }], - {"aborted":0,"attacksDetected":{"blocked":2,"total":2},"total":0} + {"aborted":0,"attacksDetected":{"blocked":2,"total":2},"total":3} ) From 5b83da45acbab4f903b6820546897de6a679e5f5 Mon Sep 17 00:00:00 2001 From: BitterPanda63 Date: Wed, 26 Mar 2025 11:25:48 +0100 Subject: [PATCH 16/21] Fix request_handler --- aikido_zen/sources/functions/request_handler.py | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/aikido_zen/sources/functions/request_handler.py b/aikido_zen/sources/functions/request_handler.py index 0ea7cb00..00cae7a7 100644 --- a/aikido_zen/sources/functions/request_handler.py +++ b/aikido_zen/sources/functions/request_handler.py @@ -15,12 +15,9 @@ def request_handler(stage, status_code=0): """This will check for rate limiting, Allowed IP's, useful routes, etc.""" try: if stage == "init": - # Initial stage of the request, called after context is stored. thread_cache = get_cache() - thread_cache.renew_if_ttl_expired() # Only check TTL at the start of a request. if ctx.get_current_context() and thread_cache: thread_cache.increment_stats() # Increment request statistics if a context exists. - if stage == "pre_response": return pre_response() if stage == "post_response": @@ -79,6 +76,8 @@ def post_response(status_code): comms = communications.get_comms() if not context or not comms: return + route_metadata = context.get_route_metadata() + is_curr_route_useful = is_useful_route( status_code, context.route, @@ -86,17 +85,12 @@ def post_response(status_code): ) if not is_curr_route_useful: return - route_metadata = context.get_route_metadata() + cache = get_cache() if cache: - route = cache.routes.get(route_metadata) - if not route: - # This route does not exist yet, initialize it: - cache.routes.initialize_route(route_metadata) - comms.send_data_to_bg_process("INITIALIZE_ROUTE", route_metadata) + cache.routes.increment_route(route_metadata) + # Run API Discovery : update_route_info( new_apispec=get_api_info(context), route=cache.routes.get(route_metadata) ) - # Add hit : - cache.routes.increment_route(route_metadata) From 0c929cc9d12de43ca6e92d69f63020861612cc54 Mon Sep 17 00:00:00 2001 From: BitterPanda63 Date: Wed, 26 Mar 2025 11:29:42 +0100 Subject: [PATCH 17/21] Fix test cases of request_handler --- aikido_zen/sources/functions/request_handler_test.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/aikido_zen/sources/functions/request_handler_test.py b/aikido_zen/sources/functions/request_handler_test.py index f85c1688..826f0c72 100644 --- a/aikido_zen/sources/functions/request_handler_test.py +++ b/aikido_zen/sources/functions/request_handler_test.py @@ -1,6 +1,6 @@ import pytest from unittest.mock import patch, MagicMock -from aikido_zen.thread.thread_cache import get_cache, ThreadCache, threadlocal_storage +from aikido_zen.thread.thread_cache import get_cache, ThreadCache from .request_handler import request_handler, post_response from ...background_process.service_config import ServiceConfig from ...context import Context, current_context @@ -22,9 +22,10 @@ def mock_context(): @pytest.fixture(autouse=True) def run_around_tests(): + get_cache().reset() current_context.set(None) yield - setattr(threadlocal_storage, "cache", None) + get_cache().reset() current_context.set(None) From 05a87f56603196aaf9432c4d536c3d6aa9481a81 Mon Sep 17 00:00:00 2001 From: BitterPanda63 Date: Wed, 26 Mar 2025 11:36:36 +0100 Subject: [PATCH 18/21] Fix request_handler and it's test cases --- .../sources/functions/request_handler.py | 4 +--- .../sources/functions/request_handler_test.py | 20 ++++--------------- 2 files changed, 5 insertions(+), 19 deletions(-) diff --git a/aikido_zen/sources/functions/request_handler.py b/aikido_zen/sources/functions/request_handler.py index 00cae7a7..16b58788 100644 --- a/aikido_zen/sources/functions/request_handler.py +++ b/aikido_zen/sources/functions/request_handler.py @@ -1,6 +1,5 @@ """Exports request_handler function""" -import aikido_zen.background_process as communications import aikido_zen.context as ctx from aikido_zen.api_discovery.get_api_info import get_api_info from aikido_zen.api_discovery.update_route_info import update_route_info @@ -73,8 +72,7 @@ def pre_response(): def post_response(status_code): """Checks if the current route is useful""" context = ctx.get_current_context() - comms = communications.get_comms() - if not context or not comms: + if not context: return route_metadata = context.get_route_metadata() diff --git a/aikido_zen/sources/functions/request_handler_test.py b/aikido_zen/sources/functions/request_handler_test.py index 826f0c72..855ff6b0 100644 --- a/aikido_zen/sources/functions/request_handler_test.py +++ b/aikido_zen/sources/functions/request_handler_test.py @@ -1,7 +1,7 @@ import pytest from unittest.mock import patch, MagicMock from aikido_zen.thread.thread_cache import get_cache, ThreadCache -from .request_handler import request_handler, post_response +from .request_handler import request_handler from ...background_process.service_config import ServiceConfig from ...context import Context, current_context @@ -29,13 +29,10 @@ def run_around_tests(): current_context.set(None) -@patch("aikido_zen.background_process.get_comms") -def test_post_response_useful_route(mock_get_comms, mock_context): +def test_post_response_useful_route(mock_context): """Test post_response when the route is useful.""" - comms = MagicMock() - mock_get_comms.return_value = comms - cache = ThreadCache() # Creates a new cache + cache = get_cache() # Creates a new cache assert cache.routes.routes == {} with patch("aikido_zen.context.get_current_context", return_value=mock_context): request_handler("post_response", status_code=200) @@ -51,15 +48,6 @@ def test_post_response_useful_route(mock_get_comms, mock_context): } } - comms.send_data_to_bg_process.assert_called_once_with( - "INITIALIZE_ROUTE", - { - "route": "/test/route", - "method": "GET", - "url": "http://localhost:8080/test/route", - }, - ) - @patch("aikido_zen.background_process.get_comms") def test_post_response_not_useful_route(mock_get_comms, mock_context): @@ -130,7 +118,7 @@ def create_service_config(blocked_ips=None): ) if blocked_ips: config.set_blocked_ips(blocked_ips) - ThreadCache().config = config + get_cache().config = config return config From 4ddd294d1342c4df7060435a8dad33f31d920a76 Mon Sep 17 00:00:00 2001 From: BitterPanda63 Date: Wed, 26 Mar 2025 11:39:26 +0100 Subject: [PATCH 19/21] Fix broken test case for vulnerability scanner --- aikido_zen/vulnerabilities/init_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aikido_zen/vulnerabilities/init_test.py b/aikido_zen/vulnerabilities/init_test.py index c5d6062b..21d2ac9a 100644 --- a/aikido_zen/vulnerabilities/init_test.py +++ b/aikido_zen/vulnerabilities/init_test.py @@ -82,7 +82,7 @@ def test_ssrf(caplog, get_context): def test_lifecycle_cache_bypassed_ip(caplog, get_context): get_context.set_as_current_context() - cache = ThreadCache() + cache = get_cache() cache.config.bypassed_ips = IPList() cache.config.bypassed_ips.add("198.51.100.23") assert cache.is_bypassed_ip("198.51.100.23") From c43ab2f837444d34203a356c5fadbb64e90c5bc9 Mon Sep 17 00:00:00 2001 From: Wout Feys Date: Wed, 26 Mar 2025 14:06:24 +0100 Subject: [PATCH 20/21] validate the firewall lists API response correctly --- .../cloud_connection_manager/update_firewall_lists.py | 11 +++++------ end2end/server/mock_aikido_core.py | 2 ++ 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/aikido_zen/background_process/cloud_connection_manager/update_firewall_lists.py b/aikido_zen/background_process/cloud_connection_manager/update_firewall_lists.py index c4a95515..41b6cf3c 100644 --- a/aikido_zen/background_process/cloud_connection_manager/update_firewall_lists.py +++ b/aikido_zen/background_process/cloud_connection_manager/update_firewall_lists.py @@ -17,12 +17,11 @@ def update_firewall_lists(connection_manager): blocked_user_agents = res.get("blockedUserAgents") # Validate response - if ( - not res.get("success") - and blocked_ips - and allowed_ips - and blocked_user_agents - ): + if not res.get("success"): + return + if not isinstance(blocked_ips, list) or not isinstance(allowed_ips, list): + return + if not isinstance(blocked_user_agents, str): return connection_manager.conf.set_blocked_ips(blocked_ips) diff --git a/end2end/server/mock_aikido_core.py b/end2end/server/mock_aikido_core.py index cb7f2567..4fbc2748 100644 --- a/end2end/server/mock_aikido_core.py +++ b/end2end/server/mock_aikido_core.py @@ -36,6 +36,8 @@ "ips": ["1.2.3.4"] } ], + "allowedIPAddresses": [], + "blockedUserAgents": "", }, "configUpdatedAt": {}, } From 57825879b6464032a7bacacd772fcaa89e0247f2 Mon Sep 17 00:00:00 2001 From: BitterPanda63 Date: Tue, 1 Apr 2025 11:10:13 +0200 Subject: [PATCH 21/21] Improves heartbeat event validation Enhances heartbeat event validation by adding informative assertion messages, making debugging easier when validation fails. --- end2end/server/check_events_from_mock.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/end2end/server/check_events_from_mock.py b/end2end/server/check_events_from_mock.py index af8724be..8da473f3 100644 --- a/end2end/server/check_events_from_mock.py +++ b/end2end/server/check_events_from_mock.py @@ -22,6 +22,7 @@ def validate_started_event(event, stack, dry_mode=False, serverless=False, os_na # assert set(event["agent"]["stack"]) == set(stack) def validate_heartbeat(event, routes, req_stats): - assert event["type"] == "heartbeat" - assert event["routes"] == routes - assert event["stats"]["requests"] == req_stats + assert event["type"] == "heartbeat", f"Expected event type 'heartbeat', but got '{event['type']}'" + assert event["routes"] == routes, f"Expected routes '{routes}', but got '{event['routes']}'" + assert event["stats"]["requests"] == req_stats, f"Expected request stats '{req_stats}', but got '{event['stats']['requests']}'" +