Skip to content

Commit 28ba05d

Browse files
threaded output now added to tests during execution
1 parent 213a728 commit 28ba05d

File tree

3 files changed

+105
-49
lines changed

3 files changed

+105
-49
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ install: venv
1717

1818
.PHONY: test
1919
test: install
20-
$(COMMON_VARS) PROJECT_ID=$(PROJECT_ID) ${VENV_NAME}/bin/pytest -svvv --branch=$(SUBSTRATUS_BRANCH)
20+
$(COMMON_VARS) PROJECT_ID=$(PROJECT_ID) ${VENV_NAME}/bin/pytest -s --branch=$(SUBSTRATUS_BRANCH)
2121

2222

2323
.PHONY: freeze

tests/capture_output_stream.py

Lines changed: 68 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,18 @@
33
A utility to watch and stream outputs of local ipykernel events.
44
"""
55

6-
import asyncio
7-
from jupyter_client.asynchronous import AsyncKernelClient
6+
import logging
7+
from jupyter_client.blocking.client import BlockingKernelClient
88
from watchdog.observers import Observer
99
from watchdog.events import FileSystemEventHandler
1010
import glob
1111
import os
12-
import janus
12+
import re
13+
import threading
14+
from queue import Queue
15+
16+
ansi_escape = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])")
17+
logger = logging.getLogger(__name__)
1318

1419

1520
class NewFileHandler(FileSystemEventHandler):
@@ -19,8 +24,8 @@ def __init__(self, queue):
1924
def on_created(self, event):
2025
if event.is_directory or not event.src_path.endswith(".json"):
2126
return
22-
print(f"New kernel detected: {event.src_path}")
23-
self.queue.sync_q.put(event.src_path)
27+
logger.debug(f"New kernel detected: {event.src_path}")
28+
self.queue.put(event.src_path)
2429

2530

2631
def process_msg(msg):
@@ -30,52 +35,86 @@ def process_msg(msg):
3035
if "data" in msg["content"]
3136
else msg["content"].get("text", "")
3237
)
33-
print(f"Output: {output}")
38+
clean_text = ansi_escape.sub("", output)
39+
logger.info(clean_text.rstrip("\n"))
3440
else:
35-
print(f"Unhandled message type: {msg['msg_type']}")
36-
print(msg)
41+
logger.debug(f"Unhandled message type: {msg['msg_type']}")
42+
logger.debug(msg)
3743

3844

39-
async def watch_kernel(connection_file):
40-
kc = AsyncKernelClient(connection_file=connection_file)
45+
def watch_kernel(connection_file, stop_event):
46+
kc = BlockingKernelClient(connection_file=connection_file)
4147
kc.load_connection_file()
4248
kc.start_channels()
4349

44-
while True:
45-
msg = await kc.iopub_channel.get_msg()
46-
process_msg(msg)
50+
while not stop_event.is_set():
51+
try:
52+
msg = kc.get_iopub_msg(timeout=1)
53+
if msg:
54+
process_msg(msg)
55+
except Exception as _:
56+
continue
4757

4858

49-
async def watch_queue(queue, watched_files):
50-
while True:
51-
new_file = await queue.async_q.get()
59+
def watch_queue(queue, watched_files, stop_event):
60+
while not stop_event.is_set():
61+
new_file = queue.get()
5262
if new_file not in watched_files:
53-
print(f"Processing new kernel: {new_file}")
63+
logger.debug(f"Processing new kernel: {new_file}")
5464
watched_files.add(new_file)
55-
asyncio.create_task(watch_kernel(new_file))
65+
threading.Thread(target=watch_kernel, args=(new_file, stop_event)).start()
5666

5767

58-
async def main():
68+
def start_watches():
69+
stop_event = threading.Event()
70+
threads = []
5971
paths_to_watch = [
60-
f"{os.path.expanduser('~')}/Library/Jupyter/runtime/", # only works on mac OS
61-
"/private/var/folders/9n/1rd9yjf913s10bzn5w9mdf_m0000gn/T/", # I'm certain this is not portable as is
72+
f"{os.path.expanduser('~')}/Library/Jupyter/runtime/",
73+
"/private/var/folders/9n/1rd9yjf913s10bzn5w9mdf_m0000gn/T/",
6274
"/tmp/",
6375
]
6476

6577
existing_files = {
6678
f for path in paths_to_watch for f in glob.glob(os.path.join(path, "*.json"))
6779
}
68-
print(f"Watching {len(existing_files)} existing files")
69-
70-
queue = janus.Queue()
71-
72-
tasks = [watch_kernel(config_file) for config_file in existing_files]
80+
logger.info(f"Watching {len(existing_files)} existing files")
81+
82+
queue = Queue()
83+
watched_files = set(existing_files)
84+
for config_file in existing_files:
85+
watch_thread = threading.Thread(
86+
target=watch_kernel,
87+
args=(
88+
config_file,
89+
stop_event,
90+
),
91+
)
92+
watch_thread.start()
93+
threads.append(watch_thread)
7394

7495
observer = Observer()
7596
for path in paths_to_watch:
7697
observer.schedule(NewFileHandler(queue), path, recursive=True)
7798

78-
observer_task = asyncio.get_event_loop().run_in_executor(None, observer.start)
79-
queue_watcher = watch_queue(queue, existing_files)
99+
observer.start()
100+
101+
# Start the watch_queue function in a separate thread
102+
watch_queue_thread = threading.Thread(
103+
target=watch_queue,
104+
args=(
105+
queue,
106+
watched_files,
107+
stop_event,
108+
),
109+
)
110+
watch_queue_thread.start()
111+
threads.append(watch_queue_thread)
112+
return threads, stop_event
113+
114+
115+
def main():
116+
start_watches()
117+
80118

81-
await asyncio.gather(*tasks, observer_task, queue_watcher)
119+
if __name__ == "__main__":
120+
main()

tests/conftest.py

Lines changed: 36 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,16 @@
11
import pytest
2-
import asyncio
32
import os
43
import subprocess
54

6-
from pytest_dependency import depends
75
from testbook import testbook
86
from google.cloud import storage
97
import google.auth
108
from google.auth.transport.requests import Request
119
from testbook.testbook import TestbookNotebookClient
1210
import logging
13-
from capture_output_stream import main
11+
from capture_output_stream import start_watches
1412

15-
logging.basicConfig(level=logging.INFO)
16-
# TODO(bjb): remove
17-
logging.basicConfig(level=logging.DEBUG)
13+
logging.basicConfig(level=logging.INFO, format="%(levelname)s:%(name)s:%(message)s")
1814
logger = logging.getLogger(__name__)
1915

2016

@@ -124,19 +120,40 @@ def auth_tb_serving_models(branch):
124120

125121
@pytest.fixture(scope="session", autouse=True)
126122
def gcp_setup(auth_tb_quickstart):
127-
logger.debug("Starting gcp_setup")
128-
129-
# Create a new event loop for this fixture
130-
loop = asyncio.new_event_loop()
131-
asyncio.set_event_loop(loop)
132-
133-
logger.info("before capturing stream")
134-
main_task = loop.create_task(main()) # Create task using the created loop
135-
logger.info("output should stream")
136-
# ... rest of the code ...
137-
138-
# Close the event loop when done
139-
loop.close()
123+
threads, stop_event = start_watches() # start watches in threads
124+
for attempt in range(3): # Retry up to 3 times
125+
logger.debug(f"Attempt {attempt} to execute installer gcp-up")
126+
try:
127+
auth_tb_quickstart.execute_cell("installer gcp-up")
128+
assert "Apply complete!" in auth_tb_quickstart.cell_output_text(
129+
"installer gcp-up"
130+
)
131+
break
132+
except Exception as err:
133+
logger.warning(f"gcp-up encountered an error: {err}")
134+
if attempt == 1:
135+
delete_state_lock()
136+
continue
137+
138+
yield # teardown below the yield
139+
140+
logger.debug("Tearing down gcp_setup")
141+
for attempt in range(3): # Retry up to 3 times
142+
try:
143+
auth_tb_quickstart.execute_cell("installer gcp-down")
144+
assert "Apply complete!" in auth_tb_quickstart.cell_output_text(
145+
"installer gcp-down"
146+
)
147+
break
148+
except Exception as err:
149+
logger.warning(f"gcp-down encountered an error: {err}")
150+
if attempt == 1:
151+
delete_state_lock()
152+
continue
153+
finally:
154+
stop_event.set()
155+
for thread in threads:
156+
thread.join() # Wait for threads to finish
140157

141158

142159
def delete_state_lock(

0 commit comments

Comments
 (0)