diff --git a/octopoes/octopoes/xtdb/client.py b/octopoes/octopoes/xtdb/client.py index 529239c3064..4a56d05203d 100644 --- a/octopoes/octopoes/xtdb/client.py +++ b/octopoes/octopoes/xtdb/client.py @@ -163,7 +163,7 @@ def delete_node(self) -> None: raise XTDBException("Could not delete node") from e - def export_transactions(self) -> Any: + def export_transactions(self): res = self._session.get(f"{self.client_url()}/tx-log?with-ops?=true", headers={"Accept": "application/json"}) self._verify_response(res) return res.json() diff --git a/octopoes/tools/xtdb-cli.py b/octopoes/tools/xtdb-cli.py index c7a9feef0f3..53a50af3ef8 100755 --- a/octopoes/tools/xtdb-cli.py +++ b/octopoes/tools/xtdb-cli.py @@ -1,219 +1,252 @@ #!/usr/bin/env python -import argparse import datetime -import sys -from pathlib import Path - -import httpx - - -class XTDB: - def __init__(self, host: str, port: int, node: str): - self.host = host - self.port = port - self.node = node - - def _root(self, target: str = ""): - return f"http://{self.host}:{self.port}/_xtdb/{self.node}/{target}" - - def status(self): - headers = {"Accept": "application/json"} - req = httpx.get(self._root("status"), headers=headers) - return req.text - - def query(self, query: str = "{:query {:find [ ?var ] :where [[?var :xt/id ]]}}"): - headers = {"Accept": "application/json", "Content-Type": "application/edn"} - req = httpx.post(self._root("query"), headers=headers, data=query) - return req.text - - def entity(self, key: str): - headers = {"Accept": "application/json"} - req = httpx.get(self._root(f"entity?eid={key}"), headers=headers) - return req.text - - def history(self, key: str): - headers = {"Accept": "application/json"} - req = httpx.get(self._root(f"entity?eid={key}&history=true&sortOrder=asc"), headers=headers) - return req.text - - def entity_tx(self, key: str): - headers = {"Accept": "application/json"} - req = httpx.get(self._root(f"entity-tx?eid={key}"), headers=headers) - return req.text - - def attribute_stats(self): - headers = {"Accept": "application/json"} - req = httpx.get(self._root("attribute-stats"), headers=headers) - return req.text - - def sync(self, timeout: int = 500): - headers = {"Accept": "application/json"} - req = httpx.get(self._root(f"sync?timeout={timeout}"), headers=headers) - return req.text - - def await_tx(self, txid: int): - headers = {"Accept": "application/json"} - req = httpx.get(self._root(f"await-tx?txId={txid}"), headers=headers) - return req.text - - def await_tx_time(self, tm: str = datetime.datetime.now().isoformat()): - headers = {"Accept": "application/json"} - req = httpx.get(self._root(f"await-tx-time?tx-time={tm}"), headers=headers) - return req.text - - def tx_log(self): - headers = {"Accept": "application/json"} - req = httpx.get(self._root("tx-log"), headers=headers) - return req.text - - def tx_log_docs(self): - headers = {"Accept": "application/json"} - req = httpx.get(self._root("tx-log?with-ops?=true"), headers=headers) - return req.text - - def submit_tx(self, txs): - headers = {"Accept": "application/json", "Content-Type": "application/json"} - data = '{{"tx-ops": {}}}'.format(" ".join(txs)) - req = httpx.post(self._root("submit-tx"), headers=headers, data=data) - return req.text - - def tx_committed(self, txid: int): - headers = {"Accept": "application/json"} - req = httpx.get(self._root(f"tx_commited?txId={txid}"), headers=headers) - return req.text - - def latest_completed_tx(self): - headers = {"Accept": "application/json"} - req = httpx.get(self._root("latest-completed-tx"), headers=headers) - return req.text - - def latest_submitted_tx(self): - headers = {"Accept": "application/json"} - req = httpx.get(self._root("latest-submitted-tx"), headers=headers) - return req.text - - def active_queries(self): - headers = {"Accept": "application/json"} - req = httpx.get(self._root("active-queries"), headers=headers) - return req.text - - def recent_queries(self): - headers = {"Accept": "application/json"} - req = httpx.get(self._root("recent-queries"), headers=headers) - return req.text - - def slowest_queries(self): - headers = {"Accept": "application/json"} - req = httpx.get(self._root("recent-queries"), headers=headers) - return req.text - - -def dispatch(xtdb, instruction): - match instruction.pop(0): - case "list-keys": - return xtdb.query() - case "list-values": - return xtdb.query("{:query {:find [(pull ?var [*])] :where [[?var :xt/id]]}}") - case "submit-tx": - if instruction: - return xtdb.submit_tx(instruction) - case x: - call = getattr(xtdb, x.replace("-", "_")) - match call.__code__.co_argcount - 1: - case 1: - return call(instruction[0]) - case 0: - return call() - - -KEYWORDS = set( - [ - keyword.replace("_", "-") - for keyword in dir(XTDB) - if callable(getattr(XTDB, keyword)) and not keyword.startswith("_") - ] - + ["list-keys", "list-values"] +import json +import logging + +import click +from xtdb_client import XTDBClient + +logger = logging.getLogger(__name__) + + +@click.group( + context_settings={ + "help_option_names": ["-h", "--help"], + "max_content_width": 120, + "show_default": True, + } +) +@click.option("-n", "--node", default="0", help="XTDB node") +@click.option( + "-u", + "--url", + default="http://localhost:3000", + help="XTDB server base url", ) +@click.option( + "-t", + "--timeout", + type=int, + default=5000, + help="XTDB request timeout (in ms)", +) +@click.option("-v", "--verbosity", count=True, help="Increase the verbosity level") +@click.pass_context +def cli(ctx: click.Context, url: str, node: str, timeout: int, verbosity: int): + verbosities = [logging.WARN, logging.INFO, logging.DEBUG] + try: + if verbosity: + logging.basicConfig(level=verbosities[verbosity - 1]) + except IndexError: + raise click.UsageError("Invalid verbosity level (use -v, -vv, or -vvv)") + + client = XTDBClient(url, node, timeout) + logger.info("Instantiated XTDB client with endpoint %s for node %s", url, node) + + ctx.ensure_object(dict) + ctx.obj["client"] = client + + +@cli.command +@click.pass_context +def status(ctx: click.Context): + client: XTDBClient = ctx.obj["client"] + + click.echo(json.dumps(client.status())) + + +@cli.command(help='EDN Query (default: "{:query {:find [ ?var ] :where [[?var :xt/id ]]}}")') +@click.argument("edn", required=False) +@click.pass_context +def query(ctx: click.Context, edn: str): + client: XTDBClient = ctx.obj["client"] + + if edn: + click.echo(json.dumps(client.query(edn))) + else: + click.echo(json.dumps(client.query())) + + +@cli.command(help="List all keys in node") +@click.pass_context +def list_keys(ctx: click.Context): + client: XTDBClient = ctx.obj["client"] + + click.echo(json.dumps(client.query())) + + +@cli.command(help="List all values in node") +@click.pass_context +def list_values(ctx: click.Context): + client: XTDBClient = ctx.obj["client"] + + click.echo(json.dumps(client.query("{:query {:find [(pull ?var [*])] :where [[?var :xt/id]]}}"))) + + +@cli.command +@click.option("--tx-id", type=int) +@click.option("--tx-time", type=click.DateTime()) +@click.option("--valid-time", type=click.DateTime()) +@click.argument("key") +@click.pass_context +def entity( + ctx: click.Context, + key: str, + valid_time: datetime.datetime | None = None, + tx_time: datetime.datetime | None = None, + tx_id: int | None = None, +): + client: XTDBClient = ctx.obj["client"] + + click.echo(json.dumps(client.entity(key, valid_time, tx_time, tx_id))) + + +@cli.command +@click.option("--with-docs", is_flag=True) +@click.option("--with-corrections", is_flag=True) +@click.argument("key") +@click.pass_context +def history(ctx: click.Context, key: str, with_corrections: bool, with_docs: bool): + client: XTDBClient = ctx.obj["client"] + + click.echo(json.dumps(client.history(key, with_corrections, with_docs))) + + +@cli.command +@click.option("--tx-id", type=int) +@click.option("--tx-time", type=click.DateTime()) +@click.option("--valid-time", type=click.DateTime()) +@click.argument("key") +@click.pass_context +def entity_tx( + ctx: click.Context, + key: str, + valid_time: datetime.datetime | None = None, + tx_time: datetime.datetime | None = None, + tx_id: int | None = None, +): + client: XTDBClient = ctx.obj["client"] + + click.echo(json.dumps(client.entity_tx(key, valid_time, tx_time, tx_id))) + + +@cli.command +@click.pass_context +def attribute_stats(ctx: click.Context): + client: XTDBClient = ctx.obj["client"] + + click.echo(json.dumps(client.attribute_stats())) + + +@cli.command +@click.option("--timeout", type=int) +@click.pass_context +def sync(ctx: click.Context, timeout: int | None): + client: XTDBClient = ctx.obj["client"] + + click.echo(json.dumps(client.sync(timeout))) + + +@cli.command +@click.option("--timeout", type=int) +@click.argument("tx-id", type=int) +@click.pass_context +def await_tx(ctx: click.Context, tx_id: int, timeout: int | None): + client: XTDBClient = ctx.obj["client"] + + click.echo(json.dumps(client.await_tx(tx_id, timeout))) + + +@cli.command +@click.option("--timeout", type=int) +@click.argument("tx-time", type=click.DateTime()) +@click.pass_context +def await_tx_time( + ctx: click.Context, + tx_time: datetime.datetime, + timeout: int | None, +): + client: XTDBClient = ctx.obj["client"] + + click.echo(json.dumps(client.await_tx_time(tx_time, timeout))) + + +@cli.command +@click.option("--with-ops", is_flag=True) +@click.option("--after-tx-id", type=int) +@click.pass_context +def tx_log(ctx: click.Context, after_tx_id: int | None, with_ops: bool): + client: XTDBClient = ctx.obj["client"] + + click.echo(json.dumps(client.tx_log(after_tx_id, with_ops))) + + +@cli.command(help="Show all document transactions") +@click.pass_context +def txs(ctx: click.Context): + client: XTDBClient = ctx.obj["client"] + + click.echo(json.dumps(client.tx_log(None, True))) + + +@cli.command +@click.argument("txs", nargs=-1) +@click.pass_context +def submit_tx(ctx: click.Context, txs): + client: XTDBClient = ctx.obj["client"] + + click.echo(json.dumps(client.submit_tx(txs))) + + +@cli.command +@click.argument("tx-id", type=int) +@click.pass_context +def tx_committed(ctx: click.Context, tx_id: int) -> None: + client: XTDBClient = ctx.obj["client"] + + click.echo(json.dumps(client.tx_committed(tx_id))) + + +@cli.command +@click.pass_context +def latest_completed_tx(ctx: click.Context): + client: XTDBClient = ctx.obj["client"] + + click.echo(json.dumps(client.latest_completed_tx())) + + +@cli.command +@click.pass_context +def latest_submitted_tx(ctx: click.Context): + client: XTDBClient = ctx.obj["client"] + + click.echo(json.dumps(client.latest_submitted_tx())) + + +@cli.command +@click.pass_context +def active_queries(ctx: click.Context): + client: XTDBClient = ctx.obj["client"] + + click.echo(json.dumps(client.active_queries())) + + +@cli.command +@click.pass_context +def recent_queries(ctx: click.Context): + client: XTDBClient = ctx.obj["client"] + + click.echo(json.dumps(client.recent_queries())) + + +@cli.command +@click.pass_context +def slowest_queries(ctx: click.Context): + client: XTDBClient = ctx.obj["client"] -EPILOG = """ -As instructions the following keywords with arguments are supported: - status - query [edn-query] - list-keys - list-values - entity [xt/id] - history [xt/id] - entity-tx [xt/id] - attribute-stats - sync [timeout in ms] - await-tx [transaction id] - await-tx-time [time] - tx-log - tx-log-docs - submit-tx [transaction list] - tx-committed [transaction id] - latest-completed-tx - latest-submitted-tx - active-queries - recent-queries - slowest-queries - -If no keyword is given in the initial instruction either use -* a dash "-" to read stdin -* otherwise all instructions are treated as filenames - -See https://v1-docs.xtdb.com/clients/http/ for more information. - -OpenKAT https://openkat.nl/. -""" - - -def iparse(instructions): - idxs = [idx for idx, key in enumerate(instructions) if key in KEYWORDS] + [len(instructions)] - return [instructions[i:j] for i, j in zip(idxs, idxs[1:] + idxs[:1]) if instructions[i:j]] - - -def main(): - parser = argparse.ArgumentParser( - prog="xtdb-cli", - description="A command-line interface for xtdb multinode as used in OpenKAT", - epilog=EPILOG, - add_help=True, - allow_abbrev=True, - formatter_class=argparse.RawDescriptionHelpFormatter, - ) - parser.add_argument("--port", help="xtdb server port (default 3000)", type=int, default=3000) - parser.add_argument("--host", help="xtdb server hostname (default localhost)", type=str, default="localhost") - parser.add_argument("--node", help="xtdb node (default 0)", type=str, default="0") - parser.add_argument("instructions", type=str, nargs="*") - args = parser.parse_args() - xtdb = XTDB(args.host, args.port, args.node) - if args.instructions: - if args.instructions[0] in KEYWORDS: - for instruction in iparse(args.instructions): - result = dispatch(xtdb, instruction) - if result: - sys.stdout.write(f"{result}\n") - elif args.instructions[0] == "-": - for line in sys.stdin: - if line.rstrip() == "exit" or line.rstrip() == "quit": - break - for instruction in iparse(line.rstrip().split(" ")): - result = dispatch(xtdb, instruction) - if result: - sys.stdout.write(f"{result}\n") - else: - for fname in args.instructions: - with Path(fname).open("r") as file: - for line in file.readlines(): - if line.rstrip() == "exit" or line.rstrip() == "quit": - break - for instruction in iparse(line.rstrip().split(" ")): - result = dispatch(xtdb, instruction) - if result: - sys.stdout.write(f"{result}\n") + click.echo(json.dumps(client.slowest_queries())) if __name__ == "__main__": - main() + cli() diff --git a/octopoes/tools/xtdb_client.py b/octopoes/tools/xtdb_client.py new file mode 100644 index 00000000000..f353a1b76bf --- /dev/null +++ b/octopoes/tools/xtdb_client.py @@ -0,0 +1,154 @@ +import datetime + +import httpx +from pydantic import JsonValue + + +class XTDBClient: + def __init__(self, base_url: str, node: str, timeout: int | None = None): + self._client = httpx.Client( + base_url=f"{base_url}/_xtdb/{node}", + headers={"Accept": "application/json"}, + timeout=timeout, + ) + + def status(self) -> JsonValue: + res = self._client.get("/status") + + return res.json() + + def query(self, query: str = "{:query {:find [ ?var ] :where [[?var :xt/id ]]}}") -> JsonValue: + res = self._client.post("/query", content=query, headers={"Content-Type": "application/edn"}) + + return res.json() + + def entity( + self, + key: str, + valid_time: datetime.datetime | None = None, + tx_time: datetime.datetime | None = None, + tx_id: int | None = None, + ) -> JsonValue: + params = {"eid": key} + if valid_time is not None: + params["valid-time"] = valid_time.isoformat() + if tx_time is not None: + params["tx-time"] = tx_time.isoformat() + if tx_id is not None: + params["tx-id"] = str(tx_id) + + res = self._client.get("/entity", params=params) + + return res.json() + + def history(self, key: str, with_corrections: bool, with_docs: bool) -> JsonValue: + params = {"eid": key, "history": True, "sortOrder": "asc"} + if with_corrections: + params["with-corrections"] = "true" + if with_docs: + params["with-docs"] = "true" + + res = self._client.get("/entity", params=params) + + return res.json() + + def entity_tx( + self, + key: str, + valid_time: datetime.datetime | None = None, + tx_time: datetime.datetime | None = None, + tx_id: int | None = None, + ) -> JsonValue: + params = {"eid": key} + if valid_time is not None: + params["valid-time"] = valid_time.isoformat() + if tx_time is not None: + params["tx-time"] = tx_time.isoformat() + if tx_id is not None: + params["tx-id"] = str(tx_id) + res = self._client.get("/entity-tx", params=params) + + return res.json() + + def attribute_stats(self) -> JsonValue: + res = self._client.get("/attribute-stats") + + return res.json() + + def sync(self, timeout: int | None) -> JsonValue: + if timeout is not None: + res = self._client.get("/sync", params={"timeout": timeout}) + else: + res = self._client.get("/sync") + + return res.json() + + def await_tx(self, transaction_id: int, timeout: int | None) -> JsonValue: + params = {"txId": transaction_id} + if timeout is not None: + params["timeout"] = timeout + res = self._client.get("/await-tx", params=params) + + return res.json() + + def await_tx_time( + self, + transaction_time: datetime.datetime, + timeout: int | None, + ) -> JsonValue: + params = {"tx-time": transaction_time.isoformat()} + if timeout is not None: + params["timeout"] = str(timeout) + res = self._client.get("/await-tx-time", params=params) + + return res.json() + + def tx_log( + self, + after_tx_id: int | None, + with_ops: bool, + ) -> JsonValue: + params = {} + if after_tx_id is not None: + params["after-tx-id"] = after_tx_id + if with_ops: + params["with-ops?"] = True + + res = self._client.get("/tx-log", params=params) + + return res.json() + + def submit_tx(self, transactions: list[str]) -> JsonValue: + res = self._client.post("/submit-tx", json={"tx-ops": transactions}) + + return res.json() + + def tx_committed(self, txid: int) -> JsonValue: + res = self._client.get("/tx-committed", params={"txId": txid}) + + return res.json() + + def latest_completed_tx(self) -> JsonValue: + res = self._client.get("/latest-completed-tx") + + return res.json() + + def latest_submitted_tx(self) -> JsonValue: + res = self._client.get("/latest-submitted-tx") + + return res.json() + + def active_queries(self) -> JsonValue: + res = self._client.get("/active-queries") + + return res.json() + + def recent_queries(self) -> JsonValue: + res = self._client.get("/recent-queries") + + return res.json() + + def slowest_queries(self) -> JsonValue: + res = self._client.get("/recent-queries") + + return res.json()