Skip to content

Commit

Permalink
Move unextractable to table
Browse files Browse the repository at this point in the history
* Make row a struct so we can add attribute easier
* make function and table debug friendly
  • Loading branch information
saulshanabrook committed Nov 29, 2023
1 parent e7b430a commit 78bcdb5
Show file tree
Hide file tree
Showing 3 changed files with 143 additions and 54 deletions.
77 changes: 48 additions & 29 deletions src/function/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ pub struct Function {
index_updated_through: usize,
updates: usize,
scratch: IndexSet<usize>,
unextractable: HashSet<Vec<Value>>,
}

#[derive(Clone)]
Expand Down Expand Up @@ -62,6 +61,21 @@ impl ResolvedSchema {
}
}

impl Debug for Function {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Function")
.field("decl", &self.decl)
.field("schema", &self.schema)
.field("nodes", &self.nodes)
.field("indexes", &self.indexes)
.field("rebuild_indexes", &self.rebuild_indexes)
.field("index_updated_through", &self.index_updated_through)
.field("updates", &self.updates)
.field("scratch", &self.scratch)
.finish()
}
}

/// A non-Union merge discovered during rebuilding that has to be applied before
/// resuming execution.
pub(crate) type DeferredMerge = (ValueVec, Value, Value);
Expand Down Expand Up @@ -143,7 +157,6 @@ impl Function {
on_merge,
merge_vals,
},
unextractable: Default::default(),
// TODO figure out merge and default here
})
}
Expand Down Expand Up @@ -197,12 +210,12 @@ impl Function {
if !self.schema.output.is_eq_sort() {
panic!("Only eq sorts can be marked unextractable")
}
self.unextractable.insert(inputs.to_vec());
self.nodes.mark_unextractable(inputs);
}

/// Check if the given inputs are unextractable.
pub fn check_unextractable(&self, inputs: &[Value]) -> bool {
self.unextractable.contains(inputs)
self.nodes.get_row(inputs).unwrap().unextractable
}

/// Return a column index that contains (a superset of) the offsets for the
Expand Down Expand Up @@ -390,6 +403,7 @@ impl Function {
// Entry is stale
return result;
};
let unextractable = self.nodes.get_row(args).unwrap().unextractable;

let mut out_val = out.value;
scratch.clear();
Expand All @@ -405,36 +419,41 @@ impl Function {
return result;
}
let out_ty = &self.schema.output;
self.nodes.insert_and_merge(scratch, timestamp, |prev| {
if let Some(mut prev) = prev {
out_ty.canonicalize(&mut prev, uf);
let mut appended = false;
if self.merge.on_merge.is_some() && prev != out_val {
deferred_merges.push((scratch.clone(), prev, out_val));
appended = true;
}
match &self.merge.merge_vals {
MergeFn::Union => {
debug_assert!(self.schema.output.is_eq_sort());
uf.union_values(prev, out_val, self.schema.output.name())
self.nodes.insert_and_merge(
scratch,
timestamp,
|prev| {
if let Some(mut prev) = prev {
out_ty.canonicalize(&mut prev, uf);
let mut appended = false;
if self.merge.on_merge.is_some() && prev != out_val {
deferred_merges.push((scratch.clone(), prev, out_val));
appended = true;
}
MergeFn::AssertEq => {
if prev != out_val {
result = Err(Error::MergeError(self.decl.name, prev, out_val));
match &self.merge.merge_vals {
MergeFn::Union => {
debug_assert!(self.schema.output.is_eq_sort());
uf.union_values(prev, out_val, self.schema.output.name())
}
prev
}
MergeFn::Expr(_) => {
if !appended && prev != out_val {
deferred_merges.push((scratch.clone(), prev, out_val));
MergeFn::AssertEq => {
if prev != out_val {
result = Err(Error::MergeError(self.decl.name, prev, out_val));
}
prev
}
MergeFn::Expr(_) => {
if !appended && prev != out_val {
deferred_merges.push((scratch.clone(), prev, out_val));
}
prev
}
prev
}
} else {
out_val
}
} else {
out_val
}
});
},
unextractable,
);
if let Some((inputs, _)) = self.nodes.get_index(i) {
if inputs != &scratch[..] {
scratch.clear();
Expand Down
111 changes: 89 additions & 22 deletions src/function/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
//! It's likely that we will have to store these "on the side" or use some sort
//! of persistent data-structure for the entire table.
use std::{
fmt::{Debug, Formatter},
hash::{BuildHasher, Hash, Hasher},
mem,
ops::Range,
Expand All @@ -46,12 +47,29 @@ struct TableOffset {
off: Offset,
}

#[derive(Debug, Clone)]
pub(crate) struct Row {
pub(crate) input: Input,
pub(crate) output: TupleOutput,
pub(crate) unextractable: bool,
}

impl Row {
fn new(input: Input, output: TupleOutput, unextractable: bool) -> Row {
Row {
input,
output,
unextractable,
}
}
}

#[derive(Default, Clone)]
pub(crate) struct Table {
max_ts: u32,
n_stale: usize,
table: RawTable<TableOffset>,
pub(crate) vals: Vec<(Input, TupleOutput)>,
pub(crate) vals: Vec<Row>,
}

/// Used for the RawTable probe sequence.
Expand All @@ -64,12 +82,22 @@ macro_rules! search_for {
}
// If the hash matches, the value should not be stale, and the data
// should match.
let inp = &$slf.vals[to.off as usize].0;
let inp = &$slf.vals[to.off as usize].input;
inp.live() && inp.data() == $inp
}
};
}

impl Debug for Table {
fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
f.debug_struct("Table")
.field("max_ts", &self.max_ts)
.field("n_stale", &self.n_stale)
.field("vals", &self.vals)
.finish()
}
}

impl Table {
/// Clear the contents of the table.
pub(crate) fn clear(&mut self) {
Expand All @@ -89,7 +117,7 @@ impl Table {
let mut src = 0usize;
let mut dst = 0usize;
self.table.clear();
self.vals.retain(|(inp, _)| {
self.vals.retain(|Row { input: inp, .. }| {
if inp.live() {
let hash = hash_values(inp.data());
self.table
Expand All @@ -108,20 +136,38 @@ impl Table {
/// Get the entry in the table for the given values, if they are in the
/// table.
pub(crate) fn get(&self, inputs: &[Value]) -> Option<&TupleOutput> {
self.get_row(inputs).map(|row| &row.output)
}

pub(crate) fn get_row(&self, inputs: &[Value]) -> Option<&Row> {
let hash = hash_values(inputs);
let TableOffset { off, .. } = self.table.get(hash, search_for!(self, hash, inputs))?;
debug_assert!(self.vals[*off].0.live());
Some(&self.vals[*off].1)
debug_assert!(self.vals[*off].input.live());
Some(&self.vals[*off])
}

pub(crate) fn mark_unextractable(&mut self, inputs: &[Value]) {
let hash = hash_values(inputs);
let TableOffset { off, .. } = self
.table
.get(hash, search_for!(self, hash, inputs))
.unwrap();
self.vals[*off].unextractable = true;
}

/// Insert the given data into the table at the given timestamp. Return the
/// previous value, if there was one.
pub(crate) fn insert(&mut self, inputs: &[Value], out: Value, ts: u32) -> Option<Value> {
let mut res = None;
self.insert_and_merge(inputs, ts, |prev| {
res = prev;
out
});
self.insert_and_merge(
inputs,
ts,
|prev| {
res = prev;
out
},
false,
);
res
}

Expand All @@ -137,14 +183,19 @@ impl Table {
inputs: &[Value],
ts: u32,
on_merge: impl FnOnce(Option<Value>) -> Value,
unextractable: bool,
) {
assert!(ts >= self.max_ts);
self.max_ts = ts;
let hash = hash_values(inputs);
if let Some(TableOffset { off, .. }) =
self.table.get_mut(hash, search_for!(self, hash, inputs))
{
let (inp, prev) = &mut self.vals[*off];
let Row {
input: inp,
output: prev,
..
} = &mut self.vals[*off];
let next = on_merge(Some(prev.value));
if next == prev.value {
return;
Expand All @@ -153,23 +204,25 @@ impl Table {
self.n_stale += 1;
let k = mem::take(&mut inp.data);
let new_offset = self.vals.len();
self.vals.push((
self.vals.push(Row::new(
Input::new(k),
TupleOutput {
value: next,
timestamp: ts,
},
unextractable,
));
*off = new_offset;
return;
}
let new_offset = self.vals.len();
self.vals.push((
self.vals.push(Row::new(
Input::new(inputs.into()),
TupleOutput {
value: on_merge(None),
timestamp: ts,
},
unextractable,
));
self.table.insert(
hash,
Expand Down Expand Up @@ -198,7 +251,7 @@ impl Table {

/// The minimum timestamp stored by the table, if there is one.
pub(crate) fn min_ts(&self) -> Option<u32> {
Some(self.vals.first()?.1.timestamp)
Some(self.vals.first()?.output.timestamp)
}

/// An upper bound for all timestamps stored in the table.
Expand All @@ -208,7 +261,7 @@ impl Table {

/// Get the timestamp for the entry at index `i`.
pub(crate) fn get_timestamp(&self, i: usize) -> Option<u32> {
Some(self.vals.get(i)?.1.timestamp)
Some(self.vals.get(i)?.output.timestamp)
}

/// Remove the given mapping from the table, returns whether an entry was
Expand All @@ -221,20 +274,28 @@ impl Table {
} else {
return false;
};
self.vals[entry.off].0.stale_at = ts;
self.vals[entry.off].input.stale_at = ts;
self.n_stale += 1;
true
}

/// Returns the entries at the given index if the entry is live and the index in bounds.
pub(crate) fn get_index(&self, i: usize) -> Option<(&[Value], &TupleOutput)> {
let (inp, out) = self.vals.get(i)?;
let Row {
input: inp,
output: out,
..
} = self.get_index_row(i)?;
if !inp.live() {
return None;
}
Some((inp.data(), out))
}

pub(crate) fn get_index_row(&self, i: usize) -> Option<&Row> {
self.vals.get(i)
}

/// Iterate over the live entries in the table, in insertion order.
pub(crate) fn iter(&self) -> impl Iterator<Item = (&[Value], &TupleOutput)> + '_ {
self.iter_range(0..self.num_offsets())
Expand All @@ -247,24 +308,30 @@ impl Table {
&self,
range: Range<usize>,
) -> impl Iterator<Item = (usize, &[Value], &TupleOutput)> + '_ {
self.vals[range.clone()]
.iter()
.zip(range)
.filter_map(|((inp, out), i)| {
self.vals[range.clone()].iter().zip(range).filter_map(
|(
Row {
input: inp,
output: out,
..
},
i,
)| {
if inp.live() {
Some((i, inp.data(), out))
} else {
None
}
})
},
)
}

#[cfg(debug_assertions)]
pub(crate) fn assert_sorted(&self) {
assert!(self
.vals
.windows(2)
.all(|xs| xs[0].1.timestamp <= xs[1].1.timestamp))
.all(|xs| xs[0].output.timestamp <= xs[1].output.timestamp))
}

/// Iterate over the live entries in the timestamp range, passing back their
Expand Down
Loading

0 comments on commit 78bcdb5

Please sign in to comment.