From fa6f5a4d7e37990dbb8dc4b598be633dbb2b2462 Mon Sep 17 00:00:00 2001 From: Al Liu Date: Mon, 15 Apr 2024 01:31:30 +0800 Subject: [PATCH] Fix warnings --- core/src/base.rs | 6 ++-- transports/net/src/packet_processor.rs | 22 ++++-------- transports/quic/src/tests.rs | 50 -------------------------- 3 files changed, 9 insertions(+), 69 deletions(-) diff --git a/core/src/base.rs b/core/src/base.rs index 499a5dd..1f4688e 100644 --- a/core/src/base.rs +++ b/core/src/base.rs @@ -11,7 +11,7 @@ use async_channel::{Receiver, Sender}; use async_lock::{Mutex, RwLock}; use atomic_refcell::AtomicRefCell; -use futures::{stream::FuturesUnordered, StreamExt}; +use futures::stream::FuturesUnordered; use nodecraft::{resolver::AddressResolver, CheapClone, Node}; use super::{ @@ -316,7 +316,7 @@ where if let Err(e) = self.transport.shutdown().await { tracing::error!(err=%e, "memberlist: failed to shutdown transport"); return Err(e); - } + } Ok(()) } @@ -472,7 +472,7 @@ where &self, shutdown_rx: Receiver<()>, ) -> <::Spawner as AsyncSpawner>::JoinHandle<()> { - use futures::FutureExt; + use futures::{FutureExt, StreamExt}; let queue_check_interval = self.inner.opts.queue_check_interval; let this = self.clone(); diff --git a/transports/net/src/packet_processor.rs b/transports/net/src/packet_processor.rs index ebfb49f..4ae7d68 100644 --- a/transports/net/src/packet_processor.rs +++ b/transports/net/src/packet_processor.rs @@ -356,13 +356,9 @@ where }; let keys = encryptor.keys().await; if encrypted_message_size <= offload_size { - return Self::decrypt( - algo, - keys, - packet_label.as_bytes(), - &mut encrypted_message, - ) - .and_then(|_| Self::read_from_packet_without_compression_and_encryption(encrypted_message)); + return Self::decrypt(algo, keys, packet_label.as_bytes(), &mut encrypted_message).and_then( + |_| Self::read_from_packet_without_compression_and_encryption(encrypted_message), + ); } let (tx, rx) = futures::channel::oneshot::channel(); @@ -370,15 +366,9 @@ where rayon::spawn(move || { if tx .send( - Self::decrypt( - algo, - keys, - packet_label.as_bytes(), - &mut encrypted_message, - ) - .and_then(|_| { - Self::read_from_packet_without_compression_and_encryption(encrypted_message) - }), + Self::decrypt(algo, keys, packet_label.as_bytes(), &mut encrypted_message).and_then( + |_| Self::read_from_packet_without_compression_and_encryption(encrypted_message), + ), ) .is_err() { diff --git a/transports/quic/src/tests.rs b/transports/quic/src/tests.rs index 5b0a732..6b8cdf0 100644 --- a/transports/quic/src/tests.rs +++ b/transports/quic/src/tests.rs @@ -601,53 +601,3 @@ mod s2n_stream_layer { Options::new("localhost".into(), p.join("cert.pem"), p.join("key.pem")) } } - -#[cfg(test)] -mod test { - use agnostic::tokio::TokioRuntime; - use memberlist_core::transport::Lpe; - use nodecraft::resolver::socket_addr::SocketAddrResolver; - - use crate::{quinn::Quinn, QuicTransport, QuicTransportOptions}; - - use super::*; - - #[test] - fn test_shutdown_cleanup() { - let mut rt = tokio::runtime::Runtime::new().unwrap(); - rt.block_on(async move { - let layer = quinn_stream_layer::().await; - let addr = "127.0.0.1:0".parse().unwrap(); - let mut opts = QuicTransportOptions::with_stream_layer_options("test".into(), layer); - opts.add_bind_address(addr); - let transport = QuicTransport::< - SmolStr, - SocketAddrResolver, - Quinn, - Lpe<_, _>, - TokioRuntime, - >::new(opts) - .await - .unwrap(); - let resolved_addr = *transport.advertise_address(); - - // drop the transport now - transport.shutdown().await; - drop(transport); - - // we should be able to bind to the same address - let layer = quinn_stream_layer::().await; - let mut opts = QuicTransportOptions::with_stream_layer_options("test".into(), layer); - opts.add_bind_address(resolved_addr); - let _ = QuicTransport::< - SmolStr, - SocketAddrResolver, - Quinn, - Lpe<_, _>, - TokioRuntime, - >::new(opts) - .await - .unwrap(); - }); - } -}