From a91fc4f87035a3ef9182c1c1d80e4a72b7ffcc36 Mon Sep 17 00:00:00 2001 From: pabloFuente Date: Fri, 18 Mar 2022 14:06:43 +0100 Subject: [PATCH] openvidu-server: avoid Media Node removal ops if not 1st reconnection attempt --- .../io/openvidu/server/kurento/kms/Kms.java | 9 ++ .../server/kurento/kms/KmsManager.java | 106 ++++++++++-------- 2 files changed, 70 insertions(+), 45 deletions(-) diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/Kms.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/Kms.java index 9cee3f89..d67e5860 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/Kms.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/Kms.java @@ -67,6 +67,7 @@ public class Kms { private LoadManager loadManager; private QuarantineKiller quarantineKiller; + private boolean isFirstReconnectionAttempt = true; private AtomicBoolean isKurentoClientConnected = new AtomicBoolean(false); private AtomicLong timeOfKurentoClientConnection = new AtomicLong(0); private AtomicLong timeOfKurentoClientDisconnection = new AtomicLong(0); @@ -128,6 +129,14 @@ public class Kms { return true; // loadManager.allowMoreElements(this); } + public boolean isFirstReconnectionAttempt() { + return this.isFirstReconnectionAttempt; + } + + public void setFirstReconnectionAttempt(boolean isFirst) { + this.isFirstReconnectionAttempt = isFirst; + } + public boolean isKurentoClientConnected() { return this.isKurentoClientConnected.get(); } diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/KmsManager.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/KmsManager.java index 37000827..2c64d4ab 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/KmsManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/KmsManager.java @@ -184,28 +184,26 @@ public abstract class KmsManager { @Override public void disconnected() { + final Kms kms = kmss.get(kmsId); - // TODO: take a look at this -// if (kms.getTimeOfKurentoClientDisconnection() > 0) { -// log.warn("Event disconnected of KurentoClient {} is already being processed by other thread", -// kms.getKurentoClient().toString()); -// return; -// } + if (kms.getKurentoClient().isDestroyed()) { + log.info( + "Kurento Client \"disconnected\" event for KMS {} [{}]. Closed explicitly by openvidu-server. No reconnection process", + kms.getUri(), kms.getKurentoClient().toString()); + return; + } else { + log.info("Kurento Client \"disconnected\" event for KMS {} [{}]. Reconnecting", kms.getUri(), + kms.getKurentoClient().toString()); + } kms.setKurentoClientConnected(false); kms.setTimeOfKurentoClientDisconnection(System.currentTimeMillis()); - if (kms.getKurentoClient().isDestroyed()) { - log.info( - "Kurento Client \"disconnected\" event for KMS {} [{}]. Closed explicitly by openvidu-server", - kms.getUri(), kms.getKurentoClient().toString()); - return; - } else { - log.info("Kurento Client \"disconnected\" event for KMS {} [{}]. Waiting reconnection", - kms.getUri(), kms.getKurentoClient().toString()); - } + disconnectionHandler(kms); + } + private void disconnectionHandler(Kms kms) { // 6 attempts, 2 times per second (3 seconds total) final int maxReconnectTimeMillis = 3000; final int intervalWaitMs = 500; @@ -217,49 +215,60 @@ public abstract class KmsManager { final UpdatableTimerTask kurentoClientReconnectTimer = new UpdatableTimerTask(() -> { if (iteration.decrementAndGet() < 0) { - log.error( - "OpenVidu Server [{}] could not reconnect to Media Node {} with IP {} in {} seconds. Media Node crashed", - kms.getKurentoClient().toString(), kms.getId(), kms.getIp(), - (intervalWaitMs * loops / 1000)); kms.getKurentoClientReconnectTimer().cancelTimer(); - final long timeOfKurentoDisconnection = kms.getTimeOfKurentoClientDisconnection(); - final List affectedSessionIds = kms.getKurentoSessions().stream() - .map(session -> session.getSessionId()).collect(Collectors.toUnmodifiableList()); - final List affectedRecordingIds = kms.getActiveRecordings().stream() - .map(entry -> entry.getKey()).collect(Collectors.toUnmodifiableList()); + if (kms.isFirstReconnectionAttempt()) { - // 1. Remove Media Node from cluster - log.warn("Removing Media Node {} with IP {} after crash", kms.getId(), kms.getIp()); - String environmentId = removeMediaNodeUponCrash(kms.getId()); + log.error( + "OpenVidu Server [{}] could not reconnect to Media Node {} with IP {} in {} seconds. Media Node crashed", + kms.getKurentoClient().toString(), kms.getId(), kms.getIp(), + (intervalWaitMs * loops / 1000)); - // 2. Send nodeCrashed webhook event - sessionEventsHandler.onMediaNodeCrashed(kms, environmentId, timeOfKurentoDisconnection, - affectedSessionIds, affectedRecordingIds); + kms.setFirstReconnectionAttempt(false); - // 3. Close all sessions and recordings with reason "nodeCrashed" - log.warn("Closing {} sessions hosted by Media Node {} with IP {}: {}", - kms.getKurentoSessions().size(), kms.getId(), kms.getIp(), - kms.getKurentoSessions().stream().map(s -> s.getSessionId()) - .collect(Collectors.joining(",", "[", "]"))); - try { - // Flag the thread to skip remote operations to KMS - RemoteOperationUtils.setToSkipRemoteOperations(); - sessionManager.closeAllSessionsAndRecordingsOfKms(kms, EndReason.nodeCrashed); - } finally { - RemoteOperationUtils.revertToRunRemoteOperations(); + final long timeOfKurentoDisconnection = kms.getTimeOfKurentoClientDisconnection(); + final List affectedSessionIds = kms.getKurentoSessions().stream() + .map(session -> session.getSessionId()).collect(Collectors.toUnmodifiableList()); + final List affectedRecordingIds = kms.getActiveRecordings().stream() + .map(entry -> entry.getKey()).collect(Collectors.toUnmodifiableList()); + + // 1. Remove Media Node from cluster + log.warn("Removing Media Node {} with IP {} after crash", kms.getId(), kms.getIp()); + String environmentId = removeMediaNodeUponCrash(kms.getId()); + + // 2. Send nodeCrashed webhook event + sessionEventsHandler.onMediaNodeCrashed(kms, environmentId, timeOfKurentoDisconnection, + affectedSessionIds, affectedRecordingIds); + + // 3. Close all sessions and recordings with reason "nodeCrashed" + log.warn("Closing {} sessions hosted by Media Node {} with IP {}: {}", + kms.getKurentoSessions().size(), kms.getId(), kms.getIp(), + kms.getKurentoSessions().stream().map(s -> s.getSessionId()) + .collect(Collectors.joining(",", "[", "]"))); + try { + // Flag the thread to skip remote operations to KMS + RemoteOperationUtils.setToSkipRemoteOperations(); + sessionManager.closeAllSessionsAndRecordingsOfKms(kms, EndReason.nodeCrashed); + } finally { + RemoteOperationUtils.revertToRunRemoteOperations(); + } + } else { + log.error( + "Retry error. OpenVidu Server [{}] could not connect to Media Node {} with IP {} in {} seconds", + kms.getKurentoClient().toString(), kms.getId(), kms.getIp(), + (intervalWaitMs * loops / 1000)); } if (infiniteRetry()) { log.info("Retrying reconnection to Media Node {} with IP {}", kms.getId(), kms.getIp()); - disconnected(); + disconnectionHandler(kms); } } else { if ((System.currentTimeMillis() - initTime) > maxReconnectTimeMillis) { - // KurentoClient connection timeout exceeds the limit. This happens if not only - // media server process has crashed, but the instance itself is not reachable + // KurentoClient connection timeout exceeds the limit. This prevents a + // single reconnection attempt to exceed the total timeout limit iteration.set(0); return; } @@ -275,7 +284,10 @@ public abstract class KmsManager { log.info("According to Timer KMS with uri {} and KurentoClient [{}] is now reconnected", kms.getUri(), kms.getKurentoClient().toString()); + + kms.setFirstReconnectionAttempt(true); kms.getKurentoClientReconnectTimer().cancelTimer(); + kms.setKurentoClientConnected(true); kms.setTimeOfKurentoClientConnection(System.currentTimeMillis()); @@ -319,13 +331,17 @@ public abstract class KmsManager { final Kms kms = kmss.get(kmsId); log.info("Kurento Client \"connected\" event for KMS {} [{}]", kms.getUri(), kms.getKurentoClient().toString()); - // TODO: This should be done here, not after KurentoClient#create method returns + // TODO: This should be done here, not after KurentoClient#create method + // returns, but it seems that this event is never triggered // kms.setKurentoClientConnected(true); // kms.setTimeOfKurentoClientConnection(System.currentTimeMillis()); } @Override public void reconnecting() { + final Kms kms = kmss.get(kmsId); + log.info("Kurento Client \"reconnecting\" event for KMS {} [{}]", kms.getUri(), + kms.getKurentoClient().toString()); } };