-
Notifications
You must be signed in to change notification settings - Fork 24
/
Copy pathexperiment.py
245 lines (199 loc) · 7.83 KB
/
experiment.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
from __future__ import absolute_import, division, print_function, unicode_literals
import sys
import uuid
import threading
import numpy as np
from threading import Lock
from datetime import datetime
from six.moves.queue import Queue
from .client import HDClient
from .constants import API_NAME_EXPERIMENT
from .constants import API_NAME_CLI_TENSORBOARD
from .monitor import monitor
from .io_buffer import IOBuffer
from .server_manager import ServerManagerHTTP
from .hyper_dash import HyperDash
from .utils import get_logger
# Python 2/3 compatibility
__metaclass__ = type
KERAS = "keras"
class ExperimentRunner:
"""
No-op class for reusing CodeRunner architecture
"""
def __init__(
self,
done=False,
exit_cleanly=True,
):
self.done = done
self.lock = Lock()
self.exit_cleanly = exit_cleanly
self.start_time = None
self.end_time = None
def is_done(self):
with self.lock:
return self.exit_cleanly, self.done
def get_return_val(self):
with self.lock:
return None
def get_exception(self):
with self.lock:
return None
def should_run_as_thread(self):
return False
def get_start_and_end_time(self):
return self.start_time, self.end_time
def _set_start_time(self, start_time):
self.start_time = start_time
def _set_end_time(self, end_time):
self.end_time = end_time
class Experiment:
"""Experiment records hyperparameters and metrics. The recorded values
are sent to the Hyperdash server.
Example:
exp = Experiment("MNIST")
exp.param("batch size", 32)
"""
_api_name = API_NAME_EXPERIMENT
def __init__(
self,
model_name,
api_key_getter=None,
capture_io=True,
):
"""Initialize the HyperDash class.
args:
1) model_name: Name of the model. Experiment number will autoincrement.
2) capture_io: Should save stdout/stderror to log file and upload it to Hyperdash.
"""
self.model_name = model_name
self.callbacks = Callbacks(self)
self._experiment_runner = ExperimentRunner()
self.lock = Lock()
# Create a UUID to uniquely identify this run from the SDK's point of view
current_sdk_run_uuid = str(uuid.uuid4())
# Capture STDOUT/STDERR before they're modified
self._old_out, self._old_err = sys.stdout, sys.stderr
# Buffers to which to redirect output so we can capture it
out = [IOBuffer(), IOBuffer()]
self._logger = get_logger(model_name, current_sdk_run_uuid, out[0])
if capture_io:
# Redirect STDOUT/STDERR to buffers
sys.stdout, sys.stderr = out
server_manager = ServerManagerHTTP(api_key_getter, self._logger, self._api_name)
self._hd_client = HDClient(self._logger, server_manager, current_sdk_run_uuid)
self._hd = HyperDash(
model_name,
current_sdk_run_uuid,
server_manager,
out,
(self._old_out, self._old_err,),
self._logger,
self._experiment_runner,
)
# Channel to update once experiment has finished running
# Syncs with the seperate hyperdash messaging loop thread
self.done_chan = Queue()
def run():
self._experiment_runner._set_start_time(datetime.now())
self._hd.run()
self._experiment_runner._set_end_time(datetime.now())
self.done_chan.put(True)
exp_thread = threading.Thread(target=run)
exp_thread.daemon = True
exp_thread.start()
self._ended = False
def metric(self, name, value, log=True):
if self._ended:
self._logger.warn("Cannot send metric {}, experiment ended. Please start a new experiment.".format(name))
return
return self._hd_client.metric(name, value, log)
def param(self, name, value, log=True):
if self._ended:
self._logger.warn("Cannot send param {}, experiment ended. Please start a new experiment.".format(name))
return
return self._hd_client.param(name, value, log)
def iter(self, n, log=True):
if self._ended:
self._logger.warn("Cannot iterate, experiment ended. Please start a new experiment.")
return
return self._hd_client.iter(n, log)
def end(self):
if self._ended:
return
self._ended = True
with self.lock:
sys.stdout, sys.stderr = self._old_out, self._old_err
self._experiment_runner.exit_cleanly = True
self._experiment_runner.done = True
# Makes sure the experiment runner has cleaned up fully
self.done_chan.get(block=True, timeout=None)
"""
For selective logging while capture_io is disabled
Main use case is if you output large amounts of text to STDOUT
but only want a subset saved to logs
"""
def log(self, string):
self._logger.info(string)
class Callbacks:
"""Callbacks is a container class for 3rd-party library callbacks.
An instance of Experiment is injected so that the callbacks can emit
metrics/logs/parameters on behalf of an experiment.
"""
def __init__(self, exp):
self._exp = exp
self._callbacks = {}
@property
def keras(self):
"""
Returns an object that implements the Keras Callback interface.
This method initializes the Keras callback lazily to to prevent
any possible import issues from affecting users who don't use it,
as well as prevent it from importing Keras/tensorflow and all of
their accompanying baggage unnecessarily in the case that they
happened to be installed, but the user is not using them.
"""
cb = self._callbacks.get(KERAS)
# Keras is not importable
if cb is False:
return None
# If this is the first time, try and import Keras
if not cb:
# Check if Keras is installed and fallback gracefully
try:
from keras.callbacks import Callback as KerasCallback
class _KerasCallback(KerasCallback):
"""_KerasCallback implement KerasCallback using an injected Experiment.
# TODO: Decide if we want to handle the additional callbacks:
# 1) on_epoch_begin
# 2) on_batch_begin
# 3) on_batch_end
# 4) on_train_begin
# 5) on_train_end
"""
def __init__(self, exp):
super(_KerasCallback, self).__init__()
self._exp = exp
def on_epoch_end(self, epoch, logs=None):
if not logs:
logs = {}
# for <python 36
for k, v in sorted(logs.items()):
if isinstance(v, (np.ndarray, np.generic)):
self._exp.metric(k, v.item())
else:
self._exp.metric(k, v)
cb = _KerasCallback(self._exp)
self._callbacks[KERAS] = cb
return cb
except ImportError:
# Mark Keras as unimportable for future calls
self._callbacks[KERAS] = False
return None
return cb
# Version of Experiment with a different name for use internally, should not be used directly by consumers
class _TensorboardExperiment(Experiment):
_api_name = API_NAME_CLI_TENSORBOARD
def __init__(self, *args, **kwargs):
Experiment.__init__(self, *args, **kwargs)