diff --git a/openvidu-server/src/main/java/io/openvidu/server/config/OpenviduConfig.java b/openvidu-server/src/main/java/io/openvidu/server/config/OpenviduConfig.java index b251fc39..3f90bbf1 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/config/OpenviduConfig.java +++ b/openvidu-server/src/main/java/io/openvidu/server/config/OpenviduConfig.java @@ -38,7 +38,6 @@ import java.util.Map.Entry; import javax.annotation.PostConstruct; -import io.openvidu.java.client.IceServerProperties; import org.apache.commons.io.FilenameUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.validator.routines.DomainValidator; @@ -59,6 +58,7 @@ import com.google.gson.JsonElement; import com.google.gson.JsonObject; import com.google.gson.JsonSyntaxException; +import io.openvidu.java.client.IceServerProperties; import io.openvidu.java.client.OpenViduRole; import io.openvidu.java.client.VideoCodec; import io.openvidu.server.OpenViduServer; @@ -469,6 +469,10 @@ public class OpenviduConfig { return RequestMappings.FRONTEND_CE; } + public int getReconnectionTimeout() { + return 2000000000; + } + // Properties management methods public OpenviduConfig deriveWithAdditionalPropertiesSource(Map propertiesSource) { @@ -544,7 +548,7 @@ public class OpenviduConfig { postProcessConfigProps(); userConfigProps = new ArrayList<>(configProps.keySet()); userConfigProps.removeAll(getNonUserProperties()); - for (String notShowEmptyConfigKey: getNonPrintablePropertiesIfEmpty()) { + for (String notShowEmptyConfigKey : getNonPrintablePropertiesIfEmpty()) { String value = configProps.get(notShowEmptyConfigKey); if (value == null || value.isEmpty() || value.equals(new JsonArray().toString())) { userConfigProps.remove(notShowEmptyConfigKey); @@ -902,7 +906,6 @@ public class OpenviduConfig { addError(property, "Cannot be empty"); return false; } - if (value.equalsIgnoreCase("true") || value.equalsIgnoreCase("false")) { return Boolean.parseBoolean(value); } else { @@ -914,7 +917,6 @@ public class OpenviduConfig { protected Integer asNonNegativeInteger(String property) { try { Integer integerValue = Integer.parseInt(getValue(property)); - if (integerValue < 0) { addError(property, "Is not a non negative integer"); } @@ -925,6 +927,29 @@ public class OpenviduConfig { } } + protected Integer asOptionalNonNegativeInteger(String property, int min, int max) { + try { + String value = getValue(property); + if (value == null || value.isEmpty()) { + return null; + } + Integer integerValue = Integer.parseInt(getValue(property)); + if (integerValue < 0) { + addError(property, "Is not a non negative integer"); + } + if (integerValue < min) { + addError(property, "Must be >= " + min); + } + if (integerValue >= max) { + addError(property, "Must be < " + max); + } + return integerValue; + } catch (NumberFormatException e) { + addError(property, "Is not a non negative integer"); + return 0; + } + } + /* * This method checks all types of Internet addresses (IPv4, IPv6 and Domains) */ @@ -1191,7 +1216,7 @@ public class OpenviduConfig { private IceServerProperties.Builder readIceServer(String property, String iceServerString) { String url = null, username = null, credential = null, staticAuthSecret = null; String[] iceServerPropList = iceServerString.split(","); - for (String iceServerProp: iceServerPropList) { + for (String iceServerProp : iceServerPropList) { if (iceServerProp.startsWith("url=")) { url = StringUtils.substringAfter(iceServerProp, "url="); } else if (iceServerProp.startsWith("username=")) { diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/FixedOneKmsManager.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/FixedOneKmsManager.java index 8f624866..b7ab4414 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/FixedOneKmsManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/FixedOneKmsManager.java @@ -44,7 +44,7 @@ public class FixedOneKmsManager extends KmsManager { throws Exception { KmsProperties firstProps = kmsProperties.get(0); KurentoClient kClient = null; - Kms kms = new Kms(firstProps, loadManager, mediaNodeManager); + Kms kms = new Kms(firstProps, loadManager, this); try { JsonRpcWSConnectionListener listener = this.generateKurentoConnectionListener(kms.getId()); JsonRpcClientNettyWebSocket client = new JsonRpcClientNettyWebSocket(firstProps.getUri(), listener); @@ -90,15 +90,10 @@ public class FixedOneKmsManager extends KmsManager { } @Override - protected String removeMediaNodeUponCrash(String mediaNodeId) { + protected String removeMediaNodeUponCrash(String mediaNodeId, boolean followedByReconnection, String message) { return null; } - @Override - protected boolean infiniteRetry() { - return true; - } - @Override @PostConstruct protected void postConstructInitKurentoClients() { diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/Kms.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/Kms.java index 850e9d2e..738c2eb2 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/Kms.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/Kms.java @@ -39,7 +39,6 @@ import com.google.gson.JsonObject; import io.openvidu.java.client.RecordingProperties; import io.openvidu.server.core.MediaServer; import io.openvidu.server.kurento.core.KurentoSession; -import io.openvidu.server.utils.MediaNodeManager; import io.openvidu.server.utils.RecordingUtils; import io.openvidu.server.utils.UpdatableTimerTask; @@ -65,7 +64,7 @@ public class Kms { private MediaServer mediaServer; private UpdatableTimerTask clientReconnectTimer; private LoadManager loadManager; - private MediaNodeManager mediaNodeManager; + private KmsManager kmsManager; private boolean isFirstReconnectionAttempt = true; private AtomicBoolean isKurentoClientConnected = new AtomicBoolean(false); @@ -76,7 +75,7 @@ public class Kms { private Map activeRecordings = new ConcurrentHashMap<>(); private AtomicLong activeComposedRecordings = new AtomicLong(); - public Kms(KmsProperties props, LoadManager loadManager, MediaNodeManager mediaNodeManager) { + public Kms(KmsProperties props, LoadManager loadManager, KmsManager kmsManager) { this.id = props.getId(); this.uri = props.getUri(); @@ -90,7 +89,7 @@ public class Kms { this.ip = url.getHost(); this.loadManager = loadManager; - this.mediaNodeManager = mediaNodeManager; + this.kmsManager = kmsManager; } public KurentoClient getKurentoClient() { @@ -146,13 +145,13 @@ public class Kms { this.isKurentoClientConnected.set(isConnected); if (isConnected) { this.setTimeOfKurentoClientConnection(timestamp); - this.mediaNodeManager.mediaNodeUsageRegistration(this, timestamp); + kmsManager.getMediaNodeManager().mediaNodeUsageRegistration(this, timestamp, kmsManager.getKmss()); if (this.mediaServer == null) { this.fetchMediaServerType(); } } else { this.setTimeOfKurentoClientDisconnection(timestamp); - this.mediaNodeManager.mediaNodeUsageDeregistration(this, timestamp); + kmsManager.getMediaNodeManager().mediaNodeUsageDeregistration(this, timestamp); } } @@ -201,7 +200,7 @@ public class Kms { if (RecordingUtils.IS_COMPOSED(properties.outputMode())) { this.activeComposedRecordings.decrementAndGet(); } - this.mediaNodeManager.dropIdleMediaNode(this.id); + kmsManager.getMediaNodeManager().dropIdleMediaNode(this.id); } public JsonObject toJson() { diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/KmsManager.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/KmsManager.java index 1274cf57..273a20cd 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/KmsManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/KmsManager.java @@ -109,6 +109,11 @@ public abstract class KmsManager { protected SessionManager sessionManager; protected LoadManager loadManager; + // Media Node reconnection cycle: 6 attempts, 2 times per second (3s total) + final int MAX_RECONNECT_TIME_MILLIS = 3000; + final int INTERVAL_WAIT_MS = 500; + final int RECONNECTION_LOOPS = MAX_RECONNECT_TIME_MILLIS / INTERVAL_WAIT_MS; + public KmsManager(SessionManager sessionManager, LoadManager loadManager) { this.sessionManager = sessionManager; this.loadManager = loadManager; @@ -167,6 +172,10 @@ public abstract class KmsManager { return kmsLoads; } + public MediaNodeManager getMediaNodeManager() { + return this.mediaNodeManager; + } + protected JsonRpcWSConnectionListener generateKurentoConnectionListener(final String kmsId) { return new JsonRpcWSConnectionListener() { @@ -218,29 +227,28 @@ public abstract class KmsManager { kms.setKurentoClientConnected(false); - disconnectionHandler(kms); + disconnectionHandler(kms, 0); } - private void disconnectionHandler(Kms kms) { - // 6 attempts, 2 times per second (3 seconds total) - final int maxReconnectTimeMillis = 3000; - final int intervalWaitMs = 500; - final int loops = maxReconnectTimeMillis / intervalWaitMs; - final AtomicInteger iteration = new AtomicInteger(loops); + private void disconnectionHandler(Kms kms, int reconnectionSecondsConsumed) { + final AtomicInteger iteration = new AtomicInteger(RECONNECTION_LOOPS); final long initTime = System.currentTimeMillis(); + final int accumulatedTimeout = reconnectionSecondsConsumed + (MAX_RECONNECT_TIME_MILLIS / 1000); + final UpdatableTimerTask kurentoClientReconnectTimer = new UpdatableTimerTask(() -> { if (iteration.decrementAndGet() < 0) { kms.getKurentoClientReconnectTimer().cancelTimer(); + boolean mustRetryReconnection = accumulatedTimeout < openviduConfig.getReconnectionTimeout(); if (kms.isFirstReconnectionAttempt()) { log.error( "OpenVidu Server [{}] could not reconnect to Media Node {} with IP {} in {} seconds. Media Node crashed", kms.getKurentoClient().toString(), kms.getId(), kms.getIp(), - (intervalWaitMs * loops / 1000)); + (INTERVAL_WAIT_MS * RECONNECTION_LOOPS / 1000)); kms.setFirstReconnectionAttempt(false); @@ -251,8 +259,8 @@ public abstract class KmsManager { .map(entry -> entry.getKey()).collect(Collectors.toUnmodifiableList()); // 1. Remove Media Node from cluster - log.warn("Removing Media Node {} with IP {} after crash", kms.getId(), kms.getIp()); - String environmentId = removeMediaNodeUponCrash(kms.getId()); + String environmentId = removeMediaNodeUponCrash(kms.getId(), mustRetryReconnection, + "Removing Media Node " + kms.getId() + " after node crash"); // 2. Send nodeCrashed webhook event sessionEventsHandler.onMediaNodeCrashed(kms, environmentId, timeOfKurentoDisconnection, @@ -274,17 +282,27 @@ public abstract class KmsManager { log.error( "Retry error. OpenVidu Server [{}] could not connect to Media Node {} with IP {} in {} seconds", kms.getKurentoClient().toString(), kms.getId(), kms.getIp(), - (intervalWaitMs * loops / 1000)); + (INTERVAL_WAIT_MS * RECONNECTION_LOOPS / 1000)); } - if (infiniteRetry()) { - log.info("Retrying reconnection to Media Node {} with IP {}", kms.getId(), kms.getIp()); - disconnectionHandler(kms); + if (mustRetryReconnection) { + log.info( + "Retrying reconnection to Media Node {} with IP {}. {} seconds consumed of a maximum of {}", + kms.getId(), kms.getIp(), accumulatedTimeout, + openviduConfig.getReconnectionTimeout()); + disconnectionHandler(kms, accumulatedTimeout); + } else { + log.warn( + "Reconnection process to Media Node {} with IP {} aborted. {} seconds have been consumed and the upper limit is {} seconds", + kms.getId(), kms.getIp(), accumulatedTimeout, + openviduConfig.getReconnectionTimeout()); + removeMediaNodeUponCrash(kms.getId(), mustRetryReconnection, "Removing Media Node " + + kms.getId() + " with IP " + kms.getIp() + " after reconnection abort"); } } else { - if ((System.currentTimeMillis() - initTime) > maxReconnectTimeMillis) { + if ((System.currentTimeMillis() - initTime) > MAX_RECONNECT_TIME_MILLIS) { // KurentoClient connection timeout exceeds the limit. This prevents a // single reconnection attempt to exceed the total timeout limit iteration.set(0); @@ -329,7 +347,7 @@ public abstract class KmsManager { kms.setTimeOfKurentoClientDisconnection(0); } - }, () -> Long.valueOf(intervalWaitMs)); // Try 2 times per seconds + }, () -> Long.valueOf(INTERVAL_WAIT_MS)); // Try 2 times per second kms.setKurentoClientReconnectTimer(kurentoClientReconnectTimer); kurentoClientReconnectTimer.updateTimer(); @@ -359,13 +377,12 @@ public abstract class KmsManager { public abstract void decrementActiveRecordings(RecordingProperties recordingProperties, String recordingId, Session session); - protected abstract String removeMediaNodeUponCrash(String mediaNodeId); + protected abstract String removeMediaNodeUponCrash(String mediaNodeId, boolean followedByReconnection, + String message); @PostConstruct protected abstract void postConstructInitKurentoClients(); - protected abstract boolean infiniteRetry(); - public void closeAllKurentoClients() { log.info("Closing all KurentoClients"); this.kmss.values().forEach(kms -> { diff --git a/openvidu-server/src/main/java/io/openvidu/server/utils/MediaNodeManager.java b/openvidu-server/src/main/java/io/openvidu/server/utils/MediaNodeManager.java index 08035553..ca77dc6e 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/utils/MediaNodeManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/utils/MediaNodeManager.java @@ -1,10 +1,12 @@ package io.openvidu.server.utils; +import java.util.Collection; + import io.openvidu.server.kurento.kms.Kms; public interface MediaNodeManager { - public void mediaNodeUsageRegistration(Kms kms, long timeOfConnection); + public void mediaNodeUsageRegistration(Kms kms, long timeOfConnection, Collection existingKmss); public void mediaNodeUsageDeregistration(Kms kms, long timeOfDisconnection); diff --git a/openvidu-server/src/main/java/io/openvidu/server/utils/MediaNodeManagerDummy.java b/openvidu-server/src/main/java/io/openvidu/server/utils/MediaNodeManagerDummy.java index 230aa14e..62857c02 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/utils/MediaNodeManagerDummy.java +++ b/openvidu-server/src/main/java/io/openvidu/server/utils/MediaNodeManagerDummy.java @@ -1,11 +1,13 @@ package io.openvidu.server.utils; +import java.util.Collection; + import io.openvidu.server.kurento.kms.Kms; public class MediaNodeManagerDummy implements MediaNodeManager { @Override - public void mediaNodeUsageRegistration(Kms kms, long timeOfConnection) { + public void mediaNodeUsageRegistration(Kms kms, long timeOfConnection, Collection existingKmss) { } @Override