Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rust: enable write conformance tests #970

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions rust/examples/conformance_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ fn write_file(spec: &conformance_writer_spec::WriterSpec) {
.expect("Couldn't create writer");

let mut channels = HashMap::<u16, mcap::Channel>::new();
let mut schemas = HashMap::<u64, mcap::Schema>::new();
let mut schemas = HashMap::<u16, mcap::Schema>::new();

for record in &spec.records {
match record.record_type.as_str() {
Expand All @@ -35,7 +35,7 @@ fn write_file(spec: &conformance_writer_spec::WriterSpec) {
}
"Channel" => {
let id = record.get_field_u16("id");
let schema_id = record.get_field_u64("schema_id");
let schema_id = record.get_field_u16("schema_id");
let topic = record.get_field_str("topic");
let message_encoding = record.get_field_str("message_encoding");
let schema = schemas.get(&schema_id).expect("Missing schema");
Expand All @@ -46,7 +46,7 @@ fn write_file(spec: &conformance_writer_spec::WriterSpec) {
metadata: std::collections::BTreeMap::new(),
};
writer
.add_channel(&channel)
.add_channel_with_id(&channel, id)
.expect("Couldn't write channel");
channels.insert(id, channel);
}
Expand Down Expand Up @@ -104,13 +104,16 @@ fn write_file(spec: &conformance_writer_spec::WriterSpec) {
"Schema" => {
let name = record.get_field_str("name");
let encoding = record.get_field_str("encoding");
let id = record.get_field_u64("id");
let id = record.get_field_u16("id");
let data: Vec<u8> = record.get_field_data(&"data");
let schema = mcap::Schema {
name: name.to_owned(),
encoding: encoding.to_owned(),
data: Cow::from(data),
};
writer
.add_schema_with_id(&schema, id)
.expect("Couldn't write schema");
schemas.insert(id, schema);
}
"Statistics" => {
Expand Down
6 changes: 6 additions & 0 deletions rust/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,12 @@ pub enum McapError {
ConflictingChannels(String),
#[error("Schema `{0}` has mulitple records that don't match.")]
ConflictingSchemas(String),
#[error("Cannot add multiple schemas with id `{0}`")]
ConflictingSchemaIds(u16),
#[error("Cannot add multiple channels with id `{0}`")]
ConflictingChannelIds(u16),
#[error("Schema cannot have ID 0")]
UnexpectedSchemaIdZero,
#[error("Record parse failed")]
Parse(#[from] binrw::Error),
#[error("I/O error from writing, or reading a compression stream")]
Expand Down
65 changes: 61 additions & 4 deletions rust/src/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,8 @@ pub struct Writer<'a, W: Write + Seek> {
/// Message start and end time, or None if there are no messages yet.
message_bounds: Option<(u64, u64)>,
channel_message_counts: BTreeMap<u16, u64>,
highest_channel_id: u16,
highest_schema_id: u16,
}

impl<'a, W: Write + Seek> Writer<'a, W> {
Expand Down Expand Up @@ -206,6 +208,10 @@ impl<'a, W: Write + Seek> Writer<'a, W> {
metadata_indexes: Vec::new(),
message_bounds: None,
channel_message_counts: BTreeMap::new(),
highest_channel_id: 0,
// Schema IDs cannot be zero, that's the sentinel value in a channel
// for "no schema"
highest_schema_id: 1,
})
}

Expand All @@ -222,7 +228,8 @@ impl<'a, W: Write + Seek> Writer<'a, W> {
return Ok(*id);
}

let next_channel_id = self.channels.len() as u16;
let next_channel_id = self.highest_channel_id + 1;
self.highest_channel_id = next_channel_id;
assert!(self
.channels
.insert(chan.clone(), next_channel_id)
Expand All @@ -232,14 +239,64 @@ impl<'a, W: Write + Seek> Writer<'a, W> {
Ok(next_channel_id)
}

/// Adds a channel (and its provided schema, if any), specifying its ID.
///
/// Useful to preserve channel IDs when copying from an existing MCAP.
pub fn add_channel_with_id(&mut self, chan: &Channel<'a>, id: u16) -> McapResult<()> {
let schema_id = match &chan.schema {
Some(s) => self.add_schema(s)?,
None => 0,
};
for (known_channel, known_id) in &(self.channels) {
if id == *known_id {
if chan == known_channel {
// tried adding the same channel twice with the same ID, not an error
return Ok(());
}
return Err(McapError::ConflictingChannelIds(id));
}
}
assert!(self
.channels
.insert(chan.clone(), id)
.is_none());
self.highest_channel_id = std::cmp::max(self.highest_channel_id, id);
self.chunkin_time()?
.write_channel(id, schema_id, chan)?;
Ok(())
}

/// Adds a schema separately from adding a channel.
///
/// Useful to preserve schema IDs when copying from an existing MCAP.
pub fn add_schema_with_id(&mut self, schema: &Schema<'a>, id: u16) -> McapResult<()> {
if id == 0 {
return Err(McapError::UnexpectedSchemaIdZero);
}
for (known_schema, known_id) in &(self.schemas) {
if id == *known_id {
if schema == known_schema {
return Ok(());
}
return Err(McapError::ConflictingSchemaIds(id));
}
}
assert!(self
.schemas
.insert(schema.clone(), id)
.is_none());
self.highest_schema_id = std::cmp::max(self.highest_schema_id, id);

self.chunkin_time()?.write_schema(id, schema)
}

fn add_schema(&mut self, schema: &Schema<'a>) -> McapResult<u16> {
if let Some(id) = self.schemas.get(schema) {
return Ok(*id);
}

// Schema IDs cannot be zero, that's the sentinel value in a channel
// for "no schema"
let next_schema_id = self.schemas.len() as u16 + 1;
let next_schema_id = self.highest_schema_id + 1;
self.highest_schema_id = next_schema_id;
assert!(self
.schemas
.insert(schema.clone(), next_schema_id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,6 @@ export default class RustWriterTestRunner extends WriteTestRunner {
}

supportsVariant(_variant: TestVariant): boolean {
return false;
return true;
}
}
Loading