Skip to content

Commit

Permalink
Fix creating consumer group against nonexisting stream.
Browse files Browse the repository at this point in the history
  • Loading branch information
brocaar committed Oct 5, 2023
1 parent b223ca2 commit ac55fad
Showing 1 changed file with 11 additions and 9 deletions.
20 changes: 11 additions & 9 deletions src/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ impl Integration {
.arg(&key)
.arg(&self.consumer_group)
.arg(0)
.arg("MKSTREAM")
.query_async(&mut redis_conn)
.await
{
Expand Down Expand Up @@ -445,6 +446,7 @@ mod test {
async fn test_integration() {
let redis_url = env::var("TEST_REDIS_URL").unwrap_or("redis://127.0.0.1/1".to_string());

setup_log(&Configuration::default()).unwrap();
register(Box::new(MockIntegration {})).await;

let conf = Configuration {
Expand All @@ -458,7 +460,7 @@ mod test {
};

tokio::spawn(start(conf));
sleep(Duration::from_millis(500)).await;
sleep(Duration::from_millis(100)).await;

let redis_client = redis::Client::open(redis_url).unwrap();
let mut redis_conn = redis_client.get_async_connection().await.unwrap();
Expand All @@ -478,7 +480,7 @@ mod test {
.await
.unwrap();

sleep(Duration::from_millis(500)).await;
sleep(Duration::from_millis(100)).await;

let pl_recv = UPLINK_EVENTS
.write()
Expand Down Expand Up @@ -506,7 +508,7 @@ mod test {
.await
.unwrap();

sleep(Duration::from_millis(500)).await;
sleep(Duration::from_millis(100)).await;

let pl_recv = JOIN_EVENTS
.write()
Expand Down Expand Up @@ -534,7 +536,7 @@ mod test {
.await
.unwrap();

sleep(Duration::from_millis(500)).await;
sleep(Duration::from_millis(100)).await;

let pl_recv = ACK_EVENTS
.write()
Expand Down Expand Up @@ -562,7 +564,7 @@ mod test {
.await
.unwrap();

sleep(Duration::from_millis(500)).await;
sleep(Duration::from_millis(100)).await;

let pl_recv = TXACK_EVENTS
.write()
Expand Down Expand Up @@ -590,7 +592,7 @@ mod test {
.await
.unwrap();

sleep(Duration::from_millis(500)).await;
sleep(Duration::from_millis(100)).await;

let pl_recv = LOG_EVENTS
.write()
Expand Down Expand Up @@ -618,7 +620,7 @@ mod test {
.await
.unwrap();

sleep(Duration::from_millis(500)).await;
sleep(Duration::from_millis(100)).await;

let pl_recv = STATUS_EVENTS
.write()
Expand Down Expand Up @@ -646,7 +648,7 @@ mod test {
.await
.unwrap();

sleep(Duration::from_millis(500)).await;
sleep(Duration::from_millis(100)).await;

let pl_recv = LOCATION_EVENTS
.write()
Expand Down Expand Up @@ -674,7 +676,7 @@ mod test {
.await
.unwrap();

sleep(Duration::from_millis(500)).await;
sleep(Duration::from_millis(100)).await;

let pl_recv = INTEGRATION_EVENTS
.write()
Expand Down

0 comments on commit ac55fad

Please sign in to comment.