Skip to content

Commit

Permalink
concurrent send in benchmark script (#80)
Browse files Browse the repository at this point in the history
* concurrent writes

* use better time capture

* exec psycopg directly

* exec psycopg directly

* tidy

* no tx

* update toml

* exec many

* exec many

* filter

* snip window

* lint
  • Loading branch information
ChuckHend authored Aug 25, 2023
1 parent a555c24 commit b8eebd5
Show file tree
Hide file tree
Showing 2 changed files with 165 additions and 80 deletions.
242 changes: 163 additions & 79 deletions tembo-pgmq-python/benches/bench.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from typing import Optional

import pandas as pd
import psycopg2
from matplotlib import pyplot as plt # type: ignore
from scipy.ndimage import gaussian_filter1d
from sqlalchemy import create_engine, text
Expand Down Expand Up @@ -147,35 +148,46 @@ def produce(
queue_name: The name of the queue to publish to
duration_seconds: The number of seconds to publish messages
"""
queue = PGMQueue(**connection_info)

msg = {"hello": "world"}
user = connection_info["username"]
host = connection_info["host"]
port = connection_info["port"]
password = connection_info["password"]
database = connection_info["database"]
url = f"postgresql://{user}:{password}@{host}:{port}/{database}"
conn = psycopg2.connect(url)
conn.autocommit = True
cur = conn.cursor()

all_results = []

start_time = int(time.time())
start_time = time.time()

num_msg = 0
running_duration = 0
last_print_time = start_time
last_print_time = time.time()

while running_duration < duration_seconds:
send_start = time.time()
msg_id: int = queue.send(queue_name, msg)
send_duration = time.time() - send_start
all_results.append(
{"operation": "write", "duration": round(send_duration, 4), "msg_id": msg_id, "epoch": send_start}
)
send_start = time.perf_counter()
cur.execute(f"""select * from pgmq_send('{queue_name}', '{{"hello": "world"}}')""")
msg_id = cur.fetchall()[0][0]
send_duration = time.perf_counter() - send_start
all_results.append({"operation": "write", "duration": send_duration, "msg_id": msg_id, "epoch": time.time()})
num_msg += 1
running_duration = int(time.time()) - start_time
running_duration = int(time.time() - start_time)
# log every 5 seconds
if send_start - last_print_time >= 5:
last_print_time = send_start
if time.time() - last_print_time >= 5:
last_print_time = time.time()
print(f"Total Messages Sent: {num_msg}, {running_duration} / {duration_seconds} seconds")
print(f"Total Messages Sent: {num_msg}, {int(running_duration)} / {duration_seconds} seconds")
df = pd.DataFrame(all_results)
con = create_engine(f"postgresql://{queue.username}:{queue.password}@{queue.host}:{queue.port}/{queue.database}")
df.to_sql(f"bench_results_{queue_name}", con=con, if_exists="append", index=False)
data_tuples = list(df.itertuples(index=False, name=None))
insert_query = (
f"INSERT INTO bench_results_{queue_name} (operation, duration, msg_id, epoch) VALUES (%s, %s, %s, %s);"
)
cur.executemany(insert_query, data_tuples)

cur.close()
conn.close()
print("Finished publishing messages")


Expand All @@ -184,38 +196,55 @@ def consume(queue_name: str, connection_info: dict):
Halts consumption after 5 seconds of no messages.
"""
queue = PGMQueue(**connection_info)
url = f"postgresql://{connection_info['username']}:{connection_info['password']}@{connection_info['host']}:{connection_info['port']}/{connection_info['database']}"
conn = psycopg2.connect(url)
cur = conn.cursor()

conn.autocommit = True

cur = conn.cursor()
results = []
no_message_timeout = 0
while no_message_timeout < 5:
read_start = time.time()
message: Optional[Message] = queue.read(queue_name, vt=10)
if message is None:
stmt = f"select * from pgmq_read('{queue_name}', 1, 1)"
read_start = time.perf_counter()
cur.execute(stmt)
# cur.execute("select * from pgmq_read(%s, %s, %s);", [queue_name, 1, 1])
read_duration = time.perf_counter() - read_start
message = cur.fetchall()

if len(message) == 0:
no_message_timeout += 1
if no_message_timeout > 2:
print(f"No messages for {no_message_timeout} consecutive reads")
time.sleep(0.500)
continue
else:
no_message_timeout = 0
msg_id = message[0][0]

read_duration = time.time() - read_start
results.append({"operation": "read", "duration": read_duration, "msg_id": message.msg_id, "epoch": read_start})
results.append({"operation": "read", "duration": read_duration, "msg_id": msg_id, "epoch": time.time()})

archive_start = time.time()
queue.archive(queue_name, message.msg_id)
archive_duration = time.time() - archive_start
results.append(
{"operation": "archive", "duration": archive_duration, "msg_id": message.msg_id, "epoch": archive_start}
)
archive_start = time.perf_counter()
cur.execute("select * from pgmq_archive(%s, %s);", [queue_name, msg_id])
cur.fetchall()

archive_duration = time.perf_counter() - archive_start
results.append({"operation": "archive", "duration": archive_duration, "msg_id": msg_id, "epoch": time.time()})

# divide by 2 because we're appending two results (read/archive) per message
num_consumed = len(results) / 2
print(f"Consumed {num_consumed} messages")

# divide by 2 because we're appending two results (read/archive) per message
df = pd.DataFrame(results)
con = create_engine(f"postgresql://{queue.username}:{queue.password}@{queue.host}:{queue.port}/{queue.database}")
df.to_sql(f"bench_results_{queue_name}", con=con, if_exists="append", index=False)
data_tuples = list(df.itertuples(index=False, name=None))
print("writing results: ", len(data_tuples))
insert_query = (
f"INSERT INTO bench_results_{queue_name} (operation, duration, msg_id, epoch) VALUES (%s, %s, %s, %s);"
)
cur.executemany(insert_query, data_tuples)
cur.close()
conn.close()


def summarize(queue_name: str, queue: PGMQueue, results_file: str, duration_seconds: int):
Expand All @@ -229,7 +258,6 @@ def summarize(queue_name: str, queue: PGMQueue, results_file: str, duration_seco
queue_depth["operation"] = "queue_depth"
queue_depth.rename(
columns={
# "queue_length": "duration",
"total_messages": "msg_id",
"time": "epoch",
},
Expand All @@ -238,9 +266,7 @@ def summarize(queue_name: str, queue: PGMQueue, results_file: str, duration_seco
df = pd.concat([df, queue_depth[["operation", "queue_length", "msg_id", "epoch"]]])

# iteration
trial = queue_name

all_results_csv = f"all_results_{trial}.csv"
all_results_csv = f"all_results_{queue_name}.csv"
df.to_csv(all_results_csv, index=False)

_num_df = df[df["operation"] == "archive"]
Expand All @@ -253,9 +279,6 @@ def summarize(queue_name: str, queue: PGMQueue, results_file: str, duration_seco
bbplot = _df.boxplot(
column="duration", by="operation", fontsize=12, layout=(2, 1), rot=90, figsize=(25, 20), return_type="axes"
)

bbplot[0].set_ylabel("Milliseconds")

title = f"""
num_messages = {num_messages}
duration = {duration_seconds}
Expand All @@ -273,23 +296,50 @@ def plot_rolling(csv: str, bench_name: str, duration_sec: int, params: dict):
# convert seconds to milliseconds
df["duration_ms"] = df["duration"] * 1000
df["time"] = pd.to_datetime(df["epoch"], unit="s")

max_read = df[df.operation == "read"].time.max()
result = (
df[df["time"] <= max_read].groupby("operation").agg({"time": lambda x: x.max() - x.min(), "operation": "size"})
)
result.columns = ["range", "num_messages"]
result.reset_index(inplace=True)

result.columns = ["operation", "range", "num_messages"]
result["total_duration_seconds"] = result["range"].apply(lambda x: x.total_seconds())
result["messages_per_second"] = result["num_messages"] / result["total_duration_seconds"]
output_str = []
for _, row in result.iterrows():
operation = row["operation"]
if operation == "queue_depth":
continue
s = f"{operation}: total seconds: {row['total_duration_seconds']}, total messages: {row['num_messages']}, message / sec = {row['messages_per_second']:.2f}"
output_str.append(s)

output_str = "\n".join(output_str)

# Plotting
_, ax1 = plt.subplots(figsize=(20, 10))

# plot th operations
# plot the operations
color_map = {"read": "orange", "write": "blue", "archive": "green"}
sigma = 1000 # Adjust as needed for the desired smoothing level
for op in ["read", "write", "archive"]:
_df = df[df["operation"] == op].sort_values("time")
ax1.plot(_df["time"], _df[["duration_ms"]].apply(lambda x: gaussian_filter1d(x, sigma)), label=op)
ax1.plot(
_df["time"],
_df[["duration_ms"]].apply(lambda x: gaussian_filter1d(x, sigma)),
label=op,
color=color_map[op],
)
ax1.legend(loc="upper left")

ax1.set_xlabel("time")
ax1.set_ylabel("Duration (ms)")
plt.suptitle("PGMQ Concurrent Produce/Consumer Benchmark")
plt.suptitle(f"PGMQ Concurrent Produce/Consumer Benchmark\n{output_str}")
plt.title(params)
# Create a second y-axis for 'queue_length'
ax2 = ax1.twinx()
queue_depth_data = df[df["operation"] == "queue_depth"]
queue_depth_data = df[df["time"] <= max_read][df["operation"] == "queue_depth"]
ax2.plot(queue_depth_data["time"], queue_depth_data["queue_length"], color="gray", label="queue_depth")
ax2.set_ylabel("queue_depth", color="gray")
ax2.tick_params("y", colors="gray")
Expand All @@ -304,47 +354,41 @@ def plot_rolling(csv: str, bench_name: str, duration_sec: int, params: dict):


def queue_depth(queue_name: str, connection_info: dict, kill_flag: multiprocessing.Value):
from sqlalchemy import create_engine, text

url = f"postgresql://{connection_info['username']}:{connection_info['password']}@{connection_info['host']}:{connection_info['port']}/{connection_info['database']}"

conn = psycopg2.connect(url)
eng = create_engine(url)

cur = conn.cursor()
conn.autocommit = True
all_metrics = []
while not kill_flag.value:
with eng.connect() as conn:
cur = conn.execute(text(f"select * from pgmq_metrics_all() where queue_name = '{queue_name}'"))
metrics = cur.fetchall()
depth = metrics[0][1]
total_messages = metrics[0][-2]
all_metrics.append(
{
"queue_name": metrics[0][0],
"queue_length": depth,
"total_messages": total_messages,
"time": time.time(),
}
)
print(f"Number messages in queue: {depth}, max_msg_id: {total_messages}")
cur.execute(f"select * from pgmq_metrics('{queue_name}')")
metrics = cur.fetchall()[0]
depth = metrics[1]
total_messages = metrics[-2]
all_metrics.append(
{
"queue_name": metrics[0],
"queue_length": depth,
"total_messages": total_messages,
"time": time.time(),
}
)
print(f"Number messages in queue: {depth}, max_msg_id: {total_messages}")

read_start = time.perf_counter()
cur.execute("select 1")
cur.fetchall()
sel_duration = time.perf_counter() - read_start
print("Select 1 latency (ms): ", sel_duration * 1000)
time.sleep(5)
cur.close()
conn.close()
print("Writing queue length results")
df = pd.DataFrame(all_metrics)
df.to_sql(f"bench_results_{queue_name}_queue_depth", con=eng, if_exists="append", index=False)

return all_metrics


def bench_select_1(pool):
all_durations = []
for _ in range(1000):
start = time.time()
with pool.connection() as c:
c.execute("select 1")
duration = time.time() - start
print("lat (ms)", duration * 1000)
all_durations.append(duration)


if __name__ == "__main__":
# run the multiproc benchmark
# 1 process publishing messages
Expand All @@ -362,6 +406,7 @@ def bench_select_1(pool):
"--duration_seconds", type=int, required=True, help="how long the benchmark should run, in seconds"
)
parser.add_argument("--read_concurrency", type=int, default=1, help="number of concurrent consumers")
parser.add_argument("--write_concurrency", type=int, default=1, help="number of concurrent producers")
parser.add_argument("--bench_name", type=str, required=False, help="the name of the benchmark")

# partitioned queue configurations
Expand All @@ -370,7 +415,6 @@ def bench_select_1(pool):
parser.add_argument("--message_retention", type=int, default=1_000_000, help="number of messages per partition")

args = parser.parse_args()
print(args)

# default postgres connection
if args.postgres_connection is None:
Expand Down Expand Up @@ -400,10 +444,37 @@ def bench_select_1(pool):
if bench_name is None:
bench_name = int(time.time())

queue = PGMQueue(**connection_info) # type: ignore
url = f"postgresql://{connection_info['username']}:{connection_info['password']}@{connection_info['host']}:{connection_info['port']}/{connection_info['database']}"
eng = create_engine(url)

# setup results table
test_queue = f"bench_queue_{bench_name}"
with eng.connect() as con:
con.execute(
text(
f"""
CREATE TABLE "bench_results_{test_queue}"(
operation text NULL,
duration float8 NULL,
msg_id int8 NULL,
epoch float8 NULL
)
"""
)
)
con.commit()

with eng.connect() as con:
con.execute(
text(
f"""
select pg_stat_statements_reset()
"""
)
).fetchall()
con.commit()

queue = PGMQueue(**connection_info)
if partitioned_queue:
print(f"Creating partitioned queue: {test_queue}")
queue.create_partitioned_queue(
Expand All @@ -419,9 +490,11 @@ def bench_select_1(pool):
consume_csv = f"consume_{test_queue}.csv"

# run producing and consuming in parallel, separate processes

proc_produce = Process(target=produce, args=(test_queue, connection_info, duration_seconds))
proc_produce.start()
producer_procs = {}
for i in range(args.write_concurrency):
producer = f"producer_{i}"
producer_procs[producer] = Process(target=produce, args=(test_queue, connection_info, duration_seconds))
producer_procs[producer].start()

# start a proc to poll for queue depth
kill_flag = multiprocessing.Value("b", False)
Expand All @@ -443,15 +516,26 @@ def bench_select_1(pool):
kill_flag.value = True
queue_depth_proc.join()

for producer, proc in producer_procs.items():
print("Closing producer: ", producer)
proc.terminate()

# save pg_stat_statements
with eng.connect() as con:
pg_stat_df = pd.read_sql("select * from pg_stat_statements", con=con)
pg_stat_df.to_sql(f"{bench_name}_pg_stat", index=None, con=eng)

# once consuming finishes, summarize
results_file = f"results_{test_queue}.jpg"
# TODO: organize results in a directory or something, log all the params
filename = summarize(test_queue, queue, results_file=results_file, duration_seconds=duration_seconds)

params = {
"duration_seconds": duration_seconds,
"read_concurrency": args.read_concurrency,
"bench_name": bench_name,
"host": connection_info["host"],
"produce_time_seconds": duration_seconds,
"read_concurrency": args.read_concurrency,
"write_concurrency": args.write_concurrency,
}
if partitioned_queue:
params["partition_interval"] = partition_interval
Expand Down
Loading

0 comments on commit b8eebd5

Please sign in to comment.