-
Notifications
You must be signed in to change notification settings - Fork 5
/
main.py
executable file
·122 lines (98 loc) · 4.5 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
#!/usr/bin/env python3
import plac
import sys
from swarm import Swarm
from pathlib import Path
import logging
from config import load_config
from taskpool import TaskPool
from random import shuffle
from threading import Event
from rich.logging import RichHandler
logging.basicConfig(
level="INFO",
format="%(message)s",
datefmt="[%X]",
handlers=[RichHandler(rich_tracebacks=True)],
)
log = logging.getLogger("rich")
@plac.pos("max_concurrent", "Max concurrent threads - recommended around 6 per core", type=int)
@plac.pos("test_plan", "A test plan file. See testplan.example.yaml", type=Path)
@plac.pos(
"service_config", "A file containing details about the target service. See service_config.example.yaml", type=Path
)
def main(max_concurrent, test_plan, service_config):
assert max_concurrent > 5, "Surely you can spare more than 5 concurrent threads?"
logging.basicConfig(level=logging.INFO)
pull_secret, service_config, test_plan = load_config(service_config, test_plan)
with TaskPool(max_workers=max_concurrent) as agents_taskpool:
with TaskPool(max_workers=max_concurrent) as clusters_taskpool:
swarm = Swarm(
pull_secret=pull_secret,
pull_secret_file=service_config["pull_secret_file"],
service_url=service_config["service_endpoint"],
release_image=service_config["release_image"],
ssh_pub_key=service_config["ssh_pub_key"],
)
swarm.start()
execute_plan(agents_taskpool, clusters_taskpool, test_plan, swarm)
swarm.logging.info("All clusters finished, exiting")
swarm.finalize()
def execute_plan(agents_taskpool: TaskPool, clusters_taskpool: TaskPool, test_plan, swarm: Swarm):
clusters = [
(
c["single_node"],
c["num_workers"],
c.get("with_nmstate", False),
c.get("just_infraenv", False),
c.get("infraenv_labels", {}),
)
for c in test_plan["clusters"]
for _ in range(c["amount"])
]
if test_plan.get("shuffle", False):
shuffle(clusters)
# We use a couple of events to allow cluster X to signal to cluster Y that
# cluster X has launched all of its agents, and only then cluster Y will
# launch its own agents. This allows us to launch clusters in parallel
# while still giving a cluster that launched early a priority to launch all
# of its agents before any of the clusters that were launched after it.
# This prevents a situation where all clusters race to create agent
# threads, saturating the thread pool, and as a result the clusters are in
# a dead lock because non of them have enough agents to finish the
# installation (a finished installation is necessary for agents to die and
# make space in the thread pool). This synchronization of events makes it
# so that only a single cluster at a time may have a partial amount of
# agents launched. One cluster's "I've launched all of my agents" event is
# another cluster's "can start all agents" event.
# The reason we don't simply launch clusters one after the other is because
# before launching agents, a cluster has a lot of work it needs to do, and
# there's no reason for that work to be delayed. That mostly includes
# creating CR's and waiting for the service to reconcile them.
# Create an initial "dummy" event that is immediately set for the first cluster
# since it doesn't have any cluster it needs to wait for.
previous_cluster_started_all_agents = Event()
previous_cluster_started_all_agents.set()
for cluster_index, (single_node, num_workers, with_nmstate, just_infraenv, infraenv_labels) in enumerate(clusters):
current_cluster_started_all_agents = Event()
clusters_taskpool.submit(
swarm.launch_cluster,
index=cluster_index,
task_pool=agents_taskpool,
single_node=single_node,
num_workers=num_workers,
with_nmstate=with_nmstate,
just_infraenv=just_infraenv,
infraenv_labels=infraenv_labels,
can_start_agents=previous_cluster_started_all_agents,
started_all_agents=current_cluster_started_all_agents,
)
previous_cluster_started_all_agents = current_cluster_started_all_agents
clusters_taskpool.wait()
agents_taskpool.wait()
if __name__ == "__main__":
try:
plac.call(main)
except Exception as e:
logging.exception(e)
sys.exit(1)