diff --git a/Cargo.lock b/Cargo.lock index 2c014fc2b8..ca708073f1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -49,7 +49,7 @@ version = "0.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9e8b47f52ea9bae42228d07ec09eb676433d7c4ed1ebdf0f1d1c29ed446f1ab8" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", "cipher 0.3.0", "cpufeatures", "opaque-debug", @@ -106,7 +106,7 @@ version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2c99f64d1e06488f620f932677e24bc6e2897582980441ae90a671415bd7ec2f" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", "once_cell", "version_check", ] @@ -157,12 +157,6 @@ version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bddcadddf5e9015d310179a59bb28c4d4b9920ad0f11e8e14dbadf654890c9a6" -[[package]] -name = "arrayref" -version = "0.3.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a4c527152e37cf757a3f78aae5a06fbeefdb07ccc535c980a3208ee3060dd544" - [[package]] name = "arrayvec" version = "0.5.2" @@ -310,7 +304,7 @@ dependencies = [ "async-lock", "autocfg", "blocking", - "cfg-if 1.0.0", + "cfg-if", "event-listener", "futures-lite", "libc", @@ -318,27 +312,6 @@ dependencies = [ "windows-sys 0.42.0", ] -[[package]] -name = "async-session" -version = "2.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "345022a2eed092cd105cc1b26fd61c341e100bd5fcbbd792df4baf31c2cc631f" -dependencies = [ - "anyhow", - "async-std", - "async-trait", - "base64 0.12.3", - "bincode", - "blake3", - "chrono", - "hmac 0.8.1", - "kv-log-macro", - "rand 0.7.3", - "serde", - "serde_json", - "sha2 0.9.9", -] - [[package]] name = "async-sse" version = "4.1.0" @@ -876,6 +849,68 @@ dependencies = [ "tracing", ] +[[package]] +name = "axum" +version = "0.6.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4e246206a63c9830e118d12c894f56a82033da1a2361f5544deeee3df85c99d9" +dependencies = [ + "async-trait", + "axum-core", + "bitflags", + "bytes 1.4.0", + "futures-util", + "http", + "http-body", + "hyper", + "itoa 1.0.5", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite 0.2.9", + "rustversion", + "serde", + "serde_json", + "serde_path_to_error", + "serde_urlencoded 0.7.1", + "sync_wrapper", + "tokio", + "tower", + "tower-http", + "tower-layer", + "tower-service", +] + +[[package]] +name = "axum-core" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1cae3e661676ffbacb30f1a824089a8c9150e71017f7e1e38f2aa32009188d34" +dependencies = [ + "async-trait", + "bytes 1.4.0", + "futures-util", + "http", + "http-body", + "mime", + "rustversion", + "tower-layer", + "tower-service", +] + +[[package]] +name = "axum-macros" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5fbf955307ff8addb48d2399393c9e2740dd491537ec562b66ab364fc4a38841" +dependencies = [ + "heck 0.4.1", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "backtrace" version = "0.3.67" @@ -884,7 +919,7 @@ checksum = "233d376d6d185f2a3093e58f283f60f880315b6c60075b01f36b3b85154564ca" dependencies = [ "addr2line", "cc", - "cfg-if 1.0.0", + "cfg-if", "libc", "miniz_oxide", "object", @@ -1004,21 +1039,6 @@ version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" -[[package]] -name = "blake3" -version = "0.3.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b64485778c4f16a6a5a9d335e80d449ac6c70cdd6a06d2af18a6f6f775a125b3" -dependencies = [ - "arrayref", - "arrayvec", - "cc", - "cfg-if 0.1.10", - "constant_time_eq", - "crypto-mac 0.8.0", - "digest 0.9.0", -] - [[package]] name = "block-buffer" version = "0.9.0" @@ -1184,12 +1204,6 @@ dependencies = [ "nom 7.1.3", ] -[[package]] -name = "cfg-if" -version = "0.1.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822" - [[package]] name = "cfg-if" version = "1.0.0" @@ -1611,7 +1625,7 @@ version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b540bd8bc810d3885c6ea91e2018302f68baba2129ab3e88f32389ee9370880d" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", ] [[package]] @@ -1667,7 +1681,7 @@ version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2801af0d36612ae591caa9568261fddce32ce6e08a7275ea334a06a4ad021a2c" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", "crossbeam-channel", "crossbeam-deque", "crossbeam-epoch", @@ -1681,7 +1695,7 @@ version = "0.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2dd04ddaf88237dc3b8d8f9a3c1004b506b54b3313403944054d23c0870c521" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", "crossbeam-utils", ] @@ -1691,7 +1705,7 @@ version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "715e8152b692bba2d374b53d4875445368fdf21a94751410af607a5ac677d1fc" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", "crossbeam-epoch", "crossbeam-utils", ] @@ -1703,7 +1717,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "01a9af1f4c2ef74bb8aa1f7e19706bc72d03598c8a570bb5de72243c7a9d9d5a" dependencies = [ "autocfg", - "cfg-if 1.0.0", + "cfg-if", "crossbeam-utils", "memoffset", "scopeguard", @@ -1715,7 +1729,7 @@ version = "0.3.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d1cfb3ea8a53f37c40dea2c7bedcbd88bdfae54f5e2175d6ecaff1c988353add" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", "crossbeam-utils", ] @@ -1725,7 +1739,7 @@ version = "0.8.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4fb766fa798726286dbbb842f174001dab8abc7b627a1dd86e0b7222a95d929f" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", ] [[package]] @@ -1744,16 +1758,6 @@ dependencies = [ "typenum", ] -[[package]] -name = "crypto-mac" -version = "0.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b584a330336237c1eecd3e94266efb216c56ed91225d634cb2991c5f3fd1aeab" -dependencies = [ - "generic-array", - "subtle", -] - [[package]] name = "crypto-mac" version = "0.10.1" @@ -1919,7 +1923,7 @@ version = "5.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "907076dfda823b0b36d2a1bb5f90c96660a5bbcd7729e10727f07858f22c4edc" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", "hashbrown 0.12.3", "lock_api", "once_cell", @@ -2030,7 +2034,7 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b98cf8ebf19c3d1b223e151f99a4f9f0690dca41414773390fc824184ac833e1" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", "dirs-sys-next", ] @@ -2158,7 +2162,7 @@ version = "0.8.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "071a31f4ee85403370b58aca746f01041ede6f0da2730960ad001edc2b71b394" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", ] [[package]] @@ -2200,15 +2204,6 @@ dependencies = [ "termcolor", ] -[[package]] -name = "erased-serde" -version = "0.3.24" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e4ca605381c017ec7a5fef5e548f1cfaa419ed0f6df6367339300db74c92aa7d" -dependencies = [ - "serde", -] - [[package]] name = "errno" version = "0.2.8" @@ -2267,22 +2262,6 @@ dependencies = [ "instant", ] -[[package]] -name = "femme" -version = "2.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cc04871e5ae3aa2952d552dae6b291b3099723bf779a8054281c1366a54613ef" -dependencies = [ - "cfg-if 1.0.0", - "js-sys", - "log", - "serde", - "serde_derive", - "serde_json", - "wasm-bindgen", - "web-sys", -] - [[package]] name = "file-mode" version = "0.1.2" @@ -2298,7 +2277,7 @@ version = "0.2.19" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4e884668cd0c7480504233e951174ddc3b382f7c2666e3b7310b5c4e7b0c37f9" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", "libc", "redox_syscall", "windows-sys 0.42.0", @@ -2518,7 +2497,7 @@ version = "0.1.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8fc3cb4d91f53b50155bdcfd23f6a4c39ae1969c2ae85982b135750cccaf5fce" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", "libc", "wasi 0.9.0+wasi-snapshot-preview1", ] @@ -2529,7 +2508,7 @@ version = "0.2.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c05aeb6a22b8f62540c194aac980f2115af067bfe15a0734d7277a768d396b31" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", "libc", "wasi 0.11.0+wasi-snapshot-preview1", ] @@ -2760,23 +2739,13 @@ dependencies = [ "hmac 0.10.1", ] -[[package]] -name = "hmac" -version = "0.8.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "126888268dcc288495a26bf004b38c5fdbb31682f992c84ceb046a1f0fe38840" -dependencies = [ - "crypto-mac 0.8.0", - "digest 0.9.0", -] - [[package]] name = "hmac" version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c1441c6b1e930e2817404b5046f1f989899143a12bf92de603b69f4e0aee1e15" dependencies = [ - "crypto-mac 0.10.1", + "crypto-mac", "digest 0.9.0", ] @@ -2832,7 +2801,7 @@ dependencies = [ "async-std", "async-tls", "async-trait", - "cfg-if 1.0.0", + "cfg-if", "dashmap", "deadpool", "futures", @@ -2841,6 +2810,12 @@ dependencies = [ "rustls 0.18.1", ] +[[package]] +name = "http-range-header" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0bfe8eed0a9285ef776bb792479ea3834e8b94e13d615c2f66d03dd50a435a29" + [[package]] name = "http-types" version = "2.12.0" @@ -3048,7 +3023,7 @@ version = "0.1.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", ] [[package]] @@ -3224,7 +3199,7 @@ checksum = "6607c62aa161d23d17a9072cc5da0be67cdfc89d3afb1e8d9c842bebc2525ffe" dependencies = [ "arrayvec", "bitflags", - "cfg-if 1.0.0", + "cfg-if", "ryu", "static_assertions", ] @@ -3325,7 +3300,7 @@ version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b67380fd3b2fbe7527a606e18729d21c6f3951633d0500574c4dc22d2d638b9f" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", "winapi", ] @@ -3399,7 +3374,7 @@ version = "0.4.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "abb12e687cfb44aa40f41fc3978ef76448f9b6038cad6aef4259d3c095a2382e" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", "serde", "value-bag", ] @@ -3503,6 +3478,12 @@ version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2532096657941c2fea9c289d370a250971c689d4f143798ff67113ec042024a5" +[[package]] +name = "matchit" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b87248edafb776e59e6ee64a79086f65890d3510f2c656c000bf2a7e8a0aea40" + [[package]] name = "matrixmultiply" version = "0.3.2" @@ -3819,7 +3800,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b102428fd03bc5edf97f62620f7298614c45cedf287c271e7ed450bbaf83f2e1" dependencies = [ "bitflags", - "cfg-if 1.0.0", + "cfg-if", "foreign-types", "libc", "once_cell", @@ -3948,7 +3929,7 @@ version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "60a2cfe6f0ad2bfc16aefa463b497d5c7a5ecd44a23efa72aa342d90177356dc" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", "instant", "libc", "redox_syscall", @@ -3962,7 +3943,7 @@ version = "0.9.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9069cbb9f99e3a5083476ccb29ceb1de18b9118cafa53e90c9551235de2b9521" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", "libc", "redox_syscall", "smallvec", @@ -4173,7 +4154,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "22122d5ec4f9fe1b3916419b76be1e80bcb93f618d071d2edf841b137b2a2bd6" dependencies = [ "autocfg", - "cfg-if 1.0.0", + "cfg-if", "libc", "log", "wepoll-ffi", @@ -4977,22 +4958,22 @@ dependencies = [ ] [[package]] -name = "serde_fmt" -version = "1.0.1" +name = "serde_json" +version = "1.0.93" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2963a69a2b3918c1dc75a45a18bd3fcd1120e31d3f59deb1b2f9b5d5ffb8baa4" +checksum = "cad406b69c91885b5107daf2c29572f6c8cdb3c66826821e286c533490c0bc76" dependencies = [ + "itoa 1.0.5", + "ryu", "serde", ] [[package]] -name = "serde_json" -version = "1.0.93" +name = "serde_path_to_error" +version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cad406b69c91885b5107daf2c29572f6c8cdb3c66826821e286c533490c0bc76" +checksum = "26b04f22b563c91331a10074bda3dd5492e3cc39d56bd557e91c0af42b6c7341" dependencies = [ - "itoa 1.0.5", - "ryu", "serde", ] @@ -5089,7 +5070,7 @@ dependencies = [ "base64 0.13.1", "bitflags", "bytes 1.4.0", - "cfg-if 1.0.0", + "cfg-if", "dashmap", "flate2", "futures", @@ -5139,7 +5120,7 @@ version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f5058ada175748e33390e40e872bd0fe59a19f265d0158daa551c5a88a76009c" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", "cpufeatures", "digest 0.10.6", ] @@ -5159,7 +5140,7 @@ version = "0.10.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f04293dc80c3993519f2d7f6f511707ee7094fe0c6d3406feb330cdb3540eba3" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", "cpufeatures", "digest 0.10.6", ] @@ -5177,7 +5158,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4d58a1e1bf39749807d89cf2d98ac2dfa0ff1cb3faa38fbb64dd88ac8013d800" dependencies = [ "block-buffer 0.9.0", - "cfg-if 1.0.0", + "cfg-if", "cpufeatures", "digest 0.9.0", "opaque-debug", @@ -5189,7 +5170,7 @@ version = "0.10.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "82e6b795fe2e3b1e845bafcb27aa35405c4d47cdfc92af5fc8d3002f76cebdc0" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", "cpufeatures", "digest 0.10.6", ] @@ -5510,7 +5491,7 @@ checksum = "718b1ae6b50351982dedff021db0def601677f2120938b070eadb10ba4038dd7" dependencies = [ "async-std", "async-trait", - "cfg-if 1.0.0", + "cfg-if", "encoding_rs", "futures-util", "getrandom 0.2.8", @@ -5531,9 +5512,6 @@ name = "sval" version = "1.0.0-alpha.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "45f6ee7c7b87caf59549e9fe45d6a69c75c8019e79e212a835c5da0e92f0ba08" -dependencies = [ - "serde", -] [[package]] name = "syn" @@ -5546,6 +5524,12 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "sync_wrapper" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" + [[package]] name = "syslog_loose" version = "0.18.0" @@ -5630,7 +5614,7 @@ version = "3.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5cdb1ef4eaeeaddc8fbd371e5017057064af0911902ef36b39801f67cc6d79e4" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", "fastrand", "libc", "redox_syscall", @@ -5673,7 +5657,7 @@ version = "3.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "72dc21b5887f4032c4656502d085dc28f2afbb686f25f216472bb0526f4b1b88" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", "proc-macro-error", "proc-macro2", "quote", @@ -5765,11 +5749,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c459573f0dd2cc734b539047f57489ea875af8ee950860ded20cf93a79a1dee0" dependencies = [ "async-h1", - "async-session", "async-sse", "async-std", "async-trait", - "femme", "futures-util", "http-client", "http-types", @@ -6126,6 +6108,25 @@ dependencies = [ "tracing", ] +[[package]] +name = "tower-http" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f873044bf02dd1e8239e9c1293ea39dad76dc594ec16185d0a1bf31d8dc8d858" +dependencies = [ + "bitflags", + "bytes 1.4.0", + "futures-core", + "futures-util", + "http", + "http-body", + "http-range-header", + "pin-project-lite 0.2.9", + "tower", + "tower-layer", + "tower-service", +] + [[package]] name = "tower-layer" version = "0.3.2" @@ -6144,7 +6145,7 @@ version = "0.1.37" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", "log", "pin-project-lite 0.2.9", "tracing-attributes", @@ -6370,6 +6371,8 @@ dependencies = [ "aws-sdk-s3", "aws-smithy-http", "aws-types", + "axum", + "axum-macros", "base64 0.21.0", "beef", "bimap", @@ -6452,7 +6455,6 @@ dependencies = [ "tempfile", "test-case", "testcontainers", - "tide", "tokio", "tokio-rustls 0.23.4", "tokio-stream", @@ -6550,7 +6552,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4f7f83d1e4a0e4358ac54c5c3681e5d7da5efc5a7a632c90bb6d6669ddd9bc26" dependencies = [ "async-trait", - "cfg-if 1.0.0", + "cfg-if", "data-encoding", "enum-as-inner", "futures-channel", @@ -6574,7 +6576,7 @@ version = "0.22.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "aff21aa4dcefb0a1afbfac26deb0adc93888c7d295fb63ab273ef276ba2b7cfe" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", "futures-util", "ipconfig", "lazy_static", @@ -6822,9 +6824,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2209b78d1249f7e6f3293657c9779fe31ced465df091bbd433a1cf88e916ec55" dependencies = [ "ctor", - "erased-serde", - "serde", - "serde_fmt", "sval", "version_check", ] @@ -6940,9 +6939,7 @@ version = "0.2.84" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "31f8dcbc21f30d9b8f2ea926ecb58f6b91192c17e9d33594b3df58b2007ca53b" dependencies = [ - "cfg-if 1.0.0", - "serde", - "serde_json", + "cfg-if", "wasm-bindgen-macro", ] @@ -6967,7 +6964,7 @@ version = "0.4.34" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f219e0d211ba40266969f6dbdd90636da12f75bee4fc9d6c23d1260dadb51454" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", "js-sys", "wasm-bindgen", "web-sys", diff --git a/Cargo.toml b/Cargo.toml index e193c94fe5..2daf7776ac 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -103,8 +103,9 @@ tremor-script = { path = "tremor-script", features = ["arena-delete"] } tremor-value = { path = "tremor-value" } url = "2.3" value-trait = "0.5" -tide = "0.16" zstd = "*" +axum = "0.6" +axum-macros = "0.3" # blaster / blackhole hdrhistogram = "7" @@ -232,7 +233,7 @@ num_cpus = "*" [features] -default = [] +default = ["integration"] # support for 128bit numbers in tremor-value 128bit = ["tremor-value/128bit"] diff --git a/src/connectors/google.rs b/src/connectors/google.rs index 1d44fffefc..e39b62bea9 100644 --- a/src/connectors/google.rs +++ b/src/connectors/google.rs @@ -116,7 +116,7 @@ pub(crate) mod tests { use super::*; use crate::{ - connectors::utils::EnvHelper, + connectors::{tests::free_port, utils::EnvHelper}, errors::{Error, Result}, }; use std::{convert::Infallible, io::Write, net::ToSocketAddrs}; @@ -199,7 +199,7 @@ PX8efvDMhv16QqDFF0k80d0= async fn gouth_token() -> Result<()> { let mut file = tempfile::NamedTempFile::new()?; - let port = crate::utils::free_port::find_free_tcp_port().await?; + let port = free_port::find_free_tcp_port().await?; let sa = ServiceAccount { client_email: "snot@tremor.rs".to_string(), private_key_id: "badger".to_string(), @@ -243,9 +243,7 @@ PX8efvDMhv16QqDFF0k80d0= if let Ok(token) = provider.get_token() { break token; } - if attempt >= 20 { - panic!("Failed to get token"); - } + assert!(attempt < 20, "Failed to get token"); attempt += 1; tokio::time::sleep(std::time::Duration::from_millis(100)).await; }; diff --git a/src/connectors/impls/gbq/writer/sink.rs b/src/connectors/impls/gbq/writer/sink.rs index 648bf0d5fb..fefd649fd5 100644 --- a/src/connectors/impls/gbq/writer/sink.rs +++ b/src/connectors/impls/gbq/writer/sink.rs @@ -826,6 +826,7 @@ mod test { scale: 0, }], &SinkContext::new( + openraft::NodeId::default(), SinkUId::default(), Alias::new("flow", "connector"), ConnectorType::default(), @@ -864,6 +865,7 @@ mod test { scale: 0, }], &SinkContext::new( + openraft::NodeId::default(), SinkUId::default(), Alias::new("flow", "connector"), ConnectorType::default(), @@ -904,6 +906,7 @@ mod test { scale: 0, }], &SinkContext::new( + openraft::NodeId::default(), SinkUId::default(), Alias::new("flow", "connector"), ConnectorType::default(), @@ -1129,6 +1132,7 @@ mod test { let (rx, _tx) = bounded(1024); let ctx = SinkContext::new( + openraft::NodeId::default(), SinkUId::default(), Alias::new(FlowInstanceId::new(AppId::default(), "flow"), "connector"), ConnectorType::default(), @@ -1178,6 +1182,7 @@ mod test { let (rx, _tx) = bounded(1024); let ctx = SinkContext::new( + openraft::NodeId::default(), SinkUId::default(), Alias::new("flow", "connector"), ConnectorType::default(), @@ -1223,6 +1228,7 @@ mod test { let (rx, _tx) = bounded(1024); let ctx = SinkContext::new( + openraft::NodeId::default(), SinkUId::default(), Alias::new("flow", "connector"), ConnectorType::default(), @@ -1269,6 +1275,7 @@ mod test { let (rx, _tx) = bounded(1024); let ctx = SinkContext::new( + openraft::NodeId::default(), SinkUId::default(), Alias::new("flow", "connector"), ConnectorType::default(), @@ -1313,6 +1320,7 @@ mod test { let (rx, _tx) = bounded(1024); let ctx = SinkContext::new( + openraft::NodeId::default(), SinkUId::default(), Alias::new("flow", "connector"), ConnectorType::default(), @@ -1348,6 +1356,7 @@ mod test { let (rx, _tx) = bounded(1024); let ctx = SinkContext::new( + openraft::NodeId::default(), SinkUId::default(), Alias::new("flow", "connector"), ConnectorType::default(), @@ -1408,6 +1417,7 @@ mod test { "", Event::signal_tick(), &SinkContext::new( + openraft::NodeId::default(), SinkUId::default(), Alias::new("flow", "connector"), ConnectorType::default(), @@ -1450,6 +1460,7 @@ mod test { "", Event::signal_tick(), &SinkContext::new( + openraft::NodeId::default(), SinkUId::default(), Alias::new("flow", "connector"), ConnectorType::default(), @@ -1523,6 +1534,7 @@ mod test { ); let ctx = SinkContext::new( + openraft::NodeId::default(), SinkUId::default(), Alias::new("flow", "connector"), ConnectorType::default(), @@ -1605,8 +1617,9 @@ mod test { ); let ctx = SinkContext::new( + openraft::NodeId::default(), SinkUId::default(), - AAlias::new(FlowInstanceId::new(AppId::default(), "flow"), "connector"), + Alias::new(FlowInstanceId::new(AppId::default(), "flow"), "connector"), ConnectorType::default(), QuiescenceBeacon::default(), ConnectionLostNotifier::new(crate::channel::bounded(1024).0), diff --git a/src/connectors/impls/gcl/writer/sink.rs b/src/connectors/impls/gcl/writer/sink.rs index e367e9bfe2..2e93adfd83 100644 --- a/src/connectors/impls/gcl/writer/sink.rs +++ b/src/connectors/impls/gcl/writer/sink.rs @@ -254,6 +254,7 @@ mod test { google::tests::TestTokenProvider, utils::quiescence::QuiescenceBeacon, }; use crate::ids::{AppId, FlowInstanceId}; + use bytes::Bytes; use futures::future::Ready; use googapis::google::logging::r#type::LogSeverity; use googapis::google::logging::v2::WriteLogEntriesResponse; @@ -371,7 +372,7 @@ mod test { ConnectionLostNotifier::new(connection_lost_tx), ); - sink.connect(&ctx, &Attempt::default()).await?; + sink.connect(&sink_context, &Attempt::default()).await?; let event = Event { id: EventId::new(1, 2, 3, 4), @@ -388,7 +389,7 @@ mod test { sink.on_event( "", event.clone(), - &ctx, + &sink_context, &mut EventSerializer::new( None, CodecReq::Structured, @@ -460,7 +461,7 @@ mod test { "", Event::signal_tick(), &SinkContext::new( - // NodeId::default() + openraft::NodeId::default(), SinkUId::default(), Alias::new(FlowInstanceId::new(AppId::default(), ""), ""), ConnectorType::default(), diff --git a/src/connectors/tests/clickhouse/more_complex_test.rs b/src/connectors/tests/clickhouse/more_complex_test.rs index d20f94288c..0925469891 100644 --- a/src/connectors/tests/clickhouse/more_complex_test.rs +++ b/src/connectors/tests/clickhouse/more_complex_test.rs @@ -59,9 +59,11 @@ use tremor_value::literal; use super::utils; use crate::{ - connectors::{impls::clickhouse, tests::ConnectorHarness}, + connectors::{ + impls::clickhouse, + tests::{free_port, ConnectorHarness}, + }, errors::{Error, Result}, - utils::free_port, }; macro_rules! assert_row_equals { diff --git a/src/connectors/tests/clickhouse/simple_test.rs b/src/connectors/tests/clickhouse/simple_test.rs index 0e062170d2..3c19375a61 100644 --- a/src/connectors/tests/clickhouse/simple_test.rs +++ b/src/connectors/tests/clickhouse/simple_test.rs @@ -18,10 +18,9 @@ use crate::{ connectors::{ impls::clickhouse, - tests::{clickhouse::utils, ConnectorHarness}, + tests::{clickhouse::utils, free_port, ConnectorHarness}, }, errors::{Error, Result}, - utils::free_port, }; use std::time::{Duration, Instant}; diff --git a/src/connectors/tests/elastic.rs b/src/connectors/tests/elastic.rs index 11d7a10f8e..d723c21f6c 100644 --- a/src/connectors/tests/elastic.rs +++ b/src/connectors/tests/elastic.rs @@ -15,8 +15,8 @@ use std::time::{Duration, Instant}; use super::{setup_for_tls, ConnectorHarness}; -use crate::connectors::impls::elastic; use crate::connectors::impls::http::auth::Auth; +use crate::connectors::{impls::elastic, tests::free_port}; use crate::errors::{Error, Result}; use elasticsearch::auth::{ClientCertificate, Credentials}; use elasticsearch::cert::{Certificate, CertificateValidation}; @@ -44,7 +44,7 @@ async fn connector_elastic() -> Result<()> { let _ = env_logger::try_init(); let docker = clients::Cli::default(); - let port = crate::utils::free_port::find_free_tcp_port().await?; + let port = free_port::find_free_tcp_port().await?; let image = RunnableImage::from( GenericImage::new("elasticsearch", ELASTICSEARCH_VERSION) .with_env_var("discovery.type", "single-node") @@ -468,7 +468,7 @@ async fn elastic_routing() -> Result<()> { let _ = env_logger::try_init(); let docker = clients::Cli::default(); - let port = super::free_port::find_free_tcp_port().await?; + let port = free_port::find_free_tcp_port().await?; let image = RunnableImage::from( GenericImage::new("elasticsearch", ELASTICSEARCH_VERSION) .with_env_var("discovery.type", "single-node") @@ -792,7 +792,7 @@ async fn elastic_routing() -> Result<()> { async fn auth_basic() -> Result<()> { let _ = env_logger::try_init(); let docker = clients::Cli::default(); - let port = crate::utils::free_port::find_free_tcp_port().await?; + let port = free_port::find_free_tcp_port().await?; let password = "snot"; let image = RunnableImage::from( GenericImage::new("elasticsearch", ELASTICSEARCH_VERSION) @@ -856,7 +856,7 @@ async fn auth_basic() -> Result<()> { async fn auth_api_key() -> Result<()> { let _ = env_logger::try_init(); let docker = clients::Cli::default(); - let port = crate::utils::free_port::find_free_tcp_port().await?; + let port = free_port::find_free_tcp_port().await?; let password = "snot"; let image = RunnableImage::from( GenericImage::new("elasticsearch", ELASTICSEARCH_VERSION) @@ -948,7 +948,7 @@ async fn auth_client_cert() -> Result<()> { }; let docker = clients::Cli::default(); - let port = crate::utils::free_port::find_free_tcp_port().await?; + let port = free_port::find_free_tcp_port().await?; let password = "snot"; let image = RunnableImage::from( GenericImage::new("elasticsearch", ELASTICSEARCH_VERSION) @@ -1089,7 +1089,7 @@ async fn elastic_https() -> Result<()> { }; let docker = clients::Cli::default(); - let port = crate::utils::free_port::find_free_tcp_port().await?; + let port = free_port::find_free_tcp_port().await?; let password = "snot"; let image = RunnableImage::from( GenericImage::new("elasticsearch", ELASTICSEARCH_VERSION) diff --git a/src/connectors/tests/http/client.rs b/src/connectors/tests/http/client.rs index 1a5fe3ef42..81a919697f 100644 --- a/src/connectors/tests/http/client.rs +++ b/src/connectors/tests/http/client.rs @@ -20,7 +20,6 @@ use crate::{ utils::url::HttpDefaults, }, errors::Result, - utils::free_port::find_free_tcp_port, }; use hyper::StatusCode; use hyper::{ diff --git a/src/connectors/tests/http/server.rs b/src/connectors/tests/http/server.rs index 007f541c19..caa92b6e96 100644 --- a/src/connectors/tests/http/server.rs +++ b/src/connectors/tests/http/server.rs @@ -19,7 +19,6 @@ use crate::{ utils::tls::TLSClientConfig, }, errors::Result, - utils::free_port, }; use http::StatusCode; use http_body::Body as BodyTrait; diff --git a/src/connectors/tests/kafka.rs b/src/connectors/tests/kafka.rs index 1cf1f6cf07..b458e7d889 100644 --- a/src/connectors/tests/kafka.rs +++ b/src/connectors/tests/kafka.rs @@ -14,7 +14,8 @@ mod consumer; mod producer; -use crate::{errors::Result, utils::free_port::find_free_tcp_port}; +use super::free_port::find_free_tcp_port; +use crate::errors::Result; use std::time::Duration; use testcontainers::{ clients::Cli as DockerCli, core::WaitFor, images::generic::GenericImage, Container, diff --git a/src/connectors/tests/kafka/consumer.rs b/src/connectors/tests/kafka/consumer.rs index baae798a33..ff7df1c5ef 100644 --- a/src/connectors/tests/kafka/consumer.rs +++ b/src/connectors/tests/kafka/consumer.rs @@ -16,12 +16,12 @@ use crate::{ connectors::{ impls::kafka, tests::{ + free_port::find_free_tcp_port, kafka::{redpanda_container, PRODUCE_TIMEOUT}, ConnectorHarness, }, }, errors::Result, - utils::free_port, }; use beef::Cow; use rdkafka::{ @@ -687,7 +687,7 @@ async fn performance() -> Result<()> { #[tokio::test(flavor = "multi_thread")] #[serial(kafka)] async fn connector_kafka_consumer_unreachable() -> Result<()> { - let kafka_port = free_port::find_free_tcp_port().await?; + let kafka_port = find_free_tcp_port().await?; let _ = env_logger::try_init(); let connector_config = literal!({ "reconnect": { @@ -725,7 +725,7 @@ async fn connector_kafka_consumer_unreachable() -> Result<()> { #[tokio::test(flavor = "multi_thread")] async fn invalid_rdkafka_options() -> Result<()> { let _ = env_logger::try_init(); - let kafka_port = free_port::find_free_tcp_port().await?; + let kafka_port = find_free_tcp_port().await?; let broker = format!("127.0.0.1:{kafka_port}"); let topic = "tremor_test_pause_resume"; let group_id = "invalid_rdkafka_options"; diff --git a/src/connectors/tests/s3.rs b/src/connectors/tests/s3.rs index fff256a7c7..39e8b1fa06 100644 --- a/src/connectors/tests/s3.rs +++ b/src/connectors/tests/s3.rs @@ -20,7 +20,8 @@ use rand::{distributions::Alphanumeric, Rng}; use std::time::{Duration, Instant}; use testcontainers::{clients::Cli, images::generic::GenericImage, Container, RunnableImage}; -use crate::utils::free_port::find_free_tcp_port; +use super::free_port::find_free_tcp_port; + const IMAGE: &str = "minio/minio"; const TAG: &str = "RELEASE.2023-01-12T02-06-16Z"; diff --git a/src/connectors/tests/tcp/client.rs b/src/connectors/tests/tcp/client.rs index 022dd05b54..42f09b7ddc 100644 --- a/src/connectors/tests/tcp/client.rs +++ b/src/connectors/tests/tcp/client.rs @@ -15,10 +15,9 @@ use crate::{ connectors::{ impls::tcp, - tests::{setup_for_tls, tcp::EchoServer, ConnectorHarness}, + tests::{free_port::find_free_tcp_port, setup_for_tls, tcp::EchoServer, ConnectorHarness}, }, errors::Result, - utils::free_port, }; use std::time::Duration; use tokio::net::lookup_host; @@ -41,7 +40,7 @@ async fn tcp_client() -> Result<()> { async fn tcp_client_test(use_tls: bool) -> Result<()> { let _ = env_logger::try_init(); - let free_port = free_port::find_free_tcp_port().await?; + let free_port = find_free_tcp_port().await?; let server_addr = format!("localhost:{free_port}"); diff --git a/src/raft/api.rs b/src/raft/api.rs index ad8d465676..7b500acae6 100644 --- a/src/raft/api.rs +++ b/src/raft/api.rs @@ -21,8 +21,9 @@ mod cluster; mod kv; pub(crate) mod worker; -use crate::channel::{bounded, Sender}; +use self::apps::AppState; use crate::{ + channel::{bounded, Sender}, ids::{AppId, FlowDefinitionId, FlowInstanceId}, raft::{ node::Addr, @@ -30,7 +31,12 @@ use crate::{ TremorRaftImpl, }, }; -use futures::Future; +use axum::{ + http::StatusCode, + response::{IntoResponse, Response}, + Router, +}; +use http::{header::LOCATION, HeaderMap, Uri}; use openraft::{ error::{ AddLearnerError, ChangeMembershipError, ClientReadError, ClientWriteError, Fatal, @@ -41,17 +47,14 @@ use openraft::{ }; use std::collections::HashMap; use std::{collections::BTreeSet, num::ParseIntError, sync::Arc, time::Duration}; -use tide::{http::headers::LOCATION, Body, Endpoint, Request, Response, StatusCode}; use tokio::{task::JoinHandle, time::timeout}; -use self::apps::AppState; - -type Server = tide::Server>; -type APIRequest = Request>; +pub(crate) type APIRequest = Arc; pub type APIResult = Result; const API_WORKER_TIMEOUT: Duration = Duration::from_secs(5); +#[derive(Debug)] enum APIStoreReq { GetApp(AppId, Sender>), GetApps(Sender>), @@ -95,50 +98,11 @@ pub(crate) fn initialize( (handle, state) } -pub(crate) fn install_rest_endpoints(app: &mut Server) { - let mut v1 = app.at("/v1"); - cluster::install_rest_endpoints(&mut v1); - - let mut api_endpoint = v1.at("/api"); - apps::install_rest_endpoints(&mut api_endpoint); - kv::install_rest_endpoints(&mut api_endpoint); -} - -/// Endpoint implementation that turns our errors into responses -/// and makes successful results automatically a 200 Ok -struct WrappingEndpoint -where - T: serde::Serialize + serde::Deserialize<'static>, - F: Fn(APIRequest) -> Fut, - Fut: Future> + Send + 'static, -{ - f: F, -} - -#[async_trait::async_trait] -impl Endpoint> for WrappingEndpoint -where - T: serde::Serialize + serde::Deserialize<'static> + 'static, - F: Send + Sync + 'static + Fn(APIRequest) -> Fut, - Fut: Future> + Send + 'static, -{ - async fn call(&self, req: APIRequest) -> tide::Result { - Ok(match (self.f)(req).await { - Ok(serializable) => Response::builder(StatusCode::Ok) - .body(Body::from_json(&serializable)?) - .build(), - Err(e) => e.into(), - }) - } -} - -fn wrapp(f: F) -> WrappingEndpoint -where - T: serde::Serialize + serde::Deserialize<'static> + 'static, - F: Send + Sync + 'static + Fn(APIRequest) -> Fut, - Fut: Future> + Send + 'static, -{ - WrappingEndpoint { f } +pub(crate) fn endpoints() -> Router { + Router::new() + .nest("/v1/cluster", cluster::endpoints()) + .nest("/v1/api/apps", apps::endpoints()) + .nest("/v1/api/kv", kv::endpoints()) } #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] @@ -224,6 +188,14 @@ impl std::fmt::Display for AppError { } } +#[allow(clippy::trivially_copy_pass_by_ref)] +fn round_sc(x: &StatusCode, s: S) -> Result +where + S: serde::Serializer, +{ + s.serialize_u16(x.as_u16()) +} + #[derive(Debug, Serialize)] pub enum APIError { /// We need to send this API request to the leader_url @@ -234,6 +206,7 @@ pub enum APIError { }, /// HTTP related error HTTP { + #[serde(serialize_with = "round_sc")] status: StatusCode, message: String, }, @@ -259,22 +232,57 @@ pub enum APIError { Other(String), } +impl IntoResponse for APIError { + fn into_response(self) -> Response { + let status = match &self { + APIError::ForwardToLeader { .. } => StatusCode::TEMPORARY_REDIRECT, + APIError::ChangeMembership(ChangeMembershipError::LearnerNotFound(_)) => { + StatusCode::NOT_FOUND + } + APIError::ChangeMembership(ChangeMembershipError::EmptyMembership(_)) => { + StatusCode::BAD_REQUEST + } + APIError::Other(_) + | APIError::ChangeMembership(_) + | APIError::Store(_) + | APIError::Storage(_) + | APIError::Fatal(_) + | APIError::NoQuorum(_) + | APIError::Runtime(_) + | APIError::Recv + | APIError::App(_) => StatusCode::INTERNAL_SERVER_ERROR, + APIError::Timeout => StatusCode::GATEWAY_TIMEOUT, + APIError::HTTP { status, .. } => *status, + }; + + if let APIError::ForwardToLeader { leader_url, .. } = &self { + let mut headers = HeaderMap::new(); + if let Ok(v) = leader_url.parse() { + headers.insert(LOCATION, v); + } + (status, headers).into_response() + } else { + (status, self).into_response() + } + } +} + #[async_trait::async_trait] trait ToAPIResult where T: serde::Serialize + serde::Deserialize<'static>, { - async fn to_api_result(self, req: &APIRequest) -> APIResult; + async fn to_api_result(self, uri: &Uri, req: &APIRequest) -> APIResult; } #[async_trait::async_trait()] impl + Send> ToAPIResult for Result { - async fn to_api_result(self, req: &APIRequest) -> APIResult { + async fn to_api_result(self, uri: &Uri, req: &APIRequest) -> APIResult { match self { Ok(t) => Ok(t), - Err(ClientReadError::ForwardToLeader(e)) => forward_to_leader(e, req).await, + Err(ClientReadError::ForwardToLeader(e)) => forward_to_leader(e, uri, req).await, Err(ClientReadError::Fatal(e)) => Err(APIError::Fatal(e)), Err(ClientReadError::QuorumNotEnough(e)) => Err(APIError::NoQuorum(e)), } @@ -283,10 +291,10 @@ impl + Send> ToAPIResult #[async_trait::async_trait] impl ToAPIResult> for Result { - async fn to_api_result(self, req: &APIRequest) -> APIResult> { + async fn to_api_result(self, uri: &Uri, req: &APIRequest) -> APIResult> { match self { Ok(response) => Ok(response.matched), - Err(AddLearnerError::ForwardToLeader(e)) => forward_to_leader(e, req).await, + Err(AddLearnerError::ForwardToLeader(e)) => forward_to_leader(e, uri, req).await, Err(AddLearnerError::Fatal(e)) => Err(APIError::Fatal(e)), Err(AddLearnerError::Exists(_node_id)) => Ok(None), // we want the API call to be idempotent and not error if the node is already a learner } @@ -295,13 +303,13 @@ impl ToAPIResult> for Result #[async_trait::async_trait] impl ToAPIResult<()> for Result<(), RemoveLearnerError> { - async fn to_api_result(self, req: &APIRequest) -> APIResult<()> { + async fn to_api_result(self, uri: &Uri, req: &APIRequest) -> APIResult<()> { match self { Ok(()) | Err(RemoveLearnerError::NotExists(_)) => Ok(()), // if the node is not part of the cluster, the effect is the same, so we say it is fine - Err(RemoveLearnerError::ForwardToLeader(e)) => forward_to_leader(e, req).await, + Err(RemoveLearnerError::ForwardToLeader(e)) => forward_to_leader(e, uri, req).await, Err(RemoveLearnerError::Fatal(e)) => Err(APIError::Fatal(e)), Err(e @ RemoveLearnerError::NotLearner(_)) => Err(APIError::HTTP { - status: StatusCode::Conflict, + status: StatusCode::CONFLICT, message: e.to_string(), }), } @@ -311,32 +319,37 @@ impl ToAPIResult<()> for Result<(), RemoveLearnerError> { #[async_trait::async_trait()] impl ToAPIResult for Result, ClientWriteError> { // we need the request context here to construct the redirect url properly - async fn to_api_result(self, req: &APIRequest) -> APIResult { + async fn to_api_result(self, uri: &Uri, state: &APIRequest) -> APIResult { match self { Ok(response) => Ok(response.data), - Err(ClientWriteError::ForwardToLeader(e)) => forward_to_leader(e, req).await, + Err(ClientWriteError::ForwardToLeader(e)) => forward_to_leader(e, uri, state).await, Err(ClientWriteError::Fatal(e)) => Err(APIError::Fatal(e)), Err(ClientWriteError::ChangeMembershipError(e)) => Err(APIError::ChangeMembership(e)), } } } -async fn forward_to_leader(e: ForwardToLeader, req: &APIRequest) -> APIResult +#[allow(clippy::unused_async)] +async fn forward_to_leader(e: ForwardToLeader, uri: &Uri, state: &APIRequest) -> APIResult where T: serde::Serialize + serde::Deserialize<'static>, { Err(if let Some(leader_id) = e.leader_id { let (tx, mut rx) = bounded(1); - req.state() + state .store_tx .send(APIStoreReq::GetNode(leader_id, tx)) .await?; // we can only forward to the leader if we have the node in our state machine if let Some(leader_addr) = timeout(API_WORKER_TIMEOUT, rx.recv()).await?.flatten() { - let mut leader_url = - url::Url::parse(&format!("{}://{}", req.url().scheme(), leader_addr.api()))?; - leader_url.set_path(req.url().path()); - leader_url.set_query(req.url().query()); + let mut leader_url = url::Url::parse(&format!( + "{}://{}", + uri.scheme() + .map_or_else(|| "http".to_string(), ToString::to_string), + leader_addr.api() + ))?; + leader_url.set_path(uri.path()); + leader_url.set_query(uri.query()); debug!("Forwarding to leader: {leader_url}"); // we don't care about fragment @@ -352,45 +365,9 @@ where }) } -impl From for Response { - fn from(e: APIError) -> Self { - let status = match &e { - APIError::ForwardToLeader { .. } => StatusCode::TemporaryRedirect, - APIError::ChangeMembership(ChangeMembershipError::LearnerNotFound(_)) => { - StatusCode::NotFound - } - APIError::ChangeMembership(ChangeMembershipError::EmptyMembership(_)) => { - StatusCode::BadRequest - } - APIError::Other(_) - | APIError::ChangeMembership(_) - | APIError::Store(_) - | APIError::Storage(_) - | APIError::Fatal(_) - | APIError::NoQuorum(_) - | APIError::Runtime(_) - | APIError::Recv - | APIError::App(_) => StatusCode::InternalServerError, - APIError::Timeout => StatusCode::GatewayTimeout, - APIError::HTTP { status, .. } => *status, - }; - let mut builder = Response::builder(status); - if let APIError::ForwardToLeader { leader_url, .. } = &e { - builder = builder.header(LOCATION, leader_url); - builder.build() - } else { - builder - .body(Body::from_json(&e).expect("Serialization should work")) - .build() - } - } -} - -impl From<&TremorResponse> for Response { - fn from(e: &TremorResponse) -> Self { - Response::builder(StatusCode::Ok) - .body(Body::from_json(&e).expect("Serialization should work")) - .build() +impl IntoResponse for TremorResponse { + fn into_response(self) -> Response { + (StatusCode::OK, self).into_response() } } @@ -432,15 +409,6 @@ impl From for APIError { } } -impl From for APIError { - fn from(e: tide::Error) -> Self { - Self::HTTP { - status: e.status(), - message: e.to_string(), - } - } -} - impl From for APIError { fn from(e: StorageError) -> Self { Self::Storage(e) diff --git a/src/raft/api/apps.rs b/src/raft/api/apps.rs index 74b435e817..c01d1a3910 100644 --- a/src/raft/api/apps.rs +++ b/src/raft/api/apps.rs @@ -18,8 +18,8 @@ use crate::{ instance::IntendedState, raft::{ api::{ - wrapp, APIRequest, APIResult, APIStoreReq, AppError, ArgsError, ServerState, - ToAPIResult, API_WORKER_TIMEOUT, + APIRequest, APIResult, APIStoreReq, AppError, ArgsError, ToAPIResult, + API_WORKER_TIMEOUT, }, archive::{get_app, TremorAppDef}, store::{ @@ -28,29 +28,36 @@ use crate::{ }, }, }; +use axum::{ + extract::{self, Json, State}, + routing::{delete, post}, + Router, +}; use std::collections::HashMap; -use std::{fmt::Display, sync::Arc}; -use tide::Route; +use std::fmt::Display; use tokio::time::timeout; -pub(crate) fn install_rest_endpoints(parent: &mut Route>) { - let mut apps_endpoint = parent.at("/apps"); - apps_endpoint.post(wrapp(install_app)).get(wrapp(list)); - apps_endpoint.at("/:app").delete(wrapp(uninstall_app)); - apps_endpoint.at("/:app/flows/:flow").post(wrapp(start)); - apps_endpoint - .at("/:app/instances/:instance") - .post(wrapp(manage_instance)) - .delete(wrapp(stop_instance)); +pub(crate) fn endpoints() -> Router { + Router::::new() + .route("/", post(install_app).get(list)) + .route("/:app", delete(uninstall_app)) + .route("/:app/flows/:flow", post(start)) + .route( + "/:app/instances/:instance", + post(manage_instance).delete(stop_instance), + ) } -async fn install_app(mut req: APIRequest) -> APIResult { - let file: Vec = req.body_json().await?; +async fn install_app( + extract::State(state): extract::State, + extract::OriginalUri(uri): extract::OriginalUri, + Json(file): Json>, +) -> APIResult> { let app = get_app(&file)?; let app_id = app.name().clone(); let (tx, mut rx) = bounded(1); - req.state() + state .store_tx .send(APIStoreReq::GetApp(app_id.clone(), tx)) .await?; @@ -65,19 +72,22 @@ async fn install_app(mut req: APIRequest) -> APIResult { app, file: file.clone(), }); - req.state() + state .raft .client_write(request) .await - .to_api_result(&req) + .to_api_result(&uri, &state) .await?; - Ok(app_id) + Ok(Json(app_id)) } -async fn uninstall_app(req: APIRequest) -> APIResult { - let app_id = AppId(req.param("app")?.to_string()); +async fn uninstall_app( + extract::State(state): extract::State, + extract::OriginalUri(uri): extract::OriginalUri, + extract::Path(app_id): extract::Path, +) -> APIResult { let (tx, mut rx) = bounded(1); - req.state() + state .store_tx .send(APIStoreReq::GetApp(app_id.clone(), tx)) .await?; @@ -100,11 +110,11 @@ async fn uninstall_app(req: APIRequest) -> APIResult { app: app_id.clone(), force: false, }); - req.state() + state .raft .client_write(request) .await - .to_api_result(&req) + .to_api_result(&uri, &state) .await } @@ -172,26 +182,27 @@ impl Display for AppState { } } -async fn list(req: APIRequest) -> APIResult> { +async fn list(State(state): State) -> APIResult>> { let (tx, mut rx) = bounded(1); - req.state().store_tx.send(APIStoreReq::GetApps(tx)).await?; + state.store_tx.send(APIStoreReq::GetApps(tx)).await?; let apps = timeout(API_WORKER_TIMEOUT, rx.recv()) .await? .ok_or(APIError::Recv)?; - Ok(apps) + Ok(Json(apps)) } -async fn start(mut req: APIRequest) -> APIResult { - let body: TremorStart = req.body_json().await?; - +async fn start( + extract::State(state): extract::State, + extract::OriginalUri(uri): extract::OriginalUri, + extract::Path((app_id, flow_id)): extract::Path<(AppId, FlowDefinitionId)>, + Json(body): Json, +) -> APIResult> { let instance_id = body.instance.clone(); - let app_name = AppId(req.param("app")?.to_string()); - let flow_name = FlowDefinitionId(req.param("flow")?.to_string()); let (tx, mut rx) = bounded(1); - req.state() + state .store_tx - .send(APIStoreReq::GetApp(app_name.clone(), tx)) + .send(APIStoreReq::GetApp(app_id.clone(), tx)) .await?; let app = timeout(API_WORKER_TIMEOUT, rx.recv()) .await? @@ -201,43 +212,44 @@ async fn start(mut req: APIRequest) -> APIResult { if app.instances.contains_key(instance_id.alias()) { return Err(AppError::InstanceAlreadyExists(body.instance).into()); } - if let Some(flow) = app.app.flows.get(&flow_name) { + if let Some(flow) = app.app.flows.get(&flow_id) { if let Some(errors) = config_errors(&flow.args, &body.config) { return Err(AppError::InvalidArgs { - flow: flow_name, + flow: flow_id, instance: body.instance, errors, } .into()); } } else { - return Err(AppError::FlowNotFound(app_name, flow_name).into()); + return Err(AppError::FlowNotFound(app_id, flow_id).into()); } } else { - return Err(AppError::AppNotFound(app_name).into()); + return Err(AppError::AppNotFound(app_id).into()); } let request = TremorRequest::Apps(AppsCmd::Deploy { - app: app_name.clone(), - flow: flow_name.clone(), + app: app_id.clone(), + flow: flow_id.clone(), instance: body.instance.clone(), config: body.config.clone(), // FIXME: make this a parameter state: body.state(), }); - req.state() + state .raft .client_write(request) .await - .to_api_result(&req) + .to_api_result(&uri, &state) .await?; - Ok(instance_id) + Ok(Json(instance_id)) } -async fn manage_instance(mut req: APIRequest) -> APIResult { - let body: TremorInstanceState = req.body_json().await?; - - let app_id = AppId(req.param("app")?.to_string()); - let instance_id = FlowInstanceId::new(app_id.clone(), req.param("instance")?.to_string()); +async fn manage_instance( + extract::State(state): extract::State, + extract::OriginalUri(uri): extract::OriginalUri, + extract::Path((app_id, instance_id)): extract::Path<(AppId, FlowInstanceId)>, + Json(body): Json, +) -> APIResult> { // FIXME: this is not only for this but all the API functions as we're running in a potentially // problematic situation here. // @@ -263,7 +275,7 @@ async fn manage_instance(mut req: APIRequest) -> APIResult { // serializes all commands to ensure no command is executed before the previous one has been fully // handled let (tx, mut rx) = bounded(1); - req.state() + state .store_tx .send(APIStoreReq::GetApp(app_id.clone(), tx)) .await?; @@ -277,29 +289,31 @@ async fn manage_instance(mut req: APIRequest) -> APIResult { } else { return Err(AppError::AppNotFound(app_id).into()); } - let state = match body { + let body_state = match body { TremorInstanceState::Pause => IntendedState::Paused, TremorInstanceState::Resume => IntendedState::Running, }; let request = TremorRequest::Apps(AppsCmd::InstanceStateChange { instance: instance_id.clone(), - state, + state: body_state, }); - req.state() + state .raft .client_write(request) .await - .to_api_result(&req) + .to_api_result(&uri, &state) .await?; - Ok(instance_id) + Ok(Json(instance_id)) } -async fn stop_instance(req: APIRequest) -> APIResult { - let app_id = AppId(req.param("app")?.to_string()); - let instance_id = FlowInstanceId::new(app_id.clone(), req.param("instance")?.to_string()); +async fn stop_instance( + extract::State(state): extract::State, + extract::OriginalUri(uri): extract::OriginalUri, + extract::Path((app_id, instance_id)): extract::Path<(AppId, FlowInstanceId)>, +) -> APIResult> { let (tx, mut rx) = bounded(1); - req.state() + state .store_tx .send(APIStoreReq::GetApp(app_id.clone(), tx)) .await?; @@ -314,13 +328,13 @@ async fn stop_instance(req: APIRequest) -> APIResult { return Err(AppError::AppNotFound(app_id).into()); } let request = TremorRequest::Apps(AppsCmd::Undeploy(instance_id.clone())); - req.state() + state .raft .client_write(request) .await - .to_api_result(&req) + .to_api_result(&uri, &state) .await?; - Ok(instance_id) + Ok(Json(instance_id)) } /// check the given `args` for errors according to the specified `config` from the flow definition diff --git a/src/raft/api/cluster.rs b/src/raft/api/cluster.rs index 3181b7ae7e..ea6de663f3 100644 --- a/src/raft/api/cluster.rs +++ b/src/raft/api/cluster.rs @@ -12,66 +12,78 @@ // See the License for the specific language governing permissions and // limitations under the License. +use super::{APIRequest, APIStoreReq, API_WORKER_TIMEOUT}; use crate::{ channel::bounded, raft::{ - api::{wrapp, APIError, APIResult, ServerState, ToAPIResult}, + api::{APIError, APIResult, ToAPIResult}, node::Addr, store::{NodesRequest, TremorRequest}, }, }; +use axum::{ + extract::{self, Json}, + routing::{delete, get, put}, + Router, +}; +use http::StatusCode; use openraft::{LogId, NodeId, RaftMetrics}; use std::collections::HashMap; -use std::sync::Arc; -use tide::{http::StatusCode, Route}; use tokio::time::timeout; -use super::{APIRequest, APIStoreReq, API_WORKER_TIMEOUT}; - -pub(crate) fn install_rest_endpoints(app: &mut Route>) { - let mut cluster = app.at("/cluster"); - cluster - .at("/nodes") - .post(wrapp(add_node)) - .get(wrapp(get_nodes)); - cluster.at("nodes/:node_id").delete(wrapp(remove_node)); - cluster - .at("/learners/:node_id") - .put(wrapp(add_learner)) - .patch(wrapp(add_learner)) - .delete(wrapp(remove_learner)); - cluster - .at("/voters/:node_id") - .put(wrapp(promote_voter)) - .patch(wrapp(promote_voter)) - .delete(wrapp(demote_voter)); - cluster.at("/metrics").get(wrapp(metrics)); +pub(crate) fn endpoints() -> Router { + Router::::new() + .route("/nodes", get(get_nodes).post(add_node)) + .route("/nodes/:node_id", delete(remove_node)) + .route( + "/learners/:node_id", + put(add_learner).patch(add_learner).delete(remove_learner), + ) + .route( + "/voters/:node_id", + put(promote_voter).patch(promote_voter).delete(demote_voter), + ) + .route("/metrics", get(metrics)) } /// Get a list of all currently known nodes (be it learner, leader, voter etc.) -async fn get_nodes(req: APIRequest) -> APIResult> { - let state = req.state(); - state.raft.client_read().await.to_api_result(&req).await?; +async fn get_nodes( + extract::State(state): extract::State, + extract::OriginalUri(uri): extract::OriginalUri, +) -> APIResult>> { + state + .raft + .client_read() + .await + .to_api_result(&uri, &state) + .await?; let (tx, mut rx) = bounded(1); state.store_tx.send(APIStoreReq::GetNodes(tx)).await?; let nodes = timeout(API_WORKER_TIMEOUT, rx.recv()) .await? .ok_or(APIError::Recv)?; - Ok(nodes) + Ok(Json(nodes)) } /// Make a node known to cluster by putting it onto the cluster state /// /// This is a precondition for the node being added as learner and promoted to voter -async fn add_node(mut req: APIRequest) -> APIResult { +async fn add_node( + extract::State(state): extract::State, + extract::OriginalUri(uri): extract::OriginalUri, + Json(addr): Json, +) -> APIResult> { // FIXME: returns 500 if not the leader // FIXME: better client errors - let addr: Addr = req.body_json().await?; - let state = req.state(); // 1. ensure we are on the leader, as we need to read some state-machine state // in order to give a good answer here - state.raft.client_read().await.to_api_result(&req).await?; + state + .raft + .client_read() + .await + .to_api_result(&uri, &state) + .await?; // 2. ensure we don't add the node twice if it is already there // we need to make sure we don't hold on to the state machine lock any further here @@ -85,7 +97,7 @@ async fn add_node(mut req: APIRequest) -> APIResult { .await? .ok_or(APIError::Recv)?; if let Some(existing_node_id) = maybe_existing_node_id { - Ok(existing_node_id) + Ok(Json(existing_node_id)) } else { // 2a. add the node with its metadata to the state machine, so the network impl can reach it // this will fail, when we are not on the leader @@ -98,14 +110,14 @@ async fn add_node(mut req: APIRequest) -> APIResult { addr: addr.clone(), })) .await - .to_api_result(&req) + .to_api_result(&uri, &state) .await?; let node_id = response .value .ok_or_else(|| APIError::Other("Invalid node_id".to_string()))? .parse::()?; debug!("node {addr} added to the cluster as node {node_id}"); - Ok(node_id) + Ok(Json(node_id)) } } @@ -113,11 +125,14 @@ async fn add_node(mut req: APIRequest) -> APIResult { /// /// # Errors /// if the API call fails -async fn remove_node(req: APIRequest) -> APIResult<()> { - let node_id = req.param("node_id")?.parse::()?; +async fn remove_node( + extract::State(state): extract::State, + extract::OriginalUri(uri): extract::OriginalUri, + extract::Path(node_id): extract::Path, +) -> APIResult> { // make sure the node is not a learner or a voter let (tx, mut rx) = bounded(1); - req.state() + state .store_tx .send(APIStoreReq::GetLastMembership(tx)) .await?; @@ -126,32 +141,38 @@ async fn remove_node(req: APIRequest) -> APIResult<()> { .ok_or(APIError::Recv)?; if membership.contains(&node_id) { return Err(APIError::HTTP { - status: StatusCode::Conflict, + status: StatusCode::CONFLICT, message: format!("Node {node_id} cannot be removed as it is still a voter."), }); } // TODO: how to check if the node is a learner? // remove the node metadata from the state machine - req.state() + state .raft .client_write(TremorRequest::Nodes(NodesRequest::RemoveNode { node_id })) .await - .to_api_result(&req) + .to_api_result(&uri, &state) .await?; - Ok(()) + Ok(Json(())) } /// Add a node as **Learner**. /// /// A Learner receives log replication from the leader but does not vote. /// This should be done before adding a node as a member into the cluster -async fn add_learner(req: APIRequest) -> APIResult> { - let node_id = req.param("node_id")?.parse::()?; - let state = req.state(); - +async fn add_learner( + extract::State(state): extract::State, + extract::OriginalUri(uri): extract::OriginalUri, + extract::Path(node_id): extract::Path, +) -> APIResult>> { // 1. ensure we are on the leader, as we need to read some state-machine state // in order to give a good answer here - state.raft.client_read().await.to_api_result(&req).await?; + state + .raft + .client_read() + .await + .to_api_result(&uri, &state) + .await?; // 2. check that the node has already been added // we need to make sure we don't hold on to the state machine lock any further here @@ -165,40 +186,41 @@ async fn add_learner(req: APIRequest) -> APIResult> { .ok_or(APIError::Recv)?; if node_addr.is_none() { return Err(APIError::HTTP { - status: StatusCode::NotFound, + status: StatusCode::NOT_FOUND, message: format!("Node {node_id} is not known to the cluster yet."), }); } // add the node as learner debug!("Adding node {node_id} as learner..."); - req.state() + state .raft .add_learner(node_id, true) .await - .to_api_result(&req) + .to_api_result(&uri, &state) .await + .map(Json) } /// Removes a node from **Learners** only /// -async fn remove_learner(req: APIRequest) -> APIResult<()> { - let node_id = req.param("node_id")?.parse::()?; - debug!( - "[API {} {}] Removing learner {node_id}", - req.method(), - req.url().path() - ); - let state = req.state(); +async fn remove_learner( + extract::State(state): extract::State, + extract::OriginalUri(uri): extract::OriginalUri, + extract::Path(node_id): extract::Path, +) -> APIResult> { + debug!("[API] Removing learner {node_id}",); // remove the node as learner let result = state.raft.remove_learner(node_id).await; - result.to_api_result(&req).await + result.to_api_result(&uri, &state).await.map(Json) } /// Changes specified learners to members, or remove members. -async fn promote_voter(req: APIRequest) -> APIResult> { - let node_id: NodeId = req.param("node_id")?.parse::()?; - let state = req.state(); +async fn promote_voter( + extract::State(state): extract::State, + extract::OriginalUri(uri): extract::OriginalUri, + extract::Path(node_id): extract::Path, +) -> APIResult>> { // we introduce a new scope here to release the lock on the state machine // not releasing it can lead to dead-locks, if executed on the leader (as the store is shared between the API and the raft engine) let (tx, mut rx) = bounded(1); @@ -217,19 +239,21 @@ async fn promote_voter(req: APIRequest) -> APIResult> { .raft .change_membership(membership, true) .await - .to_api_result(&req) + .to_api_result(&uri, &state) .await?; Some(node_id) } else { None }; - Ok(value) + Ok(Json(value)) } /// Changes specified learners to members, or remove members. -async fn demote_voter(req: APIRequest) -> APIResult> { - let node_id: NodeId = req.param("node_id")?.parse::()?; - let state = req.state(); +async fn demote_voter( + extract::State(state): extract::State, + extract::OriginalUri(uri): extract::OriginalUri, + extract::Path(node_id): extract::Path, +) -> APIResult>> { // scoping here to not hold the state machine locked for too long let (tx, mut rx) = bounded(1); state @@ -240,21 +264,23 @@ async fn demote_voter(req: APIRequest) -> APIResult> { .await? .ok_or(APIError::Recv)?; let value = if membership.remove(&node_id) { - req.state() + state .raft .change_membership(membership, true) .await - .to_api_result(&req) + .to_api_result(&uri, &state) .await?; Some(node_id) } else { None }; - Ok(value) + Ok(Json(value)) } /// Get the latest metrics of the cluster (from the viewpoint of the targeted node) #[allow(clippy::unused_async)] -async fn metrics(req: APIRequest) -> APIResult { - Ok(req.state().raft.metrics().borrow().clone()) +async fn metrics( + extract::State(state): extract::State, +) -> APIResult> { + Ok(Json(state.raft.metrics().borrow().clone())) } diff --git a/src/raft/api/kv.rs b/src/raft/api/kv.rs index 3c9b3ff6ae..07cc2b46c9 100644 --- a/src/raft/api/kv.rs +++ b/src/raft/api/kv.rs @@ -15,31 +15,32 @@ use crate::{ channel::bounded, raft::{ - api::{wrapp, APIError, APIRequest, APIResult, ServerState, ToAPIResult}, + api::{APIError, APIRequest, APIResult, ToAPIResult}, store::{TremorResponse, TremorSet}, }, }; -use std::sync::Arc; -use tide::Route; +use axum::{extract, routing::post, Router}; use tokio::time::timeout; use super::API_WORKER_TIMEOUT; -pub(crate) fn install_rest_endpoints(parent: &mut Route>) { - let mut kv_route = parent.at("/kv"); - kv_route.at("/write").post(wrapp(write)); - kv_route.at("/read").post(wrapp(read)); - kv_route.at("/consistent_read").post(wrapp(consistent_read)); +pub(crate) fn endpoints() -> Router { + Router::::new() + .route("/write", post(write)) + .route("/read", post(read)) + .route("/consistent_read", post(consistent_read)) } -async fn write(mut req: APIRequest) -> APIResult { - let body: TremorSet = req.body_json().await?; - let tremor_res = req - .state() +async fn write( + extract::State(state): extract::State, + extract::OriginalUri(uri): extract::OriginalUri, + extract::Json(body): extract::Json, +) -> APIResult { + let tremor_res = state .raft .client_write(body.into()) .await - .to_api_result(&req) + .to_api_result(&uri, &state) .await?; debug_assert!( tremor_res.value.is_some(), @@ -55,10 +56,12 @@ async fn write(mut req: APIRequest) -> APIResult { } /// read a value from the current node, not necessarily the leader, thus this value can be stale -async fn read(mut req: APIRequest) -> APIResult { - let key: String = req.body_json().await?; +async fn read( + extract::State(state): extract::State, + extract::Json(key): extract::Json, +) -> APIResult { let (tx, mut rx) = bounded(1); - req.state() + state .store_tx .send(super::APIStoreReq::KVGet(key, tx)) .await?; @@ -69,14 +72,21 @@ async fn read(mut req: APIRequest) -> APIResult { } /// read a value from the leader. If this request is received by another node, it will return a redirect -async fn consistent_read(mut req: APIRequest) -> APIResult { - let key: String = req.body_json().await?; - let state = req.state(); +async fn consistent_read( + extract::State(state): extract::State, + extract::OriginalUri(uri): extract::OriginalUri, + extract::Json(key): extract::Json, +) -> APIResult { // this will fail if we are not a leader - state.raft.client_read().await.to_api_result(&req).await?; + state + .raft + .client_read() + .await + .to_api_result(&uri, &state) + .await?; // here we are safe to read let (tx, mut rx) = bounded(1); - req.state() + state .store_tx .send(super::APIStoreReq::KVGet(key, tx)) .await?; diff --git a/src/raft/api/worker.rs b/src/raft/api/worker.rs index 892e4c0581..867e8984ca 100644 --- a/src/raft/api/worker.rs +++ b/src/raft/api/worker.rs @@ -24,8 +24,8 @@ use crate::{ use super::apps::AppState; async fn send(tx: Sender, t: T) { - if tx.send(t).await.is_err() { - error!("Error sending response to API"); + if let Err(e) = tx.send(t).await { + error!("Error sending response to API: {e}"); } } diff --git a/src/raft/archive.rs b/src/raft/archive.rs index d6062e368c..143e771bca 100644 --- a/src/raft/archive.rs +++ b/src/raft/archive.rs @@ -16,9 +16,9 @@ use crate::{ errors::Result, ids::{AppId, FlowDefinitionId}, }; -use async_std::path::PathBuf; use sha2::{Digest, Sha256}; use simd_json::OwnedValue; +use std::path::PathBuf; use std::{ collections::{BTreeSet, HashMap}, io::Read, diff --git a/src/raft/node.rs b/src/raft/node.rs index 4d218755db..6392015942 100644 --- a/src/raft/node.rs +++ b/src/raft/node.rs @@ -90,8 +90,9 @@ impl Running { listener.config_mut().max_frame_length(usize::MAX); let http_api_addr = server_state.addr().api().to_string(); - let mut http_api_server = tide::Server::with_state(server_state.clone()); - api::install_rest_endpoints(&mut http_api_server); + let app = api::endpoints().with_state(server_state.clone()); + let http_api_server = axum::Server::bind(&http_api_addr.parse().map_err(|_e| "badaddr")?) + .serve(app.into_make_service()); let run_handle = task::spawn(async move { let mut tcp_future = Box::pin( @@ -112,7 +113,7 @@ impl Running { .for_each(|_| async {}) .fuse(), ); - let mut http_future = Box::pin(http_api_server.listen(http_api_addr).fuse()); + let mut http_future = Box::pin(http_api_server.fuse()); let mut runtime_future = Box::pin(runtime_handle.fuse()); let mut kill_switch_future = Box::pin(kill_switch_rx.recv().fuse()); futures::select! { diff --git a/src/raft/store.rs b/src/raft/store.rs index 99ad11d46d..d6b366efa3 100644 --- a/src/raft/store.rs +++ b/src/raft/store.rs @@ -23,7 +23,6 @@ use crate::{ raft::{archive::TremorAppDef, ClusterError}, system::Runtime, }; -use async_std::sync::RwLock; use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; use openraft::{ async_trait::async_trait, @@ -44,6 +43,7 @@ use std::{ string::FromUtf8Error, sync::{Arc, Mutex}, }; +use tokio::sync::RwLock; use super::node::Addr;