Skip to content

Commit

Permalink
Merge pull request #227 from oflatt/oflatt-binary-joins
Browse files Browse the repository at this point in the history
Small gj refactor
  • Loading branch information
yihozhang authored Sep 11, 2023
2 parents b8912bd + 8979422 commit 64e27cd
Showing 1 changed file with 88 additions and 74 deletions.
162 changes: 88 additions & 74 deletions src/gj.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,8 @@ impl EGraph {
self.make_trie_access_for_column(atom, column, timestamp_range)
}

// Returns `None` when no program is needed,
// for example when there is nothing in one of the tables.
fn compile_program(
&self,
query: &CompiledQuery,
Expand Down Expand Up @@ -620,6 +622,84 @@ impl EGraph {
}
}

fn gj_for_atom<F>(
&self,
// for debugging, the atom seminaive is focusing on
atom_i: Option<usize>,
timestamp_ranges: &[Range<u32>],
cq: &CompiledQuery,
mut f: F,
) where
F: FnMut(&[Value]) -> Result,
{
// do the gj
if let Some((mut ctx, program, cols)) = Context::new(self, cq, timestamp_ranges) {
let start = Instant::now();
let atom_info = if let Some(atom_i) = atom_i {
let atom = &cq.query.atoms[atom_i];
format!("New atom: {atom}")
} else {
"Seminaive disabled".to_string()
};
log::debug!(
"Query:\n{q}\n{atom_info}\nTuple: {tuple}\nJoin order: {order}\nProgram\n{program}",
q = cq.query,
order = ListDisplay(&ctx.join_var_ordering, " "),
tuple = ListDisplay(cq.vars.keys(), " "),
);
let mut tries = Vec::with_capacity(cq.query.atoms.len());
for ((atom, ts), col) in cq
.query
.atoms
.iter()
.zip(timestamp_ranges.iter())
.zip(cols.iter())
{
// tries.push(LazyTrie::default());
if let Some(target) = col {
if let Some(col) = self.functions[&atom.head].column_index(*target, ts) {
tries.push(LazyTrie::from_column_index(col))
} else {
tries.push(LazyTrie::default());
}
} else {
tries.push(LazyTrie::default());
}
}
let mut trie_refs = tries.iter().collect::<Vec<_>>();
let mut meausrements = HashMap::<usize, Vec<usize>>::default();
let stages = InputSizes {
stage_sizes: &mut meausrements,
cur_stage: 0,
};
ctx.eval(&mut trie_refs, &program.0, stages, &mut f)
.unwrap_or(());
let mut sums = Vec::from_iter(
meausrements
.iter()
.map(|(x, y)| (*x, y.iter().copied().sum::<usize>())),
);
sums.sort_by_key(|(i, _sum)| *i);
if log_enabled!(log::Level::Debug) {
for (i, sum) in sums {
log::debug!("stage {i} total cost {sum}");
}
}
let duration = start.elapsed();
log::debug!("Matched {} times (took {:?})", ctx.matches, duration,);
let iteration = self
.ruleset_iteration
.get::<Symbol>(&"".into())
.unwrap_or(&0);
if duration.as_millis() > 1000 {
log::warn!(
"Query took a long time at iter {iteration} : {:?}",
duration
);
}
}
}

pub(crate) fn run_query<F>(&self, cq: &CompiledQuery, timestamp: u32, mut f: F)
where
F: FnMut(&[Value]) -> Result,
Expand All @@ -642,82 +722,16 @@ impl EGraph {
let do_seminaive = self.seminaive && !global_updated;
// for the later atoms, we consider everything
let mut timestamp_ranges = vec![0..u32::MAX; cq.query.atoms.len()];
for (atom_i, atom) in cq.query.atoms.iter().enumerate() {
// this time, we only consider "new stuff" for this atom
if do_seminaive {
if do_seminaive {
for (atom_i, _atom) in cq.query.atoms.iter().enumerate() {
timestamp_ranges[atom_i] = timestamp..u32::MAX;
self.gj_for_atom(Some(atom_i), &timestamp_ranges, cq, &mut f);
// now we can fix this atom to be "old stuff" only
// range is half-open; timestamp is excluded
timestamp_ranges[atom_i] = 0..timestamp;
}

// do the gj

if let Some((mut ctx, program, cols)) = Context::new(self, cq, &timestamp_ranges) {
let start = Instant::now();
log::debug!(
"Query:\n{q}\nNew atom: {atom}\nTuple: {tuple}\nJoin order: {order}\nProgram\n{program}",
q = cq.query,
order = ListDisplay(&ctx.join_var_ordering, " "),
tuple = ListDisplay(cq.vars.keys(), " "),
);
let mut tries = Vec::with_capacity(cq.query.atoms.len());
for ((atom, ts), col) in cq
.query
.atoms
.iter()
.zip(timestamp_ranges.iter())
.zip(cols.iter())
{
// tries.push(LazyTrie::default());
if let Some(target) = col {
if let Some(col) = self.functions[&atom.head].column_index(*target, ts)
{
tries.push(LazyTrie::from_column_index(col))
} else {
tries.push(LazyTrie::default());
}
} else {
tries.push(LazyTrie::default());
}
}
let mut trie_refs = tries.iter().collect::<Vec<_>>();
let mut meausrements = HashMap::<usize, Vec<usize>>::default();
let stages = InputSizes {
stage_sizes: &mut meausrements,
cur_stage: 0,
};
ctx.eval(&mut trie_refs, &program.0, stages, &mut f)
.unwrap_or(());
let mut sums = Vec::from_iter(
meausrements
.iter()
.map(|(x, y)| (*x, y.iter().copied().sum::<usize>())),
);
sums.sort_by_key(|(i, _sum)| *i);
if log_enabled!(log::Level::Debug) {
for (i, sum) in sums {
log::debug!("stage {i} total cost {sum}");
}
}
let duration = start.elapsed();
log::debug!("Matched {} times (took {:?})", ctx.matches, duration,);
let iteration = self
.ruleset_iteration
.get::<Symbol>(&"".into())
.unwrap_or(&0);
if duration.as_millis() > 1000 {
log::warn!(
"Query took a long time at iter {iteration} : {:?}",
duration
);
}
}

if !do_seminaive {
break;
}

// now we can fix this atom to be "old stuff" only
// range is half-open; timestamp is excluded
timestamp_ranges[atom_i] = 0..timestamp;
} else {
self.gj_for_atom(None, &timestamp_ranges, cq, &mut f);
}
} else if let Some((mut ctx, program, _)) = Context::new(self, cq, &[]) {
let mut meausrements = HashMap::<usize, Vec<usize>>::default();
Expand Down

0 comments on commit 64e27cd

Please sign in to comment.