Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async and receiver transmitter #19

Merged
merged 29 commits into from
Jan 19, 2024
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
1963de8
Switch tcp acceptor to async
Robert108 Nov 16, 2023
15fcb38
Spin udp threads on start
Robert108 Nov 16, 2023
b7a12fa
Remove some thread prints
Robert108 Nov 16, 2023
06b4f7f
Clean up TestObject destructor
Robert108 Nov 16, 2023
e92b40a
Fix busy wait in udp threads
Robert108 Nov 16, 2023
1559746
FIx traj size
Robert108 Nov 17, 2023
cab6672
Update HEAD timeout print
Robert108 Nov 17, 2023
87b2f62
Add shutdown function
Robert108 Nov 17, 2023
dee1ddc
Only send MONR when a HEAB has been received (ie the endpoint is reso…
Robert108 Nov 20, 2023
d46b07c
Change sigslot submodule to use https
Robert108 Nov 23, 2023
bfbc41c
Update iso22133 submodule ref
Robert108 Nov 23, 2023
11d67db
Fix faulty url
Robert108 Nov 23, 2023
a66833e
Call shutdown in destructor
Robert108 Dec 1, 2023
a9f0017
Reset context on disconnect
Robert108 Dec 1, 2023
2842fbb
fix, removed unused header
victorjarlow Nov 20, 2023
7227dcd
rebase and fix merge between two prs
Jan 11, 2024
82e5a89
Rebased and fixed tests
Jan 10, 2024
2368b21
Update src/iso22133object.cpp
Jan 15, 2024
d02ffea
Update src/iso22133object.cpp
Jan 15, 2024
9b69bee
Update src/iso22133state.cpp
Jan 15, 2024
d27392a
Update tests/isoObject.cpp
Jan 15, 2024
9014a6b
Update tests/isoObject.cpp
Jan 15, 2024
1fe62c2
Update tests/isoObject.cpp
Jan 15, 2024
b0eb86a
Update src/iso22133object.cpp
Jan 15, 2024
cdf1961
Update inc/iso22133object.hpp
Jan 15, 2024
90d48a5
New tests for tcpServer and review fixes
Jan 16, 2024
793ff67
Review comments, removed internal state functions
Jan 18, 2024
6c0f8f8
Build fail fix
Jan 19, 2024
9ab203a
WITH_SWIG ifdef for swig-specific functions
Jan 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@
url = https://github.com/RI-SE/iso22133
[submodule "sigslot"]
path = sigslot
url = git@github.com:palacaze/sigslot.git
url = https://github.com/palacaze/sigslot.git
100 changes: 100 additions & 0 deletions .vscode/settings.json
Angeleon marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
{
"files.associations": {
"*.ipp": "cpp",
"cctype": "cpp",
"clocale": "cpp",
"cmath": "cpp",
"csetjmp": "cpp",
"csignal": "cpp",
"cstdarg": "cpp",
"cstddef": "cpp",
"cstdio": "cpp",
"cstdlib": "cpp",
"cstring": "cpp",
"ctime": "cpp",
"cwchar": "cpp",
"cwctype": "cpp",
"any": "cpp",
"array": "cpp",
"atomic": "cpp",
"bit": "cpp",
"*.tcc": "cpp",
"bitset": "cpp",
"chrono": "cpp",
"compare": "cpp",
"concepts": "cpp",
"condition_variable": "cpp",
"coroutine": "cpp",
"cstdint": "cpp",
"deque": "cpp",
"forward_list": "cpp",
"list": "cpp",
"map": "cpp",
"set": "cpp",
"unordered_map": "cpp",
"unordered_set": "cpp",
"vector": "cpp",
"exception": "cpp",
"algorithm": "cpp",
"functional": "cpp",
"iterator": "cpp",
"memory": "cpp",
"memory_resource": "cpp",
"numeric": "cpp",
"optional": "cpp",
"random": "cpp",
"ratio": "cpp",
"string": "cpp",
"string_view": "cpp",
"system_error": "cpp",
"tuple": "cpp",
"type_traits": "cpp",
"utility": "cpp",
"fstream": "cpp",
"initializer_list": "cpp",
"iosfwd": "cpp",
"iostream": "cpp",
"istream": "cpp",
"limits": "cpp",
"mutex": "cpp",
"new": "cpp",
"numbers": "cpp",
"ostream": "cpp",
"ranges": "cpp",
"sstream": "cpp",
"stdexcept": "cpp",
"stop_token": "cpp",
"streambuf": "cpp",
"thread": "cpp",
"cinttypes": "cpp",
"typeinfo": "cpp",
"semaphore": "cpp",
"codecvt": "cpp",
"__nullptr": "cpp",
"future": "cpp",
"cerrno": "cpp",
"__split_buffer": "cpp",
"regex": "cpp",
"__hash_table": "cpp",
"__tree": "cpp",
"queue": "cpp",
"stack": "cpp",
"variant": "cpp",
"filesystem": "cpp",
"shared_mutex": "cpp",
"__mutex_base": "cpp",
"__functional_base": "cpp",
"locale": "cpp",
"strstream": "cpp",
"header.h": "c",
"iso22133.h": "c",
"__locale": "cpp",
"__string": "cpp",
"valarray": "cpp",
"__node_handle": "cpp",
"source_location": "cpp",
"typeindex": "cpp",
"complex": "cpp",
"iomanip": "cpp"
}
}
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -211,4 +211,4 @@ if (BUILD_TESTING)
)
include(GoogleTest)
gtest_discover_tests(${ISOOBJECT_LIBRARY}_test)
endif()
endif()
41 changes: 32 additions & 9 deletions inc/iso22133object.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
#include <thread>
#include <mutex>
#include <atomic>
#include <optional>

#include "header.h"
#include "iso22133.h"
Expand Down Expand Up @@ -51,9 +50,11 @@ class TestObject {

public:
TestObject(const std::string& listenIP = "0.0.0.0");
TestObject(int tcpSocket);
Angeleon marked this conversation as resolved.
Show resolved Hide resolved
virtual ~TestObject();

void disconnect();
void shutdown();

void setPosition(const CartesianPosition& pos) { position = pos; }
void setSpeed(const SpeedType& spd) { speed = spd; }
Expand All @@ -76,11 +77,28 @@ class TestObject {
GeographicPositionType getOrigin() const { return origin; }
std::string getLocalIP() const { return localIP; }
uint32_t getTransmitterID() const { return transmitterID; }
uint32_t getServerID() const { return serverID; }
ObjectSettingsType getObjectSettings() const { return objectSettings; }


/** SWIG Wrappers **/
bool isAwaitingFirstHeab() { return awaitingFirstHeab;}
Angeleon marked this conversation as resolved.
Show resolved Hide resolved
//! Set the endpoint for the process channel
void setProcessChannelEndpoint(int udpSocket, char *addr, const uint32_t port);
//! Wrapper for handling function that converts to char vector
int handleMessage(char *buffer, int bufferLen);
//! Wrapper for state change requests
void requestStateChange(ISO22133::Events::EventType event) { state->handleEvent(*this, event); }
Angeleon marked this conversation as resolved.
Show resolved Hide resolved

//! Used to start the threads
void startHandleTCP() { tcpReceiveThread = std::thread(&TestObject::receiveTCP, this); }
void startHandleUDP() { udpReceiveThread = std::thread(&TestObject::receiveUDP, this); }
void startHEABCheck() { heabTimeoutThread = std::thread(&TestObject::checkHeabLoop, this); }
void startSendMonr() { monrThread = std::thread(&TestObject::sendMonrLoop, this); }

protected:
//! Fill message header with receiver/transmitter id and messageCounter. Returns pointer to input header.
MessageHeaderType *populateMessageHeader(MessageHeaderType *header);

//! Pure virtual safety function that must be implemented by the user.
virtual void handleAbort() { throw std::logic_error("Use of unimplemented abort handler"); }
Expand Down Expand Up @@ -116,7 +134,6 @@ class TestObject {
//! Must be overridden if modifying the Pre-Running state
virtual PreRunning* createPreRunning() const { return new PreRunning; }


//! Signals for events
sigslot::signal<> stateChangeSig;
sigslot::signal<ObjectSettingsType&> osemSig;
Expand Down Expand Up @@ -147,6 +164,8 @@ class TestObject {

ISO22133::State* state;
private:
//! Initializer for commonalities of the constructs
void initialize();

//! TCP receiver loop that should be run in its own thread.
void receiveTCP();
Expand All @@ -157,12 +176,6 @@ class TestObject {
//! MONR sending loop that should be run in its own thread.
void sendMonrLoop();

//! Used to start the threads
void startHandleTCP() { tcpReceiveThread = std::thread(&TestObject::receiveTCP, this); }
void startHandleUDP() { udpReceiveThread = std::thread(&TestObject::receiveUDP, this); }
void startHEABCheck() { heabTimeoutThread = std::thread(&TestObject::checkHeabLoop, this); }
void startSendMonr() { monrThread = std::thread(&TestObject::sendMonrLoop, this); }

//! Function for handling received ISO messages. Calls corresponding
//! handler in the current state.
int handleMessage(std::vector<char>&);
Expand All @@ -179,11 +192,16 @@ class TestObject {
//! Set estimated network delay from HEAB times
void setNetworkDelay(std::chrono::milliseconds);

//! Get the Next message counter to send
char getMessageCounter() { return sentMessageCounter = (sentMessageCounter + 1) % 256; }
Angeleon marked this conversation as resolved.
Show resolved Hide resolved

//! Check if the received message counter is correct and update regardless to the next expected
void checkAndUpdateMessageCounter(const char receivedMessageCounter);

sigslot::signal<>heabTimeout;
std::mutex recvMutex;
std::mutex heabMutex;
std::mutex netwrkDelayMutex;
std::mutex disconnectMutex;
std::string localIP;
std::thread tcpReceiveThread;
std::thread udpReceiveThread;
Expand All @@ -206,8 +224,13 @@ class TestObject {
std::atomic<ObjectStateID> objectState { ISO_OBJECT_STATE_UNKNOWN };
std::atomic<int> readyToArm { OBJECT_READY_TO_ARM_UNAVAILABLE };
std::atomic<int> transmitterID;
std::atomic<int> serverID;
std::atomic<char> expectedMessageCounter;
std::atomic<char> sentMessageCounter;
std::atomic<bool> socketsReceivedFromController { false };
std::atomic<char> errorState { 0 };
std::atomic<bool> awaitingFirstHeab { true };
std::atomic<bool> osemReceived { false };
std::atomic<bool> on { true };

std::chrono::milliseconds estimatedNetworkDelay = std::chrono::milliseconds(0);
Expand Down
2 changes: 1 addition & 1 deletion inc/iso22133state.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <functional>
extern "C"{
#include "iso22133.h"
#include "header.h"
}
namespace ISO22133 {
class TestObject;
Expand Down Expand Up @@ -149,7 +150,6 @@ class Off : public State {
class Init : public State {
public:
virtual ObjectStateID getStateID() const final override { return ISO_OBJECT_STATE_INIT; }
virtual void onExit(TestObject&) override;
private:
void handleTRAJ(TestObject&, std::atomic<HeaderType>&) final override { unexpectedMessageWarning("TRAJ"); }
void handleOSEM(TestObject&, ObjectSettingsType&) final override { unexpectedMessageWarning("OSEM"); }
Expand Down
48 changes: 39 additions & 9 deletions inc/tcpServer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,52 @@
*/
class TcpServer {
public:
TcpServer(std::string ip, uint32_t port) : acceptor(context, boost::asio::ip::tcp::endpoint(boost::asio::ip::address_v4::from_string(ip), port)), socket(context) {
TcpServer(std::string ip, uint32_t port) :
acceptor(context, boost::asio::ip::tcp::endpoint(boost::asio::ip::address_v4::from_string(ip), port)),
socket(context),
acceptIncoming(true) {
Angeleon marked this conversation as resolved.
Show resolved Hide resolved
setBufferSize(defaultBufferSize);
};

TcpServer(int sock) :
socket(context),
acceptor(context),
acceptIncoming(false) {
setBufferSize(defaultBufferSize);
socket.assign(boost::asio::ip::tcp::v4(), sock);
};

virtual ~TcpServer() = default;
void disconnect() {
try {
socket.shutdown(boost::asio::socket_base::shutdown_both);
socket.close();
} catch (boost::system::system_error& e) {}
if (this->acceptIncoming) {
acceptor.cancel();
}
if (socket.is_open()){
socket.shutdown(boost::asio::socket_base::shutdown_both);
socket.close();
}
context.reset();
} catch (boost::system::system_error& e) {
std::cerr << "Error when closing socket: " << e.what() << std::endl;
}
};

void acceptConnection() {
try {
acceptor.accept(socket);
} catch (boost::system::system_error& e) {
std::cerr << "TCP socket accept failed: " << e.what() << std::endl;
}
acceptor.async_accept(socket, [this](const boost::system::error_code& error) {
if (error) {
// Accepting failed, handle the error
// Print the error message for example
if (error == boost::asio::error::operation_aborted) {
std::cerr << "TCP Accept aborted" << std::endl;
} else {
std::cerr << "TCP Accept error: " << error.message() << std::endl;
throw boost::system::system_error(boost::asio::error::eof);
}
}
});
context.run_one();
context.restart();
}

void setBufferSize(size_t size) { dataBuffer.resize(size); };
Expand Down Expand Up @@ -77,6 +106,7 @@ class TcpServer {
private:
std::vector<char> dataBuffer;
size_t defaultBufferSize = 4096;
bool acceptIncoming;

boost::asio::io_context context;
boost::asio::ip::tcp::acceptor acceptor;
Expand Down
1 change: 1 addition & 0 deletions inc/trajDecoder.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <atomic>

#include "iso22133.h"
#include "traj.h"

/**
* @brief Class for decoding TRAJ messages. Stores TRAJ data and
Expand Down
12 changes: 11 additions & 1 deletion inc/udpServer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,19 @@
*/
class UdpServer {
public:
UdpServer(std::string ip, uint32_t port) : socket(context, boost::asio::ip::udp::endpoint(boost::asio::ip::address_v4::from_string(ip), port)) {
UdpServer(const std::string &ip, uint32_t port) :
socket(context, boost::asio::ip::udp::endpoint(boost::asio::ip::address_v4::from_string(ip), port)) {
setBufferSize(defaultBufferSize);
};
UdpServer() :
socket(context) {
setBufferSize(defaultBufferSize);
};

void setEndpoint(int native_socket, boost::asio::ip::udp::endpoint &ep) {
socket.assign(boost::asio::ip::udp::v4(), native_socket);
senderEndpoint = ep;
};

void setBufferSize(size_t size) { dataBuffer.resize(size); };

Expand Down
Loading