From 5229914ec85484358f10062dafea171dda021b23 Mon Sep 17 00:00:00 2001 From: Zachary Spector Date: Tue, 2 Jul 2024 15:43:58 +1200 Subject: [PATCH] Delete `_get_slow_delta_threaded`, go back to just `_get_slow_delta` with numpy Turns out, cutting down on serialization really didn't make up for how much slower Python's equality comparisons are compared to numpy's. --- LiSE/LiSE/handle.py | 327 +------------------------------------------- 1 file changed, 1 insertion(+), 326 deletions(-) diff --git a/LiSE/LiSE/handle.py b/LiSE/LiSE/handle.py index 208e5db74..aa896d39e 100644 --- a/LiSE/LiSE/handle.py +++ b/LiSE/LiSE/handle.py @@ -525,7 +525,7 @@ def next_turn(self) -> Tuple[bytes, bytes]: *self._real._btt())) return pack(ret), packed_delta - def _get_slow_delta_np( + def _get_slow_delta( self, btt_from: Tuple[str, int, int] = None, btt_to: Tuple[str, int, int] = None) -> SlightlyPackedDeltaType: @@ -666,325 +666,6 @@ def _get_slow_delta_np( delta[RULEBOOKS] = dict(map(self.pack_pair, rbd.items())) return delta - def _get_slow_delta_threaded(self, btt_from: Tuple[str, int, int] = None, btt_to: Tuple[str, int, int] = None): - from operator import ne, sub - - def start_graph_val_futs(pool: ThreadPoolExecutor, kf_from: dict, kf_to: dict) -> list[Future]: - graph_val_futs = [] - for graph in kf_from['graph_val'].keys() | kf_to['graph_val'].keys(): - a = kf_from['graph_val'].get(graph, {}) - b = kf_to['graph_val'].get(graph, {}) - for k in a.keys() | b.keys(): - before = a.get(k) - after = b.get(k) - fut = pool.submit(ne, before, after) - fut.graph = graph - fut.key = k - graph_val_futs.append(k) - return graph_val_futs - - def pack_graph_val_fut(fut: Future) -> None: - graph = pack(fut.graph) - key = pack(fut.key) - v = pack(fut.after) - if graph in delta: - delta[graph][key] = v - else: - delta[graph] = {key: v, NODE_VAL: {}, EDGE_VAL: {}} - - def start_node_val_futs(pool: ThreadPoolExecutor, kf_from: dict, kf_to: dict) -> list[Future]: - node_val_futs = [] - for graph, node in kf_from['node_val'].keys() | kf_to['node_val'].keys(): - a = kf_from['node_val'].get((graph, node), {}) - b = kf_to['node_val'].get((graph, node), {}) - for k in a.keys() | b.keys(): - before = a.get(k) - after = b.get(k) - fut = pool.submit(ne, before, after) - fut.graph = graph - fut.node = node - fut.key = k - fut.before = before - fut.after = after - node_val_futs.append(fut) - return node_val_futs - - def pack_node_val_fut(fut: Future) -> None: - graph = pack(fut.graph) - node = pack(fut.node) - key = pack(fut.key) - v = pack(fut.after) - if graph not in delta: - delta[graph] = {NODE_VAL: {node: {key: v}}, EDGE_VAL: {}} - elif NODE_VAL not in delta[graph]: - delta[graph][NODE_VAL] = {node: {key: v}} - elif node not in delta[graph][NODE_VAL]: - delta[graph][NODE_VAL][node] = {key: v} - else: - delta[graph][NODE_VAL][node][key] = v - - def start_edge_val_futs(pool: ThreadPoolExecutor, kf_from: dict, kf_to: dict) -> list[Future]: - edge_val_futs = [] - for graph, orig, dest, i in kf_from['edge_val'].keys() | kf_to['edge_val'].keys(): - a = kf_from['edge_val'].get((graph, orig, dest, i), {}) - b = kf_to['edge_val'].get((graph, orig, dest, i), {}) - for k in a.keys() | b.keys(): - before = a.get(k) - after = b.get(k) - fut = pool.submit(ne, before, after) - fut.graph = graph - fut.orig = orig - fut.dest = dest - fut.idx = i - fut.key = k - fut.before = before - fut.after = after - edge_val_futs.append(fut) - return edge_val_futs - - def pack_edge_added(graph, orig, dest) -> None: - graph = pack(graph) - origdest = pack((orig, dest)) - if graph not in delta: - delta[graph] = {EDGES: {origdest: TRUE}} - elif EDGES not in delta[graph]: - delta[graph][EDGES] = {origdest: TRUE} - else: - delta[graph][EDGES][origdest] = TRUE - - def pack_edge_removed(graph, orig, dest) -> None: - graph = pack(graph) - origdest = pack((orig, dest)) - if graph not in delta: - delta[graph] = {EDGES: {origdest: FALSE}} - elif EDGES not in delta[graph]: - delta[graph][EDGES] = {origdest: FALSE} - else: - delta[graph][EDGES][origdest] = FALSE - - def pack_edge_val_fut(fut: Future) -> None: - graph = pack(fut.graph) - orig = pack(fut.orig) - dest = pack(fut.dest) - key = pack(fut.key) - v = pack(fut.after) - if graph not in delta: - delta[graph] = {EDGE_VAL: {orig: {dest: {key: v}}}} - elif orig not in delta[graph][EDGE_VAL]: - delta[graph][EDGE_VAL][orig] = {dest: {key: v}} - elif dest not in delta[graph][EDGE_VAL][orig]: - delta[graph][EDGE_VAL][orig][dest] = {key: v} - else: - delta[graph][EDGE_VAL][orig][dest][key] = v - - def start_nodes_added_removed_futs(pool: ThreadPoolExecutor, kf_from: dict, kf_to: dict) -> list[Future]: - futs = [] - for graph in kf_from['nodes'].keys() & kf_to['nodes'].keys(): - fut = pool.submit(sub, kf_from['nodes'][graph].keys(), kf_to['nodes'][graph].keys()) - fut.graph = graph[0] - fut.operation = 'removed nodes' - futs.append(fut) - fut = pool.submit(sub, kf_to['nodes'][graph].keys(), kf_from['nodes'][graph].keys()) - fut.graph = graph[0] - fut.operation = 'added nodes' - futs.append(fut) - return futs - - def pack_added_node_fut(fut: Future) -> None: - graph = pack(fut.graph) - for node in map(pack, fut.result()): - if graph not in delta: - delta[graph] = {NODES: {node: TRUE}} - elif NODES not in delta[graph]: - delta[graph][NODES] = {node: TRUE} - else: - delta[graph][NODES][node] = TRUE - - def pack_removed_node_fut(fut: Future) -> None: - graph = pack(fut.graph) - for node in map(pack, fut.result()): - if graph not in delta: - delta[graph] = {NODES: {node: FALSE}} - elif NODES not in delta[graph]: - delta[graph][NODES] = {node: FALSE} - else: - delta[graph][NODES][node] = FALSE - - def start_universal_futs(pool: ThreadPoolExecutor, univ_old: dict, univ_new: dict) -> list[Future]: - futs = [] - for k in univ_old.keys() & univ_new.keys(): - fut = pool.submit(ne, univ_old[k], univ_new[k]) - fut.key = k - fut.operation = 'universal changes' - futs.append(fut) - return futs - - def pack_universal_added(key) -> None: - delta[UNIVERSAL][pack(key)] = pack(new_univ[key]) - - def pack_universal_removed(key) -> None: - delta[UNIVERSAL][pack(key)] = NONE - - def pack_universal_fut(fut: Future) -> None: - if UNIVERSAL in delta: - delta[UNIVERSAL][pack(fut.key)] = pack(new_univ[fut.key]) - else: - delta[UNIVERSAL] = {pack(fut.key): pack(new_univ[fut.key])} - - def start_rules_futs(pool: ThreadPoolExecutor, rules_old: dict, rules_new: dict) -> list[Future]: - def changed_lists(rule_old, rule_new): - ret = {} - if rule_new['triggers'] != rule_old['triggers']: - ret['triggers'] = rule_new['triggers'] - if rule_new['prereqs'] != rule_old['prereqs']: - ret['prereqs'] = rule_new['prereqs'] - if rule_new['actions'] != rule_old['actions']: - ret['actions'] = rule_new['actions'] - return ret - futs = [] - for k in rules_old.keys() & rules_new.keys(): - fut = pool.submit(changed_lists, rules_old[k], rules_new[k]) - fut.rule = k - futs.append(fut) - return futs - - def pack_rule_added(key, new_rule) -> None: - delta[RULES][pack(key)] = pack(new_rule) - - def pack_rule_removed(key) -> None: - delta[RULES][pack(key)] = NONE - - def pack_rule_changed_fut(fut: Future) -> None: - if RULES in delta: - delta[RULES][pack(fut.rule)] = pack(fut.result()) - else: - delta[RULES] = {pack(fut.rule): pack(fut.result())} - - def start_rulebooks_futs(pool: ThreadPoolExecutor, rulebooks_old: dict, rulebooks_new: dict) -> list[Future]: - def changed(rulebook_old, rulebook_new): - if rulebook_old == rulebook_new: - return - return rulebook_new - futs = [] - for k in rulebooks_old.keys() & rulebooks_new.keys(): - fut = pool.submit(changed, rulebooks_old[k], rulebooks_new[k]) - fut.rulebook = k - futs.append(fut) - return futs - - def pack_rulebook_added(key, new_rulebook) -> None: - delta[RULEBOOKS][pack(key)] = pack(new_rulebook) - - def pack_rulebook_removed(key) -> None: - delta[RULEBOOKS][pack(key)] = NONE - - delta: Dict[bytes, Any] = {} - pack = self._real.pack - btt_from = self._get_btt(btt_from) - btt_to = self._get_btt(btt_to) - if btt_from == btt_to: - return delta - now = self._real._btt() - self._real._set_btt(*btt_from) - self._real.snap_keyframe() - kf_from = self._real._get_kf(*btt_from) - old_univ = dict(self._real.universal.items()) - old_rules = {rule: self.rule_copy(rule) for rule in self._real.rule.keys()} - old_rulebooks = {rulebook: self.rulebook_copy(rulebook) for rulebook in self._real.rulebook.keys()} - self._real._set_btt(*btt_to) - self._real.snap_keyframe() - kf_to = self._real._get_kf(*btt_to) - new_univ = dict(self._real.universal.items()) - new_rules = {rule: self.rule_copy(rule) for rule in self._real.rule.keys()} - new_rulebooks = {rulebook: self.rulebook_copy(rulebook) for rulebook in self._real.rulebook.keys()} - self._real._set_btt(*now) - with ThreadPoolExecutor() as pool: - graph_val_futs = pool.submit(start_graph_val_futs, pool, kf_from, kf_to) - node_val_futs = pool.submit(start_node_val_futs, pool, kf_from, kf_to) - edge_val_futs = pool.submit(start_edge_val_futs, pool, kf_from, kf_to) - nodes_added_removed_futs = pool.submit(start_nodes_added_removed_futs, pool, kf_from, kf_to) - universal_changed_futs = pool.submit(start_universal_futs, pool, old_univ, new_univ) - rule_changed_futs = pool.submit(start_rules_futs, pool, old_rules, new_rules) - rulebooks_changed_futs = pool.submit(start_rulebooks_futs, pool, old_rulebooks, new_rulebooks) - edges_added_fut = pool.submit(sub, kf_to['edges'].keys(), kf_from['edges'].keys()) - edges_removed_fut = pool.submit(sub, kf_from['edges'].keys(), kf_to['edges'].keys()) - universal_added_fut = pool.submit(sub, new_univ.keys(), old_univ.keys()) - universal_removed_fut = pool.submit(sub, old_univ.keys(), new_univ.keys()) - rules_added_fut = pool.submit(sub, new_rules.keys(), old_rules.keys()) - rules_removed_fut = pool.submit(sub, old_rules.keys(), new_rules.keys()) - rulebooks_added_fut = pool.submit(sub, new_rulebooks.keys(), old_rulebooks.keys()) - rulebooks_removed_fut = pool.submit(sub, old_rulebooks.keys(), new_rulebooks.keys()) - pack_futs = [] - for fut in as_completed([ - edges_added_fut, edges_removed_fut, universal_added_fut, universal_removed_fut, - rules_added_fut, rules_removed_fut, rulebooks_added_fut, rulebooks_removed_fut, - *graph_val_futs.result(), *node_val_futs.result(), *edge_val_futs.result(), - *nodes_added_removed_futs.result(), - *universal_changed_futs.result(), *rule_changed_futs.result(), *rulebooks_changed_futs.result() - ]): - r = fut.result() - if not r: - continue - if hasattr(fut, 'node'): - pack_futs.append(pool.submit(pack_node_val_fut, fut)) - elif hasattr(fut, 'dest'): - pack_futs.append(pool.submit(pack_edge_val_fut, fut)) - elif hasattr(fut, 'operation'): - if fut.operation == 'added nodes': - pack_futs.append(pool.submit(pack_added_node_fut, fut)) - elif fut.operation == 'removed nodes': - pack_futs.append(pool.submit(pack_removed_node_fut, fut)) - else: - assert fut.operation == 'universal changes' - pack_futs.append(pool.submit(pack_universal_fut, fut)) - elif fut is edges_added_fut: - for graph, orig, dest in fut.result(): - pack_futs.append(pool.submit(pack_edge_added, graph, orig, dest)) - elif fut is edges_removed_fut: - for graph, orig, dest in fut.result(): - pack_futs.append(pool.submit(pack_edge_removed, graph, orig, dest)) - elif fut is universal_added_fut and fut.result(): - if UNIVERSAL not in delta: - delta[UNIVERSAL] = {} - for key in fut.result(): - pack_futs.append(pool.submit(pack_universal_added, key)) - elif fut is universal_removed_fut and fut.result(): - if UNIVERSAL not in delta: - delta[UNIVERSAL] = {} - for key in fut.result(): - pack_futs.append(pool.submit(pack_universal_removed, key)) - elif fut is rules_added_fut and fut.result(): - if RULES not in delta: - delta[RULES] = {} - for key in fut.result(): - pack_futs.append(pool.submit(pack_rule_added, key, new_rules[key])) - elif fut is rules_removed_fut and fut.result(): - if RULES not in delta: - delta[RULES] = {} - for key in fut.result(): - pack_futs.append(pool.submit(pack_rule_removed, key)) - elif hasattr(fut, 'rule'): - pack_futs.append(pool.submit(pack_rule_changed_fut, fut)) - elif fut is rulebooks_added_fut and fut.result(): - if RULEBOOKS not in delta: - delta[RULEBOOKS] = {} - for key in fut.result(): - pack_futs.append(pool.submit(pack_rulebook_added, key, new_rulebooks[key])) - elif fut is rulebooks_removed_fut and fut.result(): - if RULEBOOKS not in delta: - delta[RULEBOOKS] = {} - for key in fut.result(): - pack_futs.append(pool.submit(pack_rulebook_removed, key)) - elif hasattr(fut, 'rulebook'): - if RULEBOOKS in delta: - delta[RULEBOOKS][pack(fut.rulebook)] = pack(fut.result()) - else: - delta[RULEBOOKS] = {pack(fut.rulebook): pack(fut.result())} - else: - pack_futs.append(pool.submit(pack_graph_val_fut, fut)) - wait(pack_futs) - return delta - @prepacked def time_travel( self, @@ -1833,9 +1514,3 @@ def game_start(self) -> None: prereqs = dict(self._real.prereq.iterplain()) actions = dict(self._real.action.iterplain()) return ret, kf, self._real.eternal, functions, methods, triggers, prereqs, actions - - -if hasattr(sys, '_is_gil_enabled') and not sys._is_gil_enabled(): - EngineHandle._get_slow_delta = EngineHandle._get_slow_delta_threaded -else: - EngineHandle._get_slow_delta = EngineHandle._get_slow_delta_np