diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/KmsManager.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/KmsManager.java index b661b1db..d868fc2a 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/KmsManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/KmsManager.java @@ -23,10 +23,9 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; -import java.util.Timer; -import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; @@ -44,7 +43,9 @@ import com.google.gson.JsonObject; import io.openvidu.server.config.OpenviduConfig; import io.openvidu.server.core.IdentifierPrefixes; +import io.openvidu.server.kurento.core.KurentoSession; import io.openvidu.server.utils.MediaNodeStatusManager; +import io.openvidu.server.utils.UpdatableTimerTask; public abstract class KmsManager { @@ -55,8 +56,6 @@ public abstract class KmsManager { private Map kmsReconnectionLocks = new ConcurrentHashMap<>(); - private final int DISCONNECTED_INTERVAL_SECONDS = 12; - public class KmsLoad implements Comparable { private Kms kms; @@ -220,57 +219,72 @@ public abstract class KmsManager { // TODO: this is a fix for the lack of reconnected event kmsReconnectionLocks.putIfAbsent(kms.getId(), new ReentrantLock()); - Timer timer = new Timer(); - timer.schedule(new TimerTask() { - @Override - public void run() { - boolean lockAcquired = false; - try { - if (kmsReconnectionLocks.get(kms.getId()).tryLock(5, TimeUnit.SECONDS)) { - lockAcquired = true; + final UpdatableTimerTask[] TIMER = new UpdatableTimerTask[1]; + final AtomicInteger ITERATION = new AtomicInteger(0); - if (kms.isKurentoClientConnected()) { - // reconnected listener already executed - log.info( - "Timer of KMS with uri {} and KurentoClient [{}] cancelled (reconnected event received during interval wait)", - kms.getUri(), kms.getKurentoClient().toString()); - timer.cancel(); - return; - } + TIMER[0] = new UpdatableTimerTask(() -> { + boolean lockAcquired = false; + try { + if (kmsReconnectionLocks.get(kms.getId()).tryLock(5, TimeUnit.SECONDS)) { + lockAcquired = true; - if (kms.getKurentoClient().isClosed()) { - log.info( - "Timer of KMS with uri {} and KurentoClient [{}] has been closed. Cancelling Timer", - kms.getUri(), kms.getKurentoClient().toString()); - timer.cancel(); - return; - } - - kms.getKurentoClient().getServerManager().getInfo(); - log.info("According to Timer KMS with uri {} and KurentoClient [{}] is now reconnected", + if (kms.isKurentoClientConnected()) { + // reconnected listener already executed + log.info( + "Timer of KMS with uri {} and KurentoClient [{}] cancelled (reconnected event received during interval wait)", kms.getUri(), kms.getKurentoClient().toString()); - timer.cancel(); - kms.setKurentoClientConnected(true); - kms.setTimeOfKurentoClientConnection(System.currentTimeMillis()); - // Here we cannot differentiate between new or old KMS process - final long timeOfKurentoDisconnection = kms.getTimeOfKurentoClientDisconnection(); - kms.getKurentoSessions().forEach(kSession -> { - kSession.restartStatusInKurento(timeOfKurentoDisconnection); - }); - kms.setTimeOfKurentoClientDisconnection(0); + TIMER[0].cancelTimer(); + return; } - } catch (Exception e) { - log.error( - "According to Timer KMS with uri {} and KurentoClient [{}] is not reconnected yet. Exception {}", - kms.getUri(), kms.getKurentoClient().toString(), e.getClass().getName()); - } finally { - if (lockAcquired) { - kmsReconnectionLocks.get(kms.getId()).unlock(); + + if (kms.getKurentoClient().isClosed()) { + log.info( + "Timer of KMS with uri {} and KurentoClient [{}] has been closed. Cancelling Timer", + kms.getUri(), kms.getKurentoClient().toString()); + TIMER[0].cancelTimer(); + return; } + + kms.getKurentoClient().getServerManager().getInfo(); + log.info("According to Timer KMS with uri {} and KurentoClient [{}] is now reconnected", + kms.getUri(), kms.getKurentoClient().toString()); + TIMER[0].cancelTimer(); + kms.setKurentoClientConnected(true); + kms.setTimeOfKurentoClientConnection(System.currentTimeMillis()); + + final long timeOfKurentoDisconnection = kms.getTimeOfKurentoClientDisconnection(); + + if (kms.getKurentoSessions().isEmpty()) { + log.info("There were no sessions in the KMS with uri {}. Nothing must be done", + kms.getUri()); + } else { + if (isNewKms(kms)) { + log.warn("KMS with URI {} is a new KMS process. Resetting {} sessions: {}", + kms.getUri(), kms.getKurentoSessions().size(), + kms.getKurentoSessions().stream().map(s -> s.getSessionId()) + .collect(Collectors.joining(",", "[", "]"))); + kms.getKurentoSessions().forEach(kSession -> { + kSession.restartStatusInKurento(timeOfKurentoDisconnection); + }); + } else { + log.info("KMS with URI {} is the same process. Nothing must be done", kms.getUri()); + } + } + + kms.setTimeOfKurentoClientDisconnection(0); + } + } catch (Exception e) { + log.error( + "According to Timer KMS with uri {} and KurentoClient [{}] is not reconnected yet. Exception {}", + kms.getUri(), kms.getKurentoClient().toString(), e.getClass().getName()); + } finally { + if (lockAcquired) { + kmsReconnectionLocks.get(kms.getId()).unlock(); } } - }, DISCONNECTED_INTERVAL_SECONDS * 1000, DISCONNECTED_INTERVAL_SECONDS * 1000); + }, () -> new Long(dynamicReconnectLoopSeconds(ITERATION.getAndIncrement()) * 1000)); + TIMER[0].updateTimer(); } @Override @@ -293,6 +307,36 @@ public abstract class KmsManager { }; } + private boolean isNewKms(Kms kms) { + try { + KurentoSession kSession = kms.getKurentoSessions().iterator().next(); + kSession.getPipeline().getName(); + return false; + } catch (NoSuchElementException e) { + return false; + } catch (Exception e) { + return true; + } + } + + private int dynamicReconnectLoopSeconds(int iteration) { + // First 10 loops every second, next 20 loops ever 3s, the rest every 10s + final int[][] intervals = { new int[] { 1, 10 }, new int[] { 3, 20 }, new int[] { 10, Integer.MAX_VALUE } }; + + int accumulatedIntervals = 0; + for (int i = 0; i < intervals.length - 1; i++) { + if ((accumulatedIntervals + intervals[i][1]) > iteration) { + // Interval found for current iteration + return intervals[i][0]; + } else { + // This iteration has already been surpassed + accumulatedIntervals += intervals[i][1]; + } + } + // Return last interval + return intervals[intervals.length - 1][0]; + } + public abstract List initializeKurentoClients(List kmsProperties, boolean disconnectUponFailure) throws Exception; diff --git a/openvidu-server/src/main/java/io/openvidu/server/utils/UpdatableTimerTask.java b/openvidu-server/src/main/java/io/openvidu/server/utils/UpdatableTimerTask.java index 93e703b6..d94122e5 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/utils/UpdatableTimerTask.java +++ b/openvidu-server/src/main/java/io/openvidu/server/utils/UpdatableTimerTask.java @@ -54,6 +54,12 @@ public class UpdatableTimerTask extends TimerTask { } } + public final void cancelTimer() { + cancel(); + timer.cancel(); + timer.purge(); + } + @Override public void run() { // Protect the inner run method so if any exception is thrown, the following