openvidu-server: prepare parameterized Media Node reconnection timeout

pull/707/head
pabloFuente 2022-03-22 10:59:47 +01:00
parent fbf04bc6a2
commit 594d9f92f9
6 changed files with 80 additions and 40 deletions

View File

@ -38,7 +38,6 @@ import java.util.Map.Entry;
import javax.annotation.PostConstruct;
import io.openvidu.java.client.IceServerProperties;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.validator.routines.DomainValidator;
@ -59,6 +58,7 @@ import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonSyntaxException;
import io.openvidu.java.client.IceServerProperties;
import io.openvidu.java.client.OpenViduRole;
import io.openvidu.java.client.VideoCodec;
import io.openvidu.server.OpenViduServer;
@ -469,6 +469,10 @@ public class OpenviduConfig {
return RequestMappings.FRONTEND_CE;
}
public int getReconnectionTimeout() {
return 2000000000;
}
// Properties management methods
public OpenviduConfig deriveWithAdditionalPropertiesSource(Map<String, ?> propertiesSource) {
@ -544,7 +548,7 @@ public class OpenviduConfig {
postProcessConfigProps();
userConfigProps = new ArrayList<>(configProps.keySet());
userConfigProps.removeAll(getNonUserProperties());
for (String notShowEmptyConfigKey: getNonPrintablePropertiesIfEmpty()) {
for (String notShowEmptyConfigKey : getNonPrintablePropertiesIfEmpty()) {
String value = configProps.get(notShowEmptyConfigKey);
if (value == null || value.isEmpty() || value.equals(new JsonArray().toString())) {
userConfigProps.remove(notShowEmptyConfigKey);
@ -902,7 +906,6 @@ public class OpenviduConfig {
addError(property, "Cannot be empty");
return false;
}
if (value.equalsIgnoreCase("true") || value.equalsIgnoreCase("false")) {
return Boolean.parseBoolean(value);
} else {
@ -914,7 +917,6 @@ public class OpenviduConfig {
protected Integer asNonNegativeInteger(String property) {
try {
Integer integerValue = Integer.parseInt(getValue(property));
if (integerValue < 0) {
addError(property, "Is not a non negative integer");
}
@ -925,6 +927,29 @@ public class OpenviduConfig {
}
}
protected Integer asOptionalNonNegativeInteger(String property, int min, int max) {
try {
String value = getValue(property);
if (value == null || value.isEmpty()) {
return null;
}
Integer integerValue = Integer.parseInt(getValue(property));
if (integerValue < 0) {
addError(property, "Is not a non negative integer");
}
if (integerValue < min) {
addError(property, "Must be >= " + min);
}
if (integerValue >= max) {
addError(property, "Must be < " + max);
}
return integerValue;
} catch (NumberFormatException e) {
addError(property, "Is not a non negative integer");
return 0;
}
}
/*
* This method checks all types of Internet addresses (IPv4, IPv6 and Domains)
*/
@ -1191,7 +1216,7 @@ public class OpenviduConfig {
private IceServerProperties.Builder readIceServer(String property, String iceServerString) {
String url = null, username = null, credential = null, staticAuthSecret = null;
String[] iceServerPropList = iceServerString.split(",");
for (String iceServerProp: iceServerPropList) {
for (String iceServerProp : iceServerPropList) {
if (iceServerProp.startsWith("url=")) {
url = StringUtils.substringAfter(iceServerProp, "url=");
} else if (iceServerProp.startsWith("username=")) {

View File

@ -44,7 +44,7 @@ public class FixedOneKmsManager extends KmsManager {
throws Exception {
KmsProperties firstProps = kmsProperties.get(0);
KurentoClient kClient = null;
Kms kms = new Kms(firstProps, loadManager, mediaNodeManager);
Kms kms = new Kms(firstProps, loadManager, this);
try {
JsonRpcWSConnectionListener listener = this.generateKurentoConnectionListener(kms.getId());
JsonRpcClientNettyWebSocket client = new JsonRpcClientNettyWebSocket(firstProps.getUri(), listener);
@ -90,15 +90,10 @@ public class FixedOneKmsManager extends KmsManager {
}
@Override
protected String removeMediaNodeUponCrash(String mediaNodeId) {
protected String removeMediaNodeUponCrash(String mediaNodeId, boolean followedByReconnection, String message) {
return null;
}
@Override
protected boolean infiniteRetry() {
return true;
}
@Override
@PostConstruct
protected void postConstructInitKurentoClients() {

View File

@ -39,7 +39,6 @@ import com.google.gson.JsonObject;
import io.openvidu.java.client.RecordingProperties;
import io.openvidu.server.core.MediaServer;
import io.openvidu.server.kurento.core.KurentoSession;
import io.openvidu.server.utils.MediaNodeManager;
import io.openvidu.server.utils.RecordingUtils;
import io.openvidu.server.utils.UpdatableTimerTask;
@ -65,7 +64,7 @@ public class Kms {
private MediaServer mediaServer;
private UpdatableTimerTask clientReconnectTimer;
private LoadManager loadManager;
private MediaNodeManager mediaNodeManager;
private KmsManager kmsManager;
private boolean isFirstReconnectionAttempt = true;
private AtomicBoolean isKurentoClientConnected = new AtomicBoolean(false);
@ -76,7 +75,7 @@ public class Kms {
private Map<String, String> activeRecordings = new ConcurrentHashMap<>();
private AtomicLong activeComposedRecordings = new AtomicLong();
public Kms(KmsProperties props, LoadManager loadManager, MediaNodeManager mediaNodeManager) {
public Kms(KmsProperties props, LoadManager loadManager, KmsManager kmsManager) {
this.id = props.getId();
this.uri = props.getUri();
@ -90,7 +89,7 @@ public class Kms {
this.ip = url.getHost();
this.loadManager = loadManager;
this.mediaNodeManager = mediaNodeManager;
this.kmsManager = kmsManager;
}
public KurentoClient getKurentoClient() {
@ -146,13 +145,13 @@ public class Kms {
this.isKurentoClientConnected.set(isConnected);
if (isConnected) {
this.setTimeOfKurentoClientConnection(timestamp);
this.mediaNodeManager.mediaNodeUsageRegistration(this, timestamp);
kmsManager.getMediaNodeManager().mediaNodeUsageRegistration(this, timestamp, kmsManager.getKmss());
if (this.mediaServer == null) {
this.fetchMediaServerType();
}
} else {
this.setTimeOfKurentoClientDisconnection(timestamp);
this.mediaNodeManager.mediaNodeUsageDeregistration(this, timestamp);
kmsManager.getMediaNodeManager().mediaNodeUsageDeregistration(this, timestamp);
}
}
@ -201,7 +200,7 @@ public class Kms {
if (RecordingUtils.IS_COMPOSED(properties.outputMode())) {
this.activeComposedRecordings.decrementAndGet();
}
this.mediaNodeManager.dropIdleMediaNode(this.id);
kmsManager.getMediaNodeManager().dropIdleMediaNode(this.id);
}
public JsonObject toJson() {

View File

@ -109,6 +109,11 @@ public abstract class KmsManager {
protected SessionManager sessionManager;
protected LoadManager loadManager;
// Media Node reconnection cycle: 6 attempts, 2 times per second (3s total)
final int MAX_RECONNECT_TIME_MILLIS = 3000;
final int INTERVAL_WAIT_MS = 500;
final int RECONNECTION_LOOPS = MAX_RECONNECT_TIME_MILLIS / INTERVAL_WAIT_MS;
public KmsManager(SessionManager sessionManager, LoadManager loadManager) {
this.sessionManager = sessionManager;
this.loadManager = loadManager;
@ -167,6 +172,10 @@ public abstract class KmsManager {
return kmsLoads;
}
public MediaNodeManager getMediaNodeManager() {
return this.mediaNodeManager;
}
protected JsonRpcWSConnectionListener generateKurentoConnectionListener(final String kmsId) {
return new JsonRpcWSConnectionListener() {
@ -218,29 +227,28 @@ public abstract class KmsManager {
kms.setKurentoClientConnected(false);
disconnectionHandler(kms);
disconnectionHandler(kms, 0);
}
private void disconnectionHandler(Kms kms) {
// 6 attempts, 2 times per second (3 seconds total)
final int maxReconnectTimeMillis = 3000;
final int intervalWaitMs = 500;
final int loops = maxReconnectTimeMillis / intervalWaitMs;
final AtomicInteger iteration = new AtomicInteger(loops);
private void disconnectionHandler(Kms kms, int reconnectionSecondsConsumed) {
final AtomicInteger iteration = new AtomicInteger(RECONNECTION_LOOPS);
final long initTime = System.currentTimeMillis();
final int accumulatedTimeout = reconnectionSecondsConsumed + (MAX_RECONNECT_TIME_MILLIS / 1000);
final UpdatableTimerTask kurentoClientReconnectTimer = new UpdatableTimerTask(() -> {
if (iteration.decrementAndGet() < 0) {
kms.getKurentoClientReconnectTimer().cancelTimer();
boolean mustRetryReconnection = accumulatedTimeout < openviduConfig.getReconnectionTimeout();
if (kms.isFirstReconnectionAttempt()) {
log.error(
"OpenVidu Server [{}] could not reconnect to Media Node {} with IP {} in {} seconds. Media Node crashed",
kms.getKurentoClient().toString(), kms.getId(), kms.getIp(),
(intervalWaitMs * loops / 1000));
(INTERVAL_WAIT_MS * RECONNECTION_LOOPS / 1000));
kms.setFirstReconnectionAttempt(false);
@ -251,8 +259,8 @@ public abstract class KmsManager {
.map(entry -> entry.getKey()).collect(Collectors.toUnmodifiableList());
// 1. Remove Media Node from cluster
log.warn("Removing Media Node {} with IP {} after crash", kms.getId(), kms.getIp());
String environmentId = removeMediaNodeUponCrash(kms.getId());
String environmentId = removeMediaNodeUponCrash(kms.getId(), mustRetryReconnection,
"Removing Media Node " + kms.getId() + " after node crash");
// 2. Send nodeCrashed webhook event
sessionEventsHandler.onMediaNodeCrashed(kms, environmentId, timeOfKurentoDisconnection,
@ -274,17 +282,27 @@ public abstract class KmsManager {
log.error(
"Retry error. OpenVidu Server [{}] could not connect to Media Node {} with IP {} in {} seconds",
kms.getKurentoClient().toString(), kms.getId(), kms.getIp(),
(intervalWaitMs * loops / 1000));
(INTERVAL_WAIT_MS * RECONNECTION_LOOPS / 1000));
}
if (infiniteRetry()) {
log.info("Retrying reconnection to Media Node {} with IP {}", kms.getId(), kms.getIp());
disconnectionHandler(kms);
if (mustRetryReconnection) {
log.info(
"Retrying reconnection to Media Node {} with IP {}. {} seconds consumed of a maximum of {}",
kms.getId(), kms.getIp(), accumulatedTimeout,
openviduConfig.getReconnectionTimeout());
disconnectionHandler(kms, accumulatedTimeout);
} else {
log.warn(
"Reconnection process to Media Node {} with IP {} aborted. {} seconds have been consumed and the upper limit is {} seconds",
kms.getId(), kms.getIp(), accumulatedTimeout,
openviduConfig.getReconnectionTimeout());
removeMediaNodeUponCrash(kms.getId(), mustRetryReconnection, "Removing Media Node "
+ kms.getId() + " with IP " + kms.getIp() + " after reconnection abort");
}
} else {
if ((System.currentTimeMillis() - initTime) > maxReconnectTimeMillis) {
if ((System.currentTimeMillis() - initTime) > MAX_RECONNECT_TIME_MILLIS) {
// KurentoClient connection timeout exceeds the limit. This prevents a
// single reconnection attempt to exceed the total timeout limit
iteration.set(0);
@ -329,7 +347,7 @@ public abstract class KmsManager {
kms.setTimeOfKurentoClientDisconnection(0);
}
}, () -> Long.valueOf(intervalWaitMs)); // Try 2 times per seconds
}, () -> Long.valueOf(INTERVAL_WAIT_MS)); // Try 2 times per second
kms.setKurentoClientReconnectTimer(kurentoClientReconnectTimer);
kurentoClientReconnectTimer.updateTimer();
@ -359,13 +377,12 @@ public abstract class KmsManager {
public abstract void decrementActiveRecordings(RecordingProperties recordingProperties, String recordingId,
Session session);
protected abstract String removeMediaNodeUponCrash(String mediaNodeId);
protected abstract String removeMediaNodeUponCrash(String mediaNodeId, boolean followedByReconnection,
String message);
@PostConstruct
protected abstract void postConstructInitKurentoClients();
protected abstract boolean infiniteRetry();
public void closeAllKurentoClients() {
log.info("Closing all KurentoClients");
this.kmss.values().forEach(kms -> {

View File

@ -1,10 +1,12 @@
package io.openvidu.server.utils;
import java.util.Collection;
import io.openvidu.server.kurento.kms.Kms;
public interface MediaNodeManager {
public void mediaNodeUsageRegistration(Kms kms, long timeOfConnection);
public void mediaNodeUsageRegistration(Kms kms, long timeOfConnection, Collection<Kms> existingKmss);
public void mediaNodeUsageDeregistration(Kms kms, long timeOfDisconnection);

View File

@ -1,11 +1,13 @@
package io.openvidu.server.utils;
import java.util.Collection;
import io.openvidu.server.kurento.kms.Kms;
public class MediaNodeManagerDummy implements MediaNodeManager {
@Override
public void mediaNodeUsageRegistration(Kms kms, long timeOfConnection) {
public void mediaNodeUsageRegistration(Kms kms, long timeOfConnection, Collection<Kms> existingKmss) {
}
@Override