Skip to content

Commit

Permalink
Merge branch 'main' into separate_indexes_primary_keys
Browse files Browse the repository at this point in the history
  • Loading branch information
vrmiguel authored Aug 22, 2023
2 parents a2de974 + 3bde389 commit 9fdd157
Showing 1 changed file with 44 additions and 13 deletions.
57 changes: 44 additions & 13 deletions tembo-pgmq-python/benches/bench.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,6 @@ def produce(
df = pd.DataFrame(all_results)
con = create_engine(f"postgresql://{queue.username}:{queue.password}@{queue.host}:{queue.port}/{queue.database}")
df.to_sql(f"bench_results_{queue_name}", con=con, if_exists="append", index=False)
# df.to_csv(f"produce_{queue_name}.csv", index=False)


def consume(queue_name: str, connection_info: dict):
Expand Down Expand Up @@ -220,7 +219,7 @@ def summarize(queue_name: str, queue: PGMQueue, results_file: str, duration_seco
"""summarizes results from two csvs into pdf"""

con = create_engine(f"postgresql://{queue.username}:{queue.password}@{queue.host}:{queue.port}/{queue.database}")
df = pd.read_sql(f"select * from bench_results_{queue_name}", con=con)
df = pd.read_sql(f'''select * from "bench_results_{queue_name}"''', con=con)

# iteration
trial = queue_name
Expand Down Expand Up @@ -278,39 +277,70 @@ def generate_plot(csv_name: str, bench_name: str, duration: int, tps: int, param
from multiprocessing import Process

parser = argparse.ArgumentParser(description="PGMQ Benchmarking")

parser.add_argument("--postgres_connection", type=str, required=False, help="postgres connection string")

parser.add_argument(
"--duration_seconds", type=int, required=True, help="how long the benchmark should run, in seconds"
)
parser.add_argument("--tps", type=int, default=400, help="number of messages to produce per second")
parser.add_argument(
"--agg_window", type=int, default=10_000, help="number of messages to aggregate for rolling average"
)
parser.add_argument("--partition_interval", type=int, default=10_000, help="number of messages per partition")
parser.add_argument("--message_retention", type=int, default=1_000_000, help="number of messages per partition")
parser.add_argument("--read_concurrency", type=int, default=1, help="number of concurrent consumers")
parser.add_argument("--bench_name", type=str, required=False, help="the name of the benchmark")

# partitioned queue configurations
parser.add_argument("--partitioned_queue", type=bool, default=False, help="whether to use a partitioned queue")
parser.add_argument("--partition_interval", type=int, default=10_000, help="number of messages per partition")
parser.add_argument("--message_retention", type=int, default=1_000_000, help="number of messages per partition")

args = parser.parse_args()
print(args)

# default postgres connection
if args.postgres_connection is None:
import getpass

user = getpass.getuser()
connection_info = dict(host="localhost", port=28815, username=user, password="postgres", database="pgmq")
else:
from urllib.parse import urlparse

result = urlparse(args.postgres_connection)
connection_info = {
"user": result.username,
"password": result.password,
"host": result.hostname,
"port": int(result.port),
"database": result.path.lstrip("/"),
}

duration_seconds = args.duration_seconds
tps = args.tps
agg_window = args.agg_window
bench_name = args.bench_name

partitioned_queue = args.partitioned_queue
partition_interval = args.partition_interval
retention_interval = args.message_retention
bench_name = args.bench_name

if bench_name is None:
bench_name = random.randint(0, 1000)

test_queue = f"bench_queue_{bench_name}"
connection_info = dict(host="localhost", port=28815, username="postgres", password="postgres", database="postgres")
queue = PGMQueue(**connection_info) # type: ignore
print(f"Creating queue: {test_queue}")

queue.create_partitioned_queue(
test_queue, partition_interval=partition_interval, retention_interval=retention_interval
)
test_queue = f"bench_queue_{bench_name}"
if partitioned_queue:
print(f"Creating partitioned queue: {test_queue}")
queue.create_partitioned_queue(
test_queue, partition_interval=partition_interval, retention_interval=retention_interval
)
else:
print(f"Creating non-partitioned queue: {test_queue}")
queue.create_queue(
test_queue,
)

produce_csv = f"produce_{test_queue}.csv"
consume_csv = f"consume_{test_queue}.csv"
Expand Down Expand Up @@ -339,11 +369,12 @@ def generate_plot(csv_name: str, bench_name: str, duration: int, tps: int, param
params = {
"duration_seconds": duration_seconds,
"tps": tps,
"partition_interval": partition_interval,
"retention_interval": retention_interval,
"read_concurrency": args.read_concurrency,
"bench_name": bench_name,
"agg_window": agg_window,
}
if partitioned_queue:
params["partition_interval"] = partition_interval
params["retention_interval"] = retention_interval

generate_plot(filename, bench_name, duration_seconds, tps, window=agg_window, params=params)

0 comments on commit 9fdd157

Please sign in to comment.