From eb87f3c9a28a433ad122e33e3b97bfecf95ed996 Mon Sep 17 00:00:00 2001 From: pabloFuente Date: Thu, 2 Apr 2020 13:19:05 +0200 Subject: [PATCH] openvidu-server: fix for the lack of reconnected KurentoClientListener event --- .../server/kurento/kms/KmsManager.java | 126 +++++++++++++++--- 1 file changed, 104 insertions(+), 22 deletions(-) diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/KmsManager.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/KmsManager.java index 97593a21..d9dfbc38 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/KmsManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/KmsManager.java @@ -23,7 +23,10 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; +import java.util.Timer; +import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; @@ -43,8 +46,14 @@ import io.openvidu.server.utils.MediaNodeStatusManager; public abstract class KmsManager { + protected static final Logger log = LoggerFactory.getLogger(KmsManager.class); + public static final Lock selectAndRemoveKmsLock = new ReentrantLock(true); + private Map kmsReconnectionLocks = new ConcurrentHashMap<>(); + + private final int DISCONNECTED_INTERVAL_SECONDS = 12; + public class KmsLoad implements Comparable { private Kms kms; @@ -81,8 +90,6 @@ public abstract class KmsManager { } } - protected static final Logger log = LoggerFactory.getLogger(KmsManager.class); - @Autowired protected OpenviduConfig openviduConfig; @@ -146,48 +153,123 @@ public abstract class KmsManager { @Override public void reconnected(boolean sameServer) { + final Kms kms = kmss.get(kmsId); - kms.setKurentoClientConnected(true); - kms.setTimeOfKurentoClientConnection(System.currentTimeMillis()); - if (!sameServer) { - // Different KMS. Reset sessions status (no Publisher or SUbscriber endpoints) - log.warn("Kurento Client reconnected to a different KMS instance, with uri {}", kms.getUri()); - log.warn("Updating all webrtc endpoints for active sessions"); - final long timeOfKurentoDisconnection = kms.getTimeOfKurentoClientDisconnection(); - kms.getKurentoSessions().forEach(kSession -> { - kSession.restartStatusInKurento(timeOfKurentoDisconnection); - }); - } else { - // Same KMS. We may infer that openvidu-server/KMS connection has been lost, but - // not the clients/KMS connections - log.warn("Kurento Client reconnected to same KMS {} with uri {}", kmsId, kms.getUri()); + + log.info("Kurento Client \"reconnected\" event for KMS {} (sameServer: {}) [{}]", kms.getUri(), + sameServer, kms.getKurentoClient().toString()); + + kmsReconnectionLocks.putIfAbsent(kms.getId(), new ReentrantLock()); + boolean lockAcquired = false; + try { + if (kmsReconnectionLocks.get(kms.getId()).tryLock(5, TimeUnit.SECONDS)) { + lockAcquired = true; + if (kms.isKurentoClientConnected()) { + // Timer task of disconnected event successfully executed + log.warn( + "Timer task already executed for reconnected Kurento Client [{}] to KMS with uri {}. Skipping event listener execution", + kms.getKurentoClient().toString(), kms.getUri()); + return; + } + kms.setKurentoClientConnected(true); + kms.setTimeOfKurentoClientConnection(System.currentTimeMillis()); + if (!sameServer) { + // Different KMS. Reset sessions status (no Publisher or SUbscriber endpoints) + log.warn("Kurento Client reconnected to a different KMS instance, with uri {}", + kms.getUri()); + log.warn("Updating all webrtc endpoints for active sessions"); + final long timeOfKurentoDisconnection = kms.getTimeOfKurentoClientDisconnection(); + kms.getKurentoSessions().forEach(kSession -> { + kSession.restartStatusInKurento(timeOfKurentoDisconnection); + }); + } else { + // Same KMS. We may infer that openvidu-server/KMS connection has been lost, but + // not the clients/KMS connections + log.warn("Kurento Client reconnected to same KMS {} with uri {}", kmsId, kms.getUri()); + } + kms.setTimeOfKurentoClientDisconnection(0); + } + } catch (InterruptedException e) { + log.error("InterruptedException when waiting for lock on reconnected event of KMS with uri {}", + kms.getUri()); + } finally { + if (lockAcquired) { + kmsReconnectionLocks.get(kms.getId()).unlock(); + } } - kms.setTimeOfKurentoClientDisconnection(0); } @Override public void disconnected() { final Kms kms = kmss.get(kmsId); + + log.info("Kurento Client \"disconnected\" event for KMS {} [{}]", kms.getUri(), + kms.getKurentoClient().toString()); + kms.setKurentoClientConnected(false); kms.setTimeOfKurentoClientDisconnection(System.currentTimeMillis()); - log.warn("Kurento Client disconnected from KMS {} with uri {}", kmsId, kms.getUri()); + + // TODO: this is a fix for the lack of reconnected event + kmsReconnectionLocks.putIfAbsent(kms.getId(), new ReentrantLock()); + Timer timer = new Timer(); + timer.schedule(new TimerTask() { + @Override + public void run() { + boolean lockAcquired = false; + try { + if (kmsReconnectionLocks.get(kms.getId()).tryLock(5, TimeUnit.SECONDS)) { + lockAcquired = true; + if (kms.isKurentoClientConnected()) { + // reconnected listener already executed + log.warn( + "Timer of KMS with uri {} and KurentoClient [{}] cancelled (reconnected event received during interval wait)", + kms.getUri(), kms.getKurentoClient().toString()); + timer.cancel(); + return; + } + kms.getKurentoClient().getServerManager().getInfo(); + log.warn("According to Timer KMS with uri {} and KurentoClient [{}] is now reconnected", + kms.getUri(), kms.getKurentoClient().toString()); + timer.cancel(); + kms.setKurentoClientConnected(true); + kms.setTimeOfKurentoClientConnection(System.currentTimeMillis()); + // Here we cannot differentiate between new or old KMS process + final long timeOfKurentoDisconnection = kms.getTimeOfKurentoClientDisconnection(); + kms.getKurentoSessions().forEach(kSession -> { + kSession.restartStatusInKurento(timeOfKurentoDisconnection); + }); + kms.setTimeOfKurentoClientDisconnection(0); + } + } catch (Exception e) { + log.error( + "According to Timer KMS with uri {} and KurentoClient [{}] is not reconnected yet. Exception {}", + kms.getUri(), kms.getKurentoClient().toString(), e.getClass().getName()); + } finally { + if (lockAcquired) { + kmsReconnectionLocks.get(kms.getId()).unlock(); + } + } + } + }, DISCONNECTED_INTERVAL_SECONDS * 1000, DISCONNECTED_INTERVAL_SECONDS * 1000); + } @Override public void connectionFailed() { final Kms kms = kmss.get(kmsId); + log.error("Kurento Client \"connectionFailed\" event for KMS {} [{}]", kms.getUri(), + kms.getKurentoClient().toString()); kms.setKurentoClientConnected(false); - log.warn("Kurento Client failed connecting to KMS {} with uri {}", kmsId, kms.getUri()); } @Override public void connected() { final Kms kms = kmss.get(kmsId); - // TODO: This should be done here instead of after KurentoClient.create method - // returns + log.info("Kurento Client \"connected\" event for KMS {} [{}]", kms.getUri(), + kms.getKurentoClient().toString()); + // TODO: This should be done here, not after KurentoClient#create method returns // kms.setKurentoClientConnected(true); // kms.setTimeOfKurentoClientConnection(System.currentTimeMillis()); - log.info("Kurento Client is now connected to KMS {} with uri {}", kmsId, kms.getUri()); } }; }