Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

libpeer's mqtt network recovery and reconnection #187

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 50 additions & 4 deletions src/peer_signaling.c
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ typedef struct PeerSignaling {

static PeerSignaling g_ps = {0};

static bool g_is_mqtt_reconnected = false;
static MQTTStatus_t g_mqtt_last_status;
static uint64_t g_mqtt_error_state_timeout;

static int peer_signaling_resolve_token(const char* token, char* username, char* password) {
char plaintext[TOKEN_MAX_LEN] = {0};
char* colon;
Expand Down Expand Up @@ -500,6 +504,33 @@ static void peer_signaling_onicecandidate(char* description, void* userdata) {
}
}

static int peer_signaling_mqtt_connect_subscribe() {
g_mqtt_error_state_timeout = ports_get_epoch_time();

if (g_ps.port <= 0) {
LOGE("Invalid MQTT port: %d", g_ps.port);
return -1;
}

if (g_is_mqtt_reconnected == true) {
peer_signaling_disconnect();
g_is_mqtt_reconnected = false;
}

Address resolved_addr;
if (ports_resolve_addr(g_ps.host, &resolved_addr) != 0) {
LOGE("Invalid host or network: %s", g_ps.host);
return -1;
}

if (peer_signaling_mqtt_connect(g_ps.host, g_ps.port) == 0) {
peer_signaling_mqtt_subscribe(1);
g_is_mqtt_reconnected = true;
}

return 0;
}

int peer_signaling_connect(const char* url, const char* token, PeerConnection* pc) {
char* client_id;

Expand All @@ -520,9 +551,8 @@ int peer_signaling_connect(const char* url, const char* token, PeerConnection* p
snprintf(g_ps.client_id, sizeof(g_ps.client_id), "%s", client_id + 1);
snprintf(g_ps.subtopic, sizeof(g_ps.subtopic), "%s/invoke", g_ps.path);
snprintf(g_ps.pubtopic, sizeof(g_ps.pubtopic), "%s/result", g_ps.path);
if (peer_signaling_mqtt_connect(g_ps.host, g_ps.port) == 0) {
peer_signaling_mqtt_subscribe(1);
}

peer_signaling_mqtt_connect_subscribe();
} break;
case 1: { // HTTP
peer_connection_create_offer(g_ps.pc);
Expand All @@ -549,7 +579,23 @@ void peer_signaling_disconnect() {
}

int peer_signaling_loop() {
MQTT_ProcessLoop(&g_ps.mqtt_ctx);
MQTTStatus_t status = MQTT_ProcessLoop(&g_ps.mqtt_ctx);
bool is_different_status = g_mqtt_last_status != status;
if (is_different_status == true && g_mqtt_last_status == MQTTSuccess) {
g_mqtt_error_state_timeout = ports_get_epoch_time();
}

g_mqtt_last_status = status;

if (status != MQTTSuccess && KEEPALIVE_CONNCHECK > 0) {
if ((ports_get_epoch_time() - g_mqtt_error_state_timeout) > KEEPALIVE_CONNCHECK) {
LOGE("MQTT connection status: %d. Reconnecting...", status);
peer_signaling_mqtt_connect_subscribe();

} else {
LOGE("MQTT error status: %d.", status);
}
}
return 0;
}
#endif // DISABLE_PEER_SIGNALING
1 change: 1 addition & 0 deletions src/stun.c
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#include <inttypes.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
Expand Down
Loading