Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add kv client #6

Merged
merged 1 commit into from
Dec 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ jobs:

- name: Install Hatch
run: pip install hatch

- name: Generate API
uses: ./.github/workflows/protobuf

- name: Run lint & fmt
run: hatch run lint:all ./client
Expand Down
30 changes: 30 additions & 0 deletions client/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
"""Xline Client"""

from __future__ import annotations
from client.protocol import ProtocolClient
from client.kv import KvClient


class Client:
"""
Xline client

Attributes:
kv: Kv client
"""

kv_client: KvClient

def __init__(self, kv: KvClient) -> None:
self.kv_client = kv

@classmethod
async def connect(cls, addrs: list[str]) -> Client:
"""
New `Client`
"""
protocol_client = await ProtocolClient.build_from_addrs(addrs)

kv_client = KvClient("client", protocol_client, "")

return cls(kv_client)
189 changes: 189 additions & 0 deletions client/kv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
"""Kv Client"""

import uuid
from typing import Optional, Literal
from client.protocol import ProtocolClient as CurpClient
from client.txn import Txn
from api.xline.xline_command_pb2 import Command, RequestWithToken, KeyRange
from api.xline.rpc_pb2 import (
RangeRequest,
RangeResponse as _RangeResponse,
PutRequest,
PutResponse as _PutResponse,
DeleteRangeRequest,
DeleteRangeResponse as _DeleteRangeResponse,
CompactionRequest,
CompactionResponse as _CompactionResponse,
)

RangeResponse = _RangeResponse
PutResponse = _PutResponse
DeleteRangeResponse = _DeleteRangeResponse
CompactionResponse = _CompactionResponse


class KvClient:
"""
Client for KV operations.

Attributes:
name: Name of the kv client, which will be used in CURP propose id generation.
curp_client: The client running the CURP protocol, communicate with all servers.
token: The auth token.
"""

name: str
curp_client: CurpClient
token: Optional[str]

def __init__(self, name: str, curp_client: CurpClient, token: Optional[str]) -> None:
self.name = name
self.curp_client = curp_client
self.token = token

async def range(
self,
key: bytes,
range_end: bytes | None = None,
limit: int | None = None,
revision: int | None = None,
sort_order: Literal["none", "ascend", "descend"] | None = None,
sort_target: Literal["key", "version", "create", "mod", "value"] | None = None,
serializable: bool = False,
keys_only: bool = False,
count_only: bool = False,
min_mod_revision: int | None = None,
max_mod_revision: int | None = None,
min_create_revision: int | None = None,
max_create_revision: int | None = None,
) -> RangeResponse:
"""
Get a range of keys from the store.
"""
req = RangeRequest(
key=key,
range_end=range_end,
limit=limit,
revision=revision,
sort_order=sort_order,
sort_target=sort_target,
serializable=serializable,
keys_only=keys_only,
count_only=count_only,
min_mod_revision=min_mod_revision,
max_mod_revision=max_mod_revision,
min_create_revision=min_create_revision,
max_create_revision=max_create_revision,
)
key_ranges = [KeyRange(key=key, range_end=range_end)]
propose_id = generate_propose_id(self.name)
request_with_token = RequestWithToken(
range_request=req,
token=self.token,
)
cmd = Command(
keys=key_ranges,
request=request_with_token,
propose_id=propose_id,
)
er, _ = await self.curp_client.propose(cmd, True)
return er.range_response

async def put(
self,
key: bytes,
value: bytes,
lease: int | None = None,
prev_kv: bool = False,
ignore_value: bool = False,
ignore_lease: bool = False,
) -> PutResponse:
"""
Put a key-value into the store.
"""
req = PutRequest(
key=key, value=value, lease=lease, prev_kv=prev_kv, ignore_value=ignore_value, ignore_lease=ignore_lease
)
key_ranges = [KeyRange(key=key, range_end=key)]
propose_id = generate_propose_id(self.name)
request_with_token = RequestWithToken(
put_request=req,
token=self.token,
)
cmd = Command(
keys=key_ranges,
request=request_with_token,
propose_id=propose_id,
)
er, _ = await self.curp_client.propose(cmd, True)
return er.put_response

async def delete(self, key: bytes, range_end: bytes | None = None, prev_kv: bool = False) -> DeleteRangeResponse:
"""
Delete a range of keys from the store.
"""
req = DeleteRangeRequest(
key=key,
range_end=range_end,
prev_kv=prev_kv,
)
key_ranges = [KeyRange(key=key, range_end=range_end)]
propose_id = generate_propose_id(self.name)

request_with_token = RequestWithToken(
delete_range_request=req,
token=self.token,
)
cmd = Command(
keys=key_ranges,
request=request_with_token,
propose_id=propose_id,
)
er, _ = await self.curp_client.propose(cmd, True)
return er.delete_range_response

def txn(self) -> Txn:
"""
Creates a transaction, which can provide serializable writes.
"""

return Txn(
self.name,
self.curp_client,
self.token,
)

async def compact(self, revision: int, physical: bool = False) -> CompactionResponse:
"""
Compacts the key-value store up to a given revision.
"""
req = CompactionRequest(
revision=revision,
physical=physical,
)
use_fast_path = physical
propose_id = generate_propose_id(self.name)
request_with_token = RequestWithToken(
compaction_request=req,
token=self.token,
)
cmd = Command(
request=request_with_token,
propose_id=propose_id,
)

if use_fast_path:
er, asr = await self.curp_client.propose(cmd, True)
return er.compaction_response

Check warning on line 177 in client/kv.py

View check run for this annotation

Codecov / codecov/patch

client/kv.py#L176-L177

Added lines #L176 - L177 were not covered by tests
else:
er, asr = await self.curp_client.propose(cmd, False)
if asr is None:
msg = "sync_res is always Some when use_fast_path is false"
raise Exception(msg)

Check warning on line 182 in client/kv.py

View check run for this annotation

Codecov / codecov/patch

client/kv.py#L181-L182

Added lines #L181 - L182 were not covered by tests
return er.compaction_response


def generate_propose_id(prefix: str) -> str:
"""Generate propose id with the given prefix"""
propose_id = f"{prefix}-{uuid.uuid4()}"
return propose_id
4 changes: 2 additions & 2 deletions client/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ async def fast_path(self, cmd: Command) -> tuple[CommandResponse, SyncResponse |
"""
for futures in asyncio.as_completed([self.fast_round(cmd), self.slow_round(cmd)]):
first, second = await futures
if isinstance(first, CommandResponse) and isinstance(second, bool):
if isinstance(first, CommandResponse) and second:
return (first, None)
if isinstance(second, CommandResponse) and isinstance(first, SyncResponse):
return (second, first)
Expand Down Expand Up @@ -137,7 +137,7 @@ async def fast_round(self, cmd: Command) -> tuple[CommandResponse | None, bool]:
exe_err.inner.ParseFromString(cmd_result.error)
raise exe_err
elif res.HasField("error"):
raise res.error
logging.info(res.error)
else:
ok_cnt += 1

Expand Down
88 changes: 88 additions & 0 deletions client/txn.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
"Transaction"

import uuid
from typing import List, Optional
from client.client import ProtocolClient as CurpClient
from api.xline.xline_command_pb2 import Command, RequestWithToken, KeyRange
from api.xline.rpc_pb2 import (
RangeRequest as _RangeRequest,
PutRequest as _PutRequest,
DeleteRangeRequest as _DeleteRangeRequest,
TxnRequest as _TxnRequest,
Compare as _Compare,
RequestOp as _RequestOp,
TxnResponse as _TxnResponse,
)

RangeRequest = _RangeRequest
PutRequest = _PutRequest
DeleteRangeRequest = _DeleteRangeRequest
TxnRequest = _TxnRequest
Compare = _Compare
RequestOp = _RequestOp
TxnResponse = _TxnResponse


class Txn:
"""
Transaction.

Attributes:
name: Name of the Transaction, which will be used in CURP propose id generation.
curp_client: The client running the CURP protocol, communicate with all servers.
token: The auth token.
"""

name: str
curp_client: CurpClient
token: Optional[str]

cmps: List[Compare]
sus: List[RequestOp]
fas: List[RequestOp]

def __init__(self, name: str, curp_client: CurpClient, token: Optional[str]) -> None:
self.name = name
self.curp_client = curp_client
self.token = token

def when(self, cmps: List[Compare]):
"compare"
self.cmps = cmps
return self

def and_then(self, op: List[RequestOp]):
"true"
self.sus = op
return self

def or_else(self, op: List[RequestOp]):
"false"
self.fas = op
return self

async def commit(self) -> TxnResponse:
"commit"
# TODO: https://github.com/xline-kv/Xline/issues/470
krs = []
for cmp in self.cmps:
krs.append(KeyRange(key=cmp.key, range_end=cmp.range_end))
LingKa28 marked this conversation as resolved.
Show resolved Hide resolved
propose_id = self.generate_propose_id(self.name)
r = TxnRequest(compare=self.cmps, success=self.sus, failure=self.fas)
req = RequestWithToken(
txn_request=r,
token=self.token,
)
cmd = Command(
keys=krs,
request=req,
propose_id=propose_id,
)
er, _ = await self.curp_client.propose(cmd, False)
return er.txn_response

@staticmethod
def generate_propose_id(prefix: str) -> str:
"""Generate propose id with the given prefix"""
propose_id = f"{prefix}-{uuid.uuid4()}"
return propose_id
Loading
Loading