From f68aeaf0919110316b475f844050d59bc55d5116 Mon Sep 17 00:00:00 2001 From: Robsdedude Date: Fri, 12 Apr 2024 19:02:33 +0200 Subject: [PATCH 1/7] Add bolt 5.4 support (telemetry) Starting with Bolt 5.4, the driver will, by default, send anonymous API usage statistics to the server if requested. An opt-out is available through `DriverConfig::with_telemetry`: ```rust let driver = Driver::new( connection_config, DriverConfig::new().with_telemetry(false), ); ``` If the server requests is, the driver will send anonymous API usage. Currently, everytime one of the following APIs is used to execute a query (for the first time), the server is informed of this (without any further information like arguments, client identifiers, etc.): * `ExecuteQueryBuilder::run()` / `ExecuteQueryBuilder::run_with_retry()` * `TransactionBuilder::run()` * `TransactionBuilder::run_with_retry()` * `AutoCommitBuilder::run()` --- CHANGELOG.md | 53 +-- README.md | 4 +- neo4j/src/driver.rs | 8 +- neo4j/src/driver/config.rs | 29 +- neo4j/src/driver/io/bolt.rs | 34 +- neo4j/src/driver/io/bolt/bolt4x4/protocol.rs | 15 +- neo4j/src/driver/io/bolt/bolt5x0/protocol.rs | 16 +- neo4j/src/driver/io/bolt/bolt5x1/protocol.rs | 15 +- neo4j/src/driver/io/bolt/bolt5x2/protocol.rs | 15 +- neo4j/src/driver/io/bolt/bolt5x3/protocol.rs | 15 +- neo4j/src/driver/io/bolt/bolt5x4.rs | 19 + neo4j/src/driver/io/bolt/bolt5x4/protocol.rs | 370 ++++++++++++++++++ .../src/driver/io/bolt/bolt5x4/translator.rs | 17 + neo4j/src/driver/io/bolt/bolt_state.rs | 1 + neo4j/src/driver/io/bolt/handshake.rs | 6 +- .../src/driver/io/bolt/message_parameters.rs | 30 ++ neo4j/src/driver/io/bolt/response.rs | 1 + neo4j/src/driver/io/pool.rs | 1 + neo4j/src/driver/io/pool/single_pool.rs | 1 + neo4j/src/driver/record_stream.rs | 25 +- neo4j/src/driver/session.rs | 68 +++- neo4j/src/driver/transaction.rs | 11 +- neo4j/src/lib.rs | 8 +- .../src/testkit_backend/requests.rs | 6 + .../src/testkit_backend/responses.rs | 5 +- 25 files changed, 709 insertions(+), 64 deletions(-) create mode 100644 neo4j/src/driver/io/bolt/bolt5x4.rs create mode 100644 neo4j/src/driver/io/bolt/bolt5x4/protocol.rs create mode 100644 neo4j/src/driver/io/bolt/bolt5x4/translator.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index 1974627..df43e85 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,31 +1,38 @@ # Changelog ## NEXT - - Removed useless lifetime parameter from `SessionConfig::with_database()`. - - Changed return type of `ConnectionConfig::with_encryption_trust_any_certificate() ` from `Result` to `Self`. - - Add support for Bolt 5.2, which adds notification filtering. - - Add `Driver::is_encrypted()`. - - Reduce the number of lifetime generic parameters in `TransactionQueryBuilder` and `TransactionRecordStream`. - - Fix `Transaction::rolblack()` failing if a result stream failed before. - - Introduce `neo4j::driver::Conifg::with_keep_alive()` and `without_keep_alive()`. - - Add support for Bolt 5.3 (bolt agent). + +- Removed useless lifetime parameter from `SessionConfig::with_database()`. +- Changed return type of `ConnectionConfig::with_encryption_trust_any_certificate() ` from `Result` + to `Self`. +- Add support for Bolt 5.2, which adds notification filtering. +- Add `Driver::is_encrypted()`. +- Reduce the number of lifetime generic parameters in `TransactionQueryBuilder` and `TransactionRecordStream`. +- Fix `Transaction::rolblack()` failing if a result stream failed before. +- Introduce `neo4j::driver::Conifg::with_keep_alive()` and `without_keep_alive()`. +- Fixed errors during `BEGIN` not being properly propagated through the transaction's internals. +- Add support for Bolt 5.3 (bolt agent). +- Add support for Bolt 5.4 (telemetry). ## 0.0.2 - - Update dependencies. - Among others `rustls`. - To accommodate this change, the `rustls_dangerous_configuration` feature was removed. - This update also affects `ConnectionConfig::with_encryption_custom_tls_config()`, which accepts a custom `rustls::ClientConfig`. - - Make `Record{entries}` private and offer many helper methods instead. - - Add `EagerResult::into_scalar()`. - - Renamed `RetryableError` to `RetryError` - - Fix `Driver::execute_query()::run()` not committing the transaction. - - Removed `AutoCommitBuilder::without_transaction_timeout` and `AutoCommitBuilder::with_default_transaction_timeout` - in favor of `AutoCommitBuilder::with_transaction_timeout` in combination with `TransactionTimeout::none`, - `TransactionTimeout::from_millis` and `TransactionTimeout::default`. - Same for `TransactionBuilder`. - - Move `neo4j::Address` to `neo4j::address::Address` - - Impl `FromStr` for `neo4j::driver::ConnectionConfig` (besides `TryFrom<&str>`). - - Much more documentation. + +- Update dependencies. + Among others `rustls`. + To accommodate this change, the `rustls_dangerous_configuration` feature was removed. + This update also affects `ConnectionConfig::with_encryption_custom_tls_config()`, which accepts a + custom `rustls::ClientConfig`. +- Make `Record{entries}` private and offer many helper methods instead. +- Add `EagerResult::into_scalar()`. +- Renamed `RetryableError` to `RetryError` +- Fix `Driver::execute_query()::run()` not committing the transaction. +- Removed `AutoCommitBuilder::without_transaction_timeout` and `AutoCommitBuilder::with_default_transaction_timeout` + in favor of `AutoCommitBuilder::with_transaction_timeout` in combination with `TransactionTimeout::none`, + `TransactionTimeout::from_millis` and `TransactionTimeout::default`. + Same for `TransactionBuilder`. +- Move `neo4j::Address` to `neo4j::address::Address` +- Impl `FromStr` for `neo4j::driver::ConnectionConfig` (besides `TryFrom<&str>`). +- Much more documentation. ## 0.0.1 + Initial release diff --git a/README.md b/README.md index d754fc1..0919967 100644 --- a/README.md +++ b/README.md @@ -78,13 +78,13 @@ A bump in MSRV is considered a minor breaking change. * [x] most basic functionality * [ ] ergonomic way to access by key * [x] Bookmark Management - * [ ] Protocol Versions + * [x] Protocol Versions * [x] 4.4 * [x] 5.0 (utc fix) * [x] 5.1 (re-auth) * [x] 5.2 (notification filtering) * [x] 5.3 (bolt agent) - * [ ] 5.4 (telemetry) + * [x] 5.4 (telemetry) * [x] Types * [x] `Null` * [x] `Integer` diff --git a/neo4j/src/driver.rs b/neo4j/src/driver.rs index 1dc891a..bfb9885 100644 --- a/neo4j/src/driver.rs +++ b/neo4j/src/driver.rs @@ -40,6 +40,7 @@ pub use config::{ InvalidRoutingContextError, KeepAliveConfig, TlsConfigError, }; pub use eager_result::{EagerResult, ScalarError}; +use io::bolt::message_parameters::TelemetryAPI; #[cfg(feature = "_internal_testkit_backend")] pub use io::ConnectionPoolMetrics; use io::{AcquireConfig, Pool, PoolConfig, PooledBolt, SessionAuth, UpdateRtArgs}; @@ -111,6 +112,7 @@ impl Driver { connection_acquisition_timeout: config.connection_acquisition_timeout, resolver: config.resolver, notification_filters: Arc::new(config.notification_filter), + telemetry: config.telemetry, }; Driver { config: ReducedDriverConfig { @@ -926,7 +928,8 @@ impl< .transaction() .with_transaction_meta(meta.borrow()) .with_transaction_timeout(timeout) - .with_routing_control(mode); + .with_routing_control(mode) + .with_api_overwrite(Some(TelemetryAPI::DriverLevel)); tx_builder.run(move |tx| { let mut result_stream = tx.query(query).with_parameters(param).run()?; let res = receiver(result_stream.raw_stream_mut())?; @@ -971,7 +974,8 @@ impl< .transaction() .with_transaction_meta(meta.borrow()) .with_transaction_timeout(timeout) - .with_routing_control(mode); + .with_routing_control(mode) + .with_api_overwrite(Some(TelemetryAPI::DriverLevel)); tx_builder.run_with_retry(retry_policy, move |tx| { let mut result_stream = tx .query(query.as_ref()) diff --git a/neo4j/src/driver/config.rs b/neo4j/src/driver/config.rs index 530e0f0..b54d554 100644 --- a/neo4j/src/driver/config.rs +++ b/neo4j/src/driver/config.rs @@ -37,7 +37,7 @@ use notification::NotificationFilter; // imports for docs #[allow(unused)] -use super::session::SessionConfig; +use super::session::{AutoCommitBuilder, SessionConfig, TransactionBuilder}; #[allow(unused)] use super::ExecuteQueryBuilder; #[allow(unused)] @@ -63,6 +63,7 @@ pub struct DriverConfig { pub(crate) resolver: Option>, pub(crate) notification_filter: NotificationFilter, pub(crate) keep_alive: Option, + pub(crate) telemetry: bool, } #[derive(Debug)] @@ -155,6 +156,7 @@ impl Default for DriverConfig { resolver: None, notification_filter: Default::default(), keep_alive: None, + telemetry: true, } } } @@ -532,6 +534,31 @@ impl DriverConfig { self.keep_alive = None; self } + + /// Enable or disable telemetry. + /// + /// If enabled (default) and the server requests is, the driver will send anonymous API usage + /// statistics to the server. + /// Currently, everytime one of the following APIs is used to execute a query + /// (for the first time), the server is informed of this + /// (without any further information like arguments, client identifiers, etc.): + /// + /// * [`ExecuteQueryBuilder::run()`] / [`ExecuteQueryBuilder::run_with_retry()`] + /// * [`TransactionBuilder::run()`] + /// * [`TransactionBuilder::run_with_retry()`] + /// * [`AutoCommitBuilder::run()`] + /// + /// # Example + /// ``` + /// use neo4j::driver::DriverConfig; + /// + /// let config = DriverConfig::new().with_telemetry(false); + /// ``` + #[inline] + pub fn with_telemetry(mut self, telemetry: bool) -> Self { + self.telemetry = telemetry; + self + } } impl ConnectionConfig { diff --git a/neo4j/src/driver/io/bolt.rs b/neo4j/src/driver/io/bolt.rs index 472e1cd..320d0fe 100644 --- a/neo4j/src/driver/io/bolt.rs +++ b/neo4j/src/driver/io/bolt.rs @@ -19,6 +19,7 @@ mod bolt5x0; mod bolt5x1; mod bolt5x2; mod bolt5x3; +mod bolt5x4; mod bolt_state; mod chunk; mod handshake; @@ -57,6 +58,7 @@ use bolt5x0::{Bolt5x0, Bolt5x0StructTranslator}; use bolt5x1::{Bolt5x1, Bolt5x1StructTranslator}; use bolt5x2::{Bolt5x2, Bolt5x2StructTranslator}; use bolt5x3::{Bolt5x3, Bolt5x3StructTranslator}; +use bolt5x4::{Bolt5x4, Bolt5x4StructTranslator}; use bolt_state::{BoltState, BoltStateTracker}; use chunk::{Chunker, Dechunker}; pub(crate) use handshake::{open, TcpConnector}; @@ -64,6 +66,7 @@ use message::BoltMessage; use message_parameters::{ BeginParameters, CommitParameters, DiscardParameters, GoodbyeParameters, HelloParameters, PullParameters, ReauthParameters, RollbackParameters, RouteParameters, RunParameters, + TelemetryParameters, }; use packstream::PackStreamSerializer; pub(crate) use response::{ @@ -172,6 +175,7 @@ impl Bolt { data: BoltData::new(version, stream, socket, local_port, address), // [bolt-version-bump] search tag when changing bolt version support protocol: match version { + (5, 4) => Bolt5x4::::default().into(), (5, 3) => Bolt5x3::::default().into(), (5, 2) => Bolt5x2::::default().into(), (5, 1) => Bolt5x1::::default().into(), @@ -281,8 +285,9 @@ impl Bolt { pub(crate) fn begin + Debug>( &mut self, parameters: BeginParameters, + callbacks: ResponseCallbacks, ) -> Result<()> { - self.protocol.begin(&mut self.data, parameters) + self.protocol.begin(&mut self.data, parameters, callbacks) } pub(crate) fn commit(&mut self, callbacks: ResponseCallbacks) -> Result<()> { @@ -303,6 +308,15 @@ impl Bolt { self.protocol.route(&mut self.data, parameters, callbacks) } + pub(crate) fn telemetry( + &mut self, + parameters: TelemetryParameters, + callbacks: ResponseCallbacks, + ) -> Result<()> { + self.protocol + .telemetry(&mut self.data, parameters, callbacks) + } + pub(crate) fn read_all( &mut self, deadline: Option, @@ -380,6 +394,9 @@ impl Bolt { pub(crate) fn is_idle_for(&self, timeout: Duration) -> bool { self.data.is_idle_for(timeout) } + pub(crate) fn set_telemetry_enabled(&mut self, enabled: bool) { + self.data.set_telemetry_enabled(enabled) + } #[inline(always)] pub(crate) fn debug_log(&self, msg: impl FnOnce() -> String) { @@ -438,6 +455,7 @@ trait BoltProtocol: Debug { &mut self, data: &mut BoltData, parameters: BeginParameters, + callbacks: ResponseCallbacks, ) -> Result<()>; fn commit( &mut self, @@ -456,6 +474,12 @@ trait BoltProtocol: Debug { parameters: RouteParameters, callbacks: ResponseCallbacks, ) -> Result<()>; + fn telemetry( + &mut self, + data: &mut BoltData, + parameters: TelemetryParameters, + callbacks: ResponseCallbacks, + ) -> Result<()>; fn load_value(&mut self, reader: &mut R) -> Result; fn handle_response( @@ -466,6 +490,7 @@ trait BoltProtocol: Debug { ) -> Result<()>; } +// [bolt-version-bump] search tag when changing bolt version support #[enum_dispatch(BoltProtocol)] #[derive(Debug)] enum BoltProtocolVersion { @@ -474,6 +499,7 @@ enum BoltProtocolVersion { V5x1(Bolt5x1), V5x2(Bolt5x2), V5x3(Bolt5x3), + V5x4(Bolt5x4), } #[derive(Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq)] @@ -494,6 +520,7 @@ pub(crate) struct BoltData { bolt_state: BoltStateTracker, meta: Arc>>, server_agent: Arc>>, + telemetry_enabled: Arc>, address: Arc
, address_str: String, last_qid: Arc>>, @@ -525,6 +552,7 @@ impl BoltData { bolt_state: BoltStateTracker::new(version), meta: Default::default(), server_agent: Default::default(), + telemetry_enabled: Default::default(), address, address_str, last_qid: Default::default(), @@ -703,6 +731,10 @@ impl BoltData { fn is_idle_for(&self, timeout: Duration) -> bool { self.idle_since.elapsed() >= timeout } + + fn set_telemetry_enabled(&mut self, enabled: bool) { + *self.telemetry_enabled.borrow_mut() = enabled; + } } impl Debug for BoltData { diff --git a/neo4j/src/driver/io/bolt/bolt4x4/protocol.rs b/neo4j/src/driver/io/bolt/bolt4x4/protocol.rs index c698348..af4ec8a 100644 --- a/neo4j/src/driver/io/bolt/bolt4x4/protocol.rs +++ b/neo4j/src/driver/io/bolt/bolt4x4/protocol.rs @@ -30,7 +30,7 @@ use super::super::message::BoltMessage; use super::super::message_parameters::{ BeginParameters, CommitParameters, DiscardParameters, GoodbyeParameters, HelloParameters, PullParameters, ReauthParameters, ResetParameters, RollbackParameters, RouteParameters, - RunParameters, + RunParameters, TelemetryParameters, }; use super::super::packstream::{ PackStreamSerializer, PackStreamSerializerDebugImpl, PackStreamSerializerImpl, @@ -267,8 +267,9 @@ impl BoltProtocol f &mut self, data: &mut BoltData, parameters: BeginParameters, + callbacks: ResponseCallbacks, ) -> Result<()> { - self.bolt5x0.begin(data, parameters) + self.bolt5x0.begin(data, parameters, callbacks) } #[inline] @@ -300,6 +301,16 @@ impl BoltProtocol f self.bolt5x0.route(data, parameters, callbacks) } + #[inline] + fn telemetry( + &mut self, + data: &mut BoltData, + parameters: TelemetryParameters, + callbacks: ResponseCallbacks, + ) -> Result<()> { + self.bolt5x0.telemetry(data, parameters, callbacks) + } + fn load_value(&mut self, reader: &mut R) -> Result { self.bolt5x0.load_value(reader) } diff --git a/neo4j/src/driver/io/bolt/bolt5x0/protocol.rs b/neo4j/src/driver/io/bolt/bolt5x0/protocol.rs index f402717..edce21d 100644 --- a/neo4j/src/driver/io/bolt/bolt5x0/protocol.rs +++ b/neo4j/src/driver/io/bolt/bolt5x0/protocol.rs @@ -31,7 +31,7 @@ use super::super::message::BoltMessage; use super::super::message_parameters::{ BeginParameters, CommitParameters, DiscardParameters, GoodbyeParameters, HelloParameters, PullParameters, ReauthParameters, ResetParameters, RollbackParameters, RouteParameters, - RunParameters, + RunParameters, TelemetryParameters, }; use super::super::packstream::{ PackStreamDeserializer, PackStreamDeserializerImpl, PackStreamSerializer, @@ -772,6 +772,7 @@ impl BoltProtocol for Bolt5x0 { &mut self, data: &mut BoltData, parameters: BeginParameters, + callbacks: ResponseCallbacks, ) -> Result<()> { let BeginParameters { bookmarks, @@ -844,7 +845,7 @@ impl BoltProtocol for Bolt5x0 { data.message_buff.push_back(vec![message_buff]); data.responses - .push_back(BoltResponse::from_message(ResponseMessage::Begin)); + .push_back(BoltResponse::new(ResponseMessage::Begin, callbacks)); debug_buf_end!(data, log_buf); Ok(()) } @@ -942,6 +943,17 @@ impl BoltProtocol for Bolt5x0 { Ok(()) } + #[inline] + fn telemetry( + &mut self, + _data: &mut BoltData, + _parameters: TelemetryParameters, + _callbacks: ResponseCallbacks, + ) -> Result<()> { + // TELEMETRY not support by this protocol version, so we ignore it. + Ok(()) + } + fn load_value(&mut self, reader: &mut R) -> Result { let mut deserializer = PackStreamDeserializerImpl::new(reader); deserializer.load(&self.translator).map_err(Into::into) diff --git a/neo4j/src/driver/io/bolt/bolt5x1/protocol.rs b/neo4j/src/driver/io/bolt/bolt5x1/protocol.rs index e4b1f40..466181e 100644 --- a/neo4j/src/driver/io/bolt/bolt5x1/protocol.rs +++ b/neo4j/src/driver/io/bolt/bolt5x1/protocol.rs @@ -26,7 +26,7 @@ use super::super::message::BoltMessage; use super::super::message_parameters::{ BeginParameters, CommitParameters, DiscardParameters, GoodbyeParameters, HelloParameters, PullParameters, ReauthParameters, ResetParameters, RollbackParameters, RouteParameters, - RunParameters, + RunParameters, TelemetryParameters, }; use super::super::packstream::{ PackStreamSerializer, PackStreamSerializerDebugImpl, PackStreamSerializerImpl, @@ -266,8 +266,9 @@ impl BoltProtocol for Bolt5x1 { &mut self, data: &mut BoltData, parameters: BeginParameters, + callbacks: ResponseCallbacks, ) -> Result<()> { - self.bolt5x0.begin(data, parameters) + self.bolt5x0.begin(data, parameters, callbacks) } #[inline] @@ -299,6 +300,16 @@ impl BoltProtocol for Bolt5x1 { self.bolt5x0.route(data, parameters, callbacks) } + #[inline] + fn telemetry( + &mut self, + data: &mut BoltData, + parameters: TelemetryParameters, + callbacks: ResponseCallbacks, + ) -> Result<()> { + self.bolt5x0.telemetry(data, parameters, callbacks) + } + #[inline] fn load_value(&mut self, reader: &mut R) -> Result { self.bolt5x0.load_value(reader) diff --git a/neo4j/src/driver/io/bolt/bolt5x2/protocol.rs b/neo4j/src/driver/io/bolt/bolt5x2/protocol.rs index d337596..da23a71 100644 --- a/neo4j/src/driver/io/bolt/bolt5x2/protocol.rs +++ b/neo4j/src/driver/io/bolt/bolt5x2/protocol.rs @@ -27,7 +27,7 @@ use super::super::message::BoltMessage; use super::super::message_parameters::{ BeginParameters, CommitParameters, DiscardParameters, GoodbyeParameters, HelloParameters, PullParameters, ReauthParameters, ResetParameters, RollbackParameters, RouteParameters, - RunParameters, + RunParameters, TelemetryParameters, }; use super::super::packstream::{ PackStreamSerializer, PackStreamSerializerDebugImpl, PackStreamSerializerImpl, @@ -356,6 +356,7 @@ impl BoltProtocol for Bolt5x2 { &mut self, data: &mut BoltData, parameters: BeginParameters, + callbacks: ResponseCallbacks, ) -> Result<()> { let BeginParameters { bookmarks, @@ -467,7 +468,7 @@ impl BoltProtocol for Bolt5x2 { data.message_buff.push_back(vec![message_buff]); data.responses - .push_back(BoltResponse::from_message(ResponseMessage::Begin)); + .push_back(BoltResponse::new(ResponseMessage::Begin, callbacks)); debug_buf_end!(data, log_buf); Ok(()) } @@ -501,6 +502,16 @@ impl BoltProtocol for Bolt5x2 { self.bolt5x1.route(data, parameters, callbacks) } + #[inline] + fn telemetry( + &mut self, + data: &mut BoltData, + parameters: TelemetryParameters, + callbacks: ResponseCallbacks, + ) -> Result<()> { + self.bolt5x1.telemetry(data, parameters, callbacks) + } + #[inline] fn load_value(&mut self, reader: &mut R) -> Result { self.bolt5x1.load_value(reader) diff --git a/neo4j/src/driver/io/bolt/bolt5x3/protocol.rs b/neo4j/src/driver/io/bolt/bolt5x3/protocol.rs index e0a9dfe..9414480 100644 --- a/neo4j/src/driver/io/bolt/bolt5x3/protocol.rs +++ b/neo4j/src/driver/io/bolt/bolt5x3/protocol.rs @@ -28,7 +28,7 @@ use super::super::message::BoltMessage; use super::super::message_parameters::{ BeginParameters, CommitParameters, DiscardParameters, GoodbyeParameters, HelloParameters, PullParameters, ReauthParameters, ResetParameters, RollbackParameters, RouteParameters, - RunParameters, + RunParameters, TelemetryParameters, }; use super::super::packstream::{ PackStreamSerializer, PackStreamSerializerDebugImpl, PackStreamSerializerImpl, @@ -227,8 +227,9 @@ impl BoltProtocol for Bolt5x3 { &mut self, data: &mut BoltData, parameters: BeginParameters, + callbacks: ResponseCallbacks, ) -> Result<()> { - self.bolt5x2.begin(data, parameters) + self.bolt5x2.begin(data, parameters, callbacks) } #[inline] @@ -260,6 +261,16 @@ impl BoltProtocol for Bolt5x3 { self.bolt5x2.route(data, parameters, callbacks) } + #[inline] + fn telemetry( + &mut self, + data: &mut BoltData, + parameters: TelemetryParameters, + callbacks: ResponseCallbacks, + ) -> Result<()> { + self.bolt5x2.telemetry(data, parameters, callbacks) + } + #[inline] fn load_value(&mut self, reader: &mut R) -> Result { self.bolt5x2.load_value(reader) diff --git a/neo4j/src/driver/io/bolt/bolt5x4.rs b/neo4j/src/driver/io/bolt/bolt5x4.rs new file mode 100644 index 0000000..94ac346 --- /dev/null +++ b/neo4j/src/driver/io/bolt/bolt5x4.rs @@ -0,0 +1,19 @@ +// Copyright Rouven Bauer +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod protocol; +mod translator; + +pub(crate) use protocol::Bolt5x4; +pub(crate) use translator::Bolt5x4StructTranslator; diff --git a/neo4j/src/driver/io/bolt/bolt5x4/protocol.rs b/neo4j/src/driver/io/bolt/bolt5x4/protocol.rs new file mode 100644 index 0000000..1706656 --- /dev/null +++ b/neo4j/src/driver/io/bolt/bolt5x4/protocol.rs @@ -0,0 +1,370 @@ +// Copyright Rouven Bauer +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use atomic_refcell::AtomicRefCell; +use std::borrow::Borrow; +use std::collections::HashMap; +use std::fmt::Debug; +use std::io::{Read, Write}; +use std::mem; +use std::net::TcpStream; +use std::ops::Deref; +use std::sync::Arc; + +use log::{debug, log_enabled, warn, Level}; + +use super::super::bolt5x0::Bolt5x0; +use super::super::bolt5x2::Bolt5x2; +use super::super::bolt5x3::Bolt5x3; +use super::super::bolt_common::ServerAwareBoltVersion; +use super::super::message::BoltMessage; +use super::super::message_parameters::{ + BeginParameters, CommitParameters, DiscardParameters, GoodbyeParameters, HelloParameters, + PullParameters, ReauthParameters, ResetParameters, RollbackParameters, RouteParameters, + RunParameters, TelemetryAPI, TelemetryParameters, +}; +use super::super::packstream::{ + PackStreamSerializer, PackStreamSerializerDebugImpl, PackStreamSerializerImpl, +}; +use super::super::{ + bolt_debug_extra, dbg_extra, debug_buf, debug_buf_end, debug_buf_start, BoltData, BoltMeta, + BoltProtocol, BoltResponse, BoltStructTranslator, OnServerErrorCb, ResponseCallbacks, + ResponseMessage, +}; +use crate::error_::Result; +use crate::value::ValueReceive; + +const SERVER_AGENT_KEY: &str = "server"; +const HINTS_KEY: &str = "hints"; +const RECV_TIMEOUT_KEY: &str = "connection.recv_timeout_seconds"; +const TELEMETRY_ENABLED_KEY: &str = "telemetry.enabled"; + +#[derive(Debug)] +pub(crate) struct Bolt5x4 { + translator: T, + pub(in super::super) bolt5x3: Bolt5x3, + protocol_version: ServerAwareBoltVersion, +} + +impl Bolt5x4 { + pub(in super::super) fn new(protocol_version: ServerAwareBoltVersion) -> Self { + Self { + translator: T::default(), + bolt5x3: Bolt5x3::new(protocol_version), + protocol_version, + } + } + + pub(in super::super) fn hello_response_telemetry_hint( + hints: &HashMap, + telemetry_enabled: &mut bool, + ) { + if !*telemetry_enabled { + // driver config opted out of telemetry + return; + } + let Some(enabled) = hints.get(TELEMETRY_ENABLED_KEY) else { + // server implicitly opted out of telemetry + *telemetry_enabled = false; + return; + }; + let ValueReceive::Boolean(enabled) = enabled else { + warn!( + "Server sent unexpected {TELEMETRY_ENABLED_KEY} type {:?}", + enabled + ); + return; + }; + // since client didn't opt out, leave it up to the server + *telemetry_enabled = *enabled; + } + + pub(in super::super) fn hello_response_handle_connection_hints( + meta: &BoltMeta, + socket: Option<&TcpStream>, + telemetry_enabled: &mut bool, + ) { + let empty_hints = HashMap::new(); + let hints = match meta.get(HINTS_KEY) { + Some(ValueReceive::Map(hints)) => hints, + Some(value) => { + warn!("Server sent unexpected {HINTS_KEY} type {:?}", value); + &empty_hints + } + None => &empty_hints, + }; + Bolt5x0::::hello_response_handle_timeout_hint(hints, socket); + Self::hello_response_telemetry_hint(hints, telemetry_enabled); + } + + pub(in super::super) fn enqueue_hello_response(data: &mut BoltData) { + let bolt_meta = Arc::clone(&data.meta); + let telemetry_enabled = Arc::clone(&data.telemetry_enabled); + let bolt_server_agent = Arc::clone(&data.server_agent); + let socket = Arc::clone(&data.socket); + + data.responses.push_back(BoltResponse::new( + ResponseMessage::Hello, + ResponseCallbacks::new().with_on_success(move |mut meta| { + Bolt5x0::::hello_response_handle_agent(&mut meta, &bolt_server_agent); + Self::hello_response_handle_connection_hints( + &meta, + socket.deref().as_ref(), + &mut telemetry_enabled.borrow_mut(), + ); + mem::swap(&mut *bolt_meta.borrow_mut(), &mut meta); + Ok(()) + }), + )); + } + + pub(in super::super) fn encode_telemetry(api: TelemetryAPI) -> i64 { + match api { + TelemetryAPI::TxFunc => 0, + TelemetryAPI::UnmanagedTx => 1, + TelemetryAPI::AutoCommit => 2, + TelemetryAPI::DriverLevel => 3, + } + } +} + +impl Default for Bolt5x4 { + fn default() -> Self { + Self::new(ServerAwareBoltVersion::V5x4) + } +} + +impl BoltProtocol for Bolt5x4 { + fn hello( + &mut self, + data: &mut BoltData, + parameters: HelloParameters, + ) -> Result<()> { + let HelloParameters { + user_agent, + auth: _, + routing_context, + notification_filter, + } = parameters; + debug_buf_start!(log_buf); + debug_buf!(log_buf, "C: HELLO"); + let mut dbg_serializer = PackStreamSerializerDebugImpl::new(); + let mut message_buff = Vec::new(); + let mut serializer = PackStreamSerializerImpl::new(&mut message_buff); + serializer.write_struct_header(0x01, 1)?; + + let extra_size = 2 + + Bolt5x2::::notification_filter_entries_count(Some(notification_filter)) + + >::into(routing_context.is_some()); + + serializer.write_dict_header(extra_size)?; + debug_buf!(log_buf, " {}", { + dbg_serializer.write_dict_header(extra_size).unwrap(); + dbg_serializer.flush() + }); + + Bolt5x0::::write_user_agent_entry( + log_buf.as_mut(), + &mut serializer, + &mut dbg_serializer, + user_agent, + )?; + + Bolt5x3::::write_bolt_agent_entry( + log_buf.as_mut(), + &mut serializer, + &mut dbg_serializer, + )?; + + self.bolt5x3 + .bolt5x2 + .bolt5x1 + .bolt5x0 + .write_routing_context_entry( + log_buf.as_mut(), + &mut serializer, + &mut dbg_serializer, + data, + routing_context, + )?; + + Bolt5x2::::write_notification_filter_entries( + log_buf.as_mut(), + &mut serializer, + &mut dbg_serializer, + Some(notification_filter), + )?; + + data.message_buff.push_back(vec![message_buff]); + debug_buf_end!(data, log_buf); + + Self::enqueue_hello_response(data); + Ok(()) + } + + #[inline] + fn reauth( + &mut self, + data: &mut BoltData, + parameters: ReauthParameters, + ) -> Result<()> { + self.bolt5x3.reauth(data, parameters) + } + + #[inline] + fn supports_reauth(&self) -> bool { + self.bolt5x3.supports_reauth() + } + + #[inline] + fn goodbye( + &mut self, + data: &mut BoltData, + parameters: GoodbyeParameters, + ) -> Result<()> { + self.bolt5x3.goodbye(data, parameters) + } + + #[inline] + fn reset( + &mut self, + data: &mut BoltData, + parameters: ResetParameters, + ) -> Result<()> { + self.bolt5x3.reset(data, parameters) + } + + #[inline] + fn run + Debug, KM: Borrow + Debug>( + &mut self, + data: &mut BoltData, + parameters: RunParameters, + callbacks: ResponseCallbacks, + ) -> Result<()> { + self.bolt5x3.run(data, parameters, callbacks) + } + + #[inline] + fn discard( + &mut self, + data: &mut BoltData, + parameters: DiscardParameters, + callbacks: ResponseCallbacks, + ) -> Result<()> { + self.bolt5x3.discard(data, parameters, callbacks) + } + + #[inline] + fn pull( + &mut self, + data: &mut BoltData, + parameters: PullParameters, + callbacks: ResponseCallbacks, + ) -> Result<()> { + self.bolt5x3.pull(data, parameters, callbacks) + } + + #[inline] + fn begin + Debug>( + &mut self, + data: &mut BoltData, + parameters: BeginParameters, + callbacks: ResponseCallbacks, + ) -> Result<()> { + self.bolt5x3.begin(data, parameters, callbacks) + } + + #[inline] + fn commit( + &mut self, + data: &mut BoltData, + parameters: CommitParameters, + callbacks: ResponseCallbacks, + ) -> Result<()> { + self.bolt5x3.commit(data, parameters, callbacks) + } + + #[inline] + fn rollback( + &mut self, + data: &mut BoltData, + parameters: RollbackParameters, + ) -> Result<()> { + self.bolt5x3.rollback(data, parameters) + } + + #[inline] + fn route( + &mut self, + data: &mut BoltData, + parameters: RouteParameters, + callbacks: ResponseCallbacks, + ) -> Result<()> { + self.bolt5x3.route(data, parameters, callbacks) + } + + fn telemetry( + &mut self, + data: &mut BoltData, + parameters: TelemetryParameters, + callbacks: ResponseCallbacks, + ) -> Result<()> { + if !AtomicRefCell::borrow(&data.telemetry_enabled).deref() { + return Ok(()); + } + + let TelemetryParameters { api } = parameters; + debug_buf_start!(log_buf); + debug_buf!(log_buf, "C: TELEMETRY"); + let mut dbg_serializer = PackStreamSerializerDebugImpl::new(); + let mut message_buff = Vec::new(); + let mut serializer = PackStreamSerializerImpl::new(&mut message_buff); + serializer.write_struct_header(0x54, 1)?; + + serializer.write_int(Self::encode_telemetry(api))?; + debug_buf!( + log_buf, + " {} // ({})", + { + dbg_serializer + .write_int(Self::encode_telemetry(api)) + .unwrap(); + dbg_serializer.flush() + }, + api.name() + ); + + data.message_buff.push_back(vec![message_buff]); + debug_buf_end!(data, log_buf); + data.responses + .push_back(BoltResponse::new(ResponseMessage::Telemetry, callbacks)); + Ok(()) + } + + #[inline] + fn load_value(&mut self, reader: &mut R) -> Result { + self.bolt5x3.load_value(reader) + } + + #[inline] + fn handle_response( + &mut self, + bolt_data: &mut BoltData, + message: BoltMessage, + on_server_error: OnServerErrorCb, + ) -> Result<()> { + self.bolt5x3 + .handle_response(bolt_data, message, on_server_error) + } +} diff --git a/neo4j/src/driver/io/bolt/bolt5x4/translator.rs b/neo4j/src/driver/io/bolt/bolt5x4/translator.rs new file mode 100644 index 0000000..71dc35a --- /dev/null +++ b/neo4j/src/driver/io/bolt/bolt5x4/translator.rs @@ -0,0 +1,17 @@ +// Copyright Rouven Bauer +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use super::super::bolt5x3::Bolt5x3StructTranslator; + +pub(crate) type Bolt5x4StructTranslator = Bolt5x3StructTranslator; diff --git a/neo4j/src/driver/io/bolt/bolt_state.rs b/neo4j/src/driver/io/bolt/bolt_state.rs index d2502d9..6522b40 100644 --- a/neo4j/src/driver/io/bolt/bolt_state.rs +++ b/neo4j/src/driver/io/bolt/bolt_state.rs @@ -81,6 +81,7 @@ impl BoltStateTracker { ResponseMessage::Commit => self.update_commit(), ResponseMessage::Rollback => self.update_rollback(), ResponseMessage::Route => self.update_route(), + ResponseMessage::Telemetry => {} // no state transition } if self.state != pre_state { diff --git a/neo4j/src/driver/io/bolt/handshake.rs b/neo4j/src/driver/io/bolt/handshake.rs index 5018908..d7445bb 100644 --- a/neo4j/src/driver/io/bolt/handshake.rs +++ b/neo4j/src/driver/io/bolt/handshake.rs @@ -34,7 +34,7 @@ use crate::time::Instant; const BOLT_MAGIC_PREAMBLE: [u8; 4] = [0x60, 0x60, 0xB0, 0x17]; // [bolt-version-bump] search tag when changing bolt version support const BOLT_VERSION_OFFER: [u8; 16] = [ - 0, 3, 3, 5, // BOLT 5.3 - 5.0 + 0, 4, 4, 5, // BOLT 5.4 - 5.0 0, 0, 4, 4, // BOLT 4.4 0, 0, 0, 0, // - 0, 0, 0, 0, // - @@ -243,6 +243,7 @@ fn decode_version_offer(offer: &[u8; 4]) -> Result<(u8, u8)> { [0, 0, 0, 0] => Err(Neo4jError::InvalidConfig { message: String::from("server version not supported"), }), + [_, _, 4, 5] => Ok((5, 4)), [_, _, 3, 5] => Ok((5, 3)), [_, _, 2, 5] => Ok((5, 2)), [_, _, 1, 5] => Ok((5, 1)), @@ -356,6 +357,7 @@ mod tests { #[case([0, 0, 1, 5], (5, 1))] #[case([0, 0, 2, 5], (5, 2))] #[case([0, 0, 3, 5], (5, 3))] + #[case([0, 0, 4, 5], (5, 4))] fn test_decode_version_offer( #[case] mut offer: [u8; 4], #[case] expected: (u8, u8), @@ -395,7 +397,7 @@ mod tests { #[case([0, 0, 1, 4])] // driver didn't offer version 4.1 #[case([0, 0, 2, 4])] // driver didn't offer version 4.2 #[case([0, 0, 3, 4])] // driver didn't offer version 4.3 - #[case([0, 0, 4, 5])] // driver didn't offer version 5.4 + #[case([0, 0, 5, 5])] // driver didn't offer version 5.4 #[case([0, 0, 0, 6])] // driver didn't offer version 6.0 fn test_garbage_server_version( #[case] mut offer: [u8; 4], diff --git a/neo4j/src/driver/io/bolt/message_parameters.rs b/neo4j/src/driver/io/bolt/message_parameters.rs index 2037507..e5b040e 100644 --- a/neo4j/src/driver/io/bolt/message_parameters.rs +++ b/neo4j/src/driver/io/bolt/message_parameters.rs @@ -233,3 +233,33 @@ impl<'a> RouteParameters<'a> { } } } + +#[derive(Debug, Clone, Copy)] +pub(crate) enum TelemetryAPI { + TxFunc, + UnmanagedTx, + AutoCommit, + DriverLevel, +} + +impl TelemetryAPI { + pub(crate) fn name(&self) -> &'static str { + match self { + Self::TxFunc => "Session::transaction with retry", + Self::UnmanagedTx => "Session::transaction without retry", + Self::AutoCommit => "Session::auto_commit", + Self::DriverLevel => "Driver::execute_query", + } + } +} + +#[derive(Debug, Clone, Copy)] +pub(crate) struct TelemetryParameters { + pub(super) api: TelemetryAPI, +} + +impl TelemetryParameters { + pub(crate) fn new(api: TelemetryAPI) -> Self { + Self { api } + } +} diff --git a/neo4j/src/driver/io/bolt/response.rs b/neo4j/src/driver/io/bolt/response.rs index 23915be..a99a86c 100644 --- a/neo4j/src/driver/io/bolt/response.rs +++ b/neo4j/src/driver/io/bolt/response.rs @@ -31,6 +31,7 @@ pub(crate) enum ResponseMessage { Commit, Rollback, Route, + Telemetry, } #[derive(Debug)] diff --git a/neo4j/src/driver/io/pool.rs b/neo4j/src/driver/io/pool.rs index 1342b15..5ba00ae 100644 --- a/neo4j/src/driver/io/pool.rs +++ b/neo4j/src/driver/io/pool.rs @@ -159,6 +159,7 @@ pub(crate) struct PoolConfig { pub(crate) connection_acquisition_timeout: Option, pub(crate) resolver: Option>, pub(crate) notification_filters: Arc, + pub(crate) telemetry: bool, } impl PoolConfig { diff --git a/neo4j/src/driver/io/pool/single_pool.rs b/neo4j/src/driver/io/pool/single_pool.rs index 9361798..92d0d2c 100644 --- a/neo4j/src/driver/io/pool/single_pool.rs +++ b/neo4j/src/driver/io/pool/single_pool.rs @@ -102,6 +102,7 @@ impl InnerPool { let address = Arc::clone(&self.address); let mut connection = self.open_socket(address, deadline)?; + connection.set_telemetry_enabled(self.config.telemetry); connection.hello(HelloParameters::new( &self.config.user_agent, diff --git a/neo4j/src/driver/record_stream.rs b/neo4j/src/driver/record_stream.rs index d66741e..990b55f 100644 --- a/neo4j/src/driver/record_stream.rs +++ b/neo4j/src/driver/record_stream.rs @@ -542,7 +542,9 @@ impl RecordListener { fn failure_cb(&mut self, me: Weak>, error: ServerError) -> Result<()> { if let Some(error_propagator) = &self.error_propagator { - error_propagator.borrow_mut().propagate_error(me, &error); + error_propagator + .borrow_mut() + .propagate_error(Some(me), &error); } self.state = RecordListenerState::Error(error.into()); self.summary = None; @@ -604,6 +606,8 @@ pub(crate) struct ErrorPropagator { error: Option>, } +pub(crate) type SharedErrorPropagator = Arc>; + impl ErrorPropagator { pub(crate) fn new() -> Self { Self::default() @@ -623,7 +627,7 @@ impl ErrorPropagator { fn propagate_error( &mut self, - source: Weak>, + source: Option>>, error: &ServerError, ) { let error = Arc::new(ServerError::new( @@ -634,8 +638,10 @@ impl ErrorPropagator { ), )); for listener in self.listeners.iter() { - if source.ptr_eq(listener) { - continue; + if let Some(source) = source.as_ref() { + if source.ptr_eq(listener) { + continue; + } } if let Some(listener) = listener.upgrade() { listener.borrow_mut().set_foreign_error(Arc::clone(&error)); @@ -647,9 +653,16 @@ impl ErrorPropagator { pub(crate) fn error(&self) -> &Option> { &self.error } -} -pub(crate) type SharedErrorPropagator = Arc>; + pub(crate) fn make_on_error_cb( + this: SharedErrorPropagator, + ) -> impl FnMut(ServerError) -> Result<()> + Send + Sync + 'static { + move |err| { + this.borrow_mut().propagate_error(None, &err); + Ok(()) + } + } +} #[derive(Debug, Error)] pub enum GetSingleRecordError { diff --git a/neo4j/src/driver/session.rs b/neo4j/src/driver/session.rs index 101c0e8..812abe7 100644 --- a/neo4j/src/driver/session.rs +++ b/neo4j/src/driver/session.rs @@ -16,11 +16,13 @@ pub(crate) mod bookmarks; pub(crate) mod config; pub(crate) mod retry; +use atomic_refcell::AtomicRefCell; use std::borrow::Borrow; use std::cell::RefCell; use std::collections::HashMap; use std::fmt::{Debug, Formatter}; use std::marker::PhantomData; +use std::ops::Deref; use std::rc::Rc; use std::result::Result as StdResult; use std::sync::Arc; @@ -28,9 +30,12 @@ use std::sync::Arc; use log::{debug, info}; use super::config::auth::AuthToken; -use super::io::bolt::message_parameters::{BeginParameters, RunParameters}; +use super::io::bolt::message_parameters::{ + BeginParameters, RunParameters, TelemetryAPI, TelemetryParameters, +}; +use super::io::bolt::ResponseCallbacks; use super::io::{AcquireConfig, Pool, PooledBolt, UpdateRtArgs}; -use super::record_stream::RecordStream; +use super::record_stream::{ErrorPropagator, RecordStream, SharedErrorPropagator}; use super::transaction::{Transaction, TransactionTimeout}; use super::{EagerResult, ReducedDriverConfig, RoutingControl}; use crate::driver::io::SessionAuth; @@ -131,9 +136,17 @@ impl<'driver> Session<'driver> { &'session mut self, builder: AutoCommitBuilder<'driver, 'session, Q, KP, P, KM, M, FRes>, ) -> Result { - let cx = self.acquire_connection(builder.mode)?; - let mut record_stream = - RecordStream::new(Rc::new(RefCell::new(cx)), self.fetch_size(), true, None); + let mut connection = self.acquire_connection(builder.mode)?; + connection.telemetry( + TelemetryParameters::new(TelemetryAPI::AutoCommit), + ResponseCallbacks::new(), + )?; + let mut record_stream = RecordStream::new( + Rc::new(RefCell::new(connection)), + self.fetch_size(), + true, + None, + ); let res = record_stream .run(RunParameters::new_auto_commit_run( builder.query.as_ref(), @@ -189,8 +202,26 @@ impl<'driver> Session<'driver> { builder: &TransactionBuilder<'driver, 'session, KM, M>, receiver: FTx, ) -> Result { - let connection = self.acquire_connection(builder.mode)?; - let mut tx = InnerTransaction::new(connection, self.fetch_size()); + let mut connection = self.acquire_connection(builder.mode)?; + let error_propagator = SharedErrorPropagator::default(); + + if let Some(api) = *builder.api.deref().borrow() { + connection.telemetry(TelemetryParameters::new(api), { + let api = Arc::clone(&builder.api); + ResponseCallbacks::new() + .with_on_success(move |_meta| { + // Once a TELEMETRY message made it successfully to the server, we can + // stop trying to send it again. + api.borrow_mut().take(); + Ok(()) + }) + .with_on_failure(ErrorPropagator::make_on_error_cb(Arc::clone( + &error_propagator, + ))) + })?; + } + let mut tx = + InnerTransaction::new(connection, self.fetch_size(), Arc::clone(&error_propagator)); let bookmarks = &*self.session_bookmarks.get_bookmarks_for_work()?; let parameters = BeginParameters::new( Some(bookmarks), @@ -205,7 +236,12 @@ impl<'driver> Session<'driver> { .map(|imp| imp.as_str()), &self.config.config.notification_filter, ); - tx.begin(parameters, self.config.eager_begin)?; + tx.begin( + parameters, + self.config.eager_begin, + ResponseCallbacks::new() + .with_on_failure(ErrorPropagator::make_on_error_cb(error_propagator)), + )?; let res = receiver(Transaction::new(&mut tx)); let res = match res { Ok(_) => { @@ -756,6 +792,7 @@ pub struct TransactionBuilder<'driver, 'session, KM, M> { meta: M, timeout: TransactionTimeout, mode: RoutingControl, + api: Arc>>, } impl<'driver, 'session> TransactionBuilder<'driver, 'session, DefaultMetaKey, DefaultMeta> { @@ -766,6 +803,7 @@ impl<'driver, 'session> TransactionBuilder<'driver, 'session, DefaultMetaKey, De meta: Default::default(), timeout: Default::default(), mode: RoutingControl::Write, + api: Default::default(), } } } @@ -810,6 +848,7 @@ impl<'driver, 'session, KM: Borrow + Debug, M: Borrow + Debug, M: Borrow + Debug, M: Borrow + Debug, M: Borrow + Debug, M: Borrow) -> Self { + self.api = Arc::new(AtomicRefCell::new(api)); + self + } + /// Run the transaction. The work to be done is specified by the given `receiver`. /// /// The `receiver` will be called with a [`Transaction`] that can be used to execute queries, @@ -937,6 +985,9 @@ impl<'driver, 'session, KM: Borrow + Debug, M: Borrow(mut self, receiver: impl FnOnce(Transaction) -> Result) -> Result { + self.api + .borrow_mut() + .get_or_insert(TelemetryAPI::UnmanagedTx); let session = self.session.take().unwrap(); session.transaction_run(&self, receiver) } @@ -952,6 +1003,7 @@ impl<'driver, 'session, KM: Borrow + Debug, M: Borrow Result, ) -> StdResult { + self.api.borrow_mut().get_or_insert(TelemetryAPI::TxFunc); let session = self.session.take().unwrap(); retry_policy.execute(|| session.transaction_run(&self, &mut receiver)) } diff --git a/neo4j/src/driver/transaction.rs b/neo4j/src/driver/transaction.rs index d3f23dc..ec0d86d 100644 --- a/neo4j/src/driver/transaction.rs +++ b/neo4j/src/driver/transaction.rs @@ -163,11 +163,15 @@ pub(crate) struct InnerTransaction<'driver> { } impl<'driver> InnerTransaction<'driver> { - pub(crate) fn new(connection: PooledBolt<'driver>, fetch_size: i64) -> Self { + pub(crate) fn new( + connection: PooledBolt<'driver>, + fetch_size: i64, + error_propagator: SharedErrorPropagator, + ) -> Self { Self { connection: Rc::new(RefCell::new(connection)), bookmark: Default::default(), - error_propagator: Default::default(), + error_propagator, fetch_size, closed: false, } @@ -177,9 +181,10 @@ impl<'driver> InnerTransaction<'driver> { &mut self, parameters: BeginParameters, eager: bool, + callbacks: ResponseCallbacks, ) -> Result<()> { let mut cx = self.connection.borrow_mut(); - cx.begin(parameters)?; + cx.begin(parameters, callbacks)?; if eager { cx.write_all(None)?; cx.read_all(None)?; diff --git a/neo4j/src/lib.rs b/neo4j/src/lib.rs index 89292f3..88c3a26 100644 --- a/neo4j/src/lib.rs +++ b/neo4j/src/lib.rs @@ -25,10 +25,10 @@ //! //! ## Compatibility // [bolt-version-bump] search tag when changing bolt version support -//! This driver supports bolt protocol version 4.4, and 5.0 - 5.3. -//! This corresponds to Neo4j versions 4.4, and 5.0 - 5.13. -//! Newer 5.x versions of the server are able to negotiate a lower, common protocol version. -//! Therefore, they, too, can be connected to but some features may be available though this driver. +//! This driver supports bolt protocol version 4.4, and 5.0 - 5.4. +//! This corresponds to Neo4j versions 4.4, and 5.0 - 5.19+. +//! For details of bolt protocol compatibility, see the +//! [official Neo4j documentation](https://neo4j.com/docs/bolt/current/bolt-compatibility/). //! //! ## Basic Example //! ``` diff --git a/testkit_backend/src/testkit_backend/requests.rs b/testkit_backend/src/testkit_backend/requests.rs index 3f1802f..9ff5609 100644 --- a/testkit_backend/src/testkit_backend/requests.rs +++ b/testkit_backend/src/testkit_backend/requests.rs @@ -77,6 +77,8 @@ pub(super) enum Request { connection_acquisition_timeout_ms: Option, notifications_min_severity: Option, notifications_disabled_categories: Option>, + #[serde(rename = "telemetryDisabled")] + telemetry_disabled: Option, encrypted: Option, trusted_certificates: Option>, }, @@ -756,6 +758,7 @@ impl Request { connection_acquisition_timeout_ms, notifications_min_severity, notifications_disabled_categories, + telemetry_disabled, encrypted, trusted_certificates, } = self @@ -829,6 +832,9 @@ impl Request { )? { driver_config = driver_config.with_notification_filter(filter); } + if let Some(telemetry_disabled) = telemetry_disabled { + driver_config = driver_config.with_telemetry(!telemetry_disabled); + } if let Some(encrypted) = encrypted { connection_config = match encrypted { true => match trusted_certificates { diff --git a/testkit_backend/src/testkit_backend/responses.rs b/testkit_backend/src/testkit_backend/responses.rs index ea9620f..ed98f38 100644 --- a/testkit_backend/src/testkit_backend/responses.rs +++ b/testkit_backend/src/testkit_backend/responses.rs @@ -35,8 +35,9 @@ use super::requests::TestKitAuth; use super::session_holder::SummaryWithQuery; use super::BackendId; +// [bolt-version-bump] search tag when changing bolt version support // https://github.com/rust-lang/rust/issues/85077 -const FEATURE_LIST: [&str; 43] = [ +const FEATURE_LIST: [&str; 44] = [ // === FUNCTIONAL FEATURES === "Feature:API:BookmarkManager", "Feature:API:ConnectionAcquisitionTimeout", @@ -74,7 +75,7 @@ const FEATURE_LIST: [&str; 43] = [ "Feature:Bolt:5.1", "Feature:Bolt:5.2", "Feature:Bolt:5.3", - // "Feature:Bolt:5.4", + "Feature:Bolt:5.4", "Feature:Bolt:Patch:UTC", "Feature:Impersonation", // "Feature:TLS:1.1", // rustls says no! For a good reason. From 038244057ca9125723d0dedc610a3a37911fe16d Mon Sep 17 00:00:00 2001 From: Robsdedude Date: Sat, 13 Apr 2024 13:55:29 +0200 Subject: [PATCH 2/7] Target latest TestKit protocol --- CHANGELOG.md | 2 +- neo4j/src/driver/transaction.rs | 1 + .../src/testkit_backend/requests.rs | 20 +++++++++++++++++++ 3 files changed, 22 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index df43e85..143ef16 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,7 +10,7 @@ - Reduce the number of lifetime generic parameters in `TransactionQueryBuilder` and `TransactionRecordStream`. - Fix `Transaction::rolblack()` failing if a result stream failed before. - Introduce `neo4j::driver::Conifg::with_keep_alive()` and `without_keep_alive()`. -- Fixed errors during `BEGIN` not being properly propagated through the transaction's internals. +- Fixed errors during transaction `BEGIN` not being properly propagated. - Add support for Bolt 5.3 (bolt agent). - Add support for Bolt 5.4 (telemetry). diff --git a/neo4j/src/driver/transaction.rs b/neo4j/src/driver/transaction.rs index ec0d86d..b6b81a4 100644 --- a/neo4j/src/driver/transaction.rs +++ b/neo4j/src/driver/transaction.rs @@ -188,6 +188,7 @@ impl<'driver> InnerTransaction<'driver> { if eager { cx.write_all(None)?; cx.read_all(None)?; + self.check_error()?; } Ok(()) } diff --git a/testkit_backend/src/testkit_backend/requests.rs b/testkit_backend/src/testkit_backend/requests.rs index 9ff5609..6d1c2e3 100644 --- a/testkit_backend/src/testkit_backend/requests.rs +++ b/testkit_backend/src/testkit_backend/requests.rs @@ -75,6 +75,10 @@ pub(super) enum Request { liveness_check_timeout_ms: Option, max_connection_pool_size: Option, connection_acquisition_timeout_ms: Option, + #[serde(rename = "clientCertificate")] + client_certificate: Option, + #[serde(rename = "clientCertificateProviderId")] + client_certificate_provider_id: Option, notifications_min_severity: Option, notifications_disabled_categories: Option>, #[serde(rename = "telemetryDisabled")] @@ -532,6 +536,17 @@ pub(super) enum AuthTokenAndExpiration { }, } +#[derive(Deserialize, Debug)] +#[serde(tag = "name", content = "data")] +#[allow(dead_code)] // reflects TestKit protocol +pub(super) enum ClientCertificate { + ClientCertificate { + certfile: String, + keyfile: String, + password: Option, + }, +} + #[derive(Deserialize, Debug)] #[serde(untagged)] #[allow(dead_code)] // reflects TestKit protocol @@ -756,6 +771,8 @@ impl Request { liveness_check_timeout_ms, max_connection_pool_size, connection_acquisition_timeout_ms, + client_certificate, + client_certificate_provider_id, notifications_min_severity, notifications_disabled_categories, telemetry_disabled, @@ -826,6 +843,9 @@ impl Request { Duration::from_millis(connection_acquisition_timeout_ms), ); } + if client_certificate.is_some() || client_certificate_provider_id.is_some() { + return Err(TestKitError::backend_err("mTLS not (yet) supported")); + } if let Some(filter) = load_notification_filter( notifications_min_severity, notifications_disabled_categories, From 370104d4b2af97ec01f3d674bf36ad4efbbbebac Mon Sep 17 00:00:00 2001 From: Robsdedude Date: Sat, 13 Apr 2024 14:14:58 +0200 Subject: [PATCH 3/7] Fix propagation of `is_retryable()` of errors within transactions --- CHANGELOG.md | 3 ++- neo4j/src/driver/record_stream.rs | 35 ++++++++++++------------------- neo4j/src/driver/transaction.rs | 6 ++---- neo4j/src/error_.rs | 16 ++++++++++++++ 4 files changed, 33 insertions(+), 27 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 143ef16..89e168e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,7 +10,8 @@ - Reduce the number of lifetime generic parameters in `TransactionQueryBuilder` and `TransactionRecordStream`. - Fix `Transaction::rolblack()` failing if a result stream failed before. - Introduce `neo4j::driver::Conifg::with_keep_alive()` and `without_keep_alive()`. -- Fixed errors during transaction `BEGIN` not being properly propagated. +- Fix errors during transaction `BEGIN` not being properly propagated. +- Fix propagation of `is_retryable()` of errors within transactions. - Add support for Bolt 5.3 (bolt agent). - Add support for Bolt 5.4 (telemetry). diff --git a/neo4j/src/driver/record_stream.rs b/neo4j/src/driver/record_stream.rs index 990b55f..b9cb6b1 100644 --- a/neo4j/src/driver/record_stream.rs +++ b/neo4j/src/driver/record_stream.rs @@ -18,6 +18,7 @@ use std::collections::VecDeque; use std::fmt::Debug; use std::iter::FusedIterator; use std::mem; +use std::ops::Deref; use std::rc::Rc; use std::result; use std::sync::{Arc, Weak}; @@ -72,7 +73,7 @@ impl<'driver> RecordStream<'driver> { parameters: RunParameters, ) -> Result<()> { if let RecordListenerState::ForeignError(e) = &(*self.listener).borrow().state { - return Err(ServerError::new(String::from(e.code()), String::from(e.message())).into()); + return Err(e.deref().clone().into()); } let mut callbacks = self.failure_callbacks(); @@ -129,11 +130,7 @@ impl<'driver> RecordStream<'driver> { mem::swap(state, &mut state_swap); match state_swap { RecordListenerState::ForeignError(e) => { - return Err(ServerError::new( - String::from(e.code()), - String::from(e.message()), - ) - .into()) + return Err(e.deref().clone().into()) } _ => panic!("checked state to be error above"), } @@ -420,11 +417,7 @@ impl<'driver> Iterator for RecordStream<'driver> { mem::swap(&mut listener.state, &mut state); match state { RecordListenerState::ForeignError(e) => { - return Some(Err(ServerError::new( - String::from(e.code()), - String::from(e.message()), - ) - .into())) + return Some(Err(e.deref().clone().into())) } _ => panic!("checked state to be foreign error above"), } @@ -542,9 +535,11 @@ impl RecordListener { fn failure_cb(&mut self, me: Weak>, error: ServerError) -> Result<()> { if let Some(error_propagator) = &self.error_propagator { - error_propagator - .borrow_mut() - .propagate_error(Some(me), &error); + error_propagator.borrow_mut().propagate_error( + Some(me), + &error, + "failure in a query of this transaction caused transaction to be closed", + ); } self.state = RecordListenerState::Error(error.into()); self.summary = None; @@ -629,14 +624,9 @@ impl ErrorPropagator { &mut self, source: Option>>, error: &ServerError, + reason: &str, ) { - let error = Arc::new(ServerError::new( - String::from(error.code()), - format!( - "failure in a query of this transaction caused transaction to be closed: {}", - error.message() - ), - )); + let error = Arc::new(error.clone_with_reason(reason)); for listener in self.listeners.iter() { if let Some(source) = source.as_ref() { if source.ptr_eq(listener) { @@ -658,7 +648,8 @@ impl ErrorPropagator { this: SharedErrorPropagator, ) -> impl FnMut(ServerError) -> Result<()> + Send + Sync + 'static { move |err| { - this.borrow_mut().propagate_error(None, &err); + this.borrow_mut() + .propagate_error(None, &err, "the transaction could not be started"); Ok(()) } } diff --git a/neo4j/src/driver/transaction.rs b/neo4j/src/driver/transaction.rs index b6b81a4..3234042 100644 --- a/neo4j/src/driver/transaction.rs +++ b/neo4j/src/driver/transaction.rs @@ -30,7 +30,7 @@ use super::io::bolt::ResponseCallbacks; use super::io::PooledBolt; use super::record_stream::{GetSingleRecordError, RecordStream, SharedErrorPropagator}; use super::Record; -use crate::error_::{Neo4jError, Result, ServerError}; +use crate::error_::{Neo4jError, Result}; use crate::summary::Summary; use crate::value::{ValueReceive, ValueSend}; @@ -256,9 +256,7 @@ impl<'driver> InnerTransaction<'driver> { fn check_error(&self) -> Result<()> { match self.error_propagator.deref().borrow().error() { None => Ok(()), - Some(err) => { - Err(ServerError::new(String::from(err.code()), String::from(err.message())).into()) - } + Some(err) => Err(err.deref().clone().into()), } } } diff --git a/neo4j/src/error_.rs b/neo4j/src/error_.rs index 7796f66..383dcda 100644 --- a/neo4j/src/error_.rs +++ b/neo4j/src/error_.rs @@ -350,6 +350,22 @@ impl ServerError { pub(crate) fn overwrite_retryable(&mut self) { self.retryable_overwrite = true; } + + pub(crate) fn clone(&self) -> Self { + Self { + code: self.code.clone(), + message: self.message.clone(), + retryable_overwrite: self.retryable_overwrite, + } + } + + pub(crate) fn clone_with_reason(&self, reason: &str) -> Self { + Self { + code: self.code.clone(), + message: format!("{}: {}", reason, self.message), + retryable_overwrite: self.retryable_overwrite, + } + } } impl Display for ServerError { From 2ff7d38068b8c4e303ff182660a0e94b4ee344a6 Mon Sep 17 00:00:00 2001 From: Robsdedude Date: Sat, 13 Apr 2024 14:33:54 +0200 Subject: [PATCH 4/7] Tidy up the changelog --- CHANGELOG.md | 40 ++++++++++++++++++++++++++-------------- 1 file changed, 26 insertions(+), 14 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 89e168e..5d3a3a7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,37 +1,49 @@ # Changelog -## NEXT +⚠️ marks breaking changes or pending breaking changes (deprecations). -- Removed useless lifetime parameter from `SessionConfig::with_database()`. -- Changed return type of `ConnectionConfig::with_encryption_trust_any_certificate() ` from `Result` - to `Self`. +## NEXT +*** +**⭐ New Features** - Add support for Bolt 5.2, which adds notification filtering. +- Add support for Bolt 5.3 (bolt agent). +- Add support for Bolt 5.4 (telemetry). - Add `Driver::is_encrypted()`. -- Reduce the number of lifetime generic parameters in `TransactionQueryBuilder` and `TransactionRecordStream`. -- Fix `Transaction::rolblack()` failing if a result stream failed before. - Introduce `neo4j::driver::Conifg::with_keep_alive()` and `without_keep_alive()`. + +**🔧 Fixes** +- Fix `Transaction::rolblack()` failing if a result stream failed before. - Fix errors during transaction `BEGIN` not being properly propagated. - Fix propagation of `is_retryable()` of errors within transactions. -- Add support for Bolt 5.3 (bolt agent). -- Add support for Bolt 5.4 (telemetry). + +**🧹Clean-up** +- ⚠️ Removed useless lifetime parameter from `SessionConfig::with_database()`. +- ⚠️ Changed return type of `ConnectionConfig::with_encryption_trust_any_certificate() ` from `Result` to `Self`. +- ⚠️ Reduce the number of lifetime generic parameters in `TransactionQueryBuilder` and `TransactionRecordStream`. + ## 0.0.2 +*** +**👏 Improvements** +- Impl `FromStr` for `neo4j::driver::ConnectionConfig` (besides `TryFrom<&str>`). -- Update dependencies. +- **🧹Clean-up** +- ⚠️ Update dependencies. Among others `rustls`. To accommodate this change, the `rustls_dangerous_configuration` feature was removed. This update also affects `ConnectionConfig::with_encryption_custom_tls_config()`, which accepts a custom `rustls::ClientConfig`. -- Make `Record{entries}` private and offer many helper methods instead. +- ⚠️ Make `Record{entries}` private and offer many helper methods instead. - Add `EagerResult::into_scalar()`. -- Renamed `RetryableError` to `RetryError` +- ⚠️ Renamed `RetryableError` to `RetryError` - Fix `Driver::execute_query()::run()` not committing the transaction. -- Removed `AutoCommitBuilder::without_transaction_timeout` and `AutoCommitBuilder::with_default_transaction_timeout` +- ⚠️ Removed `AutoCommitBuilder::without_transaction_timeout` and `AutoCommitBuilder::with_default_transaction_timeout` in favor of `AutoCommitBuilder::with_transaction_timeout` in combination with `TransactionTimeout::none`, `TransactionTimeout::from_millis` and `TransactionTimeout::default`. Same for `TransactionBuilder`. -- Move `neo4j::Address` to `neo4j::address::Address` -- Impl `FromStr` for `neo4j::driver::ConnectionConfig` (besides `TryFrom<&str>`). +- ⚠️ Move `neo4j::Address` to `neo4j::address::Address` + +**📚 Docs** - Much more documentation. ## 0.0.1 From dbc0c59c859700e400f1da4748dabcaac1ccab8f Mon Sep 17 00:00:00 2001 From: Robsdedude Date: Sat, 13 Apr 2024 19:26:57 +0200 Subject: [PATCH 5/7] TestKit: improve backend logging * Add time stamp * Remove module of log messages --- testkit_backend/src/logging.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/testkit_backend/src/logging.rs b/testkit_backend/src/logging.rs index e1f18cb..8cf0e6d 100644 --- a/testkit_backend/src/logging.rs +++ b/testkit_backend/src/logging.rs @@ -15,7 +15,9 @@ use std::io::{self, Cursor, Result as IoResult, Write}; use std::mem::swap; use std::sync::{Arc, Mutex, OnceLock}; +use std::time::SystemTime; +use chrono::{DateTime, SecondsFormat, Utc}; use log::LevelFilter; #[derive(Debug, Default, Clone)] @@ -56,8 +58,9 @@ pub(super) fn init() { fern::Dispatch::new() .format(move |out, message, record| { out.finish(format_args!( - "[{}][{:<7}] {}", - record.target(), + "{} [{:<7}] {}", + DateTime::::from(SystemTime::now()) + .to_rfc3339_opts(SecondsFormat::Nanos, true), record.level(), message )) From e3061f93074c0f5adae02777ce2688802a951379 Mon Sep 17 00:00:00 2001 From: Robsdedude Date: Sat, 13 Apr 2024 19:28:06 +0200 Subject: [PATCH 6/7] TestKit: add executeQuery sessionAuth config --- testkit_backend/src/testkit_backend/driver_holder.rs | 5 +++++ testkit_backend/src/testkit_backend/requests.rs | 5 +++++ 2 files changed, 10 insertions(+) diff --git a/testkit_backend/src/testkit_backend/driver_holder.rs b/testkit_backend/src/testkit_backend/driver_holder.rs index 3026346..f840cfd 100644 --- a/testkit_backend/src/testkit_backend/driver_holder.rs +++ b/testkit_backend/src/testkit_backend/driver_holder.rs @@ -655,6 +655,7 @@ impl DriverHolderRunner { bookmark_manager, tx_meta, timeout, + auth, }) => { let mut builder = self.driver.execute_query(query); if let Some(params) = params { @@ -684,6 +685,9 @@ impl DriverHolderRunner { if let Some(timeout) = timeout { builder = builder.with_transaction_timeout(timeout); } + if let Some(auth) = auth { + builder = builder.with_session_auth(auth); + } let result = builder .run_with_retry(ExponentialBackoff::default()) .map_err(Into::into); @@ -1051,6 +1055,7 @@ pub(super) struct ExecuteQuery { pub(crate) bookmark_manager: ExecuteQueryBookmarkManager, pub(super) tx_meta: Option>, pub(super) timeout: Option, + pub(super) auth: Option>, } impl From for Command { diff --git a/testkit_backend/src/testkit_backend/requests.rs b/testkit_backend/src/testkit_backend/requests.rs index 6d1c2e3..e147f79 100644 --- a/testkit_backend/src/testkit_backend/requests.rs +++ b/testkit_backend/src/testkit_backend/requests.rs @@ -599,6 +599,8 @@ pub(super) struct ExecuteQueryConfig { bookmark_manager_id: Option, tx_meta: Option>, timeout: Option, + #[serde(rename = "authorizationToken")] + auth: MaybeTestKitAuth, } #[derive(Deserialize, Debug)] @@ -1466,6 +1468,7 @@ impl Request { bookmark_manager_id, tx_meta, timeout, + auth, } = config; let data = backend.data.borrow_mut(); let driver_holder = get_driver(&data, &driver_id)?; @@ -1483,6 +1486,7 @@ impl Request { .map(cypher_value_map_to_value_send_map) .transpose()?; let timeout = read_transaction_timeout(timeout)?; + let auth = auth.0.map(|auth| Arc::new(auth.into())); let result = driver_holder .execute_query(ExecuteQuery { query, @@ -1493,6 +1497,7 @@ impl Request { bookmark_manager, tx_meta, timeout, + auth, }) .result?; let response: Response = result.try_into()?; From 35d539166d6c30ac18fc07c0935afba845aa9eb9 Mon Sep 17 00:00:00 2001 From: Robsdedude Date: Sat, 13 Apr 2024 19:28:46 +0200 Subject: [PATCH 7/7] Fix socket deadline never overwriting connection hint timeout --- CHANGELOG.md | 1 + neo4j/src/driver/io/deadline.rs | 6 ++++++ 2 files changed, 7 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5d3a3a7..833a38b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ - Fix `Transaction::rolblack()` failing if a result stream failed before. - Fix errors during transaction `BEGIN` not being properly propagated. - Fix propagation of `is_retryable()` of errors within transactions. +- Fix connection hint `connection.recv_timeout_seconds` not always being respected leading to connections timeing out too late. **🧹Clean-up** - ⚠️ Removed useless lifetime parameter from `SessionConfig::with_database()`. diff --git a/neo4j/src/driver/io/deadline.rs b/neo4j/src/driver/io/deadline.rs index a95292c..89cc03b 100644 --- a/neo4j/src/driver/io/deadline.rs +++ b/neo4j/src/driver/io/deadline.rs @@ -121,6 +121,12 @@ impl<'tcp, S: Read + Write> DeadlineIO<'tcp, S> { // deadline in the future Some(timeout) => timeout, }; + if let Some(old_timeout) = old_timeout { + if timeout >= old_timeout { + let res = work(self); + return self.wrap_io_error(res, ReaderErrorDuring::IO); + } + } self.wrap_io_error( set_socket_timeout(socket, Some(timeout)), ReaderErrorDuring::SetTimeout,