Skip to content

Commit

Permalink
feat: add kv clinet
Browse files Browse the repository at this point in the history
Signed-off-by: LingKa <[email protected]>
  • Loading branch information
LingKa28 committed Dec 4, 2023
1 parent 42e35cd commit 8134b71
Show file tree
Hide file tree
Showing 6 changed files with 504 additions and 2 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ jobs:

- name: Install Hatch
run: pip install hatch

- name: Generate API
uses: ./.github/workflows/protobuf

- name: Run lint & fmt
run: hatch run lint:all ./client
Expand Down
30 changes: 30 additions & 0 deletions client/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
"""Xline Client"""

from __future__ import annotations
from client.protocol import ProtocolClient
from client.kv import KvClient


class Client:
"""
Xline client
Attributes:
kv: Kv client
"""

kv_client: KvClient

def __init__(self, kv: KvClient) -> None:
self.kv_client = kv

@classmethod
async def connect(cls, addrs: list[str]) -> Client:
"""
New `Client`
"""
protocol_client = await ProtocolClient.build_from_addrs(addrs)

kv_client = KvClient("client", protocol_client, "")

return cls(kv_client)
189 changes: 189 additions & 0 deletions client/kv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
"""Kv Client"""

import uuid
from typing import Optional, Literal
from client.protocol import ProtocolClient as CurpClient
from client.txn import Txn
from api.xline.xline_command_pb2 import Command, RequestWithToken, KeyRange
from api.xline.rpc_pb2 import (
RangeRequest,
RangeResponse as _RangeResponse,
PutRequest,
PutResponse as _PutResponse,
DeleteRangeRequest,
DeleteRangeResponse as _DeleteRangeResponse,
CompactionRequest,
CompactionResponse as _CompactionResponse,
)

RangeResponse = _RangeResponse
PutResponse = _PutResponse
DeleteRangeResponse = _DeleteRangeResponse
CompactionResponse = _CompactionResponse


class KvClient:
"""
Client for KV operations.
Attributes:
name: Name of the kv client, which will be used in CURP propose id generation.
curp_client: The client running the CURP protocol, communicate with all servers.
token: The auth token.
"""

name: str
curp_client: CurpClient
token: Optional[str]

def __init__(self, name: str, curp_client: CurpClient, token: Optional[str]) -> None:
self.name = name
self.curp_client = curp_client
self.token = token

async def range(
self,
key: bytes,
range_end: bytes | None = None,
limit: int | None = None,
revision: int | None = None,
sort_order: Literal["none", "ascend", "descend"] | None = None,
sort_target: Literal["key", "version", "create", "mod", "value"] | None = None,
serializable: bool = False,
keys_only: bool = False,
count_only: bool = False,
min_mod_revision: int | None = None,
max_mod_revision: int | None = None,
min_create_revision: int | None = None,
max_create_revision: int | None = None,
) -> RangeResponse:
"""
Get a range of keys from the store.
"""
req = RangeRequest(
key=key,
range_end=range_end,
limit=limit,
revision=revision,
sort_order=sort_order,
sort_target=sort_target,
serializable=serializable,
keys_only=keys_only,
count_only=count_only,
min_mod_revision=min_mod_revision,
max_mod_revision=max_mod_revision,
min_create_revision=min_create_revision,
max_create_revision=max_create_revision,
)
key_ranges = [KeyRange(key=key, range_end=range_end)]
propose_id = generate_propose_id(self.name)
request_with_token = RequestWithToken(
range_request=req,
token=self.token,
)
cmd = Command(
keys=key_ranges,
request=request_with_token,
propose_id=propose_id,
)
er, _ = await self.curp_client.propose(cmd, True)
return er.range_response

async def put(
self,
key: bytes,
value: bytes,
lease: int | None = None,
prev_kv: bool = False,
ignore_value: bool = False,
ignore_lease: bool = False,
) -> PutResponse:
"""
Put a key-value into the store.
"""
req = PutRequest(
key=key, value=value, lease=lease, prev_kv=prev_kv, ignore_value=ignore_value, ignore_lease=ignore_lease
)
key_ranges = [KeyRange(key=key, range_end=key)]
propose_id = generate_propose_id(self.name)
request_with_token = RequestWithToken(
put_request=req,
token=self.token,
)
cmd = Command(
keys=key_ranges,
request=request_with_token,
propose_id=propose_id,
)
er, _ = await self.curp_client.propose(cmd, True)
return er.put_response

async def delete(self, key: bytes, range_end: bytes | None = None, prev_kv: bool = False) -> DeleteRangeResponse:
"""
Delete a range of keys from the store.
"""
req = DeleteRangeRequest(
key=key,
range_end=range_end,
prev_kv=prev_kv,
)
key_ranges = [KeyRange(key=key, range_end=range_end)]
propose_id = generate_propose_id(self.name)

request_with_token = RequestWithToken(
delete_range_request=req,
token=self.token,
)
cmd = Command(
keys=key_ranges,
request=request_with_token,
propose_id=propose_id,
)
er, _ = await self.curp_client.propose(cmd, True)
return er.delete_range_response

def txn(self) -> Txn:
"""
Creates a transaction, which can provide serializable writes.
"""

return Txn(
self.name,
self.curp_client,
self.token,
)

async def compact(self, revision: int, physical: bool = False) -> CompactionResponse:
"""
Compacts the key-value store up to a given revision.
"""
req = CompactionRequest(
revision=revision,
physical=physical,
)
use_fast_path = physical
propose_id = generate_propose_id(self.name)
request_with_token = RequestWithToken(
compaction_request=req,
token=self.token,
)
cmd = Command(
request=request_with_token,
propose_id=propose_id,
)

if use_fast_path:
er, asr = await self.curp_client.propose(cmd, True)
return er.compaction_response

Check warning on line 177 in client/kv.py

View check run for this annotation

Codecov / codecov/patch

client/kv.py#L176-L177

Added lines #L176 - L177 were not covered by tests
else:
er, asr = await self.curp_client.propose(cmd, False)
if asr is None:
msg = "sync_res is always Some when use_fast_path is false"
raise Exception(msg)

Check warning on line 182 in client/kv.py

View check run for this annotation

Codecov / codecov/patch

client/kv.py#L181-L182

Added lines #L181 - L182 were not covered by tests
return er.compaction_response


def generate_propose_id(prefix: str) -> str:
"""Generate propose id with the given prefix"""
propose_id = f"{prefix}-{uuid.uuid4()}"
return propose_id
4 changes: 2 additions & 2 deletions client/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ async def fast_path(self, cmd: Command) -> tuple[CommandResponse, SyncResponse |
"""
for futures in asyncio.as_completed([self.fast_round(cmd), self.slow_round(cmd)]):
first, second = await futures
if isinstance(first, CommandResponse) and isinstance(second, bool):
if isinstance(first, CommandResponse) and isinstance(second, bool) and second:
return (first, None)
if isinstance(second, CommandResponse) and isinstance(first, SyncResponse):
return (second, first)
Expand Down Expand Up @@ -137,7 +137,7 @@ async def fast_round(self, cmd: Command) -> tuple[CommandResponse | None, bool]:
exe_err.inner.ParseFromString(cmd_result.error)
raise exe_err
elif res.HasField("error"):
raise res.error
logging.info(res.error)
else:
ok_cnt += 1

Expand Down
87 changes: 87 additions & 0 deletions client/txn.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
"Transaction"

import uuid
from typing import List, Optional
from client.client import ProtocolClient as CurpClient
from api.xline.xline_command_pb2 import Command, RequestWithToken, KeyRange
from api.xline.rpc_pb2 import (
RangeRequest as _RangeRequest,
PutRequest as _PutRequest,
DeleteRangeRequest as _DeleteRangeRequest,
TxnRequest as _TxnRequest,
Compare as _Compare,
RequestOp as _RequestOp,
TxnResponse as _TxnResponse,
)

RangeRequest = _RangeRequest
PutRequest = _PutRequest
DeleteRangeRequest = _DeleteRangeRequest
TxnRequest = _TxnRequest
Compare = _Compare
RequestOp = _RequestOp
TxnResponse = _TxnResponse


class Txn:
"""
Transaction.
Attributes:
name: Name of the Transaction, which will be used in CURP propose id generation.
curp_client: The client running the CURP protocol, communicate with all servers.
token: The auth token.
"""

name: str
curp_client: CurpClient
token: Optional[str]

cmps: List[Compare]
sus: List[RequestOp]
fas: List[RequestOp]

def __init__(self, name: str, curp_client: CurpClient, token: Optional[str]) -> None:
self.name = name
self.curp_client = curp_client
self.token = token

def when(self, cmps: List[Compare]):
"compare"
self.cmps = cmps
return self

def and_then(self, op: List[RequestOp]):
"true"
self.sus = op
return self

def or_else(self, op: List[RequestOp]):
"false"
self.fas = op
return self

async def commit(self) -> TxnResponse:
"commit"
krs = []
for cmp in self.cmps:
krs.append(KeyRange(key=cmp.key, range_end=cmp.range_end))
propose_id = self.generate_propose_id(self.name)
r = TxnRequest(compare=self.cmps, success=self.sus, failure=self.fas)
req = RequestWithToken(
txn_request=r,
token=self.token,
)
cmd = Command(
keys=krs,
request=req,
propose_id=propose_id,
)
er, _ = await self.curp_client.propose(cmd, False)
return er.txn_response

@staticmethod
def generate_propose_id(prefix: str) -> str:
"""Generate propose id with the given prefix"""
propose_id = f"{prefix}-{uuid.uuid4()}"
return propose_id
Loading

0 comments on commit 8134b71

Please sign in to comment.