Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for tcp keepalive #100

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/leader_latch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ fn main() {
let zk_urls = zk_server_urls();
log::info!("connecting to {}", zk_urls);

let zk = ZooKeeper::connect(&*zk_urls, Duration::from_millis(2500), NoopWatcher).unwrap();
let zk = ZooKeeper::connect(&*zk_urls, Duration::from_millis(2500), NoopWatcher, None).unwrap();

let id = Uuid::new_v4().to_string();
log::info!("starting host with id: {:?}", id);
Expand Down
36 changes: 15 additions & 21 deletions examples/persistent_watches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ fn zk_example() {

let root = format!("/example-{}", uuid::Uuid::new_v4());
let modifying_zk =
ZooKeeper::connect(&*zk_urls, Duration::from_secs(15), LoggingWatcher).unwrap();
ZooKeeper::connect(&*zk_urls, Duration::from_secs(15), LoggingWatcher, None).unwrap();
let recursive_watch_zk =
ZooKeeper::connect(&*zk_urls, Duration::from_secs(15), LoggingWatcher).unwrap();
ZooKeeper::connect(&*zk_urls, Duration::from_secs(15), LoggingWatcher, None).unwrap();
let persistent_watch_zk =
ZooKeeper::connect(&*zk_urls, Duration::from_secs(15), LoggingWatcher).unwrap();
ZooKeeper::connect(&*zk_urls, Duration::from_secs(15), LoggingWatcher, None).unwrap();

// Creating separate clients to show the example where modifications to the nodes
// take place in a different session than our own.
Expand All @@ -47,8 +47,10 @@ fn zk_example() {
// Also separate clients per type of watch as there is a bug when creating multiple type watchers in the same
// path in the same session
// https://issues.apache.org/jira/browse/ZOOKEEPER-4466
recursive_watch_zk.add_listener(|zk_state| println!("New recursive watch ZkState is {:?}", zk_state));
persistent_watch_zk.add_listener(|zk_state| println!("New peristent watch ZkState is {:?}", zk_state));
recursive_watch_zk
.add_listener(|zk_state| println!("New recursive watch ZkState is {:?}", zk_state));
persistent_watch_zk
.add_listener(|zk_state| println!("New peristent watch ZkState is {:?}", zk_state));

modifying_zk.ensure_path(&root).unwrap();

Expand All @@ -64,7 +66,9 @@ fn zk_example() {
})
.unwrap();

println!("press c to add and modify child, e to edit the watched node, anything else to proceed");
println!(
"press c to add and modify child, e to edit the watched node, anything else to proceed"
);
let stdin = io::stdin();
let inputs = stdin.lock().lines();
let mut incr = 0;
Expand All @@ -82,28 +86,18 @@ fn zk_example() {
)
.unwrap();
modifying_zk
.set_data(
&child_path,
b"new-data".to_vec(),
None,
)
.unwrap();
modifying_zk
.delete(&child_path, None)
.set_data(&child_path, b"new-data".to_vec(), None)
.unwrap();
},
modifying_zk.delete(&child_path, None).unwrap();
}
"e" => {
modifying_zk
.set_data(
&root,
format!("new-data-{incr}").into_bytes(),
None,
)
.set_data(&root, format!("new-data-{incr}").into_bytes(), None)
.unwrap();
}
other => {
println!("received {other}");
break
break;
}
}
}
Expand Down
11 changes: 6 additions & 5 deletions examples/queue_consume.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,18 @@ fn main() {
let zk_urls = zk_server_urls();
log::info!("connecting to {}", zk_urls);

let zk = ZooKeeper::connect(&*zk_urls, Duration::from_millis(2500), NoopWatcher).unwrap();
let zk = ZooKeeper::connect(&*zk_urls, Duration::from_millis(2500), NoopWatcher, None).unwrap();

let queue = ZkQueue::new(Arc::new(zk), "/testing2".to_string()).unwrap();

println!("waiting for a message");
let msg = queue.take();
if msg.is_err() {
eprint!("unable to listen for message. error: {}", msg.err().unwrap().to_string())
eprint!(
"unable to listen for message. error: {}",
msg.err().unwrap().to_string()
)
} else {
println!("got {:?}", String::from_utf8(msg.unwrap()).unwrap());

}

}
}
6 changes: 2 additions & 4 deletions examples/queue_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,11 @@ fn main() {
let zk_urls = zk_server_urls();
log::info!("connecting to {}", zk_urls);

let zk = ZooKeeper::connect(&*zk_urls, Duration::from_millis(2500), NoopWatcher).unwrap();
let zk = ZooKeeper::connect(&*zk_urls, Duration::from_millis(2500), NoopWatcher, None).unwrap();

let queue = ZkQueue::new(Arc::new(zk), "/testing2".to_string()).unwrap();


let message = "Hello World";
let op = queue.offer(Vec::from(message.as_bytes()));
println!("{:?}", op);

}
}
21 changes: 11 additions & 10 deletions examples/zookeeper_example.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ extern crate zookeeper;
extern crate log;
extern crate env_logger;

use std::env;
use std::io;
use std::sync::mpsc;
use std::sync::Arc;
use std::time::Duration;
use std::thread;
use std::env;
use std::sync::mpsc;
use zookeeper::{Acl, CreateMode, Watcher, WatchedEvent, ZooKeeper};
use std::time::Duration;
use zookeeper::recipes::cache::PathChildrenCache;
use zookeeper::{Acl, CreateMode, WatchedEvent, Watcher, ZooKeeper};

struct LoggingWatcher;
impl Watcher for LoggingWatcher {
Expand All @@ -28,12 +28,11 @@ fn zk_server_urls() -> String {
}
}


fn zk_example() {
let zk_urls = zk_server_urls();
println!("connecting to {}", zk_urls);

let zk = ZooKeeper::connect(&*zk_urls, Duration::from_secs(15), LoggingWatcher).unwrap();
let zk = ZooKeeper::connect(&*zk_urls, Duration::from_secs(15), LoggingWatcher, None).unwrap();

zk.add_listener(|zk_state| println!("New ZkState is {:?}", zk_state));

Expand All @@ -43,10 +42,12 @@ fn zk_example() {

println!("authenticated -> {:?}", auth);

let path = zk.create("/test",
vec![1, 2],
Acl::open_unsafe().clone(),
CreateMode::Ephemeral);
let path = zk.create(
"/test",
vec![1, 2],
Acl::open_unsafe().clone(),
CreateMode::Ephemeral,
);

println!("created -> {:?}", path);

Expand Down
17 changes: 14 additions & 3 deletions src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ pub struct ZkIo {
timeout_ms: u64,
ping_timeout_duration: Duration,
conn_timeout_duration: Duration,
keepalive: Option<Duration>,
watch_sender: mpsc::Sender<WatchMessage>,
conn_resp: ConnectResponse,
zxid: i64,
Expand All @@ -100,14 +101,18 @@ impl ZkIo {
ping_timeout_duration: Duration,
watch_sender: mpsc::Sender<WatchMessage>,
state_listeners: ListenerSet<ZkState>,
keepalive: Option<Duration>,
) -> ZkIo {
trace!("ZkIo::new");
let timeout_ms = ping_timeout_duration.as_secs() * 1000
+ ping_timeout_duration.subsec_nanos() as u64 / 1000000;
let (tx, rx) = channel();

let sock = TcpStream::connect(&addrs[0]).unwrap(); // TODO I need a socket here, sorry.
sock.set_keepalive(keepalive).unwrap();

let mut zkio = ZkIo {
sock: TcpStream::connect(&addrs[0]).unwrap(), // TODO I need a socket here, sorry.
sock,
state: ZkState::Connecting,
hosts: Hosts::new(addrs),
buffer: VecDeque::new(),
Expand All @@ -119,6 +124,7 @@ impl ZkIo {
conn_timeout: None,
ping_timeout_duration: ping_timeout_duration,
conn_timeout_duration: Duration::from_secs(2),
keepalive,
timeout_ms: timeout_ms,
watch_sender: watch_sender,
conn_resp: ConnectResponse::initial(timeout_ms),
Expand Down Expand Up @@ -273,7 +279,7 @@ impl ZkIo {
.send(WatchMessage::RemoveWatch(w.path, w.watcher_type))
.unwrap(),
(_, Some(w)) => self.watch_sender.send(WatchMessage::Watch(w)).unwrap(),
(_, None) => {},
(_, None) => {}
}
}

Expand Down Expand Up @@ -335,7 +341,12 @@ impl ZkIo {
let host = self.hosts.get();
info!("Connecting to new server {:?}", host);
self.sock = match TcpStream::connect(host) {
Ok(sock) => sock,
Ok(sock) => {
if let Err(e) = sock.set_keepalive(self.keepalive) {
error!("Failed to set tcp-keepalive: {e}");
}
sock
}
Err(e) => {
error!("Failed to connect {:?}: {:?}", host, e);
continue;
Expand Down
Loading
Loading