Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Latency tests #218

Draft
wants to merge 7 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 73 additions & 0 deletions .github/workflows/CI.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
name: CI
on:
push:
branches:
- master
tags: '*'
pull_request:
concurrency:
# Skip intermediate builds: always.
# Cancel intermediate builds: only if it is a pull request build.
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: ${{ startsWith(github.ref, 'refs/pull/') }}
jobs:
test:
name: Julia ${{ matrix.version }} - ${{ matrix.os }} - ${{ matrix.arch }} - ${{ github.event_name }}
runs-on: ${{ matrix.os }}
strategy:
fail-fast: false
matrix:
version:
- '1.3'
- '1.6'
- '1.7'
os:
- ubuntu-latest
- macOS-latest
- windows-latest
arch:
- x64
steps:
- uses: actions/checkout@v2
- uses: julia-actions/setup-julia@v1
with:
version: ${{ matrix.version }}
arch: ${{ matrix.arch }}
- uses: actions/cache@v1
env:
cache-name: cache-artifacts
with:
path: ~/.julia/artifacts
key: ${{ runner.os }}-test-${{ env.cache-name }}-${{ hashFiles('**/Project.toml') }}
restore-keys: |
${{ runner.os }}-test-${{ env.cache-name }}-
${{ runner.os }}-test-
${{ runner.os }}-
- uses: julia-actions/julia-buildpkg@v1
- if: matrix.os == 'ubuntu-latest'
run: |
sudo apt-get update
sudo apt-get install --yes --no-install-recommends autoconf automake build-essential ca-certificates git libkrb5-dev libsodium-dev libtool pkg-config
git clone https://github.com/zeromq/libzmq
cd libzmq
./autogen.sh
./configure --prefix=/usr/local --with-libsodium --with-libgssapi_krb5
make
cd perf
for size in 1 2 4 8 16 32 64 128 256 512 1024; do
./local_lat tcp://*:5000 $size 1000 &
./remote_lat tcp://127.0.0.1:5000 $size 1000
sleep 1
done
for size in 1 2 4 8 16 32 64 128 256 512 1024; do
./inproc_lat $size 1000
sleep 1
done
- uses: julia-actions/julia-runtest@v1
- uses: julia-actions/julia-runtest@v1
env:
JULIA_NUM_THREADS: 4
- uses: julia-actions/julia-processcoverage@v1
- uses: codecov/codecov-action@v2
with:
files: lcov.info
4 changes: 3 additions & 1 deletion Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ ZeroMQ_jll = "4"
julia = "1.3"

[extras]
Dates = "ade2ca70-3891-5945-98fb-dc099432e06a"
Statistics = "10745b16-79ce-11e8-11f9-7d13ad32a3b2"
Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40"

[targets]
test = ["Test"]
test = ["Dates", "Test", "Statistics"]
127 changes: 127 additions & 0 deletions test/latency_tests.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
using Dates
using Statistics

@show Threads.nthreads()

@testset "Latency tests" begin
function send_messages(ctx::ZMQ.Context, msg, N::Int, Δt::TimePeriod, ready_to_start::Channel{Nothing}, start_condition::Threads.Condition)
socket = Socket(ctx, REP)
try
bind(socket, "tcp://*:6666")
timestamps = Nanosecond[]
@info "Sender ready to start on thread $(Threads.threadid())"
put!(ready_to_start, nothing)
lock(start_condition) do
wait(start_condition)
end
@info "Sender starting"
for _ in 1:N
sleep(Δt)
ZMQ.recv(socket)
ZMQ.send(socket, msg)
push!(timestamps, Nanosecond(time_ns()))
end
return timestamps
finally
close(socket)
end
end

function receive_messages(ctx::ZMQ.Context, N::Int, ready_to_start::Channel{Nothing}, start_condition::Threads.Condition)
socket = Socket(ctx, REQ)
msg = [0x1]
try
connect(socket, "tcp://localhost:6666")
timestamps = Nanosecond[]
@info "Receiver ready to start on thread $(Threads.threadid())"
put!(ready_to_start, nothing)
lock(start_condition) do
wait(start_condition)
end
@info "Receiver starting"
for _ in 1:N
ZMQ.send(socket, msg)
ZMQ.recv(socket)
push!(timestamps, Nanosecond(time_ns()))
end
return timestamps
finally
close(socket)
end
end

function time_parallel_send_receive(ctx::ZMQ.Context, msg, N::Int, Δt::TimePeriod)
ready_to_start = Channel{Nothing}(2)
start_condition = Threads.Condition()

sender = Threads.@spawn send_messages(ctx, msg, N, Δt, ready_to_start, start_condition)
receiver = Threads.@spawn receive_messages(ctx, N, ready_to_start, start_condition)
@info "Awaiting ready_to_start"
for _ in 1:2
take!(ready_to_start)
end
@info "Starting"
lock(start_condition) do
notify(start_condition)
end
@info "Awaiting completion"
send_timestamps = fetch(sender)
recv_timestamps = fetch(receiver)

return send_timestamps, recv_timestamps
end

function test_timestamps(send_timestamps, recv_timestamps, N, expected_max_latency, expected_mean, expected_median, expected_tol, max_tol)
@assert length(send_timestamps) == N
@assert length(recv_timestamps) == N

latencies = recv_timestamps .- send_timestamps
Δsend_timestamps = [Dates.toms(send_timestamps[i] - send_timestamps[i-1]) for i in 2:N]
Δrecv_timestamps = [Dates.toms(recv_timestamps[i] - recv_timestamps[i-1]) for i in 2:N]

@show maximum(latencies)
@test maximum(latencies) <= expected_max_latency

@show mean(Δsend_timestamps)
@test mean(Δsend_timestamps) ≈ expected_mean atol=expected_tol
@show median(Δsend_timestamps)
@test median(Δsend_timestamps) ≈ expected_median atol=expected_tol
@show std(Δsend_timestamps)
@test std(Δsend_timestamps) < expected_tol
@show maximum(Δsend_timestamps)
@test maximum(Δsend_timestamps) < expected_mean+max_tol

@show mean(Δrecv_timestamps)
@test mean(Δrecv_timestamps) ≈ expected_mean atol=expected_tol
@show median(Δrecv_timestamps)
@test median(Δrecv_timestamps) ≈ expected_median atol=expected_tol
@show std(Δrecv_timestamps)
@test std(Δrecv_timestamps) < expected_tol
@show maximum(Δrecv_timestamps)
@test maximum(Δrecv_timestamps) < expected_mean+max_tol
end

for period_ms in [2^n for n in 0:4]
@testset "REQ/REP with period $period_ms ms" begin
N = 1000 # Number of messages
Δt = Millisecond(period_ms)
msg = [0x1]

expected_max_latency = Microsecond(1000)
expected_mean = Dates.toms(Δt)
expected_median = expected_mean
expected_tol = Dates.toms(Millisecond(5))
expected_max_tol = Dates.toms(Millisecond(10))

ctx = ZMQ.context()
@show ctx.io_threads

try
push_timestamps, pull_timestamps = time_parallel_send_receive(ctx, msg, N, Δt)
test_timestamps(push_timestamps, pull_timestamps, N, expected_max_latency, expected_mean, expected_median, expected_tol, expected_max_tol)
finally
close(ctx)
end
end
end
end
2 changes: 2 additions & 0 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -147,3 +147,5 @@ end
end
@test !isopen(leaked_ctx)
end

include("latency_tests.jl")