nxd_mqtt_client_secure_connect
nxd_mqtt_client_receive_notify_set
static ssp_err_t NX_MQTT_Subscribe(mqtt_client_ctrl_t * p_ctrl, char const * p_name, mqtt_client_qos_t qos) { UINT status; status = nxd_mqtt_client_subscribe(p_ctrl->p_secure_client, (CHAR *)p_name, strlen(p_name), qos); if(status) { NX_MQTT_log_msg(p_ctrl, "Could not subscribe to MQTT topic %s (0x%02x)\r\n", p_name, status); return SSP_ERR_ABORTED; } status = nxd_mqtt_client_receive_notify_set(p_ctrl->p_secure_client, NX_MQTT_notify_callback); if(status) { nxd_mqtt_client_unsubscribe(p_ctrl->p_secure_client, (CHAR *)p_name, strlen(p_name)); NX_MQTT_log_msg(p_ctrl, "Could not set notify function (0x%02x)\r\n", status); return SSP_ERR_ABORTED; } return SSP_SUCCESS; } static void NX_MQTT_notify_callback(NXD_MQTT_CLIENT * p_client, UINT number_of_messages) { mqtt_client_ctrl_t *p_ctrl; SSP_PARAMETER_NOT_USED (p_client); p_ctrl = g_mqtt_client.p_ctrl; p_ctrl->num_messages += number_of_messages; tx_semaphore_put(&g_mqtt_notify_sem); } void mqtt_callback_thread_entry(void) { UINT tstatus; mqtt_client_ctrl_t *p_ctrl; mqtt_client_instance_t *p_mqtt_client; iot_service_instance_t *p_iots; /* Get the IoT cloud provider via the message queue */ tx_queue_receive(&g_msg_queue, &p_iots, TX_WAIT_FOREVER); while (1) { /* This thread will be woken up by the MQTT notify function when * a message is received on the subscribed topic. */ tstatus = tx_semaphore_get(&g_mqtt_notify_sem, TX_WAIT_FOREVER); if (tstatus != TX_SUCCESS) continue; /* Get the selected IoT service and MQTT client and call the notify callback */ p_mqtt_client = p_iots->p_ctrl->p_mqtt_client; p_ctrl = p_mqtt_client->p_ctrl; while (p_ctrl->num_messages > 0) { p_mqtt_client->p_api->messageGet(p_ctrl); if (p_ctrl->notify) p_ctrl->notify(p_iots->p_ctrl); p_ctrl->num_messages--; } } } static void aws_callback(void *p_context) { iot_service_ctrl_t *p_ctrl = (iot_service_ctrl_t *)p_context; tx_semaphore_put(&p_ctrl->prop_sem); } static ssp_err_t AWS_Open(iot_service_ctrl_t * p_ctrl, iot_service_cfg_t const * const p_cfg) { ssp_err_t status; mqtt_client_api_t *p_api; mqtt_client_ctrl_t *p_mqtt_ctrl; p_ctrl->p_mqtt_client = p_cfg->p_mqtt_client; p_api = p_ctrl->p_mqtt_client->p_api; p_mqtt_ctrl = p_ctrl->p_mqtt_client->p_ctrl; p_mqtt_ctrl->p_iot_service_ctrl = (void *)p_ctrl; p_mqtt_ctrl->notify = aws_callback; p_ctrl->prop_sem = g_aws_semaphore; p_ctrl->prop_mutex = g_aws_mutex; } static ssp_err_t AWS_PropertyGet(iot_service_ctrl_t const * p_ctrl, char const * p_name[], char *p_value[], uint32_t valLen, uint32_t num) { ssp_err_t status; UINT tstatus; mqtt_client_api_t *p_api; mqtt_client_ctrl_t *p_mqtt_ctrl; UCHAR *msg, *buf, *q; unsigned int i; p_api = p_ctrl->p_mqtt_client->p_api; p_mqtt_ctrl = p_ctrl->p_mqtt_client->p_ctrl; msg = p_mqtt_ctrl->msg; status = tx_mutex_get((TX_MUTEX *)&p_ctrl->prop_mutex, TX_WAIT_FOREVER); //snprintf(p_mqtt_ctrl->msg, sizeof(p_mqtt_ctrl->msg), ""); status = p_api->publish(p_mqtt_ctrl, aws_topics[AWS_IOT_TOPIC_GET], NULL, 0, MQTT_QOS_0); if (status != SSP_SUCCESS) { tx_mutex_put((TX_MUTEX *)&p_ctrl->prop_mutex); return status; } tstatus = tx_semaphore_get((TX_SEMAPHORE *)&p_ctrl->prop_sem, 500); if (tstatus != TX_SUCCESS) { tx_mutex_put((TX_MUTEX *)&p_ctrl->prop_mutex); return SSP_ERR_ABORTED; } /* Check if get was accepted */ status = SSP_ERR_INTERNAL; if (strcmp((char *)p_mqtt_ctrl->topic, aws_topics[AWS_IOT_TOPIC_GET_ACCEPTED]) == 0) { /* Get values */ msg = p_mqtt_ctrl->msg; for (i = 0; i < num; i++) { memset(p_value[i], 0, valLen); buf = (UCHAR *)strstr((const char *)msg, "desired"); if (buf == NULL) break; buf += strlen("desired"); buf = (UCHAR *)strstr((const char *)buf, p_name[i]); if (buf == NULL) continue; buf += strlen(p_name[i]); buf = (UCHAR *)strstr((const char *)buf, ":"); if (buf == NULL) continue; buf += 1; buf = (UCHAR *)strstr((const char *)buf, "\""); buf += 1; q = (UCHAR *)strstr((const char *)buf, "\""); if (q == NULL) continue; strncpy(p_value[i], (const char *)buf, MIN(valLen, (size_t)(q - buf))); status = SSP_SUCCESS; } } tx_mutex_put((TX_MUTEX *)&p_ctrl->prop_mutex); return status; }