Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed outgoing client message state from persistence plugin API #3127

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lib/mosquitto_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ enum mosquitto_msg_direction {
};

enum mosquitto_msg_state {
mosq_ms_any = -1,
mosq_ms_invalid = 0,
mosq_ms_publish_qos0 = 1,
mosq_ms_publish_qos1 = 2,
Expand Down
20 changes: 16 additions & 4 deletions src/database.c
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ int db__message_delete_outgoing(struct mosquitto *context, uint16_t mid, enum mo
if(client_msg->data.mid == mid){
if(client_msg->data.qos != qos){
return MOSQ_ERR_PROTOCOL;
}else if(qos == 2 && client_msg->data.state != expect_state){
}else if(qos == 2 && client_msg->data.state != expect_state && expect_state != mosq_ms_any){
return MOSQ_ERR_PROTOCOL;
}
db__message_remove_inflight(context, &context->msgs_out, client_msg);
Expand All @@ -426,7 +426,7 @@ int db__message_delete_outgoing(struct mosquitto *context, uint16_t mid, enum mo
if(client_msg->data.mid == mid){
if(client_msg->data.qos != qos){
return MOSQ_ERR_PROTOCOL;
}else if(qos == 2 && client_msg->data.state != expect_state){
}else if(qos == 2 && client_msg->data.state != expect_state && expect_state != mosq_ms_any){
return MOSQ_ERR_PROTOCOL;
}
db__message_remove_queued(context, &context->msgs_out, client_msg);
Expand Down Expand Up @@ -704,11 +704,12 @@ int db__message_insert_outgoing(struct mosquitto *context, uint64_t cmsg_id, uin
return rc;
}

int db__message_update_outgoing(struct mosquitto *context, uint16_t mid, enum mosquitto_msg_state state, int qos, bool persist)
static inline int db__message_update_outgoing_state(struct mosquitto *context, struct mosquitto__client_msg *head,
uint16_t mid, enum mosquitto_msg_state state, int qos, bool persist)
{
struct mosquitto__client_msg *client_msg;

DL_FOREACH(context->msgs_out.inflight, client_msg){
DL_FOREACH(head, client_msg){
if(client_msg->data.mid == mid){
if(client_msg->data.qos != qos){
return MOSQ_ERR_PROTOCOL;
Expand All @@ -723,6 +724,17 @@ int db__message_update_outgoing(struct mosquitto *context, uint16_t mid, enum mo
return MOSQ_ERR_NOT_FOUND;
}

int db__message_update_outgoing(struct mosquitto *context, uint16_t mid, enum mosquitto_msg_state state, int qos, bool persist)
{
int rc;

rc = db__message_update_outgoing_state(context, context->msgs_out.inflight, mid, state, qos, persist);
if (!persist && rc == MOSQ_ERR_NOT_FOUND){
rc = db__message_update_outgoing_state(context, context->msgs_out.queued, mid, state, qos, persist);
}
return rc;
}


static void db__messages_delete_list(struct mosquitto__client_msg **head)
{
Expand Down
10 changes: 5 additions & 5 deletions src/plugin_public.c
Original file line number Diff line number Diff line change
Expand Up @@ -655,14 +655,14 @@ BROKER_EXPORT int mosquitto_persist_client_msg_delete(struct mosquitto_client_ms
return MOSQ_ERR_NOT_FOUND;
}


int rc = MOSQ_ERR_INVAL;
if(client_msg->direction == mosq_md_out){
return db__message_delete_outgoing(context, client_msg->mid, client_msg->state, client_msg->qos);
rc = db__message_delete_outgoing(context, client_msg->mid, mosq_ms_any, client_msg->qos);
}else if(client_msg->direction == mosq_md_in){
return db__message_remove_incoming(context, client_msg->mid);
}else{
return MOSQ_ERR_INVAL;
rc = db__message_remove_incoming(context, client_msg->mid);
}
return MOSQ_ERR_SUCCESS;
return rc;
}


Expand Down
Loading