Skip to content

Commit

Permalink
Update ndarray to 0.16.0, MSRV is now 1.66.0, bump version to 0.11.0
Browse files Browse the repository at this point in the history
  • Loading branch information
minshao committed Aug 10, 2024
1 parent 4384c20 commit b9a424f
Show file tree
Hide file tree
Showing 10 changed files with 28 additions and 31 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jobs:
strategy:
matrix:
os: [ubuntu-latest, macOS-latest]
rust: [stable, 1.60.0]
rust: [stable, 1.66.0]
steps:
- name: Install Rust
uses: hecrj/setup-rust-action@v1
Expand Down
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@ file is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and
this project adheres to [Semantic
Versioning](https://semver.org/spec/v2.0.0.html).

## [0.11.0] - 2024-08-09

### Changed

- Update ndarray to 0.16.
- The minimum version of Rust required is now 1.66.

## [0.10.1] - 2023-05-23

### Fixed
Expand Down Expand Up @@ -132,6 +139,7 @@ Versioning](https://semver.org/spec/v2.0.0.html).

- Kafka input/output and an example of their usage.

[0.11.0]: https://github.com/petabi/eventio/compare/0.10.1...0.11.0
[0.10.1]: https://github.com/petabi/eventio/compare/0.10.0...0.10.1
[0.10.0]: https://github.com/petabi/eventio/compare/0.9.0...0.10.0
[0.9.0]: https://github.com/petabi/eventio/compare/0.8.0...0.9.0
Expand Down
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
[package]
name = "eventio"
version = "0.10.1"
version = "0.11.0"
authors = ["Min Kim <[email protected]>", "Min Shao <[email protected]>"]
edition = "2021"
rust-version = "1.60"
rust-version = "1.66"
description = "A collection of event I/O processors for event-processing applications."
documentation = "https://docs.rs/eventio"
readme = "README.md"
Expand All @@ -24,7 +24,7 @@ pcap = ["pcap-parser"]
[dependencies]
crossbeam-channel = "0.5"
kafka = { version = "0.9", default-features = false, optional = true }
ndarray = { version = "0.15", optional = true }
ndarray = { version = "0.16", optional = true }
nom = "7"
pcap-parser = { version = "0.14", features = [
"data",
Expand Down
2 changes: 1 addition & 1 deletion examples/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ fn consume(hosts: Vec<String>) {
"eventio".into(),
"eventio-examples".into(),
TOPIC.into(),
usize::max_value(),
usize::MAX,
)
.unwrap();
let in_thread = thread::spawn(move || input.run().unwrap());
Expand Down
14 changes: 5 additions & 9 deletions src/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,7 @@ impl super::Input for Input {
/// Returns an error if it cannot fetch messages from Kafka, receives an
/// invalid message, or receives an invalid ACK from `ack_channel`.
fn run(mut self) -> Result<(), Error> {
let data_channel = if let Some(channel) = &self.data_channel {
channel
} else {
let Some(data_channel) = &self.data_channel else {

Check warning on line 101 in src/kafka.rs

View check run for this annotation

Codecov / codecov/patch

src/kafka.rs#L101

Added line #L101 was not covered by tests
return Err(Error::ChannelClosed);
};

Expand All @@ -121,7 +119,7 @@ impl super::Input for Input {
for msg in msgset.messages() {
let fwd_msg: ForwardMode = rmp_serde::from_slice(msg.value)
.map_err(|e| Error::InvalidMessage(Box::new(e)))?;
if fwd_msg.entries.len() > u32::max_value() as usize {
if fwd_msg.entries.len() > u32::MAX as usize {

Check warning on line 122 in src/kafka.rs

View check run for this annotation

Codecov / codecov/patch

src/kafka.rs#L122

Added line #L122 was not covered by tests
return Err(Error::TooManyEvents(fwd_msg.entries.len()));
}
let (remaining, overflow) =
Expand All @@ -142,7 +140,7 @@ impl super::Input for Input {
loc: EntryLocation {
remainder: remainder
.try_into()
.expect("remainder <= u32::max_values()"),
.expect("remainder <= u32::MAX"),
partition,
offset,
},
Expand All @@ -155,9 +153,7 @@ impl super::Input for Input {
break;
}
i if i == recv_ack => {
let ack = if let Ok(ack) = oper.recv(&self.ack_channel) {
ack
} else {
let Ok(ack) = oper.recv(&self.ack_channel) else {

Check warning on line 156 in src/kafka.rs

View check run for this annotation

Codecov / codecov/patch

src/kafka.rs#L156

Added line #L156 was not covered by tests
// ack_channel was disconnected. Exit the
// loop and commit consumed.
break 'poll;
Expand Down Expand Up @@ -243,7 +239,7 @@ where
/// Returns an error if message serialization or transmission fails.
pub fn run(&mut self) -> Result<(), kafka::Error> {
let mut buf = Vec::new();
for msg in self.data_channel.iter() {
for msg in &self.data_channel {

Check warning on line 242 in src/kafka.rs

View check run for this annotation

Codecov / codecov/patch

src/kafka.rs#L242

Added line #L242 was not covered by tests
msg.serialize(&mut Serializer::new(&mut buf))
.map_err(|e| kafka::Error::Io(io::Error::new(io::ErrorKind::InvalidData, e)))?;
self.producer
Expand Down
8 changes: 3 additions & 5 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,9 @@ impl fmt::Display for Error {
Self::CannotFetch(e) => write!(f, "cannot fetch message from Kafka: {e}"),
Self::InvalidMessage(e) => write!(f, "invalid MessagePack format: {e}"),
Self::Fatal(s) => write!(f, "fatal error: {s}"),
Self::TooManyEvents(n) => write!(
f,
"cannot handle {n} events (expected < {})",
u32::max_value()
),
Self::TooManyEvents(n) => {
write!(f, "cannot handle {n} events (expected < {})", u32::MAX)

Check warning on line 112 in src/lib.rs

View check run for this annotation

Codecov / codecov/patch

src/lib.rs#L111-L112

Added lines #L111 - L112 were not covered by tests
}
}
}
}
4 changes: 1 addition & 3 deletions src/mbox.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,7 @@ impl<T: Read> super::Input for Input<T> {
///
/// Returns an error if reading an email from mbox fails.
fn run(mut self) -> Result<(), Error> {
let data_channel = if let Some(channel) = &self.data_channel {
channel
} else {
let Some(data_channel) = &self.data_channel else {
return Err(Error::ChannelClosed);
};

Expand Down
4 changes: 1 addition & 3 deletions src/ndarray.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,7 @@ impl super::Input for Input {
type Ack = super::SeqNo;

fn run(mut self) -> Result<(), Error> {
let data_channel = if let Some(channel) = &self.data_channel {
channel
} else {
let Some(data_channel) = &self.data_channel else {

Check warning on line 35 in src/ndarray.rs

View check run for this annotation

Codecov / codecov/patch

src/ndarray.rs#L35

Added line #L35 was not covered by tests
return Err(Error::ChannelClosed);
};
let mut sel = crossbeam_channel::Select::new();
Expand Down
7 changes: 4 additions & 3 deletions src/pcap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ pub struct Input {
unsafe impl Send for Input {}

impl Input {
/// # Panics
///
/// Will panic if fail to create reader
pub fn with_read<R: Read + 'static>(
data_channel: crossbeam_channel::Sender<Event>,
ack_channel: crossbeam_channel::Receiver<super::SeqNo>,
Expand All @@ -40,9 +43,7 @@ impl super::Input for Input {
type Ack = u64;

fn run(mut self) -> Result<(), Error> {
let data_channel = if let Some(channel) = &self.data_channel {
channel
} else {
let Some(data_channel) = &self.data_channel else {

Check warning on line 46 in src/pcap.rs

View check run for this annotation

Codecov / codecov/patch

src/pcap.rs#L46

Added line #L46 was not covered by tests
return Err(Error::ChannelClosed);
};
let mut sel = crossbeam_channel::Select::new();
Expand Down
4 changes: 1 addition & 3 deletions src/text.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,7 @@ impl<T: Read> super::Input for Input<T> {
type Ack = super::SeqNo;

fn run(mut self) -> Result<(), Error> {
let data_channel = if let Some(channel) = &self.data_channel {
channel
} else {
let Some(data_channel) = &self.data_channel else {
return Err(Error::ChannelClosed);
};

Expand Down

0 comments on commit b9a424f

Please sign in to comment.