Skip to content

Commit

Permalink
feat: RabbitMQ Event handler blocks startup until it is connected
Browse files Browse the repository at this point in the history
  • Loading branch information
chriswiggins committed May 20, 2024
1 parent fdd90a5 commit 6e5f470
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 1 deletion.
1 change: 1 addition & 0 deletions conf/janus.eventhandler.rabbitmqevh.jcfg.sample
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ general: {
route_key = "janus-events" # Routing key to use when publishing messages
#exchange_type = "fanout" # Rabbitmq exchange_type can be one of the available types: direct, topic, headers and fanout (fanout by defualt).
#heartbeat = 60 # Defines the seconds without communication that should pass before considering the TCP connection unreachable.
#block_startup_until_connected = true # Whether to block the server from starting until a connection to the RabbitMQ server is established.
#declare_outgoing_queue = true # By default (for backwards compatibility), we declare an outgoing queue. Set this to false to disable that behavior

#ssl_enable = false # Whether ssl support must be enabled
Expand Down
28 changes: 27 additions & 1 deletion src/events/janus_rabbitmqevh.c
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ static GThread *handler_thread;
static GThread *in_thread;
static void *jns_rmqevh_hdlr(void *data);
static void *jns_rmqevh_hrtbt(void *data);
int janus_rabbitmqevh_tryconnect(void);
int janus_rabbitmqevh_connect(void);

/* Queue of events to handle */
Expand Down Expand Up @@ -121,6 +122,7 @@ static gboolean ssl_verify_peer = FALSE;
static gboolean ssl_verify_hostname = FALSE;
static char *route_key = NULL, *exchange = NULL, *exchange_type = NULL ;
static uint16_t heartbeat = 0;
static gboolean block_startup_until_connected = FALSE;
static uint16_t rmqport = AMQP_PROTOCOL_PORT;
static gboolean declare_outgoing_queue = TRUE;

Expand Down Expand Up @@ -236,6 +238,10 @@ int janus_rabbitmqevh_init(const char *config_path) {
heartbeat = 0;
}

item = janus_config_get(config, config_general, janus_config_type_item, "block_startup_until_connected");
if(item && item->value && janus_is_true(item->value))
block_startup_until_connected = TRUE;

/* SSL config*/
item = janus_config_get(config, config_general, janus_config_type_item, "ssl_enable");
if(!item || !item->value || !janus_is_true(item->value)) {
Expand Down Expand Up @@ -346,6 +352,26 @@ int janus_rabbitmqevh_init(const char *config_path) {
}

int janus_rabbitmqevh_connect(void) {
uint8_t retry_interval = 5; // Retry interval in seconds

while (!g_atomic_int_get(&stopping)) {
if (janus_rabbitmqevh_tryconnect() == 0) {
return 0;
}

if (!block_startup_until_connected) {
break;
}

JANUS_LOG(LOG_ERR, "RabbitMQEventHandler: Connection failed, retrying in %d seconds\n", retry_interval);
g_usleep(retry_interval * 1000000);
}

JANUS_LOG(LOG_FATAL, "Failed to connect to RabbitMQ\n");
return -1;
}

int janus_rabbitmqevh_tryconnect(void){
rmq_conn = amqp_new_connection();
amqp_socket_t *socket = NULL;
int status = AMQP_STATUS_OK;
Expand Down Expand Up @@ -665,7 +691,7 @@ static void *jns_rmqevh_hrtbt(void *data) {
}
if(!g_atomic_int_get(&stopping)) {
JANUS_LOG(LOG_VERB, "RabbitMQEventHandler: Trying to reconnect\n");
int result = janus_rabbitmqevh_connect();
int result = janus_rabbitmqevh_tryconnect();
if(result < 0) {
g_usleep(5000000);
} else {
Expand Down

0 comments on commit 6e5f470

Please sign in to comment.