Skip to content

Commit 6ccd49e

Browse files
authored
feat: utility function to persist critical error (#222)
1 parent 157a90d commit 6ccd49e

File tree

7 files changed

+311
-2
lines changed

7 files changed

+311
-2
lines changed

pynumaflow/_constants.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,11 @@
77
SIDE_INPUT_DIR_PATH = "/var/numaflow/side-inputs"
88
ENV_UD_CONTAINER_TYPE = "NUMAFLOW_UD_CONTAINER_TYPE"
99

10-
# Get container type from env var, default to unknown-container
10+
# Error Constants
11+
RUNTIME_APPLICATION_ERRORS_PATH = "/var/numaflow/runtime/application-errors"
12+
CURRENT_CRITICAL_ERROR_FILE = "current-udf.json"
13+
INTERNAL_ERROR_CODE = "Internal error"
1114
CONTAINER_TYPE = os.getenv(ENV_UD_CONTAINER_TYPE, "unknown-container")
12-
# UDF exception error string with container type
1315
ERR_UDF_EXCEPTION_STRING = f"UDF_EXECUTION_ERROR({CONTAINER_TYPE})"
1416

1517
# Socket configs

pynumaflow/errors/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from pynumaflow.errors.errors import persist_critical_error
2+
3+
__all__ = ["persist_critical_error"]

pynumaflow/errors/_dtypes.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
from dataclasses import dataclass, asdict
2+
3+
4+
@dataclass
5+
class _RuntimeErrorEntry:
6+
"""Represents a runtime error entry to be persisted."""
7+
8+
container: str
9+
timestamp: int
10+
code: str
11+
message: str
12+
details: str
13+
14+
def to_dict(self) -> dict:
15+
"""Converts the dataclass instance to a dictionary."""
16+
return asdict(self)

pynumaflow/errors/errors.py

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
import os
2+
import json
3+
import threading
4+
import time
5+
from pynumaflow._constants import (
6+
CONTAINER_TYPE,
7+
RUNTIME_APPLICATION_ERRORS_PATH,
8+
CURRENT_CRITICAL_ERROR_FILE,
9+
INTERNAL_ERROR_CODE,
10+
)
11+
from pynumaflow.errors._dtypes import _RuntimeErrorEntry
12+
from typing import Union
13+
14+
15+
class _PersistErrorOnce:
16+
"""Ensures that the persist_critical_error function is executed only once."""
17+
18+
def __init__(self):
19+
self.done = False
20+
self.lock = threading.Lock()
21+
22+
def execute(self, func, *args, **kwargs):
23+
with self.lock:
24+
if self.done:
25+
raise RuntimeError("Persist critical error function has already been executed.")
26+
self.done = True
27+
return func(*args, **kwargs)
28+
29+
30+
_persist_error_once = _PersistErrorOnce()
31+
32+
33+
def persist_critical_error(
34+
error_code: str, error_message: str, error_details: str
35+
) -> Union[RuntimeError, None]:
36+
"""
37+
Persists a critical error to a file. This function will only execute once.
38+
Logs the error if persisting to the file fails.
39+
Returns None if successful, or raises RuntimeError if already executed.
40+
"""
41+
try:
42+
_persist_error_once.execute(
43+
_persist_critical_error_to_file,
44+
error_code,
45+
error_message,
46+
error_details,
47+
RUNTIME_APPLICATION_ERRORS_PATH,
48+
)
49+
except RuntimeError as e:
50+
return e
51+
except Exception as e:
52+
print(f"Error in persisting critical error: {e}")
53+
return None
54+
55+
56+
def _persist_critical_error_to_file(
57+
error_code: str, error_message: str, error_details: str, dir_path: str
58+
):
59+
"""Internal function to persist a critical error to a file."""
60+
61+
os.makedirs(dir_path, mode=0o777, exist_ok=True)
62+
container_dir = os.path.join(dir_path, CONTAINER_TYPE)
63+
os.makedirs(container_dir, mode=0o777, exist_ok=True)
64+
65+
current_file_path = os.path.join(container_dir, CURRENT_CRITICAL_ERROR_FILE)
66+
error_code = error_code or INTERNAL_ERROR_CODE
67+
current_timestamp = int(time.time())
68+
69+
runtime_error_entry = _RuntimeErrorEntry(
70+
container=CONTAINER_TYPE,
71+
timestamp=current_timestamp,
72+
code=error_code,
73+
message=error_message,
74+
details=error_details,
75+
)
76+
77+
with open(current_file_path, "w") as f:
78+
json.dump(runtime_error_entry.to_dict(), f)
79+
80+
final_file_name = f"{current_timestamp}-udf.json"
81+
final_file_path = os.path.join(container_dir, final_file_name)
82+
os.rename(current_file_path, final_file_path)

tests/errors/__init__.py

Whitespace-only changes.

tests/errors/test_dtypes.py

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
import unittest
2+
from pynumaflow.errors._dtypes import _RuntimeErrorEntry
3+
4+
5+
class TestRuntimeErrorEntry(unittest.TestCase):
6+
def test_runtime_error_entry_initialization(self):
7+
"""
8+
Test that _RuntimeErrorEntry initializes correctly with given values.
9+
"""
10+
container = "test-container"
11+
timestamp = 1680700000
12+
code = "500"
13+
message = "Test error message"
14+
details = "Test error details"
15+
16+
error_entry = _RuntimeErrorEntry(container, timestamp, code, message, details)
17+
18+
self.assertEqual(error_entry.container, container)
19+
self.assertEqual(error_entry.timestamp, timestamp)
20+
self.assertEqual(error_entry.code, code)
21+
self.assertEqual(error_entry.message, message)
22+
self.assertEqual(error_entry.details, details)
23+
24+
def test_runtime_error_entry_to_dict(self):
25+
"""
26+
Test that _RuntimeErrorEntry converts to a dictionary correctly.
27+
"""
28+
container = "test-container"
29+
timestamp = 1680700000
30+
code = "500"
31+
message = "Test error message"
32+
details = "Test error details"
33+
34+
error_entry = _RuntimeErrorEntry(container, timestamp, code, message, details)
35+
error_dict = error_entry.to_dict()
36+
37+
expected_dict = {
38+
"container": container,
39+
"timestamp": timestamp,
40+
"code": code,
41+
"message": message,
42+
"details": details,
43+
}
44+
45+
self.assertEqual(error_dict, expected_dict)
46+
47+
def test_runtime_error_entry_empty_values(self):
48+
"""
49+
Test that _RuntimeErrorEntry handles empty values correctly.
50+
"""
51+
container = ""
52+
timestamp = 0
53+
code = ""
54+
message = ""
55+
details = ""
56+
57+
error_entry = _RuntimeErrorEntry(container, timestamp, code, message, details)
58+
error_dict = error_entry.to_dict()
59+
60+
expected_dict = {
61+
"container": container,
62+
"timestamp": timestamp,
63+
"code": code,
64+
"message": message,
65+
"details": details,
66+
}
67+
68+
self.assertEqual(error_dict, expected_dict)
69+
70+
71+
if __name__ == "__main__":
72+
unittest.main()
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
import os
2+
import json
3+
import shutil
4+
import threading
5+
import unittest
6+
from pynumaflow.errors.errors import persist_critical_error, _persist_error_once
7+
from pynumaflow.errors.errors import _persist_critical_error_to_file
8+
from pynumaflow._constants import CONTAINER_TYPE, INTERNAL_ERROR_CODE
9+
10+
11+
class TestErrorPersistence(unittest.TestCase):
12+
def setUp(self):
13+
"""
14+
Set up temporary directories for tests.
15+
"""
16+
self.test_dirs = ["/tmp/test_error_dir", "/tmp/test_dir"]
17+
18+
def tearDown(self):
19+
"""
20+
Clean up temporary directories after tests.
21+
"""
22+
for dir_path in self.test_dirs:
23+
if os.path.exists(dir_path):
24+
shutil.rmtree(dir_path)
25+
26+
# Writes error details to a JSON file
27+
def test_writes_error_details_to_json_file(self):
28+
"""
29+
Test that _persist_critical_error_to_file writes error details to a JSON file.
30+
"""
31+
32+
dir_path = self.test_dirs[0]
33+
34+
error_code = "500"
35+
error_message = "Server Error"
36+
error_details = "An unexpected error occurred."
37+
38+
_persist_critical_error_to_file(error_code, error_message, error_details, dir_path)
39+
40+
container_dir = os.path.join(dir_path, CONTAINER_TYPE)
41+
self.assertTrue(os.path.exists(container_dir))
42+
43+
# Debug: Check directory after the function call
44+
print(f"After: {os.listdir(container_dir)}")
45+
46+
files = os.listdir(container_dir)
47+
self.assertEqual(len(files), 1)
48+
49+
final_file_name = files[0]
50+
final_file_path = os.path.join(container_dir, final_file_name)
51+
52+
with open(final_file_path) as f:
53+
data = json.load(f)
54+
55+
self.assertEqual(data["code"], error_code)
56+
self.assertEqual(data["message"], error_message)
57+
self.assertEqual(data["details"], error_details)
58+
self.assertEqual(data["container"], CONTAINER_TYPE)
59+
self.assertTrue(isinstance(data["timestamp"], int))
60+
61+
# Uses default error code if none provided
62+
def test_uses_default_error_code_if_none_provided(self):
63+
"""
64+
Test that _persist_critical_error_to_file uses the default error code if none is provided.
65+
"""
66+
dir_path = self.test_dirs[1]
67+
68+
_persist_critical_error_to_file("", "Error Message", "Error Details", dir_path)
69+
70+
container_dir = os.path.join(dir_path, "unknown-container")
71+
self.assertTrue(os.path.exists(container_dir))
72+
73+
files = os.listdir(container_dir)
74+
self.assertEqual(len(files), 1)
75+
76+
with open(os.path.join(container_dir, files[0])) as f:
77+
error_data = json.load(f)
78+
self.assertEqual(error_data["code"], INTERNAL_ERROR_CODE)
79+
80+
def test_persist_critical_error_all_threads_fail(self):
81+
"""
82+
Test that all threads fail when persist_critical_error is executed after the first call.
83+
"""
84+
error_code = "testCode"
85+
error_message = "testMessage"
86+
error_details = "testDetails"
87+
88+
# Set `done` to True to simulate that the critical error has already been persisted
89+
_persist_error_once.done = True
90+
91+
try:
92+
# Set up threading
93+
num_threads = 10
94+
errors = []
95+
lock = threading.Lock()
96+
97+
def thread_func():
98+
nonlocal errors
99+
result = persist_critical_error(error_code, error_message, error_details)
100+
with lock:
101+
errors.append(result)
102+
103+
# Create and start threads
104+
threads = []
105+
for _ in range(num_threads):
106+
thread = threading.Thread(target=thread_func)
107+
threads.append(thread)
108+
thread.start()
109+
110+
# Wait for all threads to complete
111+
for thread in threads:
112+
thread.join()
113+
114+
# Count the number of failures
115+
fail_count = sum(
116+
1
117+
for error in errors
118+
if error is not None
119+
and "Persist critical error function has already been executed" in str(error)
120+
)
121+
122+
# Assert that all threads failed
123+
self.assertEqual(
124+
fail_count,
125+
num_threads,
126+
f"Expected all {num_threads} threads to fail, but only {fail_count} failed",
127+
)
128+
finally:
129+
# Revert `done` back to False after the test
130+
_persist_error_once.done = False
131+
132+
133+
if __name__ == "__main__":
134+
unittest.main()

0 commit comments

Comments
 (0)