Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bridge subscription failure will cause a disconnect #2935

Open
wants to merge 4 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion ChangeLog.txt
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,9 @@ Broker:
choose TLS v1.1, but this is not recommended and will be removed in a future
version.
- Add -q option to allow logging to be disabled at the command line.
- Add suport for PROXY protocol v2.
- Add support for PROXY protocol v2.
- Add `bridge_fatal_sub_errors` option to control what happens when a bridge
fails to subscribe to a topic.

Plugins / plugin interface:
- Add persist-sqlite plugin.
Expand Down
1 change: 1 addition & 0 deletions fuzzing/corpora/broker_conf.dict
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
"bridge_max_packet_size"
"bridge_max_topic_alias"
"bridge_outgoing_retain"
"bridge_fatal_sub_errors"
"bridge_protocol_version"
"bridge_psk"
"bridge_receive_maximum"
Expand Down
8 changes: 8 additions & 0 deletions lib/handle_suback.c
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,14 @@ int handle__suback(struct mosquitto *mosq)
mosquitto_property_free_all(&properties);
return rc;
}

#ifdef WITH_BRIDGE
if(mosq->bridge){
rc = bridge__on_suback(mosq, qos);
if(rc) return rc;
}
#endif

granted_qos[i] = (int)qos;
i++;
}
Expand Down
11 changes: 10 additions & 1 deletion man/mosquitto.conf.5.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1974,7 +1974,16 @@ openssl dhparam -out dhparam.pem 2048</programlisting>
other setting. Defaults to <replaceable>true</replaceable>.</para>
</listitem>
</varlistentry>

<varlistentry>
<term><option>bridge_fatal_sub_errors</option> [ true | false ]</term>
<listitem>
<para> A failure to subscribe to a topic will cause an immediate
disconnection. The hope is that a temporary failure will disappear
after reconnecting. If you desire to silently ignore subscription
errors (not advised), you can set the bridge_fatal_sub_errors
option to false. Defaults to <replaceable>true</replaceable>.</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>bridge_protocol_version</option> <replaceable>version</replaceable></term>
<listitem>
Expand Down
6 changes: 6 additions & 0 deletions mosquitto.conf
Original file line number Diff line number Diff line change
Expand Up @@ -940,6 +940,12 @@
# all outgoing messages to that bridge, regardless of any other setting.
#bridge_outgoing_retain true

# A failure to subscribe to a topic will cause an immediate disconnection. The
# hope is that a temporary failure will disappear after reconnecting. If you
# desire to silently ignore subscription errors (not advised), you can set the
# bridge_fatal_sub_errors option to false.
#bridge_fatal_sub_errors true

# If you wish to restrict the size of messages sent to a remote bridge, use the
# bridge_max_packet_size option. This sets the maximum number of bytes for
# the total message, including headers and payload.
Expand Down
10 changes: 10 additions & 0 deletions src/bridge.c
Original file line number Diff line number Diff line change
Expand Up @@ -713,6 +713,16 @@ int bridge__on_connect(struct mosquitto *context)
return MOSQ_ERR_SUCCESS;
}

int bridge__on_suback(struct mosquitto *context, int qos) {
if(qos>2){
log__printf(NULL, MOSQ_LOG_ERR, "Error on bridge subscription: %s", mosquitto_reason_string(qos));
if(context->bridge->fatal_sub_errors){
do_disconnect(context, MOSQ_ERR_CONN_LOST);
}
}

return MOSQ_ERR_SUCCESS;
}

int bridge__register_local_connections(void)
{
Expand Down
8 changes: 8 additions & 0 deletions src/conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -1294,6 +1294,13 @@ static int config__read_file_core(struct mosquitto__config *config, bool reload,
if(conf__parse_bool(&token, "bridge_outgoing_retain", &cur_bridge->outgoing_retain, &saveptr)) return MOSQ_ERR_INVAL;
#else
log__printf(NULL, MOSQ_LOG_WARNING, "Warning: Bridge support not available.");
#endif
}else if(!strcmp(token, "bridge_fatal_sub_errors")){
#if defined(WITH_BRIDGE)
REQUIRE_BRIDGE(token);
if(conf__parse_bool(&token, "bridge_fatal_sub_errors", &cur_bridge->fatal_sub_errors, &saveptr)) return MOSQ_ERR_INVAL;
#else
log__printf(NULL, MOSQ_LOG_WARNING, "Warning: Bridge support not available.");
#endif
}else if(!strcmp(token, "bridge_keyfile")){
#if defined(WITH_BRIDGE) && defined(WITH_TLS)
Expand Down Expand Up @@ -1566,6 +1573,7 @@ static int config__read_file_core(struct mosquitto__config *config, bool reload,
cur_bridge->protocol_version = mosq_p_mqtt311;
cur_bridge->primary_retry_sock = INVALID_SOCKET;
cur_bridge->outgoing_retain = true;
cur_bridge->fatal_sub_errors = true;
cur_bridge->clean_start_local = -1;
cur_bridge->reload_type = brt_lazy;
cur_bridge->max_topic_alias = 10;
Expand Down
2 changes: 2 additions & 0 deletions src/mosquitto_broker_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,7 @@ struct mosquitto__bridge{
bool attempt_unsubscribe;
bool initial_notification_done;
bool outgoing_retain;
bool fatal_sub_errors;
enum mosquitto_bridge_reload_type reload_type;
uint16_t max_topic_alias;
#ifdef WITH_TLS
Expand Down Expand Up @@ -801,6 +802,7 @@ int bridge__connect(struct mosquitto *context);
int bridge__connect_step3(struct mosquitto *context);
#endif
int bridge__on_connect(struct mosquitto *context);
int bridge__on_suback(struct mosquitto *context, int qos);
void bridge_check(void);
int bridge__register_local_connections(void);
int bridge__add_topic(struct mosquitto__bridge *bridge, const char *topic, enum mosquitto__bridge_direction direction, uint8_t qos, const char *local_prefix, const char *remote_prefix);
Expand Down
88 changes: 88 additions & 0 deletions test/broker/06-bridge-fatal-sub-errors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
#!/usr/bin/env python3

# Does a bridge with bridge_fatal_sub_errors enabled
# disconnect on subscription errors? Does it remain connected otherwise?

from mosq_test_helper import *

def write_config(filename, port1, port2, fatal_sub_errors):
with open(filename, 'w') as f:
f.write("listener %d\n" % (port2))
f.write("allow_anonymous true\n")
f.write("\n")
f.write("connection bridge_sample\n")
f.write("address 127.0.0.1:%d\n" % (port1))
f.write("topic in_topic in\n")
f.write("notifications false\n")
f.write("restart_timeout 5\n")
f.write("cleansession true\n")
f.write("bridge_fatal_sub_errors %s\n" % str(fatal_sub_errors).lower())

def is_connected(sock):
try:
sock.recv(1) # if still connected, the recv will timeout
return False
except TimeoutError as e:
return True

def do_test(fatal_sub_errors):
(port1, port2) = mosq_test.get_port(2)
conf_file = os.path.basename(__file__).replace('.py', '.conf')
write_config(conf_file, port1, port2, fatal_sub_errors)

rc = 1
client_id = socket.gethostname()+".bridge_sample"
connect_packet = mosq_test.gen_connect(client_id, proto_ver=132)
connack_packet = mosq_test.gen_connack()

mid = 1
subscribe_packet = mosq_test.gen_subscribe(mid, "in_topic", 0)
suback_packet = mosq_test.gen_suback(mid, 0x80)

ssock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
ssock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
ssock.settimeout(40)
ssock.bind(('', port1))
ssock.listen(1)

broker = None

try:
broker = mosq_test.start_broker(filename=os.path.basename(__file__), port=port2, use_conf=True)

(bridge, address) = ssock.accept()
bridge.settimeout(5)

mosq_test.expect_packet(bridge, "connect", connect_packet)
bridge.send(connack_packet)

mosq_test.expect_packet(bridge, "subscribe", subscribe_packet)
bridge.send(suback_packet)

time.sleep(0.25) # give the broker some time to react

# if (connected and not fatal) or (disconnected and fatal): success, else: failure
rc = 0 if is_connected(bridge) != fatal_sub_errors else 1
except mosq_test.TestError:
pass
finally:
os.remove(conf_file)
try:
bridge.close()
except NameError:
pass

broker.terminate()
if mosq_test.wait_for_subprocess(broker):
print("broker not terminated")
rc = 1
(stdo, stde) = broker.communicate()
ssock.close()
if rc:
print(stde.decode('utf-8'))
exit(rc)

do_test(True)
do_test(False)

exit(0)
1 change: 1 addition & 0 deletions test/broker/16-config-huge.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ def write_config(filename, ports, per_listener_settings, plugver, acl_file):
f.write("bridge_max_packet_size 10000\n")
f.write("bridge_max_topic_alias 1000\n")
f.write("bridge_outgoing_retain false\n")
f.write("bridge_fatal_sub_errors false\n")
f.write("bridge_protocol_version mqttv50\n")
#f.write("bridge_psk\n")
f.write("bridge_receive_maximum 100\n")
Expand Down
1 change: 1 addition & 0 deletions test/broker/16-config-parse-errors-without-tls.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
do_test_broker_failure(conf_file, ["bridge_max_packet_size 1000"], port, 3, "Error: The 'bridge_max_packet_size' option requires a bridge to be defined first.") # Missing bridge config
do_test_broker_failure(conf_file, ["bridge_max_topic_alias 1000"], port, 3, "Error: The 'bridge_max_topic_alias' option requires a bridge to be defined first.") # Missing bridge config
do_test_broker_failure(conf_file, ["bridge_outgoing_retain false"], port, 3, "Error: The 'bridge_outgoing_retain' option requires a bridge to be defined first.") # Missing bridge config
do_test_broker_failure(conf_file, ["bridge_fatal_sub_errors false"], 3, "Error: The 'bridge_fatal_sub_errors' option requires a bridge to be defined first.") # Missing bridge config
do_test_broker_failure(conf_file, ["bridge_protocol_version string"], port, 3, "Error: The 'bridge_protocol_version' option requires a bridge to be defined first.") # Missing bridge config
do_test_broker_failure(conf_file, ["bridge_receive_maximum 10"], port, 3, "Error: The 'bridge_receive_maximum' option requires a bridge to be defined first.") # Missing bridge config
do_test_broker_failure(conf_file, ["bridge_reload_type string"], port, 3, "Error: The 'bridge_reload_type' option requires a bridge to be defined first.") # Missing bridge config
Expand Down
1 change: 1 addition & 0 deletions test/broker/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@
(2, './06-bridge-reconnect-local-out.py'),
(2, './06-bridge-remote-shutdown.py'),
(2, './06-bridge-config-reload.py'),
(2, './06-bridge-fatal-sub-errors.py'),

(1, './07-will-control.py'),
(1, './07-will-delay-invalid-573191.py'),
Expand Down
Loading