Skip to content

Commit

Permalink
✨ Add ability to borrow Session by ID from SocketAcceptor / SocketIni…
Browse files Browse the repository at this point in the history
…tiator
  • Loading branch information
arthurlm committed Oct 6, 2024
1 parent 9980466 commit da9095a
Show file tree
Hide file tree
Showing 8 changed files with 138 additions and 10 deletions.
2 changes: 2 additions & 0 deletions quickfix-ffi/quickfix-bind/include/quickfix_bind.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ int8_t FixSocketAcceptor_poll(FixSocketAcceptor_t *obj);
int8_t FixSocketAcceptor_stop(FixSocketAcceptor_t *obj);
int8_t FixSocketAcceptor_isLoggedOn(const FixSocketAcceptor_t *obj);
int8_t FixSocketAcceptor_isStopped(const FixSocketAcceptor_t *obj);
FixSession_t *FixSocketAcceptor_getSession(const FixSocketAcceptor_t *obj, const FixSessionID_t *sessionId);
void FixSocketAcceptor_delete(const FixSocketAcceptor_t *obj);

FixSocketInitiator_t *FixSocketInitiator_new(FixApplication_t *application, FixMessageStoreFactory_t *storeFactory,
Expand All @@ -154,6 +155,7 @@ int8_t FixSocketInitiator_poll(FixSocketInitiator_t *obj);
int8_t FixSocketInitiator_stop(FixSocketInitiator_t *obj);
int8_t FixSocketInitiator_isLoggedOn(const FixSocketInitiator_t *obj);
int8_t FixSocketInitiator_isStopped(const FixSocketInitiator_t *obj);
FixSession_t *FixSocketInitiator_getSession(const FixSocketInitiator_t *obj, const FixSessionID_t *sessionId);
void FixSocketInitiator_delete(const FixSocketInitiator_t *obj);

FixSessionID_t *FixSessionID_new(const char *beginString, const char *senderCompID, const char *targetCompID,
Expand Down
12 changes: 12 additions & 0 deletions quickfix-ffi/quickfix-bind/src/quickfix_bind.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,12 @@ int8_t FixSocketAcceptor_isStopped(const SocketAcceptor *obj) {
CATCH_OR_RETURN_ERRNO({ return obj->isStopped(); });
}

FixSession_t *FixSocketAcceptor_getSession(const FixSocketAcceptor_t *obj, const FixSessionID_t *sessionId) {
RETURN_VAL_IF_NULL(obj, NULL);
RETURN_VAL_IF_NULL(sessionId, NULL);
CATCH_OR_RETURN_NULL({ return obj->getSession(*sessionId); });
}

void FixSocketAcceptor_delete(const SocketAcceptor *obj) {
RETURN_IF_NULL(obj);
delete obj;
Expand Down Expand Up @@ -642,6 +648,12 @@ int8_t FixSocketInitiator_isStopped(const SocketInitiator *obj) {
CATCH_OR_RETURN_ERRNO({ return obj->isStopped(); });
}

FixSession_t *FixSocketInitiator_getSession(const FixSocketInitiator_t *obj, const FixSessionID_t *sessionId) {
RETURN_VAL_IF_NULL(obj, NULL);
RETURN_VAL_IF_NULL(sessionId, NULL);
CATCH_OR_RETURN_NULL({ return obj->getSession(*sessionId); });
}

void FixSocketInitiator_delete(const SocketInitiator *obj) {
RETURN_IF_NULL(obj);
delete obj;
Expand Down
10 changes: 10 additions & 0 deletions quickfix-ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,11 @@ extern "C" {
#[must_use]
pub fn FixSocketAcceptor_isStopped(obj: FixSocketAcceptor_t) -> i8;

pub fn FixSocketAcceptor_getSession(
obj: FixSocketAcceptor_t,
sessionId: FixSessionID_t,
) -> Option<FixSession_t>;

pub fn FixSocketAcceptor_delete(obj: FixSocketAcceptor_t);

// Socket initiator
Expand Down Expand Up @@ -296,6 +301,11 @@ extern "C" {
#[must_use]
pub fn FixSocketInitiator_isStopped(obj: FixSocketInitiator_t) -> i8;

pub fn FixSocketInitiator_getSession(
obj: FixSocketInitiator_t,
sessionId: FixSessionID_t,
) -> Option<FixSession_t>;

pub fn FixSocketInitiator_delete(obj: FixSocketInitiator_t);

// Session ID
Expand Down
12 changes: 12 additions & 0 deletions quickfix/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,18 @@ pub trait ConnectionHandler {
fn is_stopped(&self) -> Result<bool, QuickFixError>;
}

/// Define a container of session
///
/// May be a socket acceptor / initiator.
pub trait SessionContainer {
/// Borrow mutable session to the container.
///
/// Session is lookup using its ID.
fn with_session_mut<F, T>(&self, session_id: SessionId, f: F) -> Result<T, QuickFixError>
where
F: FnOnce(&mut Session) -> T;
}

/// Convert object to FIX value.
///
/// This trait is like `std::fmt::Display` but it has a different meaning.
Expand Down
3 changes: 2 additions & 1 deletion quickfix/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ impl Session {
/// Function is unsafe because there is no way to bind FIX session lifetime
/// to rust session lifetime.
///
/// Maybe Oren Miller as a better idea / solution to solve this issue.
/// Use `SessionContainer::with_session_mut` instead. It will give you a safe scope
/// where session has been borrowed to the acceptor / initiator.
pub unsafe fn lookup(session_id: &SessionId) -> Result<Self, QuickFixError> {
match unsafe { FixSession_lookup(session_id.0) } {
Some(session) => Ok(Self(session)),
Expand Down
30 changes: 26 additions & 4 deletions quickfix/src/socket_acceptor.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
use std::marker::PhantomData;

use quickfix_ffi::{
FixSocketAcceptor_block, FixSocketAcceptor_delete, FixSocketAcceptor_isLoggedOn,
FixSocketAcceptor_isStopped, FixSocketAcceptor_new, FixSocketAcceptor_poll,
FixSocketAcceptor_start, FixSocketAcceptor_stop, FixSocketAcceptor_t,
FixSocketAcceptor_block, FixSocketAcceptor_delete, FixSocketAcceptor_getSession,
FixSocketAcceptor_isLoggedOn, FixSocketAcceptor_isStopped, FixSocketAcceptor_new,
FixSocketAcceptor_poll, FixSocketAcceptor_start, FixSocketAcceptor_stop, FixSocketAcceptor_t,
};

use crate::{
utils::{ffi_code_to_bool, ffi_code_to_result},
Application, ApplicationCallback, ConnectionHandler, FfiMessageStoreFactory, LogCallback,
LogFactory, QuickFixError, SessionSettings,
LogFactory, QuickFixError, Session, SessionContainer, SessionSettings,
};

/// Socket implementation of incoming connections handler.
Expand Down Expand Up @@ -89,6 +89,28 @@ where
}
}

impl<A, L, S> SessionContainer for SocketAcceptor<'_, A, L, S>
where
A: ApplicationCallback,
S: FfiMessageStoreFactory,
L: LogCallback,
{
fn with_session_mut<F, T>(&self, session_id: crate::SessionId, f: F) -> Result<T, QuickFixError>
where
F: FnOnce(&mut Session) -> T,
{
let mut session = unsafe {
FixSocketAcceptor_getSession(self.inner, session_id.0)
.map(Session)
.ok_or_else(|| {
QuickFixError::SessionNotFound(format!("No session found: {session_id:?}"))
})?
};

Ok(f(&mut session))
}
}

impl<A, L, S> Drop for SocketAcceptor<'_, A, L, S>
where
A: ApplicationCallback,
Expand Down
31 changes: 27 additions & 4 deletions quickfix/src/socket_initiator.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
use std::marker::PhantomData;

use quickfix_ffi::{
FixSocketInitiator_block, FixSocketInitiator_delete, FixSocketInitiator_isLoggedOn,
FixSocketInitiator_isStopped, FixSocketInitiator_new, FixSocketInitiator_poll,
FixSocketInitiator_start, FixSocketInitiator_stop, FixSocketInitiator_t,
FixSocketInitiator_block, FixSocketInitiator_delete, FixSocketInitiator_getSession,
FixSocketInitiator_isLoggedOn, FixSocketInitiator_isStopped, FixSocketInitiator_new,
FixSocketInitiator_poll, FixSocketInitiator_start, FixSocketInitiator_stop,
FixSocketInitiator_t,
};

use crate::{
utils::{ffi_code_to_bool, ffi_code_to_result},
Application, ApplicationCallback, ConnectionHandler, FfiMessageStoreFactory, LogCallback,
LogFactory, QuickFixError, SessionSettings,
LogFactory, QuickFixError, Session, SessionContainer, SessionSettings,
};

/// Socket implementation of establishing connections handler.
Expand Down Expand Up @@ -89,6 +90,28 @@ where
}
}

impl<A, L, S> SessionContainer for SocketInitiator<'_, A, L, S>
where
A: ApplicationCallback,
S: FfiMessageStoreFactory,
L: LogCallback,
{
fn with_session_mut<F, T>(&self, session_id: crate::SessionId, f: F) -> Result<T, QuickFixError>
where
F: FnOnce(&mut Session) -> T,
{
let mut session = unsafe {
FixSocketInitiator_getSession(self.inner, session_id.0)
.map(Session)
.ok_or_else(|| {
QuickFixError::SessionNotFound(format!("No session found: {session_id:?}"))
})?
};

Ok(f(&mut session))
}
}

impl<A, L, S> Drop for SocketInitiator<'_, A, L, S>
where
A: ApplicationCallback,
Expand Down
48 changes: 47 additions & 1 deletion quickfix/tests/test_send_receive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ fn test_full_fix_application() -> Result<(), QuickFixError> {

assert_eq!(receiver.admin_msg_count(), MsgCounter { recv: 1, sent: 1 });

// Send a message from one app to the other
// Send a message from one app to the other using `send_to_target` API.
assert_eq!(sender.user_msg_count(), MsgCounter::default());
assert_eq!(receiver.user_msg_count(), MsgCounter::default());

Expand All @@ -96,6 +96,52 @@ fn test_full_fix_application() -> Result<(), QuickFixError> {
assert_eq!(sender.user_msg_count(), MsgCounter { sent: 1, recv: 1 });
assert_eq!(receiver.user_msg_count(), MsgCounter { sent: 1, recv: 1 });

// Send a message from one app to the other using `SessionContainer` API.
// - Check first session are not mixable from sender / receiver.
assert_eq!(
socket_sender.with_session_mut(ServerType::Receiver.session_id(), |_session| {
unreachable!();
}),
Err(QuickFixError::SessionNotFound(
"No session found: SessionId(\"FIX.4.4:ME->THEIR\")".to_string()
))
);

assert_eq!(
socket_receiver.with_session_mut(ServerType::Sender.session_id(), |_session| {
unreachable!();
}),
Err(QuickFixError::SessionNotFound(
"No session found: SessionId(\"FIX.4.4:THEIR->ME\")".to_string()
))
);

// Then play with API 😎
socket_sender.with_session_mut(ServerType::Sender.session_id(), |session| {
let news = build_news("Hello", &[])?;
session.send(news)?;

Ok::<_, QuickFixError>(())
})??;
thread::sleep(Duration::from_millis(50));

assert_eq!(sender.user_msg_count(), MsgCounter { sent: 2, recv: 1 });
assert_eq!(receiver.user_msg_count(), MsgCounter { sent: 1, recv: 2 });

socket_receiver.with_session_mut(ServerType::Receiver.session_id(), |session| {
let news = build_news(
"Anyone here",
&["This news have", "some content", "that is very interesting"],
)?;
session.send(news)?;

Ok::<_, QuickFixError>(())
})??;
thread::sleep(Duration::from_millis(50));

assert_eq!(sender.user_msg_count(), MsgCounter { sent: 2, recv: 2 });
assert_eq!(receiver.user_msg_count(), MsgCounter { sent: 2, recv: 2 });

// Stop everything
socket_receiver.stop()?;
socket_sender.stop()?;
Expand Down

0 comments on commit da9095a

Please sign in to comment.