From 6e5f47083666380a7dff5d0d05b625d62788bc71 Mon Sep 17 00:00:00 2001 From: Chris Wiggins Date: Mon, 20 May 2024 19:06:04 +1200 Subject: [PATCH] feat: RabbitMQ Event handler blocks startup until it is connected --- ...janus.eventhandler.rabbitmqevh.jcfg.sample | 1 + src/events/janus_rabbitmqevh.c | 28 ++++++++++++++++++- 2 files changed, 28 insertions(+), 1 deletion(-) diff --git a/conf/janus.eventhandler.rabbitmqevh.jcfg.sample b/conf/janus.eventhandler.rabbitmqevh.jcfg.sample index c8635e2453..6d2d1786ec 100644 --- a/conf/janus.eventhandler.rabbitmqevh.jcfg.sample +++ b/conf/janus.eventhandler.rabbitmqevh.jcfg.sample @@ -21,6 +21,7 @@ general: { route_key = "janus-events" # Routing key to use when publishing messages #exchange_type = "fanout" # Rabbitmq exchange_type can be one of the available types: direct, topic, headers and fanout (fanout by defualt). #heartbeat = 60 # Defines the seconds without communication that should pass before considering the TCP connection unreachable. + #block_startup_until_connected = true # Whether to block the server from starting until a connection to the RabbitMQ server is established. #declare_outgoing_queue = true # By default (for backwards compatibility), we declare an outgoing queue. Set this to false to disable that behavior #ssl_enable = false # Whether ssl support must be enabled diff --git a/src/events/janus_rabbitmqevh.c b/src/events/janus_rabbitmqevh.c index ca87043b28..84d16439eb 100644 --- a/src/events/janus_rabbitmqevh.c +++ b/src/events/janus_rabbitmqevh.c @@ -87,6 +87,7 @@ static GThread *handler_thread; static GThread *in_thread; static void *jns_rmqevh_hdlr(void *data); static void *jns_rmqevh_hrtbt(void *data); +int janus_rabbitmqevh_tryconnect(void); int janus_rabbitmqevh_connect(void); /* Queue of events to handle */ @@ -121,6 +122,7 @@ static gboolean ssl_verify_peer = FALSE; static gboolean ssl_verify_hostname = FALSE; static char *route_key = NULL, *exchange = NULL, *exchange_type = NULL ; static uint16_t heartbeat = 0; +static gboolean block_startup_until_connected = FALSE; static uint16_t rmqport = AMQP_PROTOCOL_PORT; static gboolean declare_outgoing_queue = TRUE; @@ -236,6 +238,10 @@ int janus_rabbitmqevh_init(const char *config_path) { heartbeat = 0; } + item = janus_config_get(config, config_general, janus_config_type_item, "block_startup_until_connected"); + if(item && item->value && janus_is_true(item->value)) + block_startup_until_connected = TRUE; + /* SSL config*/ item = janus_config_get(config, config_general, janus_config_type_item, "ssl_enable"); if(!item || !item->value || !janus_is_true(item->value)) { @@ -346,6 +352,26 @@ int janus_rabbitmqevh_init(const char *config_path) { } int janus_rabbitmqevh_connect(void) { + uint8_t retry_interval = 5; // Retry interval in seconds + + while (!g_atomic_int_get(&stopping)) { + if (janus_rabbitmqevh_tryconnect() == 0) { + return 0; + } + + if (!block_startup_until_connected) { + break; + } + + JANUS_LOG(LOG_ERR, "RabbitMQEventHandler: Connection failed, retrying in %d seconds\n", retry_interval); + g_usleep(retry_interval * 1000000); + } + + JANUS_LOG(LOG_FATAL, "Failed to connect to RabbitMQ\n"); + return -1; +} + +int janus_rabbitmqevh_tryconnect(void){ rmq_conn = amqp_new_connection(); amqp_socket_t *socket = NULL; int status = AMQP_STATUS_OK; @@ -665,7 +691,7 @@ static void *jns_rmqevh_hrtbt(void *data) { } if(!g_atomic_int_get(&stopping)) { JANUS_LOG(LOG_VERB, "RabbitMQEventHandler: Trying to reconnect\n"); - int result = janus_rabbitmqevh_connect(); + int result = janus_rabbitmqevh_tryconnect(); if(result < 0) { g_usleep(5000000); } else {