From 9ea8a4d15dae1b16776e0b870b3fb5ee8a9f489e Mon Sep 17 00:00:00 2001 From: Chris Wiggins Date: Mon, 20 May 2024 19:07:01 +1200 Subject: [PATCH] feat: RabbitMQ Transport blocks startup until it is connected --- conf/janus.transport.rabbitmq.jcfg.sample | 1 + src/transports/janus_rabbitmq.c | 23 ++++++++++++++++++++++- 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/conf/janus.transport.rabbitmq.jcfg.sample b/conf/janus.transport.rabbitmq.jcfg.sample index 27cfebd0a0..c9bf9e642b 100644 --- a/conf/janus.transport.rabbitmq.jcfg.sample +++ b/conf/janus.transport.rabbitmq.jcfg.sample @@ -35,6 +35,7 @@ general: { #queue_autodelete = false # Whether or not incoming queue should autodelete after janus disconnects from RabbitMQ #queue_exclusive = false # Whether or not incoming queue should only allow one subscriber #heartbeat = 60 # Defines the seconds without communication that should pass before considering the TCP connection unreachable. + #block_startup_until_connected = true # Whether to block the server from starting until a connection to the RabbitMQ server is established. #ssl_enabled = false # Whether ssl support must be enabled #ssl_verify_peer = true # Whether peer verification must be enabled diff --git a/src/transports/janus_rabbitmq.c b/src/transports/janus_rabbitmq.c index 8ed3675b3c..2582708687 100644 --- a/src/transports/janus_rabbitmq.c +++ b/src/transports/janus_rabbitmq.c @@ -190,6 +190,7 @@ static gboolean declare_outgoing_queue = FALSE, declare_outgoing_queue_admin = F amqp_boolean_t queue_durable = 0, queue_exclusive = 0, queue_autodelete = 0, queue_durable_admin = 0, queue_exclusive_admin = 0, queue_autodelete_admin = 0; static uint16_t heartbeat = 0; +static gboolean block_startup_until_connected = FALSE; /* Transport implementation */ int janus_rabbitmq_init(janus_transport_callbacks *callback, const char *config_path) { @@ -312,6 +313,11 @@ int janus_rabbitmq_init(janus_transport_callbacks *callback, const char *config_ heartbeat = 0; } + /* Check if we need to block startup until RabbitMQ is connected */ + item = janus_config_get(config, config_general, janus_config_type_item, "block_startup_until_connected"); + if(item && item->value && janus_is_true(item->value)) + block_startup_until_connected = TRUE; + /* Now check if the Janus API must be supported */ item = janus_config_get(config, config_general, janus_config_type_item, "enabled"); if(item == NULL) { @@ -455,7 +461,22 @@ int janus_rabbitmq_init(janus_transport_callbacks *callback, const char *config_ rmq_client = g_malloc0(sizeof(janus_rabbitmq_client)); /* Connect */ - int result = janus_rabbitmq_connect(); + uint8_t retry_interval = 5; // Retry interval in seconds + int result = -1; + while(!rmq_client->destroy && !g_atomic_int_get(&stopping)) { + result = janus_rabbitmq_connect(); + if (result == 0) { + break; + } + + if (!block_startup_until_connected) { + break; + } + + JANUS_LOG(LOG_ERR, "RabbitMQ: Connection failed, retrying in %d seconds\n", retry_interval); + g_usleep(retry_interval * 1000000); + } + if(result < 0) { goto error; }