From 913cf7bbd972c6e7254fb7879a17447ccb2d16bc Mon Sep 17 00:00:00 2001 From: Jari Nippula Date: Fri, 2 Jul 2021 15:27:27 +0300 Subject: [PATCH] urtps: Simplified timesync --- msg/templates/uorb_microcdr/msg.cpp.em | 29 ++++++++++++++++++-- msg/templates/urtps/RtpsTopics.cpp.em | 32 +++++++++++++++------- msg/templates/urtps/RtpsTopics.h.em | 9 +++++- msg/templates/urtps/microRTPS_agent.cpp.em | 4 +-- 4 files changed, 59 insertions(+), 15 deletions(-) diff --git a/msg/templates/uorb_microcdr/msg.cpp.em b/msg/templates/uorb_microcdr/msg.cpp.em index ea7251c47bea..ebec954a2f39 100644 --- a/msg/templates/uorb_microcdr/msg.cpp.em +++ b/msg/templates/uorb_microcdr/msg.cpp.em @@ -65,6 +65,7 @@ topic_name = spec.short_name #include #include #include +#include @################################################# @# Searching for serialize function per each field @@ -94,12 +95,31 @@ def get_serialization_type_name(type_name): else: raise Exception("Type {0} not supported, add to type_serialize_map!".format(type_name)) +def serialize_timestamp_delta(field, scope_name, abs_time_exists): + if (not abs_time_exists): + print(" const hrt_abstime now = hrt_absolute_time();") + print(" hrt_abstime " + scope_name+str(field.name) + "__ = now - input->" + scope_name+str(field.name) + ";") + print(" ucdr_serialize_" + str(get_serialization_type_name(field.type)) + "(writer, " + scope_name+str(field.name) + "__);") + return True + +def deserialize_timestamp_delta(field, scope_name, abs_time_exists): + if (not abs_time_exists): + print(" const hrt_abstime now = hrt_absolute_time();") + print(" ucdr_deserialize_" + str(get_serialization_type_name(field.type)) + "(reader, &output->" + scope_name+str(field.name) + ");") + print(" output->" + scope_name+str(field.name) + " = now - output->" + scope_name+str(field.name) + ";") + return True + def add_serialize_functions(fields, scope_name): + hrt_abstime_read = False for field in fields: if (not field.is_header): if (field.is_builtin): if (not field.is_array): - print(" ucdr_serialize_" + str(get_serialization_type_name(field.type)) + "(writer, input->" + scope_name+str(field.name) + ");") + if (topic_name != "timesync" and + (scope_name+str(field.name) == "timestamp")): + hrt_abstime_read = serialize_timestamp_delta(field, scope_name, hrt_abstime_read) + else: + print(" ucdr_serialize_" + str(get_serialization_type_name(field.type)) + "(writer, input->" + scope_name+str(field.name) + ");") else: print(" ucdr_serialize_array_" + str(get_serialization_type_name(field.base_type)) + "(writer, input->" + scope_name+str(field.name) + ", " + str(field.array_len) + ");") else: @@ -113,11 +133,16 @@ def add_serialize_functions(fields, scope_name): add_serialize_functions(children_fields, name + ('[%d].' %i)) def add_deserialize_functions(fields, scope_name): + hrt_abstime_read = False for field in fields: if (not field.is_header): if (field.is_builtin): if (not field.is_array): - print(" ucdr_deserialize_" + str(get_serialization_type_name(field.type)) + "(reader, &output->" + scope_name+str(field.name) + ");") + if (topic_name != "timesync" and + (scope_name+str(field.name) == "timestamp")): + hrt_abstime_read = deserialize_timestamp_delta(field, scope_name, hrt_abstime_read) + else: + print(" ucdr_deserialize_" + str(get_serialization_type_name(field.type)) + "(reader, &output->" + scope_name+str(field.name) + ");") else: print(" ucdr_deserialize_array_" + str(get_serialization_type_name(field.base_type)) + "(reader, output->" + scope_name+str(field.name) + ", " + str(field.array_len) + ");") else: diff --git a/msg/templates/urtps/RtpsTopics.cpp.em b/msg/templates/urtps/RtpsTopics.cpp.em index e9e1b271053a..576c5c25b86b 100644 --- a/msg/templates/urtps/RtpsTopics.cpp.em +++ b/msg/templates/urtps/RtpsTopics.cpp.em @@ -56,8 +56,10 @@ package = package[0] #include "RtpsTopics.h" -bool RtpsTopics::init(std::condition_variable* t_send_queue_cv, std::mutex* t_send_queue_mutex, std::queue* t_send_queue, const std::string& ns) +bool RtpsTopics::init(std::condition_variable* t_send_queue_cv, std::mutex* t_send_queue_mutex, std::queue* t_send_queue, const std::string& ns, const uint32_t transmission_speed_bytes_per_sec) { + _tr_speed = transmission_speed_bytes_per_sec; + @[if recv_topics]@ // Initialise subscribers std::cout << "\033[0;36m--- Subscribers ---\033[0m" << std::endl; @@ -107,10 +109,16 @@ void RtpsTopics::publish(uint8_t topic_ID, char data_buffer[], size_t len) if (getMsgSysID(&st) == 1) { @[ end if]@ - // apply timestamp offset + // Replace timestamp with time delta from original ts to 'now' + uint64_t now = _timesync->getMonoTimeUSec(); uint64_t timestamp = getMsgTimestamp(&st); - _timesync->subtractOffset(timestamp); - setMsgTimestamp(&st, timestamp); + uint64_t ts_delta = now - timestamp + (len * 1000000 / _tr_speed); + setMsgTimestamp(&st, ts_delta); + uint64_t timestampsample = getMsgTimestampSample(&st); + if (timestampsample) { + ts_delta = now - timestampsample + (len * 1000000 / _tr_speed); + setMsgTimestampSample(&st, ts_delta); + } _@(topic)_pub.publish(&st); @[ if topic == 'Timesync' or topic == 'timesync']@ } @@ -139,13 +147,17 @@ bool RtpsTopics::getMsg(const uint8_t topic_ID, eprosima::fastcdr::Cdr &scdr) @[ if topic == 'Timesync' or topic == 'timesync']@ if (getMsgSysID(&msg) == 0) { @[ end if]@ - // apply timestamps offset - uint64_t timestamp = getMsgTimestamp(&msg); - uint64_t timestamp_sample = getMsgTimestampSample(&msg); - _timesync->addOffset(timestamp); + // Substract time_delta and transmission delay from current time and set new timestamp + uint64_t now = _timesync->getMonoTimeUSec(); + uint64_t ts_delta = getMsgTimestamp(&msg); + uint64_t timestamp = now - ts_delta; setMsgTimestamp(&msg, timestamp); - _timesync->addOffset(timestamp_sample); - setMsgTimestampSample(&msg, timestamp_sample); + uint64_t timestamp_sample; + ts_delta = getMsgTimestampSample(&msg); + if (ts_delta) { + timestamp_sample = now - ts_delta; + setMsgTimestampSample(&msg, timestamp_sample); + } msg.serialize(scdr); ret = true; @[ if topic == 'Timesync' or topic == 'timesync']@ diff --git a/msg/templates/urtps/RtpsTopics.h.em b/msg/templates/urtps/RtpsTopics.h.em index 4f3f9d4e15fb..91fa452d48ff 100644 --- a/msg/templates/urtps/RtpsTopics.h.em +++ b/msg/templates/urtps/RtpsTopics.h.em @@ -93,7 +93,7 @@ using @(topic)_msg_t = @(topic); class RtpsTopics { public: - bool init(std::condition_variable* t_send_queue_cv, std::mutex* t_send_queue_mutex, std::queue* t_send_queue, const std::string& ns); + bool init(std::condition_variable* t_send_queue_cv, std::mutex* t_send_queue_mutex, std::queue* t_send_queue, const std::string& ns, const uint32_t transmission_speed_bytes_per_sec); void set_timesync(const std::shared_ptr& timesync) { _timesync = timesync; }; @[if send_topics]@ void publish(uint8_t topic_ID, char data_buffer[], size_t len); @@ -204,4 +204,11 @@ private: * messages timestamps. */ std::shared_ptr _timesync; + + /** + * @@brief Transmission speed. + * Speed (bytes per second) how fast the data goes through the transmission line + * This is used for calculating timestamp for received messages + */ + uint32_t _tr_speed; }; diff --git a/msg/templates/urtps/microRTPS_agent.cpp.em b/msg/templates/urtps/microRTPS_agent.cpp.em index 728eb2a9d22b..bddcc60eb255 100644 --- a/msg/templates/urtps/microRTPS_agent.cpp.em +++ b/msg/templates/urtps/microRTPS_agent.cpp.em @@ -113,7 +113,7 @@ struct options { static void usage(const char *name) { printf("usage: %s [options]\n\n" - " -b UART device baudrate. Default 460800\n" + " -b UART device baudrate. Default 460800. Used for transmission delay also in UDP case\n" " -d UART device. Default /dev/ttyACM0\n" " -f Activates UART link SW flow control\n" " -h Activates UART link HW flow control\n" @@ -276,7 +276,7 @@ int main(int argc, char** argv) topics.set_timesync(timeSync); @[if recv_topics]@ - topics.init(&t_send_queue_cv, &t_send_queue_mutex, &t_send_queue, _options.ns); + topics.init(&t_send_queue_cv, &t_send_queue_mutex, &t_send_queue, _options.ns, _options.baudrate); @[end if]@ running = true;