Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for persistent recursive watches #715

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 117 additions & 0 deletions kazoo/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from kazoo.protocol.connection import ConnectionHandler
from kazoo.protocol.paths import _prefix_root, normpath
from kazoo.protocol.serialization import (
AddWatch,
Auth,
CheckVersion,
CloseInstance,
Expand All @@ -38,6 +39,7 @@
SetACL,
GetData,
Reconfig,
RemoveWatches,
SetData,
Sync,
Transaction,
Expand All @@ -48,6 +50,8 @@
KazooState,
KeeperState,
WatchedEvent,
AddWatchMode,
WatcherType,
)
from kazoo.retry import KazooRetry
from kazoo.security import ACL, OPEN_ACL_UNSAFE
Expand Down Expand Up @@ -248,6 +252,8 @@ def __init__(
self.state_listeners = set()
self._child_watchers = defaultdict(set)
self._data_watchers = defaultdict(set)
self._persistent_watchers = defaultdict(set)
self._persistent_recursive_watchers = defaultdict(set)
self._reset()
self.read_only = read_only

Expand Down Expand Up @@ -416,8 +422,16 @@ def _reset_watchers(self):
for data_watchers in self._data_watchers.values():
watchers.extend(data_watchers)

for persistent_watchers in self._persistent_watchers.values():
watchers.extend(persistent_watchers)

for pr_watchers in self._persistent_recursive_watchers.values():
watchers.extend(pr_watchers)

self._child_watchers = defaultdict(set)
self._data_watchers = defaultdict(set)
self._persistent_watchers = defaultdict(set)
self._persistent_recursive_watchers = defaultdict(set)

ev = WatchedEvent(EventType.NONE, self._state, None)
for watch in watchers:
Expand Down Expand Up @@ -1644,8 +1658,111 @@ def reconfig_async(self, joining, leaving, new_members, from_config):

return async_result

def add_watch(self, path, watch, mode):
"""Add a watch.

This method adds persistent watches. Unlike the data and
child watches which may be set by calls to
:meth:`KazooClient.exists`, :meth:`KazooClient.get`, and
:meth:`KazooClient.get_children`, persistent watches are not
removed after being triggered.

To remove a persistent watch, use
:meth:`KazooClient.remove_all_watches` with an argument of
:attr:`~kazoo.protocol.states.WatcherType.ANY`.

The `mode` argument determines whether or not the watch is
recursive. To set a persistent watch, use
:class:`~kazoo.protocol.states.AddWatchMode.PERSISTENT`. To set a
persistent recursive watch, use
:class:`~kazoo.protocol.states.AddWatchMode.PERSISTENT_RECURSIVE`.

:param path: Path of node to watch.
:param watch: Watch callback to set for future changes
to this path.
:param mode: The mode to use.
:type mode: int

:raises:
:exc:`~kazoo.exceptions.MarshallingError` if mode is
unknown.

:exc:`~kazoo.exceptions.ZookeeperError` if the server
returns a non-zero error code.
"""
return self.add_watch_async(path, watch, mode).get()

def add_watch_async(self, path, watch, mode):
"""Asynchronously add a watch. Takes the same arguments as
:meth:`add_watch`.
"""
if not isinstance(path, str):
raise TypeError("Invalid type for 'path' (string expected)")
if not callable(watch):
raise TypeError("Invalid type for 'watch' (must be a callable)")
if not isinstance(mode, int):
StephenSorriaux marked this conversation as resolved.
Show resolved Hide resolved
raise TypeError("Invalid type for 'mode' (int expected)")
if mode not in (
AddWatchMode.PERSISTENT,
AddWatchMode.PERSISTENT_RECURSIVE,
):
raise ValueError("Invalid value for 'mode'")

async_result = self.handler.async_result()
self._call(
AddWatch(_prefix_root(self.chroot, path), watch, mode),
async_result,
)
return async_result

def remove_all_watches(self, path, watcher_type):
"""Remove watches from a path.

This removes all watches of a specified type (data, child,
any) from a given path.

The `watcher_type` argument specifies which type to use. It
may be one of:

* :attr:`~kazoo.protocol.states.WatcherType.DATA`
* :attr:`~kazoo.protocol.states.WatcherType.CHILDREN`
* :attr:`~kazoo.protocol.states.WatcherType.ANY`

To remove persistent watches, specify a watcher type of
:attr:`~kazoo.protocol.states.WatcherType.ANY`.

:param path: Path of watch to remove.
:param watcher_type: The type of watch to remove.
:type watcher_type: int
"""

return self.remove_all_watches_async(path, watcher_type).get()

def remove_all_watches_async(self, path, watcher_type):
"""Asynchronously remove watches. Takes the same arguments as
:meth:`remove_all_watches`.
"""
if not isinstance(path, str):
raise TypeError("Invalid type for 'path' (string expected)")
if not isinstance(watcher_type, int):
StephenSorriaux marked this conversation as resolved.
Show resolved Hide resolved
raise TypeError("Invalid type for 'watcher_type' (int expected)")
if watcher_type not in (
WatcherType.ANY,
WatcherType.CHILDREN,
WatcherType.DATA,
):
raise ValueError("Invalid value for 'watcher_type'")

async_result = self.handler.async_result()
self._call(
RemoveWatches(_prefix_root(self.chroot, path), watcher_type),
async_result,
)
return async_result


class TransactionRequest(object):

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this blank line extraneous? I don't normally recall a blank line between class definition and docstring...

"""A Zookeeper Transaction Request

A Transaction provides a builder object that can be used to
Expand Down
56 changes: 49 additions & 7 deletions kazoo/protocol/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
)
from kazoo.loggingsupport import BLATHER
from kazoo.protocol.serialization import (
AddWatch,
Auth,
Close,
Connect,
Expand All @@ -28,17 +29,20 @@
GetChildren2,
Ping,
PingInstance,
RemoveWatches,
ReplyHeader,
SASL,
Transaction,
Watch,
int_struct,
)
from kazoo.protocol.states import (
AddWatchMode,
Callback,
KeeperState,
WatchedEvent,
EVENT_TYPE_MAP,
WatcherType,
)
from kazoo.retry import (
ForceRetryError,
Expand Down Expand Up @@ -363,6 +367,18 @@ def _write(self, msg, timeout):
raise ConnectionDropped("socket connection broken")
sent += bytes_sent

def _find_persistent_recursive_watchers(self, path):
parts = path.split("/")
watchers = []
for count in range(len(parts)):
candidate = "/".join(parts[: count + 1])
if not candidate:
continue
watchers.extend(
self.client._persistent_recursive_watchers.get(candidate, [])
)
return watchers

def _read_watch_event(self, buffer, offset):
client = self.client
watch, offset = Watch.deserialize(buffer, offset)
Expand All @@ -374,9 +390,13 @@ def _read_watch_event(self, buffer, offset):

if watch.type in (CREATED_EVENT, CHANGED_EVENT):
watchers.extend(client._data_watchers.pop(path, []))
watchers.extend(client._persistent_watchers.get(path, []))
watchers.extend(self._find_persistent_recursive_watchers(path))
elif watch.type == DELETED_EVENT:
watchers.extend(client._data_watchers.pop(path, []))
watchers.extend(client._child_watchers.pop(path, []))
watchers.extend(client._persistent_watchers.get(path, []))
watchers.extend(self._find_persistent_recursive_watchers(path))
elif watch.type == CHILD_EVENT:
watchers.extend(client._child_watchers.pop(path, []))
else:
Expand Down Expand Up @@ -448,13 +468,35 @@ def _read_response(self, header, buffer, offset):

async_object.set(response)

# Determine if watchers should be registered
watcher = getattr(request, "watcher", None)
if not client._stopped.is_set() and watcher:
if isinstance(request, (GetChildren, GetChildren2)):
client._child_watchers[request.path].add(watcher)
else:
client._data_watchers[request.path].add(watcher)
# Determine if watchers should be registered or unregistered
if not client._stopped.is_set():
watcher = getattr(request, "watcher", None)
if watcher:
if isinstance(request, AddWatch):
if request.mode == AddWatchMode.PERSISTENT:
client._persistent_watchers[request.path].add(
watcher
)
elif request.mode == AddWatchMode.PERSISTENT_RECURSIVE:
client._persistent_recursive_watchers[
request.path
].add(watcher)
Comment on lines +476 to +483
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this might benefit from an else clause?

I realize it's a no-op, but it still took me a moment to realize that's because it might be a non-persistant watch that's being added... so might be easier for a reader to understand with a code comment...

But code comments can go out of sync, so a simple assertion that it's the expected mode might be useful like:

else:
    if request.mode not in [expected modes]:
       raise <relevant exception>

that way both clarifies the code and also catches stupid errors if somehow a case was missed down the road.

Alternatively, this might read even simpler as a switch statement with a default case... but I realize those require Python 3.10 so maybe leave a:

# TODO: switch these if/else clauses to switch statements where appropriate once we drop Python 3.9 support

elif isinstance(request, (GetChildren, GetChildren2)):
client._child_watchers[request.path].add(watcher)
else:
client._data_watchers[request.path].add(watcher)
if isinstance(request, RemoveWatches):
if request.watcher_type == WatcherType.CHILDREN:
client._child_watchers.pop(request.path, None)
elif request.watcher_type == WatcherType.DATA:
client._data_watchers.pop(request.path, None)
elif request.watcher_type == WatcherType.ANY:
Comment on lines +489 to +493
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this might read simpler as a switch statement rather than the if/elif/elif series? technically it should be more performant too

Oh, NVM, just realized switch statements won't work in Python 3.8/3.9 😢

client._child_watchers.pop(request.path, None)
client._data_watchers.pop(request.path, None)
client._persistent_watchers.pop(request.path, None)
client._persistent_recursive_watchers.pop(
request.path, None
)

if isinstance(request, Close):
self.logger.log(BLATHER, "Read close response")
Expand Down
28 changes: 28 additions & 0 deletions kazoo/protocol/serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,20 @@ def deserialize(cls, bytes, offset):
return data, stat


class RemoveWatches(namedtuple("RemoveWatches", "path watcher_type")):
type = 18

def serialize(self):
b = bytearray()
b.extend(write_string(self.path))
b.extend(int_struct.pack(self.watcher_type))
return b

@classmethod
def deserialize(cls, bytes, offset):
return None


class Auth(namedtuple("Auth", "auth_type scheme auth")):
type = 100

Expand All @@ -441,6 +455,20 @@ def deserialize(cls, bytes, offset):
return challenge, offset


class AddWatch(namedtuple("AddWatch", "path watcher mode")):
type = 106

def serialize(self):
b = bytearray()
b.extend(write_string(self.path))
b.extend(int_struct.pack(self.mode))
return b

@classmethod
def deserialize(cls, bytes, offset):
return None


class Watch(namedtuple("Watch", "type state path")):
@classmethod
def deserialize(cls, bytes, offset):
Expand Down
41 changes: 41 additions & 0 deletions kazoo/protocol/states.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,3 +251,44 @@ def data_length(self):
@property
def children_count(self):
return self.numChildren


class AddWatchMode(object):
"""Modes for use with :meth:`~kazoo.client.KazooClient.add_watch`

.. attribute:: PERSISTENT

The watch is not removed when trigged.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
The watch is not removed when trigged.
The watch is not removed when triggered.


.. attribute:: PERSISTENT_RECURSIVE

The watch is not removed when trigged, and applies to all
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
The watch is not removed when trigged, and applies to all
The watch is not removed when triggered, and applies to all

paths underneath the supplied path as well.
"""

PERSISTENT = 0
PERSISTENT_RECURSIVE = 1


class WatcherType(object):
"""Watcher types for use with
:meth:`~kazoo.client.KazooClient.remove_all_watches`

.. attribute:: CHILDREN

Child watches.

.. attribute:: DATA

Data watches.

.. attribute:: ANY

Any type of watch (child, data, persistent, or persistent
recursive).

"""

CHILDREN = 1
DATA = 2
ANY = 3
Loading