Skip to content

Commit

Permalink
urtps: Simplified timesync
Browse files Browse the repository at this point in the history
  • Loading branch information
jnippula committed Jul 8, 2021
1 parent 1129599 commit 913cf7b
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 15 deletions.
29 changes: 27 additions & 2 deletions msg/templates/uorb_microcdr/msg.cpp.em
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ topic_name = spec.short_name
#include <ucdr/microcdr.h>
#include <uORB/topics/@(topic_name).h>
#include <uORB_microcdr/topics/@(topic_name).h>
#include <drivers/drv_hrt.h>

@#################################################
@# Searching for serialize function per each field
Expand Down Expand Up @@ -94,12 +95,31 @@ def get_serialization_type_name(type_name):
else:
raise Exception("Type {0} not supported, add to type_serialize_map!".format(type_name))

def serialize_timestamp_delta(field, scope_name, abs_time_exists):
if (not abs_time_exists):
print(" const hrt_abstime now = hrt_absolute_time();")
print(" hrt_abstime " + scope_name+str(field.name) + "__ = now - input->" + scope_name+str(field.name) + ";")
print(" ucdr_serialize_" + str(get_serialization_type_name(field.type)) + "(writer, " + scope_name+str(field.name) + "__);")
return True

def deserialize_timestamp_delta(field, scope_name, abs_time_exists):
if (not abs_time_exists):
print(" const hrt_abstime now = hrt_absolute_time();")
print(" ucdr_deserialize_" + str(get_serialization_type_name(field.type)) + "(reader, &output->" + scope_name+str(field.name) + ");")
print(" output->" + scope_name+str(field.name) + " = now - output->" + scope_name+str(field.name) + ";")
return True

def add_serialize_functions(fields, scope_name):
hrt_abstime_read = False
for field in fields:
if (not field.is_header):
if (field.is_builtin):
if (not field.is_array):
print(" ucdr_serialize_" + str(get_serialization_type_name(field.type)) + "(writer, input->" + scope_name+str(field.name) + ");")
if (topic_name != "timesync" and
(scope_name+str(field.name) == "timestamp")):
hrt_abstime_read = serialize_timestamp_delta(field, scope_name, hrt_abstime_read)
else:
print(" ucdr_serialize_" + str(get_serialization_type_name(field.type)) + "(writer, input->" + scope_name+str(field.name) + ");")
else:
print(" ucdr_serialize_array_" + str(get_serialization_type_name(field.base_type)) + "(writer, input->" + scope_name+str(field.name) + ", " + str(field.array_len) + ");")
else:
Expand All @@ -113,11 +133,16 @@ def add_serialize_functions(fields, scope_name):
add_serialize_functions(children_fields, name + ('[%d].' %i))

def add_deserialize_functions(fields, scope_name):
hrt_abstime_read = False
for field in fields:
if (not field.is_header):
if (field.is_builtin):
if (not field.is_array):
print(" ucdr_deserialize_" + str(get_serialization_type_name(field.type)) + "(reader, &output->" + scope_name+str(field.name) + ");")
if (topic_name != "timesync" and
(scope_name+str(field.name) == "timestamp")):
hrt_abstime_read = deserialize_timestamp_delta(field, scope_name, hrt_abstime_read)
else:
print(" ucdr_deserialize_" + str(get_serialization_type_name(field.type)) + "(reader, &output->" + scope_name+str(field.name) + ");")
else:
print(" ucdr_deserialize_array_" + str(get_serialization_type_name(field.base_type)) + "(reader, output->" + scope_name+str(field.name) + ", " + str(field.array_len) + ");")
else:
Expand Down
32 changes: 22 additions & 10 deletions msg/templates/urtps/RtpsTopics.cpp.em
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,10 @@ package = package[0]

#include "RtpsTopics.h"

bool RtpsTopics::init(std::condition_variable* t_send_queue_cv, std::mutex* t_send_queue_mutex, std::queue<uint8_t>* t_send_queue, const std::string& ns)
bool RtpsTopics::init(std::condition_variable* t_send_queue_cv, std::mutex* t_send_queue_mutex, std::queue<uint8_t>* t_send_queue, const std::string& ns, const uint32_t transmission_speed_bytes_per_sec)
{
_tr_speed = transmission_speed_bytes_per_sec;

@[if recv_topics]@
// Initialise subscribers
std::cout << "\033[0;36m--- Subscribers ---\033[0m" << std::endl;
Expand Down Expand Up @@ -107,10 +109,16 @@ void RtpsTopics::publish(uint8_t topic_ID, char data_buffer[], size_t len)

if (getMsgSysID(&st) == 1) {
@[ end if]@
// apply timestamp offset
// Replace timestamp with time delta from original ts to 'now'
uint64_t now = _timesync->getMonoTimeUSec();
uint64_t timestamp = getMsgTimestamp(&st);
_timesync->subtractOffset(timestamp);
setMsgTimestamp(&st, timestamp);
uint64_t ts_delta = now - timestamp + (len * 1000000 / _tr_speed);
setMsgTimestamp(&st, ts_delta);
uint64_t timestampsample = getMsgTimestampSample(&st);
if (timestampsample) {
ts_delta = now - timestampsample + (len * 1000000 / _tr_speed);
setMsgTimestampSample(&st, ts_delta);
}
_@(topic)_pub.publish(&st);
@[ if topic == 'Timesync' or topic == 'timesync']@
}
Expand Down Expand Up @@ -139,13 +147,17 @@ bool RtpsTopics::getMsg(const uint8_t topic_ID, eprosima::fastcdr::Cdr &scdr)
@[ if topic == 'Timesync' or topic == 'timesync']@
if (getMsgSysID(&msg) == 0) {
@[ end if]@
// apply timestamps offset
uint64_t timestamp = getMsgTimestamp(&msg);
uint64_t timestamp_sample = getMsgTimestampSample(&msg);
_timesync->addOffset(timestamp);
// Substract time_delta and transmission delay from current time and set new timestamp
uint64_t now = _timesync->getMonoTimeUSec();
uint64_t ts_delta = getMsgTimestamp(&msg);
uint64_t timestamp = now - ts_delta;
setMsgTimestamp(&msg, timestamp);
_timesync->addOffset(timestamp_sample);
setMsgTimestampSample(&msg, timestamp_sample);
uint64_t timestamp_sample;
ts_delta = getMsgTimestampSample(&msg);
if (ts_delta) {
timestamp_sample = now - ts_delta;
setMsgTimestampSample(&msg, timestamp_sample);
}
msg.serialize(scdr);
ret = true;
@[ if topic == 'Timesync' or topic == 'timesync']@
Expand Down
9 changes: 8 additions & 1 deletion msg/templates/urtps/RtpsTopics.h.em
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ using @(topic)_msg_t = @(topic);

class RtpsTopics {
public:
bool init(std::condition_variable* t_send_queue_cv, std::mutex* t_send_queue_mutex, std::queue<uint8_t>* t_send_queue, const std::string& ns);
bool init(std::condition_variable* t_send_queue_cv, std::mutex* t_send_queue_mutex, std::queue<uint8_t>* t_send_queue, const std::string& ns, const uint32_t transmission_speed_bytes_per_sec);
void set_timesync(const std::shared_ptr<TimeSync>& timesync) { _timesync = timesync; };
@[if send_topics]@
void publish(uint8_t topic_ID, char data_buffer[], size_t len);
Expand Down Expand Up @@ -204,4 +204,11 @@ private:
* messages timestamps.
*/
std::shared_ptr<TimeSync> _timesync;

/**
* @@brief Transmission speed.
* Speed (bytes per second) how fast the data goes through the transmission line
* This is used for calculating timestamp for received messages
*/
uint32_t _tr_speed;
};
4 changes: 2 additions & 2 deletions msg/templates/urtps/microRTPS_agent.cpp.em
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ struct options {
static void usage(const char *name)
{
printf("usage: %s [options]\n\n"
" -b <baudrate> UART device baudrate. Default 460800\n"
" -b <baudrate> UART device baudrate. Default 460800. Used for transmission delay also in UDP case\n"
" -d <device> UART device. Default /dev/ttyACM0\n"
" -f <sw flow control> Activates UART link SW flow control\n"
" -h <hw flow control> Activates UART link HW flow control\n"
Expand Down Expand Up @@ -276,7 +276,7 @@ int main(int argc, char** argv)
topics.set_timesync(timeSync);

@[if recv_topics]@
topics.init(&t_send_queue_cv, &t_send_queue_mutex, &t_send_queue, _options.ns);
topics.init(&t_send_queue_cv, &t_send_queue_mutex, &t_send_queue, _options.ns, _options.baudrate);
@[end if]@

running = true;
Expand Down

0 comments on commit 913cf7b

Please sign in to comment.