openvidu-server: fix KurentoClient reconnection timer behavior (one per Kms)

pull/609/head
pabloFuente 2021-01-26 17:22:34 +01:00
parent 0f781f44fd
commit f2582aa7ce
2 changed files with 22 additions and 13 deletions

View File

@ -38,6 +38,7 @@ import com.google.gson.JsonObject;
import io.openvidu.server.kurento.core.KurentoSession; import io.openvidu.server.kurento.core.KurentoSession;
import io.openvidu.server.utils.QuarantineKiller; import io.openvidu.server.utils.QuarantineKiller;
import io.openvidu.server.utils.UpdatableTimerTask;
/** /**
* Abstraction of a KMS instance: an object of this class corresponds to a KMS * Abstraction of a KMS instance: an object of this class corresponds to a KMS
@ -58,6 +59,7 @@ public class Kms {
private String uri; private String uri;
private String ip; private String ip;
private KurentoClient client; private KurentoClient client;
private UpdatableTimerTask clientReconnectTimer;
private LoadManager loadManager; private LoadManager loadManager;
private QuarantineKiller quarantineKiller; private QuarantineKiller quarantineKiller;
@ -85,10 +87,22 @@ public class Kms {
this.quarantineKiller = quarantineKiller; this.quarantineKiller = quarantineKiller;
} }
public KurentoClient getKurentoClient() {
return this.client;
}
public void setKurentoClient(KurentoClient client) { public void setKurentoClient(KurentoClient client) {
this.client = client; this.client = client;
} }
public UpdatableTimerTask getKurentoClientReconnectTimer() {
return this.clientReconnectTimer;
}
public void setKurentoClientReconnectTimer(UpdatableTimerTask clientReconnectTimer) {
this.clientReconnectTimer = clientReconnectTimer;
}
public String getId() { public String getId() {
return id; return id;
} }
@ -101,10 +115,6 @@ public class Kms {
return ip; return ip;
} }
public KurentoClient getKurentoClient() {
return this.client;
}
public double getLoad() { public double getLoad() {
return loadManager.calculateLoad(this); return loadManager.calculateLoad(this);
} }

View File

@ -60,8 +60,6 @@ public abstract class KmsManager {
public static final Lock selectAndRemoveKmsLock = new ReentrantLock(true); public static final Lock selectAndRemoveKmsLock = new ReentrantLock(true);
public static final int MAX_SECONDS_LOCK_WAIT = 15; public static final int MAX_SECONDS_LOCK_WAIT = 15;
private UpdatableTimerTask kurentoReconnectTimer;
public class KmsLoad implements Comparable<KmsLoad> { public class KmsLoad implements Comparable<KmsLoad> {
private Kms kms; private Kms kms;
@ -204,12 +202,12 @@ public abstract class KmsManager {
final AtomicInteger iteration = new AtomicInteger(loops); final AtomicInteger iteration = new AtomicInteger(loops);
final long intervalWaitMs = 500L; final long intervalWaitMs = 500L;
kurentoReconnectTimer = new UpdatableTimerTask(() -> { final UpdatableTimerTask kurentoClientReconnectTimer = new UpdatableTimerTask(() -> {
if (iteration.decrementAndGet() < 0) { if (iteration.decrementAndGet() < 0) {
log.error("KurentoClient [{}] could not reconnect to KMS with uri {} in {} seconds", log.error("KurentoClient [{}] could not reconnect to KMS with uri {} in {} seconds",
kms.getKurentoClient().toString(), kms.getUri(), (intervalWaitMs * 6 / 1000)); kms.getKurentoClient().toString(), kms.getUri(), (intervalWaitMs * 6 / 1000));
kurentoReconnectTimer.cancelTimer(); kms.getKurentoClientReconnectTimer().cancelTimer();
log.warn("Closing {} sessions hosted by KMS with uri {}: {}", kms.getKurentoSessions().size(), log.warn("Closing {} sessions hosted by KMS with uri {}: {}", kms.getKurentoSessions().size(),
kms.getUri(), kms.getKurentoSessions().stream().map(s -> s.getSessionId()) kms.getUri(), kms.getKurentoSessions().stream().map(s -> s.getSessionId())
.collect(Collectors.joining(",", "[", "]"))); .collect(Collectors.joining(",", "[", "]")));
@ -234,7 +232,7 @@ public abstract class KmsManager {
log.info("According to Timer KMS with uri {} and KurentoClient [{}] is now reconnected", log.info("According to Timer KMS with uri {} and KurentoClient [{}] is now reconnected",
kms.getUri(), kms.getKurentoClient().toString()); kms.getUri(), kms.getKurentoClient().toString());
kurentoReconnectTimer.cancelTimer(); kms.getKurentoClientReconnectTimer().cancelTimer();
kms.setKurentoClientConnected(true); kms.setKurentoClientConnected(true);
kms.setTimeOfKurentoClientConnection(System.currentTimeMillis()); kms.setTimeOfKurentoClientConnection(System.currentTimeMillis());
@ -260,7 +258,8 @@ public abstract class KmsManager {
} }
}, () -> intervalWaitMs); // Try 2 times per seconds }, () -> intervalWaitMs); // Try 2 times per seconds
kurentoReconnectTimer.updateTimer(); kms.setKurentoClientReconnectTimer(kurentoClientReconnectTimer);
kurentoClientReconnectTimer.updateTimer();
} }
@Override @Override
@ -314,11 +313,11 @@ public abstract class KmsManager {
@PreDestroy @PreDestroy
public void close() { public void close() {
if (kurentoReconnectTimer != null) {
kurentoReconnectTimer.cancelTimer();
}
log.info("Closing all KurentoClients"); log.info("Closing all KurentoClients");
this.kmss.values().forEach(kms -> { this.kmss.values().forEach(kms -> {
if (kms.getKurentoClientReconnectTimer() != null) {
kms.getKurentoClientReconnectTimer().cancelTimer();
}
kms.getKurentoClient().destroy(); kms.getKurentoClient().destroy();
}); });
} }