openvidu-server: avoid Media Node removal ops if not 1st reconnection attempt

pull/707/head
pabloFuente 2022-03-18 14:06:43 +01:00
parent a940708f8d
commit a91fc4f870
2 changed files with 70 additions and 45 deletions

View File

@ -67,6 +67,7 @@ public class Kms {
private LoadManager loadManager; private LoadManager loadManager;
private QuarantineKiller quarantineKiller; private QuarantineKiller quarantineKiller;
private boolean isFirstReconnectionAttempt = true;
private AtomicBoolean isKurentoClientConnected = new AtomicBoolean(false); private AtomicBoolean isKurentoClientConnected = new AtomicBoolean(false);
private AtomicLong timeOfKurentoClientConnection = new AtomicLong(0); private AtomicLong timeOfKurentoClientConnection = new AtomicLong(0);
private AtomicLong timeOfKurentoClientDisconnection = new AtomicLong(0); private AtomicLong timeOfKurentoClientDisconnection = new AtomicLong(0);
@ -128,6 +129,14 @@ public class Kms {
return true; // loadManager.allowMoreElements(this); return true; // loadManager.allowMoreElements(this);
} }
public boolean isFirstReconnectionAttempt() {
return this.isFirstReconnectionAttempt;
}
public void setFirstReconnectionAttempt(boolean isFirst) {
this.isFirstReconnectionAttempt = isFirst;
}
public boolean isKurentoClientConnected() { public boolean isKurentoClientConnected() {
return this.isKurentoClientConnected.get(); return this.isKurentoClientConnected.get();
} }

View File

@ -184,28 +184,26 @@ public abstract class KmsManager {
@Override @Override
public void disconnected() { public void disconnected() {
final Kms kms = kmss.get(kmsId); final Kms kms = kmss.get(kmsId);
// TODO: take a look at this if (kms.getKurentoClient().isDestroyed()) {
// if (kms.getTimeOfKurentoClientDisconnection() > 0) { log.info(
// log.warn("Event disconnected of KurentoClient {} is already being processed by other thread", "Kurento Client \"disconnected\" event for KMS {} [{}]. Closed explicitly by openvidu-server. No reconnection process",
// kms.getKurentoClient().toString()); kms.getUri(), kms.getKurentoClient().toString());
// return; return;
// } } else {
log.info("Kurento Client \"disconnected\" event for KMS {} [{}]. Reconnecting", kms.getUri(),
kms.getKurentoClient().toString());
}
kms.setKurentoClientConnected(false); kms.setKurentoClientConnected(false);
kms.setTimeOfKurentoClientDisconnection(System.currentTimeMillis()); kms.setTimeOfKurentoClientDisconnection(System.currentTimeMillis());
if (kms.getKurentoClient().isDestroyed()) { disconnectionHandler(kms);
log.info(
"Kurento Client \"disconnected\" event for KMS {} [{}]. Closed explicitly by openvidu-server",
kms.getUri(), kms.getKurentoClient().toString());
return;
} else {
log.info("Kurento Client \"disconnected\" event for KMS {} [{}]. Waiting reconnection",
kms.getUri(), kms.getKurentoClient().toString());
} }
private void disconnectionHandler(Kms kms) {
// 6 attempts, 2 times per second (3 seconds total) // 6 attempts, 2 times per second (3 seconds total)
final int maxReconnectTimeMillis = 3000; final int maxReconnectTimeMillis = 3000;
final int intervalWaitMs = 500; final int intervalWaitMs = 500;
@ -217,11 +215,16 @@ public abstract class KmsManager {
final UpdatableTimerTask kurentoClientReconnectTimer = new UpdatableTimerTask(() -> { final UpdatableTimerTask kurentoClientReconnectTimer = new UpdatableTimerTask(() -> {
if (iteration.decrementAndGet() < 0) { if (iteration.decrementAndGet() < 0) {
kms.getKurentoClientReconnectTimer().cancelTimer();
if (kms.isFirstReconnectionAttempt()) {
log.error( log.error(
"OpenVidu Server [{}] could not reconnect to Media Node {} with IP {} in {} seconds. Media Node crashed", "OpenVidu Server [{}] could not reconnect to Media Node {} with IP {} in {} seconds. Media Node crashed",
kms.getKurentoClient().toString(), kms.getId(), kms.getIp(), kms.getKurentoClient().toString(), kms.getId(), kms.getIp(),
(intervalWaitMs * loops / 1000)); (intervalWaitMs * loops / 1000));
kms.getKurentoClientReconnectTimer().cancelTimer();
kms.setFirstReconnectionAttempt(false);
final long timeOfKurentoDisconnection = kms.getTimeOfKurentoClientDisconnection(); final long timeOfKurentoDisconnection = kms.getTimeOfKurentoClientDisconnection();
final List<String> affectedSessionIds = kms.getKurentoSessions().stream() final List<String> affectedSessionIds = kms.getKurentoSessions().stream()
@ -249,17 +252,23 @@ public abstract class KmsManager {
} finally { } finally {
RemoteOperationUtils.revertToRunRemoteOperations(); RemoteOperationUtils.revertToRunRemoteOperations();
} }
} else {
log.error(
"Retry error. OpenVidu Server [{}] could not connect to Media Node {} with IP {} in {} seconds",
kms.getKurentoClient().toString(), kms.getId(), kms.getIp(),
(intervalWaitMs * loops / 1000));
}
if (infiniteRetry()) { if (infiniteRetry()) {
log.info("Retrying reconnection to Media Node {} with IP {}", kms.getId(), kms.getIp()); log.info("Retrying reconnection to Media Node {} with IP {}", kms.getId(), kms.getIp());
disconnected(); disconnectionHandler(kms);
} }
} else { } else {
if ((System.currentTimeMillis() - initTime) > maxReconnectTimeMillis) { if ((System.currentTimeMillis() - initTime) > maxReconnectTimeMillis) {
// KurentoClient connection timeout exceeds the limit. This happens if not only // KurentoClient connection timeout exceeds the limit. This prevents a
// media server process has crashed, but the instance itself is not reachable // single reconnection attempt to exceed the total timeout limit
iteration.set(0); iteration.set(0);
return; return;
} }
@ -275,7 +284,10 @@ public abstract class KmsManager {
log.info("According to Timer KMS with uri {} and KurentoClient [{}] is now reconnected", log.info("According to Timer KMS with uri {} and KurentoClient [{}] is now reconnected",
kms.getUri(), kms.getKurentoClient().toString()); kms.getUri(), kms.getKurentoClient().toString());
kms.setFirstReconnectionAttempt(true);
kms.getKurentoClientReconnectTimer().cancelTimer(); kms.getKurentoClientReconnectTimer().cancelTimer();
kms.setKurentoClientConnected(true); kms.setKurentoClientConnected(true);
kms.setTimeOfKurentoClientConnection(System.currentTimeMillis()); kms.setTimeOfKurentoClientConnection(System.currentTimeMillis());
@ -319,13 +331,17 @@ public abstract class KmsManager {
final Kms kms = kmss.get(kmsId); final Kms kms = kmss.get(kmsId);
log.info("Kurento Client \"connected\" event for KMS {} [{}]", kms.getUri(), log.info("Kurento Client \"connected\" event for KMS {} [{}]", kms.getUri(),
kms.getKurentoClient().toString()); kms.getKurentoClient().toString());
// TODO: This should be done here, not after KurentoClient#create method returns // TODO: This should be done here, not after KurentoClient#create method
// returns, but it seems that this event is never triggered
// kms.setKurentoClientConnected(true); // kms.setKurentoClientConnected(true);
// kms.setTimeOfKurentoClientConnection(System.currentTimeMillis()); // kms.setTimeOfKurentoClientConnection(System.currentTimeMillis());
} }
@Override @Override
public void reconnecting() { public void reconnecting() {
final Kms kms = kmss.get(kmsId);
log.info("Kurento Client \"reconnecting\" event for KMS {} [{}]", kms.getUri(),
kms.getKurentoClient().toString());
} }
}; };