Skip to content

Commit

Permalink
Sped up has_cycles using phaseline method (N^2 --> N)
Browse files Browse the repository at this point in the history
Added detailed docstring examples to phaseline.
Found much faster algorithm for determining phaselines.
  • Loading branch information
root-11 committed May 20, 2023
1 parent 28be548 commit da8b601
Showing 1 changed file with 84 additions and 45 deletions.
129 changes: 84 additions & 45 deletions graph/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -750,7 +750,6 @@ def maximum_flow(graph, start, end):
# ready for the next iteration.
edges = graph.edges(path)
for n1, n2, d in edges:

# 3.a. recording:
v = flow_graph.edge(n1, n2, default=None)
if v is None:
Expand Down Expand Up @@ -911,7 +910,6 @@ def lower_bound(graph, nodes):
L = []
edges = set()
for n in nodes:

L2 = [(d, e) for s, e, d in graph.edges(from_node=n) if e in nodes - {n}]
if not L2:
continue
Expand Down Expand Up @@ -964,7 +962,6 @@ def lower_bound(graph, nodes):
remaining_nodes = all_nodes - tour_set

for n2 in remaining_nodes:

new_tour = tour + (n2,)

lb_set = remaining_nodes - {n2}
Expand Down Expand Up @@ -1174,13 +1171,14 @@ def has_cycles(graph):
if not isinstance(graph, (BasicGraph, Graph, Graph3D)):
raise TypeError(f"Expected BasicGraph, Graph or Graph3D, not {type(graph)}")

for n1, n2, d in graph.edges():
if n1 == n2:
for n1, n2, _ in graph.edges():
if n1 == n2: # detect nodes that point to themselves
return True
if graph.is_connected(n2, n1):
return True
return False

try:
_ = list(graph.phase_lines()) # tries to create a DAG.
return False
except AttributeError:
return True

def components(graph):
"""Determines the components of the graph
Expand Down Expand Up @@ -1262,7 +1260,7 @@ def topological_sort(graph, key=None):
https://en.wikipedia.org/wiki/Topological_sorting
Note: The algorithm does not check for loops before initiating
the sortation, but raise Exception at the first conflict.
the sortation, but raise AttributeError at the first conflict.
This saves O(m+n) runtime.
"""
Expand All @@ -1287,11 +1285,54 @@ def key(x):
zero_in_degree = sorted(g2.nodes(in_degree=0), key=key)

if g2.nodes():
raise TypeError(f"Graph is not acyclic: Loop found: {g2.nodes()}")
raise AttributeError(f"Graph is not acyclic: Loop found: {g2.nodes()}")


def phase_lines(graph):
"""Determines the phase lines of a directed graph.
This is useful for determining which tasks can be performed in
parallel. Each phase in the phaselines must be completed to assure
that the tasks in the next phase can be performed with complete input.
This is in contrast to Topological sort that only generates
a queue of tasks, may be fine for a single processor, but has no
mechanism for coordination that all inputs for a task have been completed
so that multiple processors can work on them.
Example: DAG with tasks:
u1 u4 u2 u3
\ \ \_______\
csg cs3 append
\ \ \
op1 \ op3
\ \ \
op2 \ cs2
\ \___________\
cs1 join
\ \
map1 map2
\___________\
save
phaselines = {
"u1": 0, "u4": 0, "u2": 0, "u3": 0,
"csg": 1, "cs3": 1, "append": 1,
"op1": 2, "op3": 2, "op2": 3, "cs2": 3,
"cs1": 4, "join": 4,
"map1": 5, "map2": 5,
"save": 6,
}
From this example it is visible that processing the 4 'uN' (uploads) is
the highest degree of concurrency. This can be determined as follows:
d = defaultdict(int)
for _, pl in graph.phaselines():
d[pl] += 1
max_processors = max(d, key=d.get)
:param graph: Graph
:return: dictionary with node id : phase in cut.
Expand All @@ -1311,40 +1352,39 @@ def phase_lines(graph):
if not isinstance(graph, (BasicGraph, Graph, Graph3D)):
raise TypeError(f"Expected BasicGraph, Graph or Graph3D, not {type(graph)}")

phases = {n: 0 for n in graph.nodes()}
sinks = {n: set() for n in phases} # sinks[e] = {s1,s2}
edges = {n: set() for n in phases}
for s, e, d in graph.edges():
sinks[e].add(s)
edges[s].add(e)

cache = {}

level = 0
while sinks:
sources = [e for e in sinks if not sinks[e]] # these nodes have in_degree=0
if not sources:
raise AttributeError("The graph does not have any sinks.")
for s in sources:
phases[s] = level # let's update the phase value
del sinks[s] # and let's remove their sink entry.
# and remove their set item from the sinks dict
for e in edges[s]:
if e not in sinks:
continue
sinks[e].discard(s)
# but also check if their descendants are loops.
for s2 in list(sinks[e]):

con = cache.get((e, s2)) # check if the edge has been seen before.
if con is None:
con = graph.is_connected(e, s2) # if not seen before, search...
cache[(e, s2)] = con

if con:
sinks[e].discard(s2)
level += 1
nmax = len(graph.nodes())
phases = {n: nmax + 1 for n in graph.nodes()}
phase_counter = 0

g2 = graph.copy()
q = list(g2.nodes(in_degree=0)) # Current iterations work queue
if not q:
raise AttributeError("The graph does not have any sources.")

q2 = set() # Next iterations work queue
while q:
for n1 in q:
if g2.in_degree(n1)!=0:
q2.add(n1) # n1 has an in coming edge, so it has to wait.
continue
phases[n1] = phase_counter # update the phaseline number
for n2 in g2.nodes(from_node=n1):
q2.add(n2) # add node for next iteration

# at this point the nodes with no incoming edges have been accounted
# for, so now they may be removed for the working graph.
for n1 in q:
if n1 not in q2:
g2.del_node(n1) # remove nodes that have no incoming edges

if set(q) == q2:
raise AttributeError(f"Loop found: The graph is not acyclic!")

# Finally turn the next iterations workqueue into current.
# and increment the phaseline counter.
q = [n for n in q2]
q2.clear()
phase_counter += 1
return phases


Expand Down Expand Up @@ -2504,7 +2544,6 @@ def critical_path_minimize_for_slack(graph):
and new_graph.edge(n2, n1) is None # ...downstream
and new_graph.edge(n1, n2) is None # ... not creating a cycle.
): # ... not already a dependency.

new_graph.add_edge(n1, n2)
new_edges.append((n1, n2))

Expand Down

0 comments on commit da8b601

Please sign in to comment.