Skip to content

Commit 48f5412

Browse files
jeblairceache
authored andcommitted
feat(recipe): add ExistingDataWatch class
This adds a subclass of DataWatch which only operates on existing ZNodes. If a user uses a DataWatch on a path and the ZNode at that path is deleted, the DataWatch will still issue an "exists" call and set a watch right before the final callback. That means that regardless of the return value of the callback and whether or not Kazoo will invoke the callback again, the ZooKeeper server still has a watch entry for that path. In short, using a DataWatch on a path and then deleting that path can leak watch entries on the ZooKeeper server. Because the DataWatch recipe is designed to watch non-existing paths, this behavior may be desired and relied on by some users, so it's not considered a bug. But other users may want to use DataWatches for nodes where this behavior would be a problem. The ExistingDataWatch class behaves similarly to its parent class, DataWatch, but it does not set a watch on paths which do not exist (whether that's because they never existed or were recently deleted). This means that a user of an ExistingDataWatch can be assured that after the callback with the deleted event, the watch is removed from the server.
1 parent 9bb8499 commit 48f5412

File tree

4 files changed

+145
-1
lines changed

4 files changed

+145
-1
lines changed

docs/api/recipe/watchers.rst

+6
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,12 @@ Public API
1515

1616
.. automethod:: __call__
1717

18+
.. autoclass:: ExistingDataWatch
19+
:members:
20+
21+
.. automethod:: __init__
22+
23+
.. automethod:: __call__
1824

1925
.. autoclass:: ChildrenWatch
2026
:members:

kazoo/client.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@
6363
from kazoo.recipe.partitioner import SetPartitioner
6464
from kazoo.recipe.party import Party, ShallowParty
6565
from kazoo.recipe.queue import Queue, LockingQueue
66-
from kazoo.recipe.watchers import ChildrenWatch, DataWatch
66+
from kazoo.recipe.watchers import ChildrenWatch, DataWatch, ExistingDataWatch
6767

6868

6969
string_types = six.string_types
@@ -352,6 +352,7 @@ def _retry(*args, **kwargs):
352352
self.DoubleBarrier = partial(DoubleBarrier, self)
353353
self.ChildrenWatch = partial(ChildrenWatch, self)
354354
self.DataWatch = partial(DataWatch, self)
355+
self.ExistingDataWatch = partial(ExistingDataWatch, self)
355356
self.Election = partial(Election, self)
356357
self.NonBlockingLease = partial(NonBlockingLease, self)
357358
self.MultiNonBlockingLease = partial(MultiNonBlockingLease, self)

kazoo/recipe/watchers.py

+61
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,67 @@ def _session_watcher(self, state):
217217
self._client.handler.spawn(self._get_data)
218218

219219

220+
class ExistingDataWatch(DataWatch):
221+
"""Watches a node for data updates and calls the specified
222+
function each time it changes
223+
224+
Similar to :class:`~kazoo.recipes.watchers.DataWatch`, but it does
225+
not operate on nodes which do not exist.
226+
227+
The function will also be called the very first time its
228+
registered to get the data.
229+
230+
Returning `False` from the registered function will disable future
231+
data change calls. If the client connection is closed (using the
232+
close command), the DataWatch will no longer get updates.
233+
234+
If the function supplied takes three arguments, then the third one
235+
will be a :class:`~kazoo.protocol.states.WatchedEvent`. It will
236+
only be set if the change to the data occurs as a result of the
237+
server notifying the watch that there has been a change. Events
238+
like reconnection or the first call will not include an event.
239+
240+
If the node does not exist on creation then the function will be
241+
called with ``None`` for all values and no futher callbacks will
242+
occur. If the node is deleted after the watch is created, the
243+
function will be called with the event argument indicating a
244+
delete event and no further callbacks will occur.
245+
"""
246+
247+
@_ignore_closed
248+
def _get_data(self, event=None):
249+
# Ensure this runs one at a time, possible because the session
250+
# watcher may trigger a run
251+
with self._run_lock:
252+
if self._stopped:
253+
return
254+
255+
initial_version = self._version
256+
257+
try:
258+
data, stat = self._retry(self._client.get,
259+
self._path, self._watcher)
260+
except NoNodeError:
261+
data = stat = None
262+
263+
# No node data, clear out version
264+
if stat is None:
265+
self._version = None
266+
else:
267+
self._version = stat.mzxid
268+
269+
# Call our function if its the first time ever, or if the
270+
# version has changed
271+
if initial_version != self._version or not self._ever_called:
272+
self._log_func_exception(data, stat, event)
273+
274+
# If the node doesn't exist, we won't be watching any more
275+
if stat is None:
276+
self._stopped = True
277+
self._func = None
278+
self._client.remove_listener(self._session_watcher)
279+
280+
220281
class ChildrenWatch(object):
221282
"""Watches a node for children updates and calls the specified
222283
function each time it changes

kazoo/tests/test_watchers.py

+76
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,82 @@ def changed(val, stat):
279279
assert b is False
280280

281281

282+
class KazooExistingDataWatcherTests(KazooTestCase):
283+
def setUp(self):
284+
super(KazooExistingDataWatcherTests, self).setUp()
285+
self.path = "/" + uuid.uuid4().hex
286+
self.client.ensure_path(self.path)
287+
288+
def test_data_watcher_non_existent_path(self):
289+
update = threading.Event()
290+
data = [True]
291+
292+
# Make it a non-existent path
293+
self.path += 'f'
294+
295+
@self.client.ExistingDataWatch(self.path)
296+
def changed(d, stat):
297+
data.pop()
298+
data.append(d)
299+
update.set()
300+
301+
update.wait(10)
302+
assert data == [None]
303+
update.clear()
304+
305+
# We should not get an update
306+
self.client.create(self.path, b'fred')
307+
update.wait(0.2)
308+
assert data == [None]
309+
update.clear()
310+
311+
def test_data_watcher_existing_path(self):
312+
update = threading.Event()
313+
data = [True]
314+
315+
# Make it an existing path
316+
self.path += 'f'
317+
self.client.create(self.path, b'fred')
318+
319+
@self.client.ExistingDataWatch(self.path)
320+
def changed(d, stat):
321+
data.pop()
322+
data.append(d)
323+
update.set()
324+
325+
update.wait(10)
326+
assert data[0] == b'fred'
327+
update.clear()
328+
329+
def test_data_watcher_delete(self):
330+
update = threading.Event()
331+
data = [True]
332+
333+
# Make it an existing path
334+
self.path += 'f'
335+
self.client.create(self.path, b'fred')
336+
337+
@self.client.ExistingDataWatch(self.path)
338+
def changed(d, stat):
339+
data.pop()
340+
data.append(d)
341+
update.set()
342+
343+
update.wait(10)
344+
assert data[0] == b'fred'
345+
update.clear()
346+
347+
self.client.delete(self.path)
348+
update.wait(10)
349+
assert data == [None]
350+
update.clear()
351+
352+
self.client.create(self.path, b'ginger')
353+
update.wait(0.2)
354+
assert data == [None]
355+
update.clear()
356+
357+
282358
class KazooChildrenWatcherTests(KazooTestCase):
283359
def setUp(self):
284360
super(KazooChildrenWatcherTests, self).setUp()

0 commit comments

Comments
 (0)