Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement simple asyncio wrapper API with basic tests #646

Draft
wants to merge 8 commits into
base: master
Choose a base branch
from
3 changes: 3 additions & 0 deletions kazoo/aio/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
"""
Simple asyncio integration of the threaded async executor engine.
"""
79 changes: 79 additions & 0 deletions kazoo/aio/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import asyncio

from kazoo.aio.handler import AioSequentialThreadingHandler
from kazoo.client import KazooClient, TransactionRequest


class AioKazooClient(KazooClient):
"""
The asyncio compatibility mostly mimics the behaviour of the base async
one. All calls are wrapped in asyncio.shield() to prevent cancellation
that is not supported in the base async implementation.

The sync and base-async API are still completely functional. Mixing the
use of any of the 3 should be okay.
"""

def __init__(self, *args, **kwargs):
if not kwargs.get("handler"):
kwargs["handler"] = AioSequentialThreadingHandler()
KazooClient.__init__(self, *args, **kwargs)

# asyncio compatible api wrappers
async def start_aio(self):
return await asyncio.shield(self.start_async().future)

async def add_auth_aio(self, *args, **kwargs):
return await asyncio.shield(self.add_auth_async(*args, **kwargs).future)
Traktormaster marked this conversation as resolved.
Show resolved Hide resolved

async def sync_aio(self, *args, **kwargs):
return await asyncio.shield(self.sync_async(*args, **kwargs).future)

async def create_aio(self, *args, **kwargs):
return await asyncio.shield(self.create_async(*args, **kwargs).future)

async def ensure_path_aio(self, *args, **kwargs):
return await asyncio.shield(
self.ensure_path_async(*args, **kwargs).future
)

async def exists_aio(self, *args, **kwargs):
return await asyncio.shield(self.exists_async(*args, **kwargs).future)

async def get_aio(self, *args, **kwargs):
return await asyncio.shield(self.get_async(*args, **kwargs).future)

async def get_children_aio(self, *args, **kwargs):
return await asyncio.shield(
self.get_children_async(*args, **kwargs).future
)

async def get_acls_aio(self, *args, **kwargs):
return await asyncio.shield(self.get_acls_async(*args, **kwargs).future)
Traktormaster marked this conversation as resolved.
Show resolved Hide resolved

async def set_acls_aio(self, *args, **kwargs):
return await asyncio.shield(self.set_acls_async(*args, **kwargs).future)
Traktormaster marked this conversation as resolved.
Show resolved Hide resolved

async def set_aio(self, *args, **kwargs):
return await asyncio.shield(self.set_async(*args, **kwargs).future)

def transaction_aio(self):
return AioTransactionRequest(self)

async def delete_aio(self, *args, **kwargs):
return await asyncio.shield(self.delete_async(*args, **kwargs).future)

async def reconfig_aio(self, *args, **kwargs):
return await asyncio.shield(self.reconfig_async(*args, **kwargs).future)
Traktormaster marked this conversation as resolved.
Show resolved Hide resolved


class AioTransactionRequest(TransactionRequest):
async def commit_aio(self):
return await asyncio.shield(self.commit_async().future)

async def __aenter__(self):
return self

async def __aexit__(self, exc_type, exc_value, exc_tb):
if not exc_type:
await self.commit_aio()
60 changes: 60 additions & 0 deletions kazoo/aio/handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import asyncio
import threading

from kazoo.handlers.threading import AsyncResult, SequentialThreadingHandler


class AioAsyncResult(AsyncResult):
def __init__(self, handler):
self.future = handler.loop.create_future()
AsyncResult.__init__(self, handler)

def set(self, value=None):
"""
The completion of the future has the same guarantees as the
notification emitting of the condition.
Provided that no callbacks raise it will complete.
"""
AsyncResult.set(self, value)
self._handler.loop.call_soon_threadsafe(self.future.set_result, value)

def set_exception(self, exception):
"""
The completion of the future has the same guarantees as the
notification emitting of the condition.
Provided that no callbacks raise it will complete.
"""
AsyncResult.set_exception(self, exception)
self._handler.loop.call_soon_threadsafe(
self.future.set_exception, exception
)


class AioSequentialThreadingHandler(SequentialThreadingHandler):
def __init__(self):
"""
Creating the handler must be done on the asyncio-loop's thread.
"""
self.loop = asyncio.get_running_loop()
self._aio_thread = threading.current_thread()
SequentialThreadingHandler.__init__(self)

def async_result(self):
"""
Almost all async-result objects are created by a method that is
invoked from the user's thead. The one exception I'm aware of is
in the PatientChildrenWatch utility, that creates an async-result
in its worker thread. Just because of that it is imperative to
only create asyncio compatible results when the invoking code is
from the loop's thread. There is no PEP/API guarantee that
implementing the create_future() has to be thread-safe. The default
is mostly thread-safe. The only thing that may get synchronization
issue is a debug-feature for asyncio development. Quickly looking at
the alternate implementation of uvloop, they use the default Future
implementation, so no change there.
For now, just to be safe, we check the current thread and create an
async-result object based on the invoking thread's identity.
"""
if threading.current_thread() is self._aio_thread:
return AioAsyncResult(self)
return AsyncResult(self)
4 changes: 2 additions & 2 deletions kazoo/testing/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from kazoo.testing.harness import KazooTestCase, KazooTestHarness
from kazoo.testing.harness import KazooAioTestCase, KazooTestCase, KazooTestHarness
Traktormaster marked this conversation as resolved.
Show resolved Hide resolved


__all__ = ('KazooTestHarness', 'KazooTestCase', )
__all__ = ('KazooTestHarness', 'KazooTestCase', 'KazooAioTestCase', )
29 changes: 27 additions & 2 deletions kazoo/testing/harness.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
"""Kazoo testing harnesses"""
import asyncio
import logging
import os
import uuid
import unittest

from kazoo import python2atexit as atexit
from kazoo.aio.client import AioKazooClient
from kazoo.client import KazooClient
from kazoo.exceptions import KazooException
from kazoo.protocol.connection import _CONNECTION_DROP, _SESSION_EXPIRED
Expand Down Expand Up @@ -144,6 +146,7 @@ def test_something_else(self):

"""
DEFAULT_CLIENT_TIMEOUT = 15
CLIENT_CLS = KazooClient

def __init__(self, *args, **kw):
super(KazooTestHarness, self).__init__(*args, **kw)
Expand All @@ -159,14 +162,14 @@ def servers(self):
return ",".join([s.address for s in self.cluster])

def _get_nonchroot_client(self):
c = KazooClient(self.servers)
c = self.CLIENT_CLS(self.servers)
self._clients.append(c)
return c

def _get_client(self, **client_options):
if 'timeout' not in client_options:
client_options['timeout'] = self.DEFAULT_CLIENT_TIMEOUT
c = KazooClient(self.hosts, **client_options)
c = self.CLIENT_CLS(self.hosts, **client_options)
self._clients.append(c)
return c

Expand Down Expand Up @@ -245,3 +248,25 @@ def setUp(self):

def tearDown(self):
self.teardown_zookeeper()


class KazooAioTestCase(KazooTestHarness):
CLIENT_CLS = AioKazooClient

def __init__(self, *args, **kw):
super(KazooAioTestCase, self).__init__(*args, **kw)
self.loop = None

async def setup_zookeeper_aio(self):
self.setup_zookeeper() # NOTE: could enhance this to call start_aio() on the client
Traktormaster marked this conversation as resolved.
Show resolved Hide resolved

async def teardown_zookeeper_aio(self):
self.teardown_zookeeper()

def setUp(self):
self.loop = asyncio.get_event_loop_policy().new_event_loop()
self.loop.run_until_complete(self.setup_zookeeper_aio())

def tearDown(self):
self.loop.run_until_complete(self.teardown_zookeeper_aio())
self.loop.close()
34 changes: 34 additions & 0 deletions kazoo/tests/test_aio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from kazoo.exceptions import NotEmptyError, NoNodeError
from kazoo.protocol.states import ZnodeStat
from kazoo.testing import KazooAioTestCase


class KazooAioTests(KazooAioTestCase):
def test_basic_aio_functionality(self):
self.loop.run_until_complete(self._test_basic_aio_functionality())

async def _test_basic_aio_functionality(self):
assert await self.client.create_aio("/tmp") == "/tmp"
assert await self.client.get_children_aio("/") == ["tmp"]
assert await self.client.ensure_path_aio("/tmp/x/y") == "/tmp/x/y"
assert await self.client.exists_aio("/tmp/x/y")
assert isinstance(
await self.client.set_aio("/tmp/x/y", b"very aio"), ZnodeStat
)
data, stat = await self.client.get_aio("/tmp/x/y")
assert data == b"very aio"
assert isinstance(stat, ZnodeStat)
try:
await self.client.delete_aio("/tmp/x")
except NotEmptyError:
pass
await self.client.delete_aio("/tmp/x/y")
try:
await self.client.get_aio("/tmp/x/y")
except NoNodeError:
pass
async with self.client.transaction_aio() as tx:
tx.create("/tmp/z", b"ZZZ")
tx.set_data("/tmp/x", b"XXX")
assert (await self.client.get_aio("/tmp/x"))[0] == b"XXX"
assert (await self.client.get_aio("/tmp/z"))[0] == b"ZZZ"