openvidu-server: improved reconnection polling (reset sessions only if new KMS)

pull/508/head
pabloFuente 2020-06-10 18:07:07 +02:00
parent 34a45ff923
commit ac30d55a1e
2 changed files with 97 additions and 47 deletions

View File

@ -23,10 +23,9 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
@ -44,7 +43,9 @@ import com.google.gson.JsonObject;
import io.openvidu.server.config.OpenviduConfig;
import io.openvidu.server.core.IdentifierPrefixes;
import io.openvidu.server.kurento.core.KurentoSession;
import io.openvidu.server.utils.MediaNodeStatusManager;
import io.openvidu.server.utils.UpdatableTimerTask;
public abstract class KmsManager {
@ -55,8 +56,6 @@ public abstract class KmsManager {
private Map<String, Lock> kmsReconnectionLocks = new ConcurrentHashMap<>();
private final int DISCONNECTED_INTERVAL_SECONDS = 12;
public class KmsLoad implements Comparable<KmsLoad> {
private Kms kms;
@ -220,10 +219,10 @@ public abstract class KmsManager {
// TODO: this is a fix for the lack of reconnected event
kmsReconnectionLocks.putIfAbsent(kms.getId(), new ReentrantLock());
Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
final UpdatableTimerTask[] TIMER = new UpdatableTimerTask[1];
final AtomicInteger ITERATION = new AtomicInteger(0);
TIMER[0] = new UpdatableTimerTask(() -> {
boolean lockAcquired = false;
try {
if (kmsReconnectionLocks.get(kms.getId()).tryLock(5, TimeUnit.SECONDS)) {
@ -234,7 +233,7 @@ public abstract class KmsManager {
log.info(
"Timer of KMS with uri {} and KurentoClient [{}] cancelled (reconnected event received during interval wait)",
kms.getUri(), kms.getKurentoClient().toString());
timer.cancel();
TIMER[0].cancelTimer();
return;
}
@ -242,21 +241,36 @@ public abstract class KmsManager {
log.info(
"Timer of KMS with uri {} and KurentoClient [{}] has been closed. Cancelling Timer",
kms.getUri(), kms.getKurentoClient().toString());
timer.cancel();
TIMER[0].cancelTimer();
return;
}
kms.getKurentoClient().getServerManager().getInfo();
log.info("According to Timer KMS with uri {} and KurentoClient [{}] is now reconnected",
kms.getUri(), kms.getKurentoClient().toString());
timer.cancel();
TIMER[0].cancelTimer();
kms.setKurentoClientConnected(true);
kms.setTimeOfKurentoClientConnection(System.currentTimeMillis());
// Here we cannot differentiate between new or old KMS process
final long timeOfKurentoDisconnection = kms.getTimeOfKurentoClientDisconnection();
if (kms.getKurentoSessions().isEmpty()) {
log.info("There were no sessions in the KMS with uri {}. Nothing must be done",
kms.getUri());
} else {
if (isNewKms(kms)) {
log.warn("KMS with URI {} is a new KMS process. Resetting {} sessions: {}",
kms.getUri(), kms.getKurentoSessions().size(),
kms.getKurentoSessions().stream().map(s -> s.getSessionId())
.collect(Collectors.joining(",", "[", "]")));
kms.getKurentoSessions().forEach(kSession -> {
kSession.restartStatusInKurento(timeOfKurentoDisconnection);
});
} else {
log.info("KMS with URI {} is the same process. Nothing must be done", kms.getUri());
}
}
kms.setTimeOfKurentoClientDisconnection(0);
}
} catch (Exception e) {
@ -268,9 +282,9 @@ public abstract class KmsManager {
kmsReconnectionLocks.get(kms.getId()).unlock();
}
}
}
}, DISCONNECTED_INTERVAL_SECONDS * 1000, DISCONNECTED_INTERVAL_SECONDS * 1000);
}, () -> new Long(dynamicReconnectLoopSeconds(ITERATION.getAndIncrement()) * 1000));
TIMER[0].updateTimer();
}
@Override
@ -293,6 +307,36 @@ public abstract class KmsManager {
};
}
private boolean isNewKms(Kms kms) {
try {
KurentoSession kSession = kms.getKurentoSessions().iterator().next();
kSession.getPipeline().getName();
return false;
} catch (NoSuchElementException e) {
return false;
} catch (Exception e) {
return true;
}
}
private int dynamicReconnectLoopSeconds(int iteration) {
// First 10 loops every second, next 20 loops ever 3s, the rest every 10s
final int[][] intervals = { new int[] { 1, 10 }, new int[] { 3, 20 }, new int[] { 10, Integer.MAX_VALUE } };
int accumulatedIntervals = 0;
for (int i = 0; i < intervals.length - 1; i++) {
if ((accumulatedIntervals + intervals[i][1]) > iteration) {
// Interval found for current iteration
return intervals[i][0];
} else {
// This iteration has already been surpassed
accumulatedIntervals += intervals[i][1];
}
}
// Return last interval
return intervals[intervals.length - 1][0];
}
public abstract List<Kms> initializeKurentoClients(List<KmsProperties> kmsProperties, boolean disconnectUponFailure)
throws Exception;

View File

@ -54,6 +54,12 @@ public class UpdatableTimerTask extends TimerTask {
}
}
public final void cancelTimer() {
cancel();
timer.cancel();
timer.purge();
}
@Override
public void run() {
// Protect the inner run method so if any exception is thrown, the following