From ece10b3d042d1a15f0b313a40f349a760d49e079 Mon Sep 17 00:00:00 2001 From: oflatt Date: Fri, 8 Sep 2023 09:37:57 -0700 Subject: [PATCH 1/3] cleanup gj somewhat --- src/gj.rs | 155 ++++++++++++++++++++++++++++-------------------------- 1 file changed, 81 insertions(+), 74 deletions(-) diff --git a/src/gj.rs b/src/gj.rs index bdc69f63..59f5e255 100644 --- a/src/gj.rs +++ b/src/gj.rs @@ -620,6 +620,78 @@ impl EGraph { } } + fn gj_for_atom( + &self, + atom_i: usize, + timestamp_ranges: &Vec>, + cq: &CompiledQuery, + mut f: F, + ) where + F: FnMut(&[Value]) -> Result, + { + let atom = &cq.query.atoms[atom_i]; + // do the gj + if let Some((mut ctx, program, cols)) = Context::new(self, cq, timestamp_ranges) { + let start = Instant::now(); + log::debug!( + "Query:\n{q}\nNew atom: {atom}\nTuple: {tuple}\nJoin order: {order}\nProgram\n{program}", + q = cq.query, + order = ListDisplay(&ctx.join_var_ordering, " "), + tuple = ListDisplay(cq.vars.keys(), " "), + ); + let mut tries = Vec::with_capacity(cq.query.atoms.len()); + for ((atom, ts), col) in cq + .query + .atoms + .iter() + .zip(timestamp_ranges.iter()) + .zip(cols.iter()) + { + // tries.push(LazyTrie::default()); + if let Some(target) = col { + if let Some(col) = self.functions[&atom.head].column_index(*target, ts) { + tries.push(LazyTrie::from_column_index(col)) + } else { + tries.push(LazyTrie::default()); + } + } else { + tries.push(LazyTrie::default()); + } + } + let mut trie_refs = tries.iter().collect::>(); + let mut meausrements = HashMap::>::default(); + let stages = InputSizes { + stage_sizes: &mut meausrements, + cur_stage: 0, + }; + ctx.eval(&mut trie_refs, &program.0, stages, &mut f) + .unwrap_or(()); + let mut sums = Vec::from_iter( + meausrements + .iter() + .map(|(x, y)| (*x, y.iter().copied().sum::())), + ); + sums.sort_by_key(|(i, _sum)| *i); + if log_enabled!(log::Level::Debug) { + for (i, sum) in sums { + log::debug!("stage {i} total cost {sum}"); + } + } + let duration = start.elapsed(); + log::debug!("Matched {} times (took {:?})", ctx.matches, duration,); + let iteration = self + .ruleset_iteration + .get::(&"".into()) + .unwrap_or(&0); + if duration.as_millis() > 1000 { + log::warn!( + "Query took a long time at iter {iteration} : {:?}", + duration + ); + } + } + } + pub(crate) fn run_query(&self, cq: &CompiledQuery, timestamp: u32, mut f: F) where F: FnMut(&[Value]) -> Result, @@ -642,82 +714,17 @@ impl EGraph { let do_seminaive = self.seminaive && !global_updated; // for the later atoms, we consider everything let mut timestamp_ranges = vec![0..u32::MAX; cq.query.atoms.len()]; - for (atom_i, atom) in cq.query.atoms.iter().enumerate() { - // this time, we only consider "new stuff" for this atom - if do_seminaive { + if do_seminaive { + for (atom_i, _atom) in cq.query.atoms.iter().enumerate() { timestamp_ranges[atom_i] = timestamp..u32::MAX; + self.gj_for_atom(atom_i, ×tamp_ranges, cq, &mut f); + // now we can fix this atom to be "old stuff" only + // range is half-open; timestamp is excluded + timestamp_ranges[atom_i] = 0..timestamp; } - - // do the gj - - if let Some((mut ctx, program, cols)) = Context::new(self, cq, ×tamp_ranges) { - let start = Instant::now(); - log::debug!( - "Query:\n{q}\nNew atom: {atom}\nTuple: {tuple}\nJoin order: {order}\nProgram\n{program}", - q = cq.query, - order = ListDisplay(&ctx.join_var_ordering, " "), - tuple = ListDisplay(cq.vars.keys(), " "), - ); - let mut tries = Vec::with_capacity(cq.query.atoms.len()); - for ((atom, ts), col) in cq - .query - .atoms - .iter() - .zip(timestamp_ranges.iter()) - .zip(cols.iter()) - { - // tries.push(LazyTrie::default()); - if let Some(target) = col { - if let Some(col) = self.functions[&atom.head].column_index(*target, ts) - { - tries.push(LazyTrie::from_column_index(col)) - } else { - tries.push(LazyTrie::default()); - } - } else { - tries.push(LazyTrie::default()); - } - } - let mut trie_refs = tries.iter().collect::>(); - let mut meausrements = HashMap::>::default(); - let stages = InputSizes { - stage_sizes: &mut meausrements, - cur_stage: 0, - }; - ctx.eval(&mut trie_refs, &program.0, stages, &mut f) - .unwrap_or(()); - let mut sums = Vec::from_iter( - meausrements - .iter() - .map(|(x, y)| (*x, y.iter().copied().sum::())), - ); - sums.sort_by_key(|(i, _sum)| *i); - if log_enabled!(log::Level::Debug) { - for (i, sum) in sums { - log::debug!("stage {i} total cost {sum}"); - } - } - let duration = start.elapsed(); - log::debug!("Matched {} times (took {:?})", ctx.matches, duration,); - let iteration = self - .ruleset_iteration - .get::(&"".into()) - .unwrap_or(&0); - if duration.as_millis() > 1000 { - log::warn!( - "Query took a long time at iter {iteration} : {:?}", - duration - ); - } - } - - if !do_seminaive { - break; - } - - // now we can fix this atom to be "old stuff" only - // range is half-open; timestamp is excluded - timestamp_ranges[atom_i] = 0..timestamp; + } else { + let atom_i = 0; + self.gj_for_atom(atom_i, ×tamp_ranges, cq, &mut f); } } else if let Some((mut ctx, program, _)) = Context::new(self, cq, &[]) { let mut meausrements = HashMap::>::default(); From 289dce338436c72293c6ba78e5bea271e53852d3 Mon Sep 17 00:00:00 2001 From: oflatt Date: Fri, 8 Sep 2023 09:40:26 -0700 Subject: [PATCH 2/3] nit --- src/gj.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/gj.rs b/src/gj.rs index 59f5e255..f0b76d9e 100644 --- a/src/gj.rs +++ b/src/gj.rs @@ -623,7 +623,7 @@ impl EGraph { fn gj_for_atom( &self, atom_i: usize, - timestamp_ranges: &Vec>, + timestamp_ranges: &[Range], cq: &CompiledQuery, mut f: F, ) where From 897942240577e045f40b4968affb1d397af865d8 Mon Sep 17 00:00:00 2001 From: oflatt Date: Fri, 8 Sep 2023 11:55:56 -0700 Subject: [PATCH 3/3] more cleanup --- src/gj.rs | 27 +++++++++++++++++---------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/src/gj.rs b/src/gj.rs index f0b76d9e..8a7b38e6 100644 --- a/src/gj.rs +++ b/src/gj.rs @@ -403,6 +403,8 @@ impl EGraph { self.make_trie_access_for_column(atom, column, timestamp_range) } + // Returns `None` when no program is needed, + // for example when there is nothing in one of the tables. fn compile_program( &self, query: &CompiledQuery, @@ -622,23 +624,29 @@ impl EGraph { fn gj_for_atom( &self, - atom_i: usize, + // for debugging, the atom seminaive is focusing on + atom_i: Option, timestamp_ranges: &[Range], cq: &CompiledQuery, mut f: F, ) where F: FnMut(&[Value]) -> Result, { - let atom = &cq.query.atoms[atom_i]; // do the gj if let Some((mut ctx, program, cols)) = Context::new(self, cq, timestamp_ranges) { let start = Instant::now(); + let atom_info = if let Some(atom_i) = atom_i { + let atom = &cq.query.atoms[atom_i]; + format!("New atom: {atom}") + } else { + "Seminaive disabled".to_string() + }; log::debug!( - "Query:\n{q}\nNew atom: {atom}\nTuple: {tuple}\nJoin order: {order}\nProgram\n{program}", - q = cq.query, - order = ListDisplay(&ctx.join_var_ordering, " "), - tuple = ListDisplay(cq.vars.keys(), " "), - ); + "Query:\n{q}\n{atom_info}\nTuple: {tuple}\nJoin order: {order}\nProgram\n{program}", + q = cq.query, + order = ListDisplay(&ctx.join_var_ordering, " "), + tuple = ListDisplay(cq.vars.keys(), " "), + ); let mut tries = Vec::with_capacity(cq.query.atoms.len()); for ((atom, ts), col) in cq .query @@ -717,14 +725,13 @@ impl EGraph { if do_seminaive { for (atom_i, _atom) in cq.query.atoms.iter().enumerate() { timestamp_ranges[atom_i] = timestamp..u32::MAX; - self.gj_for_atom(atom_i, ×tamp_ranges, cq, &mut f); + self.gj_for_atom(Some(atom_i), ×tamp_ranges, cq, &mut f); // now we can fix this atom to be "old stuff" only // range is half-open; timestamp is excluded timestamp_ranges[atom_i] = 0..timestamp; } } else { - let atom_i = 0; - self.gj_for_atom(atom_i, ×tamp_ranges, cq, &mut f); + self.gj_for_atom(None, ×tamp_ranges, cq, &mut f); } } else if let Some((mut ctx, program, _)) = Context::new(self, cq, &[]) { let mut meausrements = HashMap::>::default();