Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix support for 1.11 #244

Merged
merged 3 commits into from
Jul 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
name: CI

env:
JULIA_NUM_THREADS: 2
on:
Expand All @@ -9,6 +10,7 @@ on:
branches:
- master
tags: '*'

jobs:
test:
name: Julia ${{ matrix.version }} - ${{ matrix.os }} - ${{ matrix.arch }} - ${{ github.event_name }}
Expand All @@ -18,6 +20,7 @@ jobs:
matrix:
version:
- '1' # Current stable version
- 'nightly'
os:
- ubuntu-latest
- windows-latest
Expand All @@ -36,6 +39,7 @@ jobs:
- os: ubuntu-latest
arch: x64
version: '1.3'

steps:
- uses: actions/checkout@v4
- uses: julia-actions/setup-julia@v2
Expand All @@ -59,6 +63,7 @@ jobs:
with:
file: lcov.info
token: ${{ secrets.CODECOV_TOKEN }}

docs:
name: Documentation
runs-on: ubuntu-latest
Expand Down
6 changes: 6 additions & 0 deletions docs/src/_changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ CurrentModule = ZMQ
This documents notable changes in ZMQ.jl. The format is based on [Keep a
Changelog](https://keepachangelog.com).

## Unreleased

### Added
- Support for creating [`Message`](@ref)'s from the new `Memory` type in Julia
1.11 ([#244]).

## [v1.2.6] - 2024-06-13

### Added
Expand Down
7 changes: 5 additions & 2 deletions src/message.jl
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,15 @@ mutable struct Message <: AbstractArray{UInt8,1}
Message(p, pointer(p.string)+p.offset, sizeof(p))

@doc """
Message(a::Array)
Message(a::T) where T <: DenseVector

Create a message with an array as a buffer (for send). Note: the same
ownership semantics as for [`Message(m::String)`](@ref) apply.

Usually `a` will be a 1D `Array`/`Vector`, but on 1.11+ it can also be a
`Memory`.
"""
Message(a::Array) = Message(a, pointer(a), sizeof(a))
Message(a::T) where T <: DenseVector = Message(a, pointer(a), sizeof(a))

@doc """
Message(io::IOBuffer)
Expand Down
281 changes: 169 additions & 112 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ using ZMQ, Test
@info("Testing with ZMQ version $(ZMQ.version)")

@testset "ZMQ contexts" begin
ctx=Context()
@test ctx isa Context
@test (ctx.io_threads = 2) == 2
@test ctx.io_threads == 2
ZMQ.close(ctx)

#try to create socket with expired context
@test_throws StateError Socket(ctx, PUB)
ctx=Context()
@test ctx isa Context
@test (ctx.io_threads = 2) == 2
@test ctx.io_threads == 2
ZMQ.close(ctx)

#try to create socket with expired context
@test_throws StateError Socket(ctx, PUB)
end

# This test is in its own function to keep it simple and try to trick Julia into
Expand All @@ -31,135 +31,192 @@ end
@testset "ZMQ sockets" begin
context_gc_test()

s=Socket(PUB)
@test s isa Socket
ZMQ.close(s)
s=Socket(PUB)
@test s isa Socket
ZMQ.close(s)

s1=Socket(REP)
s1.sndhwm = 1000
s1.linger = 1
s1.routing_id = "abcd"
s1=Socket(REP)
s1.sndhwm = 1000
s1.linger = 1
s1.routing_id = "abcd"

@test s1.routing_id == "abcd"
@test s1.sndhwm === 1000
@test s1.linger === 1
@test s1.rcvmore === false
@test s1.routing_id == "abcd"
@test s1.sndhwm === 1000
@test s1.linger === 1
@test s1.rcvmore === false

s2=Socket(REQ)
@test s1.type == REP
@test s2.type == REQ
s2=Socket(REQ)
@test s1.type == REP
@test s2.type == REQ

ZMQ.bind(s1, "tcp://*:5555")
ZMQ.connect(s2, "tcp://localhost:5555")
ZMQ.bind(s1, "tcp://*:5555")
ZMQ.connect(s2, "tcp://localhost:5555")

msg = Message("test request")
msg = Message("test request")

# Smoke tests
@test Base.elsize(msg) == 1
@test Base.strides(msg) == (1,)

# Test similar() and copy() fixes in https://github.com/JuliaInterop/ZMQ.jl/pull/165
@test similar(msg, UInt8, 12) isa Vector{UInt8}
@test copy(msg) == codeunits("test request")

ZMQ.send(s2, Message("test request"))
@test unsafe_string(ZMQ.recv(s1)) == "test request"
ZMQ.send(s1, Message("test response"))
@test unsafe_string(ZMQ.recv(s2)) == "test response"

ZMQ.send(s2, "test request 2")
@test ZMQ.recv(s1, String) == "test request 2"
ZMQ.send(s1, Vector(codeunits("test response 2")))
@test String(ZMQ.recv(s2, Vector{UInt8})) == "test response 2"
ZMQ.send(s2, 3.14159)
@test ZMQ.recv(s1, Float64) === 3.14159
ZMQ.send(s1, [314159, 12345])
@test ZMQ.recv(s2, Vector{Int}) == [314159, 12345]

# Test task-blocking behavior
c = Base.Condition()
global msg_sent = false
@async begin
global msg_sent
sleep(0.5)
msg_sent = true
ZMQ.send(s2, Message("test request"))
@test (unsafe_string(ZMQ.recv(s2)) == "test response")
notify(c)
end

# This will hang forver if ZMQ blocks the entire process since
# we'll never switch to the other task
@test unsafe_string(ZMQ.recv(s1)) == "test request"
@test msg_sent == true
ZMQ.send(s1, Message("test response"))
wait(c)

# Test _Message task-blocking behavior, similar to above
c = Base.Condition()
msg_sent = false
@async begin
global msg_sent
sleep(0.5)
msg_sent = true
ZMQ.send(s2, "another test request")
@test ZMQ.recv(s2, String) == "another test response"
notify(c)
end
@test ZMQ.recv(s1, String) == "another test request"
@test msg_sent == true
ZMQ.send(s1, "another test response")
wait(c)

ZMQ.send(s2, Message("another test request"))
msg = ZMQ.recv(s1)
o=convert(IOStream, msg)
seek(o, 0)
@test String(take!(o)) == "another test request"
ZMQ.send(s1) do io
print(io, "buffer ")
print(io, "this")
end
@test String(take!(ZMQ.recv(s2, IOBuffer))) == "buffer this"

@testset "Message AbstractVector interface" begin
m = Message("1")
@test m[1]==0x31
@test (m[1]=0x32) === 0x32
@test unsafe_string(m)=="2"
finalize(m)
end

# ZMQ.close(s1); ZMQ.close(s2) # should happen when context is closed
ZMQ.close(ZMQ._context) # immediately close global context rather than waiting for exit
# Test similar() and copy() fixes in https://github.com/JuliaInterop/ZMQ.jl/pull/165
@test similar(msg, UInt8, 12) isa Vector{UInt8}
@test copy(msg) == codeunits("test request")

ZMQ.send(s2, Message("test request"))
@test unsafe_string(ZMQ.recv(s1)) == "test request"
ZMQ.send(s1, Message("test response"))
@test unsafe_string(ZMQ.recv(s2)) == "test response"

ZMQ.send(s2, "test request 2")
@test ZMQ.recv(s1, String) == "test request 2"
ZMQ.send(s1, Vector(codeunits("test response 2")))
@test String(ZMQ.recv(s2, Vector{UInt8})) == "test response 2"
ZMQ.send(s2, 3.14159)
@test ZMQ.recv(s1, Float64) === 3.14159
ZMQ.send(s1, [314159, 12345])
@test ZMQ.recv(s2, Vector{Int}) == [314159, 12345]

# Test task-blocking behavior
c = Base.Condition()
global msg_sent = false
@async begin
global msg_sent
sleep(0.5)
msg_sent = true
ZMQ.send(s2, Message("test request"))
@test (unsafe_string(ZMQ.recv(s2)) == "test response")
notify(c)
end

# This will hang forver if ZMQ blocks the entire process since
# we'll never switch to the other task
@test unsafe_string(ZMQ.recv(s1)) == "test request"
@test msg_sent == true
ZMQ.send(s1, Message("test response"))
wait(c)

# Test _Message task-blocking behavior, similar to above
c = Base.Condition()
msg_sent = false
@async begin
global msg_sent
sleep(0.5)
msg_sent = true
ZMQ.send(s2, "another test request")
@test ZMQ.recv(s2, String) == "another test response"
notify(c)
end
@test ZMQ.recv(s1, String) == "another test request"
@test msg_sent == true
ZMQ.send(s1, "another test response")
wait(c)

ZMQ.send(s2, Message("another test request"))
msg = ZMQ.recv(s1)
o=convert(IOStream, msg)
seek(o, 0)
@test String(take!(o)) == "another test request"
ZMQ.send(s1) do io
print(io, "buffer ")
print(io, "this")
end
@test String(take!(ZMQ.recv(s2, IOBuffer))) == "buffer this"

@testset "Message AbstractVector interface" begin
m = Message("1")
@test m[1]==0x31
@test (m[1]=0x32) === 0x32
@test unsafe_string(m)=="2"
finalize(m)
end

# ZMQ.close(s1); ZMQ.close(s2) # should happen when context is closed
ZMQ.close(ZMQ._context) # immediately close global context rather than waiting for exit
@test !isopen(s1)
@test !isopen(s2)
end

# Test all the send constructors
@testset "Message" begin
s1 = Socket(PUB)
s2 = Socket(SUB)
ZMQ.subscribe(s2, "")
ZMQ.bind(s1, "tcp://*:5555")
ZMQ.connect(s2, "tcp://localhost:5555")

# Sleep for a bit to prevent the 'slow joiner' problem
sleep(0.5)

# Message(::Int) - construct from buffer size
data = rand(UInt8, 10)
m1 = Message(length(data))
# Note that we don't use copy!() for compatibility with Julia 1.3
for i in eachindex(data)
m1[i] = data[i]
end
ZMQ.send(s1, m1)
@test ZMQ.recv(s2) == data

# Message(::Any, ::Ptr, ::Int) - construct from pointer to existing data
buffer = rand(UInt8, 10)
m2 = Message(buffer, pointer(buffer), length(buffer))
ZMQ.send(s1, m2)
@test ZMQ.recv(s2) == buffer

# Message(::String)
str_msg = "foobar"
m3 = Message(str_msg)
ZMQ.send(s1, m3)
@test String(ZMQ.recv(s2)) == str_msg

# Message(::SubString)
m4 = Message(str_msg[1:3])
ZMQ.send(s1, m4)
@test String(ZMQ.recv(s2)) == str_msg[1:3]

# Message(::DenseVector) - construct from array
buffer2 = rand(UInt8, 10)
m5 = Message(buffer2)
ZMQ.send(s1, m5)
@test ZMQ.recv(s2) == buffer2

# Message(::IOBuffer)
buffer3 = rand(UInt8, 10)
iobuf = IOBuffer(buffer3)
m6 = Message(iobuf)
ZMQ.send(s1, m6)
@test ZMQ.recv(s2) == buffer3

close(s1)
close(s2)
end

@testset "ZMQ resource management" begin
local leaked_req_socket, leaked_rep_socket
ZMQ.Socket(ZMQ.REQ) do req_socket
leaked_req_socket = req_socket
leaked_req_socket = req_socket

ZMQ.Socket(ZMQ.REP) do rep_socket
leaked_rep_socket = rep_socket
ZMQ.Socket(ZMQ.REP) do rep_socket
leaked_rep_socket = rep_socket

ZMQ.bind(rep_socket, "inproc://tester")
ZMQ.connect(req_socket, "inproc://tester")
ZMQ.bind(rep_socket, "inproc://tester")
ZMQ.connect(req_socket, "inproc://tester")

ZMQ.send(req_socket, "Mr. Watson, come here, I want to see you.")
@test unsafe_string(ZMQ.recv(rep_socket)) == "Mr. Watson, come here, I want to see you."
ZMQ.send(rep_socket, "Coming, Mr. Bell.")
@test unsafe_string(ZMQ.recv(req_socket)) == "Coming, Mr. Bell."
end
ZMQ.send(req_socket, "Mr. Watson, come here, I want to see you.")
@test unsafe_string(ZMQ.recv(rep_socket)) == "Mr. Watson, come here, I want to see you."
ZMQ.send(rep_socket, "Coming, Mr. Bell.")
@test unsafe_string(ZMQ.recv(req_socket)) == "Coming, Mr. Bell."
end

@test !ZMQ.isopen(leaked_rep_socket)
@test !ZMQ.isopen(leaked_rep_socket)
end
@test !ZMQ.isopen(leaked_req_socket)

local leaked_ctx
ZMQ.Context() do ctx
leaked_ctx = ctx
leaked_ctx = ctx

@test isopen(ctx)
@test isopen(ctx)
end
@test !isopen(leaked_ctx)
end
Expand Down
Loading