Skip to content

Improvement for incoherent statistics #347

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 27 commits into from
Apr 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
5b47fc8
Create new process worker script and run it on context creation
bitterpanda63 Mar 20, 2025
6aa778b
Remove INITIALIZE_ROUTE command
bitterpanda63 Mar 21, 2025
ead26f8
Cleanup of process_worker logic
bitterpanda63 Mar 21, 2025
932d840
Create a process_worker and process_worker_loader in thread
bitterpanda63 Mar 21, 2025
c10d834
make thread_cache process-llocal
bitterpanda63 Mar 21, 2025
312c115
Improve route increment stuff
bitterpanda63 Mar 21, 2025
a1f9a25
fix thread_cache_test test cases
bitterpanda63 Mar 21, 2025
a74b346
Fix all test cases that relied on internal thread cache logic
bitterpanda63 Mar 21, 2025
2f08b88
Cleanup thread_cache.py
bitterpanda63 Mar 21, 2025
950849c
make log message make some sense
bitterpanda63 Mar 21, 2025
436f630
fix thread_cache res check and only start process_worker with context
bitterpanda63 Mar 21, 2025
f6d6cfc
Fix broken routes test
bitterpanda63 Mar 21, 2025
3f570f3
Fix django mysql e2e test
bitterpanda63 Mar 25, 2025
f41512e
Also add apispec to e2e test
bitterpanda63 Mar 25, 2025
19c7b62
Total is 3 requests for django_mysql e2e
bitterpanda63 Mar 26, 2025
5b83da4
Fix request_handler
bitterpanda63 Mar 26, 2025
0c929cc
Fix test cases of request_handler
bitterpanda63 Mar 26, 2025
05a87f5
Fix request_handler and it's test cases
bitterpanda63 Mar 26, 2025
4ddd294
Fix broken test case for vulnerability scanner
bitterpanda63 Mar 26, 2025
c43ab2f
validate the firewall lists API response correctly
bitterpanda63 Mar 26, 2025
645e303
Merge remote-tracking branch 'origin/fix/validate-firewall-lists-corr…
bitterpanda63 Mar 26, 2025
61c9ae9
Merge branch 'main' into improve-thread-caching-separated
bitterpanda63 Mar 26, 2025
3d228bb
Merge branch 'main' into improve-thread-caching-separated
bitterpanda63 Apr 1, 2025
960212f
Merge remote-tracking branch 'origin/main' into improve-thread-cachin…
bitterpanda63 Apr 1, 2025
5782587
Improves heartbeat event validation
bitterpanda63 Apr 1, 2025
d3225ba
Merge remote-tracking branch 'origin/main' into improve-thread-cachin…
bitterpanda63 Apr 10, 2025
b08726d
Merge branch 'main' into improve-thread-caching-separated
bitterpanda63 Apr 10, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions aikido_zen/background_process/commands/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from aikido_zen.helpers.logging import logger
from .attack import process_attack
from .read_property import process_read_property
from .initialize_route import process_initialize_route
from .user import process_user
from .should_ratelimit import process_should_ratelimit
from .kill import process_kill
Expand All @@ -16,7 +15,6 @@
# This maps to a tuple : (function, returns_data?)
# Commands that don't return data :
"ATTACK": (process_attack, False),
"INITIALIZE_ROUTE": (process_initialize_route, False),
"USER": (process_user, False),
"KILL": (process_kill, False),
"STATISTICS": (process_statistics, False),
Expand Down
11 changes: 0 additions & 11 deletions aikido_zen/background_process/commands/initialize_route.py

This file was deleted.

37 changes: 0 additions & 37 deletions aikido_zen/background_process/commands/initialize_route_test.py

This file was deleted.

11 changes: 4 additions & 7 deletions aikido_zen/background_process/routes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,17 @@ def __init__(self, max_size=1000):

def initialize_route(self, route_metadata):
"""
Initializes a route for the first time.
Initializes a route for the first time. `hits_delta_since_sync` counts delta between syncs.
"""
self.manage_routes_size()
key = route_to_key(route_metadata)
if self.routes.get(key):
return
self.routes[key] = {
"method": route_metadata.get("method"),
"path": route_metadata.get("route"),
"hits": 0,
"hits_delta_since_sync": 0,
"apispec": {},
}
# This field counts the difference in hits in between synchronisation for threads :
self.routes[key]["hits_delta_since_sync"] = 0

def increment_route(self, route_metadata):
"""
Expand All @@ -41,8 +38,8 @@ def increment_route(self, route_metadata):
"""
key = route_to_key(route_metadata)
if not self.routes.get(key):
return
# Add a hit to the route :
self.initialize_route(route_metadata)
# Add hits to
route = self.routes.get(key)
route["hits"] += 1
route["hits_delta_since_sync"] += 1
Expand Down
7 changes: 0 additions & 7 deletions aikido_zen/background_process/routes/init_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,6 @@ def test_increment_route_twice():
assert routes.routes["GET:/api/resource"]["hits"] == 2


def test_increment_route_that_does_not_exist():
routes = Routes(max_size=3)
routes.increment_route(gen_route_metadata(route="/api/resource"))
routes.increment_route(gen_route_metadata(route="/api/resource"))
assert len(routes.routes) == 0


def test_clear_routes():
routes = Routes(max_size=3)
routes.initialize_route(gen_route_metadata(route="/api/resource"))
Expand Down
15 changes: 6 additions & 9 deletions aikido_zen/middleware/init_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,18 @@

import pytest
from aikido_zen.context import current_context, Context, get_current_context
from aikido_zen.thread.thread_cache import ThreadCache, threadlocal_storage
from aikido_zen.thread.thread_cache import ThreadCache, get_cache
from . import should_block_request


@pytest.fixture(autouse=True)
def run_around_tests():
get_cache().reset()
yield
# Make sure to reset context and cache after every test so it does not
# interfere with other tests
current_context.set(None)
get_cache().reset()


def test_without_context():
Expand Down Expand Up @@ -39,20 +41,15 @@ def set_context(user=None, executed_middleware=False):
).set_as_current_context()


class MyThreadCache(ThreadCache):
def renew_if_ttl_expired(self):
return


def test_with_context_without_cache():
set_context()
threadlocal_storage.cache = None
get_cache().cache = None
assert should_block_request() == {"block": False}


def test_with_context_with_cache():
set_context(user={"id": "123"})
thread_cache = MyThreadCache()
thread_cache = get_cache()

thread_cache.config.blocked_uids = ["123"]
assert get_current_context().executed_middleware == False
Expand All @@ -76,7 +73,7 @@ def test_with_context_with_cache():

def test_cache_comms_with_endpoints():
set_context(user={"id": "456"})
thread_cache = MyThreadCache()
thread_cache = get_cache()
thread_cache.config.blocked_uids = ["123"]
thread_cache.config.endpoints = [
{
Expand Down
6 changes: 3 additions & 3 deletions aikido_zen/sinks/tests/requests_test.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import os
import pytest
from aikido_zen.context import Context, current_context
from aikido_zen.thread.thread_cache import ThreadCache, threadlocal_storage
from aikido_zen.thread.thread_cache import ThreadCache, get_cache
from aikido_zen.errors import AikidoSSRF
from aikido_zen.background_process.comms import reset_comms
import aikido_zen.sinks.socket
Expand All @@ -19,11 +19,12 @@

@pytest.fixture(autouse=True)
def run_around_tests():
get_cache().reset()
yield
# Make sure to reset context and cache after every test so it does not
# interfere with other tests
current_context.set(None)
setattr(threadlocal_storage, "cache", None)
get_cache().reset()


def set_context_and_lifecycle(url):
Expand All @@ -48,7 +49,6 @@ def set_context_and_lifecycle(url):
source="flask",
)
context.set_as_current_context()
ThreadCache()


def test_srrf_test(monkeypatch):
Expand Down
6 changes: 3 additions & 3 deletions aikido_zen/sinks/tests/urrlib3_test.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import os
import pytest
from aikido_zen.context import Context, current_context
from aikido_zen.thread.thread_cache import ThreadCache, threadlocal_storage
from aikido_zen.thread.thread_cache import get_cache
from aikido_zen.errors import AikidoSSRF
from aikido_zen.background_process.comms import reset_comms
import aikido_zen.sinks.socket
Expand All @@ -19,11 +19,12 @@

@pytest.fixture(autouse=True)
def run_around_tests():
get_cache().reset()
yield
# Make sure to reset context and cache after every test so it does not
# interfere with other tests
current_context.set(None)
setattr(threadlocal_storage, "cache", None)
get_cache().reset()


def set_context_and_lifecycle(url, host=None):
Expand All @@ -50,7 +51,6 @@ def set_context_and_lifecycle(url, host=None):
source="flask",
)
context.set_as_current_context()
ThreadCache()


def test_srrf_test(monkeypatch):
Expand Down
20 changes: 6 additions & 14 deletions aikido_zen/sources/functions/request_handler.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
"""Exports request_handler function"""

import aikido_zen.background_process as communications
import aikido_zen.context as ctx
from aikido_zen.api_discovery.get_api_info import get_api_info
from aikido_zen.api_discovery.update_route_info import update_route_info
Expand All @@ -15,12 +14,9 @@ def request_handler(stage, status_code=0):
"""This will check for rate limiting, Allowed IP's, useful routes, etc."""
try:
if stage == "init":
# Initial stage of the request, called after context is stored.
thread_cache = get_cache()
thread_cache.renew_if_ttl_expired() # Only check TTL at the start of a request.
if ctx.get_current_context() and thread_cache:
thread_cache.increment_stats() # Increment request statistics if a context exists.

if stage == "pre_response":
return pre_response()
if stage == "post_response":
Expand Down Expand Up @@ -76,27 +72,23 @@ def pre_response():
def post_response(status_code):
"""Checks if the current route is useful"""
context = ctx.get_current_context()
comms = communications.get_comms()
if not context or not comms:
if not context:
return
route_metadata = context.get_route_metadata()

is_curr_route_useful = is_useful_route(
status_code,
context.route,
context.method,
)
if not is_curr_route_useful:
return
route_metadata = context.get_route_metadata()

cache = get_cache()
if cache:
route = cache.routes.get(route_metadata)
if not route:
# This route does not exist yet, initialize it:
cache.routes.initialize_route(route_metadata)
comms.send_data_to_bg_process("INITIALIZE_ROUTE", route_metadata)
cache.routes.increment_route(route_metadata)

# Run API Discovery :
update_route_info(
new_apispec=get_api_info(context), route=cache.routes.get(route_metadata)
)
# Add hit :
cache.routes.increment_route(route_metadata)
25 changes: 7 additions & 18 deletions aikido_zen/sources/functions/request_handler_test.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import pytest
from unittest.mock import patch, MagicMock
from aikido_zen.thread.thread_cache import get_cache, ThreadCache, threadlocal_storage
from .request_handler import request_handler, post_response
from aikido_zen.thread.thread_cache import get_cache, ThreadCache
from .request_handler import request_handler
from ...background_process.service_config import ServiceConfig
from ...context import Context, current_context

Expand All @@ -22,19 +22,17 @@ def mock_context():

@pytest.fixture(autouse=True)
def run_around_tests():
get_cache().reset()
current_context.set(None)
yield
setattr(threadlocal_storage, "cache", None)
get_cache().reset()
current_context.set(None)


@patch("aikido_zen.background_process.get_comms")
def test_post_response_useful_route(mock_get_comms, mock_context):
def test_post_response_useful_route(mock_context):
"""Test post_response when the route is useful."""
comms = MagicMock()
mock_get_comms.return_value = comms

cache = ThreadCache() # Creates a new cache
cache = get_cache() # Creates a new cache
assert cache.routes.routes == {}
with patch("aikido_zen.context.get_current_context", return_value=mock_context):
request_handler("post_response", status_code=200)
Expand All @@ -50,15 +48,6 @@ def test_post_response_useful_route(mock_get_comms, mock_context):
}
}

comms.send_data_to_bg_process.assert_called_once_with(
"INITIALIZE_ROUTE",
{
"route": "/test/route",
"method": "GET",
"url": "http://localhost:8080/test/route",
},
)


@patch("aikido_zen.background_process.get_comms")
def test_post_response_not_useful_route(mock_get_comms, mock_context):
Expand Down Expand Up @@ -129,7 +118,7 @@ def create_service_config(blocked_ips=None):
)
if blocked_ips:
config.set_blocked_ips(blocked_ips)
ThreadCache().config = config
get_cache().config = config
return config


Expand Down
28 changes: 28 additions & 0 deletions aikido_zen/thread/process_worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import multiprocessing
import time

from aikido_zen.helpers.logging import logger
from aikido_zen.thread import thread_cache

# Renew the cache from this background worker every 5 seconds
RENEW_CACHE_EVERY_X_SEC = 5


def aikido_process_worker_thread():
"""
process worker -> When a web server like gUnicorn makes new processes, and those have multiple threads,
Aikido process worker is linked to those new processes, so in essence it's 1 extra thread. This thread
is responsible for syncing statistics, route data, configuration, ...
"""
# Get the current process
current_process = multiprocessing.current_process()

while True:
# Print information about the process
logger.debug(
f"Process ID: {current_process.pid}, Name: {current_process.name} - process_worker renewing thread cache."
)

# Renew the cache every 5 seconds
thread_cache.renew()
time.sleep(RENEW_CACHE_EVERY_X_SEC)
23 changes: 23 additions & 0 deletions aikido_zen/thread/process_worker_loader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import multiprocessing
import threading

from aikido_zen.context import get_current_context
from aikido_zen.thread.process_worker import aikido_process_worker_thread


def load_worker():
"""
Loads in a new process worker if one does not already exist for the current process
"""
if get_current_context() is None:
return # don't start a worker if it's not related to a request.

# The name is aikido-process-worker- + the current PID
thread_name = "aikido-process-worker-" + str(multiprocessing.current_process().pid)
if any([thread.name == thread_name for thread in threading.enumerate()]):
return # The thread already exists, returning.

# Create a new daemon thread tht will handle communication to and from background agent
thread = threading.Thread(target=aikido_process_worker_thread, name=thread_name)
thread.daemon = True
thread.start()
Loading
Loading