openvidu-server: fix for the lack of reconnected KurentoClientListener event

pull/419/head
pabloFuente 2020-04-02 13:19:05 +02:00
parent 31dd5d6db9
commit eb87f3c9a2
1 changed files with 104 additions and 22 deletions

View File

@ -23,7 +23,10 @@ import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.NoSuchElementException; import java.util.NoSuchElementException;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -43,8 +46,14 @@ import io.openvidu.server.utils.MediaNodeStatusManager;
public abstract class KmsManager { public abstract class KmsManager {
protected static final Logger log = LoggerFactory.getLogger(KmsManager.class);
public static final Lock selectAndRemoveKmsLock = new ReentrantLock(true); public static final Lock selectAndRemoveKmsLock = new ReentrantLock(true);
private Map<String, Lock> kmsReconnectionLocks = new ConcurrentHashMap<>();
private final int DISCONNECTED_INTERVAL_SECONDS = 12;
public class KmsLoad implements Comparable<KmsLoad> { public class KmsLoad implements Comparable<KmsLoad> {
private Kms kms; private Kms kms;
@ -81,8 +90,6 @@ public abstract class KmsManager {
} }
} }
protected static final Logger log = LoggerFactory.getLogger(KmsManager.class);
@Autowired @Autowired
protected OpenviduConfig openviduConfig; protected OpenviduConfig openviduConfig;
@ -146,12 +153,30 @@ public abstract class KmsManager {
@Override @Override
public void reconnected(boolean sameServer) { public void reconnected(boolean sameServer) {
final Kms kms = kmss.get(kmsId); final Kms kms = kmss.get(kmsId);
log.info("Kurento Client \"reconnected\" event for KMS {} (sameServer: {}) [{}]", kms.getUri(),
sameServer, kms.getKurentoClient().toString());
kmsReconnectionLocks.putIfAbsent(kms.getId(), new ReentrantLock());
boolean lockAcquired = false;
try {
if (kmsReconnectionLocks.get(kms.getId()).tryLock(5, TimeUnit.SECONDS)) {
lockAcquired = true;
if (kms.isKurentoClientConnected()) {
// Timer task of disconnected event successfully executed
log.warn(
"Timer task already executed for reconnected Kurento Client [{}] to KMS with uri {}. Skipping event listener execution",
kms.getKurentoClient().toString(), kms.getUri());
return;
}
kms.setKurentoClientConnected(true); kms.setKurentoClientConnected(true);
kms.setTimeOfKurentoClientConnection(System.currentTimeMillis()); kms.setTimeOfKurentoClientConnection(System.currentTimeMillis());
if (!sameServer) { if (!sameServer) {
// Different KMS. Reset sessions status (no Publisher or SUbscriber endpoints) // Different KMS. Reset sessions status (no Publisher or SUbscriber endpoints)
log.warn("Kurento Client reconnected to a different KMS instance, with uri {}", kms.getUri()); log.warn("Kurento Client reconnected to a different KMS instance, with uri {}",
kms.getUri());
log.warn("Updating all webrtc endpoints for active sessions"); log.warn("Updating all webrtc endpoints for active sessions");
final long timeOfKurentoDisconnection = kms.getTimeOfKurentoClientDisconnection(); final long timeOfKurentoDisconnection = kms.getTimeOfKurentoClientDisconnection();
kms.getKurentoSessions().forEach(kSession -> { kms.getKurentoSessions().forEach(kSession -> {
@ -164,30 +189,87 @@ public abstract class KmsManager {
} }
kms.setTimeOfKurentoClientDisconnection(0); kms.setTimeOfKurentoClientDisconnection(0);
} }
} catch (InterruptedException e) {
log.error("InterruptedException when waiting for lock on reconnected event of KMS with uri {}",
kms.getUri());
} finally {
if (lockAcquired) {
kmsReconnectionLocks.get(kms.getId()).unlock();
}
}
}
@Override @Override
public void disconnected() { public void disconnected() {
final Kms kms = kmss.get(kmsId); final Kms kms = kmss.get(kmsId);
log.info("Kurento Client \"disconnected\" event for KMS {} [{}]", kms.getUri(),
kms.getKurentoClient().toString());
kms.setKurentoClientConnected(false); kms.setKurentoClientConnected(false);
kms.setTimeOfKurentoClientDisconnection(System.currentTimeMillis()); kms.setTimeOfKurentoClientDisconnection(System.currentTimeMillis());
log.warn("Kurento Client disconnected from KMS {} with uri {}", kmsId, kms.getUri());
// TODO: this is a fix for the lack of reconnected event
kmsReconnectionLocks.putIfAbsent(kms.getId(), new ReentrantLock());
Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
boolean lockAcquired = false;
try {
if (kmsReconnectionLocks.get(kms.getId()).tryLock(5, TimeUnit.SECONDS)) {
lockAcquired = true;
if (kms.isKurentoClientConnected()) {
// reconnected listener already executed
log.warn(
"Timer of KMS with uri {} and KurentoClient [{}] cancelled (reconnected event received during interval wait)",
kms.getUri(), kms.getKurentoClient().toString());
timer.cancel();
return;
}
kms.getKurentoClient().getServerManager().getInfo();
log.warn("According to Timer KMS with uri {} and KurentoClient [{}] is now reconnected",
kms.getUri(), kms.getKurentoClient().toString());
timer.cancel();
kms.setKurentoClientConnected(true);
kms.setTimeOfKurentoClientConnection(System.currentTimeMillis());
// Here we cannot differentiate between new or old KMS process
final long timeOfKurentoDisconnection = kms.getTimeOfKurentoClientDisconnection();
kms.getKurentoSessions().forEach(kSession -> {
kSession.restartStatusInKurento(timeOfKurentoDisconnection);
});
kms.setTimeOfKurentoClientDisconnection(0);
}
} catch (Exception e) {
log.error(
"According to Timer KMS with uri {} and KurentoClient [{}] is not reconnected yet. Exception {}",
kms.getUri(), kms.getKurentoClient().toString(), e.getClass().getName());
} finally {
if (lockAcquired) {
kmsReconnectionLocks.get(kms.getId()).unlock();
}
}
}
}, DISCONNECTED_INTERVAL_SECONDS * 1000, DISCONNECTED_INTERVAL_SECONDS * 1000);
} }
@Override @Override
public void connectionFailed() { public void connectionFailed() {
final Kms kms = kmss.get(kmsId); final Kms kms = kmss.get(kmsId);
log.error("Kurento Client \"connectionFailed\" event for KMS {} [{}]", kms.getUri(),
kms.getKurentoClient().toString());
kms.setKurentoClientConnected(false); kms.setKurentoClientConnected(false);
log.warn("Kurento Client failed connecting to KMS {} with uri {}", kmsId, kms.getUri());
} }
@Override @Override
public void connected() { public void connected() {
final Kms kms = kmss.get(kmsId); final Kms kms = kmss.get(kmsId);
// TODO: This should be done here instead of after KurentoClient.create method log.info("Kurento Client \"connected\" event for KMS {} [{}]", kms.getUri(),
// returns kms.getKurentoClient().toString());
// TODO: This should be done here, not after KurentoClient#create method returns
// kms.setKurentoClientConnected(true); // kms.setKurentoClientConnected(true);
// kms.setTimeOfKurentoClientConnection(System.currentTimeMillis()); // kms.setTimeOfKurentoClientConnection(System.currentTimeMillis());
log.info("Kurento Client is now connected to KMS {} with uri {}", kmsId, kms.getUri());
} }
}; };
} }