From 1b98351596d26a25c10846fd7f4307299c7f9723 Mon Sep 17 00:00:00 2001 From: pabloFuente Date: Tue, 22 Dec 2020 17:36:40 +0100 Subject: [PATCH] openvidu-server: new behavior upon KurentoClient disconnection --- .../io/openvidu/server/OpenViduServer.java | 24 +-- .../cdr/CDREventMediaServerCrashed.java | 25 +++ .../io/openvidu/server/cdr/CDREventName.java | 2 +- .../openvidu/server/cdr/CallDetailRecord.java | 7 + .../io/openvidu/server/core/EndReason.java | 4 +- .../server/core/SessionEventsHandler.java | 5 + .../openvidu/server/core/SessionManager.java | 2 +- .../kurento/core/KurentoParticipant.java | 56 ++++-- .../kurento/kms/FixedOneKmsManager.java | 13 +- .../server/kurento/kms/KmsManager.java | 185 +++++++----------- .../src/main/resources/application.properties | 2 +- .../config/IntegrationTestConfiguration.java | 3 +- 12 files changed, 170 insertions(+), 158 deletions(-) create mode 100644 openvidu-server/src/main/java/io/openvidu/server/cdr/CDREventMediaServerCrashed.java diff --git a/openvidu-server/src/main/java/io/openvidu/server/OpenViduServer.java b/openvidu-server/src/main/java/io/openvidu/server/OpenViduServer.java index 202a91dc..cabacc17 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/OpenViduServer.java +++ b/openvidu-server/src/main/java/io/openvidu/server/OpenViduServer.java @@ -101,18 +101,6 @@ public class OpenViduServer implements JsonRpcConfigurer { @Autowired OpenviduConfig config; - @Bean - @ConditionalOnMissingBean - @DependsOn({ "openviduConfig", "mediaNodeStatusManager" }) - public KmsManager kmsManager(OpenviduConfig openviduConfig) { - if (openviduConfig.getKmsUris().isEmpty()) { - throw new IllegalArgumentException("'KMS_URIS' should contain at least one KMS url"); - } - String firstKmsWsUri = openviduConfig.getKmsUris().get(0); - log.info("OpenVidu Server using one KMS: {}", firstKmsWsUri); - return new FixedOneKmsManager(); - } - @Bean @ConditionalOnMissingBean @DependsOn("openviduConfig") @@ -148,6 +136,18 @@ public class OpenViduServer implements JsonRpcConfigurer { return new KurentoSessionManager(); } + @Bean + @ConditionalOnMissingBean + @DependsOn({ "openviduConfig", "sessionManager", "mediaNodeStatusManager" }) + public KmsManager kmsManager(OpenviduConfig openviduConfig, SessionManager sessionManager) { + if (openviduConfig.getKmsUris().isEmpty()) { + throw new IllegalArgumentException("'KMS_URIS' should contain at least one KMS url"); + } + String firstKmsWsUri = openviduConfig.getKmsUris().get(0); + log.info("OpenVidu Server using one KMS: {}", firstKmsWsUri); + return new FixedOneKmsManager(sessionManager); + } + @Bean @ConditionalOnMissingBean @DependsOn("openviduConfig") diff --git a/openvidu-server/src/main/java/io/openvidu/server/cdr/CDREventMediaServerCrashed.java b/openvidu-server/src/main/java/io/openvidu/server/cdr/CDREventMediaServerCrashed.java new file mode 100644 index 00000000..46e8c604 --- /dev/null +++ b/openvidu-server/src/main/java/io/openvidu/server/cdr/CDREventMediaServerCrashed.java @@ -0,0 +1,25 @@ +package io.openvidu.server.cdr; + +import com.google.gson.JsonObject; + +import io.openvidu.server.kurento.kms.Kms; + +public class CDREventMediaServerCrashed extends CDREvent { + + private Kms kms; + + public CDREventMediaServerCrashed(CDREventName eventName, String sessionId, Long timeStamp, Kms kms) { + super(eventName, sessionId, timeStamp); + this.kms = kms; + } + + @Override + public JsonObject toJson() { + JsonObject json = super.toJson(); + json.addProperty("id", this.kms.getId()); + json.addProperty("ip", this.kms.getIp()); + json.addProperty("uri", this.kms.getUri()); + return json; + } + +} diff --git a/openvidu-server/src/main/java/io/openvidu/server/cdr/CDREventName.java b/openvidu-server/src/main/java/io/openvidu/server/cdr/CDREventName.java index 19b515ee..6020e3e1 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/cdr/CDREventName.java +++ b/openvidu-server/src/main/java/io/openvidu/server/cdr/CDREventName.java @@ -21,6 +21,6 @@ public enum CDREventName { sessionCreated, sessionDestroyed, participantJoined, participantLeft, webrtcConnectionCreated, webrtcConnectionDestroyed, recordingStarted, recordingStopped, recordingStatusChanged, filterEventDispatched, - signalSent, mediaNodeStatusChanged, autoscaling + signalSent, mediaNodeStatusChanged, autoscaling, mediaServerCrashed } diff --git a/openvidu-server/src/main/java/io/openvidu/server/cdr/CallDetailRecord.java b/openvidu-server/src/main/java/io/openvidu/server/cdr/CallDetailRecord.java index 06df7084..82ebbefb 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/cdr/CallDetailRecord.java +++ b/openvidu-server/src/main/java/io/openvidu/server/cdr/CallDetailRecord.java @@ -36,6 +36,7 @@ import io.openvidu.server.core.Participant; import io.openvidu.server.core.Session; import io.openvidu.server.core.SessionManager; import io.openvidu.server.kurento.endpoint.KmsEvent; +import io.openvidu.server.kurento.kms.Kms; import io.openvidu.server.recording.Recording; import io.openvidu.server.recording.service.RecordingManager; import io.openvidu.server.summary.SessionSummary; @@ -216,4 +217,10 @@ public class CallDetailRecord { }); } + public void recordMediaServerCrashed(Kms kms, long timeOfKurentoDisconnection) { + CDREvent e = new CDREventMediaServerCrashed(CDREventName.mediaServerCrashed, null, timeOfKurentoDisconnection, + kms); + this.log(e); + } + } diff --git a/openvidu-server/src/main/java/io/openvidu/server/core/EndReason.java b/openvidu-server/src/main/java/io/openvidu/server/core/EndReason.java index 34300a83..220a789a 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/core/EndReason.java +++ b/openvidu-server/src/main/java/io/openvidu/server/core/EndReason.java @@ -20,7 +20,7 @@ package io.openvidu.server.core; public enum EndReason { unsubscribe, unpublish, disconnect, forceUnpublishByUser, forceUnpublishByServer, forceDisconnectByUser, - forceDisconnectByServer, lastParticipantLeft, networkDisconnect, mediaServerDisconnect, openviduServerStopped, - recordingStoppedByServer, automaticStop, sessionClosedByServer + forceDisconnectByServer, lastParticipantLeft, networkDisconnect, mediaServerDisconnect, mediaServerCrashed, + openviduServerStopped, recordingStoppedByServer, automaticStop, sessionClosedByServer } diff --git a/openvidu-server/src/main/java/io/openvidu/server/core/SessionEventsHandler.java b/openvidu-server/src/main/java/io/openvidu/server/core/SessionEventsHandler.java index 989df39c..a0d6581c 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/core/SessionEventsHandler.java +++ b/openvidu-server/src/main/java/io/openvidu/server/core/SessionEventsHandler.java @@ -42,6 +42,7 @@ import io.openvidu.server.config.OpenviduBuildInfo; import io.openvidu.server.config.OpenviduConfig; import io.openvidu.server.kurento.core.KurentoParticipant; import io.openvidu.server.kurento.endpoint.KurentoFilter; +import io.openvidu.server.kurento.kms.Kms; import io.openvidu.server.recording.Recording; import io.openvidu.server.rpc.RpcNotificationService; @@ -606,6 +607,10 @@ public class SessionEventsHandler { public void onConnectionPropertyChanged(Participant participant, String property, Object newValue) { } + public void onMediaServerCrashed(Kms kms, long timeOfKurentoDisconnection) { + CDR.recordMediaServerCrashed(kms, timeOfKurentoDisconnection); + } + protected Set filterParticipantsByRole(OpenViduRole[] roles, Set participants) { return participants.stream().filter(part -> { if (ProtocolElements.RECORDER_PARTICIPANT_PUBLICID.equals(part.getParticipantPublicId())) { diff --git a/openvidu-server/src/main/java/io/openvidu/server/core/SessionManager.java b/openvidu-server/src/main/java/io/openvidu/server/core/SessionManager.java index 7e066dbc..d5abc2cf 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/core/SessionManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/core/SessionManager.java @@ -459,7 +459,7 @@ public abstract class SessionManager { log.warn("Possible ghost session {}", sessionActive.getSessionId()); } } - }, () -> new Long(openviduConfig.getSessionGarbageInterval() * 1000)); + }, () -> Long.valueOf(openviduConfig.getSessionGarbageInterval() * 1000)); this.sessionGarbageCollectorTimer.updateTimer(); diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoParticipant.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoParticipant.java index 306e5034..f9cefaf6 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoParticipant.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoParticipant.java @@ -37,6 +37,7 @@ import org.kurento.client.MediaElement; import org.kurento.client.MediaPipeline; import org.kurento.client.PassThrough; import org.kurento.client.internal.server.KurentoServerException; +import org.kurento.jsonrpc.JsonRpcException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -370,11 +371,16 @@ public class KurentoParticipant extends Participant { it.remove(); if (subscriber != null && subscriber.getEndpoint() != null) { - releaseSubscriberEndpoint(remoteParticipantName, - (KurentoParticipant) this.session.getParticipantByPublicId(remoteParticipantName), subscriber, - reason, false); - log.debug("PARTICIPANT {}: Released subscriber endpoint to {}", this.getParticipantPublicId(), - remoteParticipantName); + try { + releaseSubscriberEndpoint(remoteParticipantName, + (KurentoParticipant) this.session.getParticipantByPublicId(remoteParticipantName), + subscriber, reason, false); + log.debug("PARTICIPANT {}: Released subscriber endpoint to {}", this.getParticipantPublicId(), + remoteParticipantName); + } catch (JsonRpcException e) { + log.error("Error releasing subscriber endpoint of participant {}: {}", this.participantPublicId, + e.getMessage()); + } } else { log.warn( "PARTICIPANT {}: Trying to close subscriber endpoint to {}. " @@ -452,25 +458,33 @@ public class KurentoParticipant extends Participant { } private void releasePublisherEndpointAux(EndReason reason, Long kmsDisconnectionTime) { - // Remove streamId from publisher's map - this.session.publishedStreamIds.remove(this.getPublisherStreamId()); + try { + // Remove streamId from publisher's map + this.session.publishedStreamIds.remove(this.getPublisherStreamId()); - if (this.openviduConfig.isRecordingModuleEnabled() && this.token.record() - && this.recordingManager.sessionIsBeingRecorded(session.getSessionId())) { - this.recordingManager.stopOneIndividualStreamRecording(session, this.getPublisherStreamId(), - kmsDisconnectionTime); + this.streaming = false; + this.session.deregisterPublisher(this); + + if (this.openviduConfig.isRecordingModuleEnabled() && this.token.record() + && this.recordingManager.sessionIsBeingRecorded(session.getSessionId())) { + this.recordingManager.stopOneIndividualStreamRecording(session, this.getPublisherStreamId(), + kmsDisconnectionTime); + } + + publisher.cancelStatsLoop.set(true); + + // These operations are all remote + publisher.unregisterErrorListeners(); + for (MediaElement el : publisher.getMediaElements()) { + releaseElement(getParticipantPublicId(), el); + } + releaseElement(getParticipantPublicId(), publisher.getEndpoint()); + + } catch (JsonRpcException e) { + log.error("Error releasing publisher endpoint of participant {}: {}", this.participantPublicId, + e.getMessage()); } - publisher.unregisterErrorListeners(); - publisher.cancelStatsLoop.set(true); - - for (MediaElement el : publisher.getMediaElements()) { - releaseElement(getParticipantPublicId(), el); - } - releaseElement(getParticipantPublicId(), publisher.getEndpoint()); - this.streaming = false; - this.session.deregisterPublisher(this); - endpointConfig.getCdr().stopPublisher(this.getParticipantPublicId(), publisher.getStreamId(), reason); publisher = null; } diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/FixedOneKmsManager.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/FixedOneKmsManager.java index f9066343..fd4e23c6 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/FixedOneKmsManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/FixedOneKmsManager.java @@ -26,12 +26,19 @@ import javax.annotation.PostConstruct; import org.kurento.client.KurentoClient; import org.kurento.commons.exception.KurentoException; +import org.kurento.jsonrpc.client.JsonRpcClientNettyWebSocket; +import org.kurento.jsonrpc.client.JsonRpcWSConnectionListener; import io.openvidu.java.client.RecordingProperties; import io.openvidu.server.core.Session; +import io.openvidu.server.core.SessionManager; public class FixedOneKmsManager extends KmsManager { + public FixedOneKmsManager(SessionManager sessionManager) { + super(sessionManager); + } + @Override public List initializeKurentoClients(List kmsProperties, boolean disconnectUponFailure) throws Exception { @@ -39,7 +46,11 @@ public class FixedOneKmsManager extends KmsManager { KurentoClient kClient = null; Kms kms = new Kms(firstProps, loadManager, quarantineKiller); try { - kClient = KurentoClient.create(firstProps.getUri(), this.generateKurentoConnectionListener(kms.getId())); + JsonRpcWSConnectionListener listener = this.generateKurentoConnectionListener(kms.getId()); + JsonRpcClientNettyWebSocket client = new JsonRpcClientNettyWebSocket(firstProps.getUri(), listener); + client.setTryReconnectingMaxTime(0); + client.setTryReconnectingForever(false); + kClient = KurentoClient.createFromJsonRpcClient(client); this.addKms(kms); kms.setKurentoClient(kClient); diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/KmsManager.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/KmsManager.java index 05737706..69cbc80a 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/KmsManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/KmsManager.java @@ -25,7 +25,6 @@ import java.util.Map; import java.util.NoSuchElementException; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -35,7 +34,7 @@ import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import org.apache.commons.lang3.RandomStringUtils; -import org.kurento.client.KurentoConnectionListener; +import org.kurento.jsonrpc.client.JsonRpcWSConnectionListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -44,8 +43,11 @@ import com.google.gson.JsonObject; import io.openvidu.java.client.RecordingProperties; import io.openvidu.server.config.OpenviduConfig; +import io.openvidu.server.core.EndReason; import io.openvidu.server.core.IdentifierPrefixes; import io.openvidu.server.core.Session; +import io.openvidu.server.core.SessionEventsHandler; +import io.openvidu.server.core.SessionManager; import io.openvidu.server.kurento.core.KurentoSession; import io.openvidu.server.utils.MediaNodeStatusManager; import io.openvidu.server.utils.QuarantineKiller; @@ -58,8 +60,6 @@ public abstract class KmsManager { public static final Lock selectAndRemoveKmsLock = new ReentrantLock(true); public static final int MAX_SECONDS_LOCK_WAIT = 15; - private Map kmsReconnectionLocks = new ConcurrentHashMap<>(); - private UpdatableTimerTask kurentoReconnectTimer; public class KmsLoad implements Comparable { @@ -110,8 +110,17 @@ public abstract class KmsManager { @Autowired protected MediaNodeStatusManager mediaNodeStatusManager; + @Autowired + protected SessionEventsHandler sessionEventsHandler; + final protected Map kmss = new ConcurrentHashMap<>(); + private SessionManager sessionManager; + + public KmsManager(SessionManager sessionManager) { + this.sessionManager = sessionManager; + } + public synchronized void addKms(Kms kms) { this.kmss.put(kms.getId(), kms); } @@ -165,55 +174,14 @@ public abstract class KmsManager { return kmsLoads; } - protected KurentoConnectionListener generateKurentoConnectionListener(final String kmsId) { - return new KurentoConnectionListener() { + protected JsonRpcWSConnectionListener generateKurentoConnectionListener(final String kmsId) { + return new JsonRpcWSConnectionListener() { @Override public void reconnected(boolean sameServer) { - final Kms kms = kmss.get(kmsId); - log.info("Kurento Client \"reconnected\" event for KMS {} (sameServer: {}) [{}]", kms.getUri(), sameServer, kms.getKurentoClient().toString()); - - kmsReconnectionLocks.putIfAbsent(kms.getId(), new ReentrantLock()); - boolean lockAcquired = false; - try { - if (kmsReconnectionLocks.get(kms.getId()).tryLock(5, TimeUnit.SECONDS)) { - lockAcquired = true; - if (kms.isKurentoClientConnected()) { - // Timer task of disconnected event successfully executed - log.warn( - "Timer task already executed for reconnected Kurento Client [{}] to KMS with uri {}. Skipping event listener execution", - kms.getKurentoClient().toString(), kms.getUri()); - return; - } - kms.setKurentoClientConnected(true); - kms.setTimeOfKurentoClientConnection(System.currentTimeMillis()); - if (!sameServer) { - // Different KMS. Reset sessions status (no Publisher or SUbscriber endpoints) - log.warn("Kurento Client reconnected to a different KMS instance, with uri {}", - kms.getUri()); - log.warn("Updating all webrtc endpoints for active sessions"); - final long timeOfKurentoDisconnection = kms.getTimeOfKurentoClientDisconnection(); - kms.getKurentoSessions().forEach(kSession -> { - kSession.restartStatusInKurento(timeOfKurentoDisconnection); - }); - } else { - // Same KMS. We may infer that openvidu-server/KMS connection has been lost, but - // not the clients/KMS connections - log.warn("Kurento Client reconnected to same KMS {} with uri {}", kmsId, kms.getUri()); - } - kms.setTimeOfKurentoClientDisconnection(0); - } - } catch (InterruptedException e) { - log.error("InterruptedException when waiting for lock on reconnected event of KMS with uri {}", - kms.getUri()); - } finally { - if (lockAcquired) { - kmsReconnectionLocks.get(kms.getId()).unlock(); - } - } } @Override @@ -232,71 +200,65 @@ public abstract class KmsManager { kms.getUri(), kms.getKurentoClient().toString()); } - // TODO: this is a fix for the lack of reconnected event - kmsReconnectionLocks.putIfAbsent(kms.getId(), new ReentrantLock()); - final AtomicInteger ITERATION = new AtomicInteger(0); + final int loops = 6; + final AtomicInteger iteration = new AtomicInteger(loops); + final long intervalWaitMs = 500L; kurentoReconnectTimer = new UpdatableTimerTask(() -> { - boolean lockAcquired = false; - try { - if (kmsReconnectionLocks.get(kms.getId()).tryLock(5, TimeUnit.SECONDS)) { - lockAcquired = true; + if (iteration.decrementAndGet() < 0) { - if (kms.isKurentoClientConnected()) { - // reconnected listener already executed - log.info( - "Timer of KMS with uri {} and KurentoClient [{}] cancelled (reconnected event received during interval wait)", - kms.getUri(), kms.getKurentoClient().toString()); - kurentoReconnectTimer.cancelTimer(); - return; - } + log.error("KurentoClient [{}] could not reconnect to KMS with uri {} in {} seconds", + kms.getKurentoClient().toString(), kms.getUri(), (intervalWaitMs * 6 / 1000)); + kurentoReconnectTimer.cancelTimer(); + log.warn("Closing {} sessions hosted by KMS with uri {}: {}", kms.getKurentoSessions().size(), + kms.getUri(), kms.getKurentoSessions().stream().map(s -> s.getSessionId()) + .collect(Collectors.joining(",", "[", "]"))); - if (kms.getKurentoClient().isClosed()) { - log.info( - "Timer of KMS with uri {} and KurentoClient [{}] has been closed. Cancelling Timer", - kms.getUri(), kms.getKurentoClient().toString()); - kurentoReconnectTimer.cancelTimer(); - return; - } + final long timeOfKurentoDisconnection = kms.getTimeOfKurentoClientDisconnection(); + sessionEventsHandler.onMediaServerCrashed(kms, timeOfKurentoDisconnection); + kms.getKurentoSessions().forEach(kSession -> { + sessionManager.closeSession(kSession.getSessionId(), EndReason.mediaServerCrashed); + }); + + } else { + + try { kms.getKurentoClient().getServerManager().getInfo(); - log.info("According to Timer KMS with uri {} and KurentoClient [{}] is now reconnected", - kms.getUri(), kms.getKurentoClient().toString()); - kurentoReconnectTimer.cancelTimer(); - kms.setKurentoClientConnected(true); - kms.setTimeOfKurentoClientConnection(System.currentTimeMillis()); + } catch (Exception e) { + log.error( + "According to Timer KMS with uri {} and KurentoClient [{}] is not reconnected yet. Exception {}", + kms.getUri(), kms.getKurentoClient().toString(), e.getClass().getName()); + return; + } - final long timeOfKurentoDisconnection = kms.getTimeOfKurentoClientDisconnection(); + log.info("According to Timer KMS with uri {} and KurentoClient [{}] is now reconnected", + kms.getUri(), kms.getKurentoClient().toString()); + kurentoReconnectTimer.cancelTimer(); + kms.setKurentoClientConnected(true); + kms.setTimeOfKurentoClientConnection(System.currentTimeMillis()); - if (kms.getKurentoSessions().isEmpty()) { - log.info("There were no sessions in the KMS with uri {}. Nothing must be done", - kms.getUri()); + final long timeOfKurentoDisconnection = kms.getTimeOfKurentoClientDisconnection(); + + if (kms.getKurentoSessions().isEmpty()) { + log.info("There were no sessions in the KMS with uri {}. Nothing must be done", + kms.getUri()); + } else { + if (isNewKms(kms)) { + log.warn("KMS with URI {} is a new KMS process. Resetting {} sessions: {}", + kms.getUri(), kms.getKurentoSessions().size(), kms.getKurentoSessions().stream() + .map(s -> s.getSessionId()).collect(Collectors.joining(",", "[", "]"))); + kms.getKurentoSessions().forEach(kSession -> { + kSession.restartStatusInKurento(timeOfKurentoDisconnection); + }); } else { - if (isNewKms(kms)) { - log.warn("KMS with URI {} is a new KMS process. Resetting {} sessions: {}", - kms.getUri(), kms.getKurentoSessions().size(), - kms.getKurentoSessions().stream().map(s -> s.getSessionId()) - .collect(Collectors.joining(",", "[", "]"))); - kms.getKurentoSessions().forEach(kSession -> { - kSession.restartStatusInKurento(timeOfKurentoDisconnection); - }); - } else { - log.info("KMS with URI {} is the same process. Nothing must be done", kms.getUri()); - } + log.info("KMS with URI {} is the same process. Nothing must be done", kms.getUri()); } + } - kms.setTimeOfKurentoClientDisconnection(0); - } - } catch (Exception e) { - log.error( - "According to Timer KMS with uri {} and KurentoClient [{}] is not reconnected yet. Exception {}", - kms.getUri(), kms.getKurentoClient().toString(), e.getClass().getName()); - } finally { - if (lockAcquired) { - kmsReconnectionLocks.get(kms.getId()).unlock(); - } + kms.setTimeOfKurentoClientDisconnection(0); } - }, () -> Long.valueOf(dynamicReconnectLoopSeconds(ITERATION.getAndIncrement()) * 1000)); + }, () -> intervalWaitMs); // Try 2 times per seconds kurentoReconnectTimer.updateTimer(); } @@ -318,6 +280,11 @@ public abstract class KmsManager { // kms.setKurentoClientConnected(true); // kms.setTimeOfKurentoClientConnection(System.currentTimeMillis()); } + + @Override + public void reconnecting() { + } + }; } @@ -333,24 +300,6 @@ public abstract class KmsManager { } } - private int dynamicReconnectLoopSeconds(int iteration) { - // First 10 loops every second, next 20 loops ever 3s, the rest every 10s - final int[][] intervals = { new int[] { 1, 10 }, new int[] { 3, 20 }, new int[] { 10, Integer.MAX_VALUE } }; - - int accumulatedIntervals = 0; - for (int i = 0; i < intervals.length - 1; i++) { - if ((accumulatedIntervals + intervals[i][1]) > iteration) { - // Interval found for current iteration - return intervals[i][0]; - } else { - // This iteration has already been surpassed - accumulatedIntervals += intervals[i][1]; - } - } - // Return last interval - return intervals[intervals.length - 1][0]; - } - public abstract List initializeKurentoClients(List kmsProperties, boolean disconnectUponFailure) throws Exception; diff --git a/openvidu-server/src/main/resources/application.properties b/openvidu-server/src/main/resources/application.properties index 6730ecb7..3b7a3446 100644 --- a/openvidu-server/src/main/resources/application.properties +++ b/openvidu-server/src/main/resources/application.properties @@ -24,7 +24,7 @@ OPENVIDU_CDR_PATH=/opt/openvidu/cdr OPENVIDU_WEBHOOK=false OPENVIDU_WEBHOOK_ENDPOINT= OPENVIDU_WEBHOOK_HEADERS=[] -OPENVIDU_WEBHOOK_EVENTS=["sessionCreated","sessionDestroyed","participantJoined","participantLeft","webrtcConnectionCreated","webrtcConnectionDestroyed","recordingStatusChanged","filterEventDispatched","signalSent","mediaNodeStatusChanged","autoscaling"] +OPENVIDU_WEBHOOK_EVENTS=["sessionCreated","sessionDestroyed","participantJoined","participantLeft","webrtcConnectionCreated","webrtcConnectionDestroyed","recordingStatusChanged","filterEventDispatched","signalSent","mediaNodeStatusChanged","autoscaling","mediaServerCrashed"] OPENVIDU_RECORDING=false OPENVIDU_RECORDING_DEBUG=false diff --git a/openvidu-server/src/test/java/io/openvidu/server/test/integration/config/IntegrationTestConfiguration.java b/openvidu-server/src/test/java/io/openvidu/server/test/integration/config/IntegrationTestConfiguration.java index 56b022b2..c55fa4d0 100644 --- a/openvidu-server/src/test/java/io/openvidu/server/test/integration/config/IntegrationTestConfiguration.java +++ b/openvidu-server/src/test/java/io/openvidu/server/test/integration/config/IntegrationTestConfiguration.java @@ -18,6 +18,7 @@ import org.powermock.reflect.Whitebox; import org.springframework.boot.test.context.TestConfiguration; import org.springframework.context.annotation.Bean; +import io.openvidu.server.kurento.core.KurentoSessionManager; import io.openvidu.server.kurento.kms.FixedOneKmsManager; import io.openvidu.server.kurento.kms.Kms; import io.openvidu.server.kurento.kms.KmsManager; @@ -33,7 +34,7 @@ public class IntegrationTestConfiguration { @Bean public KmsManager kmsManager() throws Exception { - final KmsManager spy = Mockito.spy(new FixedOneKmsManager()); + final KmsManager spy = Mockito.spy(new FixedOneKmsManager(new KurentoSessionManager())); doAnswer(invocation -> { List successfullyConnectedKmss = new ArrayList<>(); List kmsProperties = invocation.getArgument(0);