Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async-runtime agnostic length-prefixed-stream #7

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions length_prefixed_stream/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ edition = "2018"
license = "BSD-3-Clause"
name = "length-prefixed-stream"
readme = "readme.md"
version = "1.0.0"
version = "1.1.0"

[dependencies]
async-std = "1.9.0"
desert = { path = "../desert" }
futures = "0.3.28"
futures-core = "0.3.28"
futures = { version = "0.3.29", default-features = false, features = ["std"] }
pin-project-lite = "0.2.10"

[dev-dependencies]
smol = "1.3.0"
22 changes: 15 additions & 7 deletions length_prefixed_stream/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,34 +2,42 @@

Decode a byte stream of varint length-encoded messages into a stream of chunks.

This crate is similar to and compatible with the
This crate is async-runtime agnostic and is similar to and compatible with the
[javascript length-prefixed-stream](https://www.npmjs.com/package/length-prefixed-stream) package.

# example
# Example

Note that we're using the [smol](https://crates.io/crates/smol) async runtime in this example.
One could just as easily use [tokio](https://crates.io/crates/tokio) or [async-std](https://crates.io/crates/async-std).

```rust
use async_std::{prelude::*, stream, task};
use futures::stream::TryStreamExt;
use futures::stream::{self, TryStreamExt, StreamExt};
use smol;

use length_prefixed_stream::decode;

type Error = Box<dyn std::error::Error + Send + Sync + 'static>;

// this program will print:
// This program will print:
//
// [97,98,99,100,101,102]
// [65,66,67,68]

fn main() -> Result<(), Error> {
task::block_on(async {
let input = stream::from_iter(vec![
smol::block_on(async {
let input = stream::iter(vec![
Ok(vec![6, 97, 98, 99]),
Ok(vec![100, 101]),
Ok(vec![102, 4, 65, 66]),
Ok(vec![67, 68]),
])
.into_async_read();

let mut decoder = decode(input);
while let Some(chunk) = decoder.next().await {
println!["{:?}", chunk?];
}

Ok(())
})
}
Expand Down
15 changes: 10 additions & 5 deletions length_prefixed_stream/examples/decode.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,30 @@
use async_std::{prelude::*, stream, task};
use futures::stream::TryStreamExt;
use futures::stream::{self, StreamExt, TryStreamExt};

use length_prefixed_stream::decode;

type Error = Box<dyn std::error::Error + Send + Sync + 'static>;

// this program will print:
// This program will print:
//
// [97,98,99,100,101,102]
// [65,66,67,68]

fn main() -> Result<(), Error> {
task::block_on(async {
let input = stream::from_iter(vec![
smol::block_on(async {
let input = stream::iter(vec![
Ok(vec![6, 97, 98, 99]),
Ok(vec![100, 101]),
Ok(vec![102, 4, 65, 66]),
Ok(vec![67, 68]),
])
.into_async_read();

let mut decoder = decode(input);

while let Some(chunk) = decoder.next().await {
println!["{:?}", chunk?];
}

Ok(())
})
}
3 changes: 1 addition & 2 deletions length_prefixed_stream/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@ mod unfold;

use std::{collections::VecDeque, marker::Unpin};

use async_std::{prelude::*, stream::Stream};
use desert::varint;
use futures::io::AsyncRead;
use futures::{io::AsyncRead, stream::Stream, AsyncReadExt};

pub use error::{DecodeError, DecodeErrorKind};
use unfold::unfold;
Expand Down
15 changes: 8 additions & 7 deletions length_prefixed_stream/src/unfold.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// vendored version of futures::stream::unfold
// Vendored version of futures::stream::unfold
// modified to use async_std

// The original source file from which this is derived is
Expand All @@ -9,12 +9,13 @@
// https://github.com/rust-lang/futures-rs/blob/master/LICENSE-MIT
// https://github.com/rust-lang/futures-rs/blob/master/LICENSE-APACHE

use async_std::future::Future;
use async_std::stream::Stream;
use async_std::task::{Context, Poll};
use core::fmt;
use core::pin::Pin;
use futures_core::ready;
use core::{fmt, pin::Pin};
use futures::{
future::Future,
ready,
stream::Stream,
task::{Context, Poll},
};

pub fn unfold<T, F, Fut, It>(init: T, f: F) -> Unfold<T, F, Fut>
where
Expand Down
26 changes: 18 additions & 8 deletions length_prefixed_stream/tests/decode.rs
Original file line number Diff line number Diff line change
@@ -1,46 +1,52 @@
use async_std::{prelude::*, stream, task};
use futures::stream::TryStreamExt;
use futures::{stream, StreamExt, TryStreamExt};

use length_prefixed_stream::decode;

type Error = Box<dyn std::error::Error + Send + Sync + 'static>;

#[test]
fn simple_0() -> Result<(), Error> {
task::block_on(async {
let input = stream::from_iter(vec![
smol::block_on(async {
let input = stream::iter(vec![
Ok(vec![6, 97, 98, 99]),
Ok(vec![100, 101]),
Ok(vec![102, 4, 65, 66]),
Ok(vec![67, 68]),
])
.into_async_read();

let mut decoder = decode(input);
let mut observed = vec![];
while let Some(chunk) = decoder.next().await {
observed.push(chunk?);
}

assert_eq![
observed,
vec![vec![97, 98, 99, 100, 101, 102], vec![65, 66, 67, 68],]
];

Ok(())
})
}

#[test]
fn simple_1() -> Result<(), Error> {
task::block_on(async {
let input = stream::from_iter(vec![
smol::block_on(async {
let input = stream::iter(vec![
Ok(vec![3, 10, 20, 30, 5]),
Ok(vec![11, 12, 13, 14, 15]),
Ok(vec![1, 6, 3, 103]),
Ok(vec![102, 101]),
])
.into_async_read();

let mut decoder = decode(input);
let mut observed = vec![];
while let Some(chunk) = decoder.next().await {
observed.push(chunk?);
}

assert_eq![
observed,
vec![
Expand All @@ -50,14 +56,15 @@ fn simple_1() -> Result<(), Error> {
vec![103, 102, 101],
]
];

Ok(())
})
}

#[test]
fn multibyte_msg_len() -> Result<(), Error> {
task::block_on(async {
let input = stream::from_iter(vec![
smol::block_on(async {
let input = stream::iter(vec![
Ok(vec![4, 200, 201, 202, 203, 144]), // encode(400) = [144,3]
Ok([vec![3], (0..200).collect()].concat()),
Ok((200..395).map(|c| (c % 256) as u8).collect()),
Expand All @@ -67,11 +74,13 @@ fn multibyte_msg_len() -> Result<(), Error> {
Ok(vec![55]),
])
.into_async_read();

let mut decoder = decode(input);
let mut observed = vec![];
while let Some(chunk) = decoder.next().await {
observed.push(chunk?);
}

assert_eq![
observed,
vec![
Expand All @@ -82,6 +91,7 @@ fn multibyte_msg_len() -> Result<(), Error> {
vec![55],
]
];

Ok(())
})
}
13 changes: 9 additions & 4 deletions length_prefixed_stream/tests/options.rs
Original file line number Diff line number Diff line change
@@ -1,29 +1,34 @@
use async_std::{prelude::*, stream, task};
use futures::stream::TryStreamExt;
use futures::{stream, StreamExt, TryStreamExt};

use length_prefixed_stream::{decode_with_options, DecodeOptions};

type Error = Box<dyn std::error::Error + Send + Sync + 'static>;

#[test]
fn options_include_len() -> Result<(), Error> {
task::block_on(async {
let input = stream::from_iter(vec![
smol::block_on(async {
let input = stream::iter(vec![
Ok(vec![6, 97, 98, 99]),
Ok(vec![100, 101]),
Ok(vec![102, 4, 65, 66]),
Ok(vec![67, 68]),
])
.into_async_read();

let mut options = DecodeOptions::default();
options.include_len = true;

let mut decoder = decode_with_options(input, options);
let mut observed = vec![];
while let Some(chunk) = decoder.next().await {
observed.push(chunk?);
}

assert_eq![
observed,
vec![vec![6, 97, 98, 99, 100, 101, 102], vec![4, 65, 66, 67, 68],]
];

Ok(())
})
}