From 1f6d7664b2c2fb9f04a61111570a78b2bfd7786e Mon Sep 17 00:00:00 2001 From: Serial <69764315+Serial-ATA@users.noreply.github.com> Date: Thu, 18 Apr 2024 13:00:31 -0400 Subject: [PATCH] ogg_pager: Fix writing of large packets --- Cargo.toml | 2 +- ogg_pager/CHANGELOG.md | 3 ++ ogg_pager/src/lib.rs | 2 - ogg_pager/src/paginate.rs | 105 +++++++++++++++++--------------------- src/ogg/read.rs | 6 +-- 5 files changed, 53 insertions(+), 65 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index dd4616b6b..11c96f0a7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,7 +22,7 @@ lofty_attr = { path = "lofty_attr" } # Debug logging log = "0.4.20" # OGG Vorbis/Opus -ogg_pager = "0.6.0" +ogg_pager = { path = "ogg_pager" } # Key maps paste = "1.0.14" diff --git a/ogg_pager/CHANGELOG.md b/ogg_pager/CHANGELOG.md index 501ca1f59..4fc1a56ef 100644 --- a/ogg_pager/CHANGELOG.md +++ b/ogg_pager/CHANGELOG.md @@ -6,6 +6,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Fixed +- When writing large packets, the size would slowly shift out of sync, causing the pages to be written incorrectly. ([issue](https://github.com/Serial-ATA/lofty-rs/issues/350)) ([PR](https://github.com/Serial-ATA/lofty-rs/pull/375)) + ## [0.6.0] - 2024-01-03 ### Added diff --git a/ogg_pager/src/lib.rs b/ogg_pager/src/lib.rs index ee2465d95..1782f95de 100644 --- a/ogg_pager/src/lib.rs +++ b/ogg_pager/src/lib.rs @@ -107,8 +107,6 @@ impl Page { /// Attempts to get a Page from a reader /// - /// Use `skip_content` to only read the header, and skip over the content. - /// /// # Errors /// /// * [`std::io::Error`] diff --git a/ogg_pager/src/paginate.rs b/ogg_pager/src/paginate.rs index 512ec8870..7471f73ed 100644 --- a/ogg_pager/src/paginate.rs +++ b/ogg_pager/src/paginate.rs @@ -1,8 +1,7 @@ use crate::error::Result; use crate::{ - segment_table, Page, PageHeader, CONTAINS_FIRST_PAGE_OF_BITSTREAM, - CONTAINS_LAST_PAGE_OF_BITSTREAM, CONTINUED_PACKET, MAX_WRITTEN_CONTENT_SIZE, - MAX_WRITTEN_SEGMENT_COUNT, + Page, PageHeader, CONTAINS_FIRST_PAGE_OF_BITSTREAM, CONTAINS_LAST_PAGE_OF_BITSTREAM, + CONTINUED_PACKET, MAX_WRITTEN_CONTENT_SIZE, MAX_WRITTEN_SEGMENT_COUNT, }; use std::io::Read; @@ -17,6 +16,7 @@ struct PaginateContext { idx: usize, remaining_page_size: usize, current_packet_len: usize, + last_segment_size: u8, } impl PaginateContext { @@ -36,10 +36,19 @@ impl PaginateContext { idx: 0, remaining_page_size: MAX_WRITTEN_CONTENT_SIZE, current_packet_len: 0, + last_segment_size: 0, } } - fn flush(&mut self, content: &mut Vec, segment_table: &mut Vec) { + fn fresh_packet(&mut self, packet: &[u8]) { + self.flags.fresh_packet = true; + self.pos = 0; + + self.current_packet_len = packet.len(); + self.last_segment_size = (packet.len() % 255) as u8; + } + + fn flush_page(&mut self, content: &mut Vec) { let mut header = PageHeader { start: self.pos, header_type_flag: { @@ -62,7 +71,7 @@ impl PaginateContext { stream_serial: self.stream_serial, sequence_number: self.idx as u32, segments: Vec::new(), - // No need to calculate this yet + // Calculated later checksum: 0, }; @@ -71,7 +80,8 @@ impl PaginateContext { self.pos += content_len as u64; // Moving on to a new packet - if self.pos > self.current_packet_len as u64 { + debug_assert!(self.pos <= self.current_packet_len as u64); + if self.pos == self.current_packet_len as u64 { self.flags.packet_spans_multiple_pages = false; } @@ -79,15 +89,17 @@ impl PaginateContext { // If it takes up the remainder of the segment table for the entire packet, // we'll just consume it as is. let segments_occupied = if content_len >= 255 { - content_len / 255 + content_len.div_ceil(255) } else { 1 }; + debug_assert!(segments_occupied <= MAX_WRITTEN_SEGMENT_COUNT); if self.flags.packet_spans_multiple_pages { - header.segments = segment_table.drain(..segments_occupied).collect(); + header.segments = vec![255; segments_occupied]; } else { - header.segments = core::mem::take(segment_table); + header.segments = vec![255; segments_occupied - 1]; + header.segments.push(self.last_segment_size); } self.pages.push(Page { @@ -137,62 +149,43 @@ where { let mut ctx = PaginateContext::new(abgp, stream_serial, flags); - let mut packets_iter = packets.into_iter(); - let mut packet = match packets_iter.next() { - Some(packet) => packet, - // We weren't given any content to paginate - None => return Ok(ctx.pages), - }; - ctx.current_packet_len = packet.len(); + for packet in packets { + ctx.fresh_packet(packet); + paginate_packet(&mut ctx, packet)?; + } + + if flags & CONTAINS_LAST_PAGE_OF_BITSTREAM == 0x04 { + if let Some(last) = ctx.pages.last_mut() { + last.header.header_type_flag |= CONTAINS_LAST_PAGE_OF_BITSTREAM; + } + } - let mut segments = segment_table(packet.len()); - let mut page_content = Vec::new(); + Ok(ctx.pages) +} +fn paginate_packet(ctx: &mut PaginateContext, packet: &[u8]) -> Result<()> { + let mut page_content = Vec::with_capacity(MAX_WRITTEN_CONTENT_SIZE); + let mut packet = packet; loop { - if !ctx.flags.packet_spans_multiple_pages && !ctx.flags.first_page { - match packets_iter.next() { - Some(packet_) => { - packet = packet_; - segments.append(&mut segment_table(packet.len())); - ctx.current_packet_len = packet.len(); - ctx.flags.fresh_packet = true; - }, - None => break, - }; + if packet.is_empty() { + break; } - // We read as much of the packet as we can, given the amount of space left in the page. - // The packet may need to span multiple pages. let bytes_read = packet .take(ctx.remaining_page_size as u64) .read_to_end(&mut page_content)?; ctx.remaining_page_size -= bytes_read; packet = &packet[bytes_read..]; - // We need to indicate whether or not any packet was finished on this page. - // This is used for the absolute granule position. - if packet.is_empty() { + + if bytes_read <= MAX_WRITTEN_CONTENT_SIZE && packet.is_empty() { ctx.flags.packet_finished_on_page = true; + } else { + ctx.flags.packet_spans_multiple_pages = true; } - // The first packet of the bitstream must have its own page, unlike any other packet. - let first_page_of_bitstream = ctx.header_flags & CONTAINS_FIRST_PAGE_OF_BITSTREAM != 0; - let first_packet_finished_on_page = - ctx.flags.first_page && first_page_of_bitstream && ctx.flags.packet_finished_on_page; - - // We have a maximum of `MAX_WRITTEN_SEGMENT_COUNT` segments available per page, if we require more than - // is left in the segment table, we'll have to split the packet into multiple pages. - let segments_required = (packet.len() / MAX_WRITTEN_SEGMENT_COUNT) + 1; - let remaining_segments = MAX_WRITTEN_SEGMENT_COUNT.saturating_sub(segments.len()); - ctx.flags.packet_spans_multiple_pages = segments_required > remaining_segments; - - if first_packet_finished_on_page - // We've completely filled this page, we need to flush before moving on - || (ctx.remaining_page_size == 0 || remaining_segments == 0) - // We've read all this packet has to offer - || packet.is_empty() - { - ctx.flush(&mut page_content, &mut segments); + if ctx.remaining_page_size == 0 || packet.is_empty() { + ctx.flush_page(&mut page_content); } ctx.flags.first_page = false; @@ -201,14 +194,8 @@ where // Flush any content leftover if !page_content.is_empty() { - ctx.flush(&mut page_content, &mut segments); + ctx.flush_page(&mut page_content); } - if flags & CONTAINS_LAST_PAGE_OF_BITSTREAM == 0x04 { - if let Some(last) = ctx.pages.last_mut() { - last.header.header_type_flag |= CONTAINS_LAST_PAGE_OF_BITSTREAM; - } - } - - Ok(ctx.pages) + Ok(()) } diff --git a/src/ogg/read.rs b/src/ogg/read.rs index 17c26364b..a7709c85a 100644 --- a/src/ogg/read.rs +++ b/src/ogg/read.rs @@ -72,15 +72,15 @@ where }, }; - let comments_total_len = data.read_u32::()?; + let number_of_items = data.read_u32::()?; let mut tag = VorbisComments { vendor, - items: Vec::with_capacity(comments_total_len as usize), + items: Vec::with_capacity(number_of_items as usize), pictures: Vec::new(), }; - for _ in 0..comments_total_len { + for _ in 0..number_of_items { let comment_len = data.read_u32::()?; if u64::from(comment_len) > len { err!(SizeMismatch);