diff --git a/src/peer_signaling.c b/src/peer_signaling.c index 342a579..30da7c0 100644 --- a/src/peer_signaling.c +++ b/src/peer_signaling.c @@ -65,6 +65,10 @@ typedef struct PeerSignaling { static PeerSignaling g_ps = {0}; +static bool g_is_mqtt_reconnected = false; +static MQTTStatus_t g_mqtt_last_status; +static uint64_t g_mqtt_error_state_timeout; + static int peer_signaling_resolve_token(const char* token, char* username, char* password) { char plaintext[TOKEN_MAX_LEN] = {0}; char* colon; @@ -500,6 +504,33 @@ static void peer_signaling_onicecandidate(char* description, void* userdata) { } } +static int peer_signaling_mqtt_connect_subscribe() { + g_mqtt_error_state_timeout = ports_get_epoch_time(); + + if (g_ps.port <= 0) { + LOGE("Invalid MQTT port: %d", g_ps.port); + return -1; + } + + if (g_is_mqtt_reconnected == true) { + peer_signaling_disconnect(); + g_is_mqtt_reconnected = false; + } + + Address resolved_addr; + if (ports_resolve_addr(g_ps.host, &resolved_addr) != 0) { + LOGE("Invalid host or network: %s", g_ps.host); + return -1; + } + + if (peer_signaling_mqtt_connect(g_ps.host, g_ps.port) == 0) { + peer_signaling_mqtt_subscribe(1); + g_is_mqtt_reconnected = true; + } + + return 0; +} + int peer_signaling_connect(const char* url, const char* token, PeerConnection* pc) { char* client_id; @@ -520,9 +551,8 @@ int peer_signaling_connect(const char* url, const char* token, PeerConnection* p snprintf(g_ps.client_id, sizeof(g_ps.client_id), "%s", client_id + 1); snprintf(g_ps.subtopic, sizeof(g_ps.subtopic), "%s/invoke", g_ps.path); snprintf(g_ps.pubtopic, sizeof(g_ps.pubtopic), "%s/result", g_ps.path); - if (peer_signaling_mqtt_connect(g_ps.host, g_ps.port) == 0) { - peer_signaling_mqtt_subscribe(1); - } + + peer_signaling_mqtt_connect_subscribe(); } break; case 1: { // HTTP peer_connection_create_offer(g_ps.pc); @@ -549,7 +579,23 @@ void peer_signaling_disconnect() { } int peer_signaling_loop() { - MQTT_ProcessLoop(&g_ps.mqtt_ctx); + MQTTStatus_t status = MQTT_ProcessLoop(&g_ps.mqtt_ctx); + bool is_different_status = g_mqtt_last_status != status; + if (is_different_status == true && g_mqtt_last_status == MQTTSuccess) { + g_mqtt_error_state_timeout = ports_get_epoch_time(); + } + + g_mqtt_last_status = status; + + if (status != MQTTSuccess && KEEPALIVE_CONNCHECK > 0) { + if ((ports_get_epoch_time() - g_mqtt_error_state_timeout) > KEEPALIVE_CONNCHECK) { + LOGE("MQTT connection status: %d. Reconnecting...", status); + peer_signaling_mqtt_connect_subscribe(); + + } else { + LOGE("MQTT error status: %d.", status); + } + } return 0; } #endif // DISABLE_PEER_SIGNALING diff --git a/src/stun.c b/src/stun.c index 0b17202..8602791 100644 --- a/src/stun.c +++ b/src/stun.c @@ -1,3 +1,4 @@ +#include #include #include #include