mirror of
https://github.com/espressif/esp-mqtt.git
synced 2026-06-05 21:04:46 +00:00
Feature: Enable SUBSCRIBE to multiple topics
- Adds an api for multiple topics on SUBSCRIBE message. Apply 2 suggestion(s) to 1 file(s) Removing headers y
This commit is contained in:
@@ -126,7 +126,7 @@ mqtt_message_t *mqtt5_msg_connect(mqtt_connection_t *connection, mqtt_connect_in
|
||||
mqtt_message_t *mqtt5_msg_publish(mqtt_connection_t *connection, const char *topic, const char *data, int data_length, int qos, int retain, uint16_t *message_id, const esp_mqtt5_publish_property_config_t *property, const char *resp_info);
|
||||
esp_err_t mqtt5_msg_parse_connack_property(uint8_t *buffer, size_t buffer_len, mqtt_connect_info_t *connection_info, esp_mqtt5_connection_property_storage_t *connection_property, esp_mqtt5_connection_server_resp_property_t *resp_property, int *reason_code, uint8_t *ack_flag, mqtt5_user_property_handle_t *user_property);
|
||||
int mqtt5_msg_get_reason_code(uint8_t *buffer, size_t length);
|
||||
mqtt_message_t *mqtt5_msg_subscribe(mqtt_connection_t *connection, const char *topic, int qos, uint16_t *message_id, const esp_mqtt5_subscribe_property_config_t *property);
|
||||
mqtt_message_t *mqtt5_msg_subscribe(mqtt_connection_t *connection, const esp_mqtt_topic_t *topic, int size, uint16_t *message_id, const esp_mqtt5_subscribe_property_config_t *property);
|
||||
mqtt_message_t *mqtt5_msg_unsubscribe(mqtt_connection_t *connection, const char *topic, uint16_t *message_id, const esp_mqtt5_unsubscribe_property_config_t *property);
|
||||
mqtt_message_t *mqtt5_msg_disconnect(mqtt_connection_t *connection, esp_mqtt5_disconnect_property_config_t *disconnect_property_info);
|
||||
mqtt_message_t *mqtt5_msg_pubcomp(mqtt_connection_t *connection, uint16_t message_id);
|
||||
|
||||
@@ -138,7 +138,7 @@ mqtt_message_t *mqtt_msg_puback(mqtt_connection_t *connection, uint16_t message_
|
||||
mqtt_message_t *mqtt_msg_pubrec(mqtt_connection_t *connection, uint16_t message_id);
|
||||
mqtt_message_t *mqtt_msg_pubrel(mqtt_connection_t *connection, uint16_t message_id);
|
||||
mqtt_message_t *mqtt_msg_pubcomp(mqtt_connection_t *connection, uint16_t message_id);
|
||||
mqtt_message_t *mqtt_msg_subscribe(mqtt_connection_t *connection, const char *topic, int qos, uint16_t *message_id);
|
||||
mqtt_message_t *mqtt_msg_subscribe(mqtt_connection_t *connection, const esp_mqtt_topic_t topic_list[], int size, uint16_t *message_id) __attribute__((nonnull));
|
||||
mqtt_message_t *mqtt_msg_unsubscribe(mqtt_connection_t *connection, const char *topic, uint16_t *message_id);
|
||||
mqtt_message_t *mqtt_msg_pingreq(mqtt_connection_t *connection);
|
||||
mqtt_message_t *mqtt_msg_pingresp(mqtt_connection_t *connection);
|
||||
|
||||
+41
-38
@@ -1,5 +1,6 @@
|
||||
#include <string.h>
|
||||
#include "mqtt5_msg.h"
|
||||
#include "mqtt_client.h"
|
||||
#include "mqtt_config.h"
|
||||
#include "platform.h"
|
||||
#include "esp_log.h"
|
||||
@@ -764,7 +765,7 @@ mqtt_message_t *mqtt5_msg_publish(mqtt_connection_t *connection, const char *top
|
||||
}
|
||||
snprintf(response_topic, response_topic_size, "%s/%s", property->response_topic, resp_info);
|
||||
if (append_property(connection, MQTT5_PROPERTY_RESPONSE_TOPIC, 2, response_topic, response_topic_size) == -1) {
|
||||
ESP_LOGE(TAG,"%s(%d) fail",__FUNCTION__, __LINE__);
|
||||
ESP_LOGE(TAG, "%s(%d) fail", __FUNCTION__, __LINE__);
|
||||
free(response_topic);
|
||||
return fail_message(connection);
|
||||
}
|
||||
@@ -849,14 +850,10 @@ int mqtt5_msg_get_reason_code(uint8_t *buffer, size_t length)
|
||||
return -1;
|
||||
}
|
||||
|
||||
mqtt_message_t *mqtt5_msg_subscribe(mqtt_connection_t *connection, const char *topic, int qos, uint16_t *message_id, const esp_mqtt5_subscribe_property_config_t *property)
|
||||
mqtt_message_t *mqtt5_msg_subscribe(mqtt_connection_t *connection, const esp_mqtt_topic_t *topic_list, int size, uint16_t *message_id, const esp_mqtt5_subscribe_property_config_t *property)
|
||||
{
|
||||
init_message(connection);
|
||||
|
||||
if (topic == NULL || topic[0] == '\0') {
|
||||
return fail_message(connection);
|
||||
}
|
||||
|
||||
if ((*message_id = append_message_id(connection, 0)) == 0) {
|
||||
return fail_message(connection);
|
||||
}
|
||||
@@ -877,41 +874,47 @@ mqtt_message_t *mqtt5_msg_subscribe(mqtt_connection_t *connection, const char *t
|
||||
}
|
||||
}
|
||||
APPEND_CHECK(update_property_len_value(connection, connection->message.length - properties_offset - 1, properties_offset), fail_message(connection));
|
||||
if (property && property->is_share_subscribe) {
|
||||
uint16_t shared_topic_size = strlen(topic) + strlen(MQTT5_SHARED_SUB) + strlen(property->share_name);
|
||||
char *shared_topic = calloc(1, shared_topic_size);
|
||||
if (!shared_topic) {
|
||||
ESP_LOGE(TAG, "Failed to calloc %d memory", shared_topic_size);
|
||||
fail_message(connection);
|
||||
}
|
||||
snprintf(shared_topic, shared_topic_size, MQTT5_SHARED_SUB, property->share_name, topic);
|
||||
if (append_property(connection, 0, 2, shared_topic, strlen(shared_topic)) == -1) {
|
||||
ESP_LOGE(TAG,"%s(%d) fail",__FUNCTION__, __LINE__);
|
||||
free(shared_topic);
|
||||
|
||||
for (int topic_number = 0; topic_number < size; ++topic_number) {
|
||||
if (topic_list[topic_number].filter[0] == '\0') {
|
||||
return fail_message(connection);
|
||||
}
|
||||
free(shared_topic);
|
||||
} else {
|
||||
APPEND_CHECK(append_property(connection, 0, 2, topic, strlen(topic)), fail_message(connection));
|
||||
}
|
||||
if (property && property->is_share_subscribe) {
|
||||
uint16_t shared_topic_size = strlen(topic_list[topic_number].filter) + strlen(MQTT5_SHARED_SUB) + strlen(property->share_name);
|
||||
char *shared_topic = calloc(1, shared_topic_size);
|
||||
if (!shared_topic) {
|
||||
ESP_LOGE(TAG, "Failed to calloc %d memory", shared_topic_size);
|
||||
fail_message(connection);
|
||||
}
|
||||
snprintf(shared_topic, shared_topic_size, MQTT5_SHARED_SUB, property->share_name, topic_list[topic_number].filter);
|
||||
if (append_property(connection, 0, 2, shared_topic, strlen(shared_topic)) == -1) {
|
||||
ESP_LOGE(TAG, "%s(%d) fail", __FUNCTION__, __LINE__);
|
||||
free(shared_topic);
|
||||
return fail_message(connection);
|
||||
}
|
||||
free(shared_topic);
|
||||
} else {
|
||||
APPEND_CHECK(append_property(connection, 0, 2, topic_list[topic_number].filter, strlen(topic_list[topic_number].filter)), fail_message(connection));
|
||||
}
|
||||
|
||||
if (connection->message.length + 1 > connection->buffer_length) {
|
||||
return fail_message(connection);
|
||||
if (connection->message.length + 1 > connection->buffer_length) {
|
||||
return fail_message(connection);
|
||||
}
|
||||
connection->buffer[connection->message.length] = 0;
|
||||
if (property) {
|
||||
if (property->retain_handle > 0 && property->retain_handle < 3) {
|
||||
connection->buffer[connection->message.length] |= (property->retain_handle & 3) << 4;
|
||||
}
|
||||
if (property->no_local_flag) {
|
||||
connection->buffer[connection->message.length] |= (property->no_local_flag << 2);
|
||||
}
|
||||
if (property->retain_as_published_flag) {
|
||||
connection->buffer[connection->message.length] |= (property->retain_as_published_flag << 3);
|
||||
}
|
||||
}
|
||||
connection->buffer[connection->message.length] |= (topic_list[topic_number].qos & 3);
|
||||
connection->message.length ++;
|
||||
}
|
||||
connection->buffer[connection->message.length] = 0;
|
||||
if (property) {
|
||||
if (property->retain_handle > 0 && property->retain_handle < 3) {
|
||||
connection->buffer[connection->message.length] |= (property->retain_handle & 3) << 4;
|
||||
}
|
||||
if (property->no_local_flag) {
|
||||
connection->buffer[connection->message.length] |= (property->no_local_flag << 2);
|
||||
}
|
||||
if (property->retain_as_published_flag) {
|
||||
connection->buffer[connection->message.length] |= (property->retain_as_published_flag << 3);
|
||||
}
|
||||
}
|
||||
connection->buffer[connection->message.length] |= (qos & 3);
|
||||
connection->message.length ++;
|
||||
return fini_message(connection, MQTT_MSG_TYPE_SUBSCRIBE, 0, 1, 0);
|
||||
}
|
||||
|
||||
@@ -975,7 +978,7 @@ mqtt_message_t *mqtt5_msg_unsubscribe(mqtt_connection_t *connection, const char
|
||||
}
|
||||
snprintf(shared_topic, shared_topic_size, MQTT5_SHARED_SUB, property->share_name, topic);
|
||||
if (append_property(connection, 0, 2, shared_topic, strlen(shared_topic)) == -1) {
|
||||
ESP_LOGE(TAG,"%s(%d) fail",__FUNCTION__, __LINE__);
|
||||
ESP_LOGE(TAG, "%s(%d) fail", __FUNCTION__, __LINE__);
|
||||
free(shared_topic);
|
||||
return fail_message(connection);
|
||||
}
|
||||
|
||||
+15
-11
@@ -29,6 +29,7 @@
|
||||
*
|
||||
*/
|
||||
#include <string.h>
|
||||
#include "mqtt_client.h"
|
||||
#include "mqtt_msg.h"
|
||||
#include "mqtt_config.h"
|
||||
#include "platform.h"
|
||||
@@ -518,26 +519,29 @@ mqtt_message_t *mqtt_msg_pubcomp(mqtt_connection_t *connection, uint16_t message
|
||||
return fini_message(connection, MQTT_MSG_TYPE_PUBCOMP, 0, 0, 0);
|
||||
}
|
||||
|
||||
mqtt_message_t *mqtt_msg_subscribe(mqtt_connection_t *connection, const char *topic, int qos, uint16_t *message_id)
|
||||
mqtt_message_t *mqtt_msg_subscribe(mqtt_connection_t *connection, const esp_mqtt_topic_t topic_list[], int size, uint16_t *message_id)
|
||||
{
|
||||
init_message(connection);
|
||||
|
||||
if (topic == NULL || topic[0] == '\0') {
|
||||
return fail_message(connection);
|
||||
}
|
||||
|
||||
if ((*message_id = append_message_id(connection, 0)) == 0) {
|
||||
return fail_message(connection);
|
||||
}
|
||||
|
||||
if (append_string(connection, topic, strlen(topic)) < 0) {
|
||||
return fail_message(connection);
|
||||
}
|
||||
for (int topic_number = 0; topic_number < size; ++topic_number) {
|
||||
if (topic_list[topic_number].filter[0] == '\0') {
|
||||
return fail_message(connection);
|
||||
}
|
||||
|
||||
if (connection->message.length + 1 > connection->buffer_length) {
|
||||
return fail_message(connection);
|
||||
if (append_string(connection, topic_list[topic_number].filter, strlen(topic_list[topic_number].filter)) < 0) {
|
||||
return fail_message(connection);
|
||||
}
|
||||
|
||||
if (connection->message.length + 1 > connection->buffer_length) {
|
||||
return fail_message(connection);
|
||||
}
|
||||
connection->buffer[connection->message.length] = topic_list[topic_number].qos;
|
||||
connection->message.length ++;
|
||||
}
|
||||
connection->buffer[connection->message.length++] = qos;
|
||||
|
||||
return fini_message(connection, MQTT_MSG_TYPE_SUBSCRIBE, 0, 1, 0);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user