Skip to content

Commit

Permalink
feat: RabbitMQ Transport blocks startup until it is connected
Browse files Browse the repository at this point in the history
  • Loading branch information
chriswiggins committed May 20, 2024
1 parent 6e5f470 commit 9ea8a4d
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 1 deletion.
1 change: 1 addition & 0 deletions conf/janus.transport.rabbitmq.jcfg.sample
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ general: {
#queue_autodelete = false # Whether or not incoming queue should autodelete after janus disconnects from RabbitMQ
#queue_exclusive = false # Whether or not incoming queue should only allow one subscriber
#heartbeat = 60 # Defines the seconds without communication that should pass before considering the TCP connection unreachable.
#block_startup_until_connected = true # Whether to block the server from starting until a connection to the RabbitMQ server is established.

#ssl_enabled = false # Whether ssl support must be enabled
#ssl_verify_peer = true # Whether peer verification must be enabled
Expand Down
23 changes: 22 additions & 1 deletion src/transports/janus_rabbitmq.c
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ static gboolean declare_outgoing_queue = FALSE, declare_outgoing_queue_admin = F
amqp_boolean_t queue_durable = 0, queue_exclusive = 0, queue_autodelete = 0,
queue_durable_admin = 0, queue_exclusive_admin = 0, queue_autodelete_admin = 0;
static uint16_t heartbeat = 0;
static gboolean block_startup_until_connected = FALSE;

/* Transport implementation */
int janus_rabbitmq_init(janus_transport_callbacks *callback, const char *config_path) {
Expand Down Expand Up @@ -312,6 +313,11 @@ int janus_rabbitmq_init(janus_transport_callbacks *callback, const char *config_
heartbeat = 0;
}

/* Check if we need to block startup until RabbitMQ is connected */
item = janus_config_get(config, config_general, janus_config_type_item, "block_startup_until_connected");
if(item && item->value && janus_is_true(item->value))
block_startup_until_connected = TRUE;

/* Now check if the Janus API must be supported */
item = janus_config_get(config, config_general, janus_config_type_item, "enabled");
if(item == NULL) {
Expand Down Expand Up @@ -455,7 +461,22 @@ int janus_rabbitmq_init(janus_transport_callbacks *callback, const char *config_
rmq_client = g_malloc0(sizeof(janus_rabbitmq_client));

/* Connect */
int result = janus_rabbitmq_connect();
uint8_t retry_interval = 5; // Retry interval in seconds
int result = -1;
while(!rmq_client->destroy && !g_atomic_int_get(&stopping)) {
result = janus_rabbitmq_connect();
if (result == 0) {
break;
}

if (!block_startup_until_connected) {
break;
}

JANUS_LOG(LOG_ERR, "RabbitMQ: Connection failed, retrying in %d seconds\n", retry_interval);
g_usleep(retry_interval * 1000000);
}

if(result < 0) {
goto error;
}
Expand Down

0 comments on commit 9ea8a4d

Please sign in to comment.