diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/FixedOneKmsManager.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/FixedOneKmsManager.java index adfdb53b..6d4cdbb2 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/FixedOneKmsManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/FixedOneKmsManager.java @@ -86,6 +86,10 @@ public class FixedOneKmsManager extends KmsManager { } } + @Override + protected void removeMediaNodeUponCrash(String mediaNodeId) { + } + @Override @PostConstruct protected void postConstructInitKurentoClients() { diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/KmsManager.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/KmsManager.java index 6918abf9..c723ae9a 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/KmsManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/KmsManager.java @@ -113,7 +113,7 @@ public abstract class KmsManager { final protected Map kmss = new ConcurrentHashMap<>(); - private SessionManager sessionManager; + protected SessionManager sessionManager; public KmsManager(SessionManager sessionManager) { this.sessionManager = sessionManager; @@ -217,13 +217,15 @@ public abstract class KmsManager { final long timeOfKurentoDisconnection = kms.getTimeOfKurentoClientDisconnection(); sessionEventsHandler.onMediaNodeCrashed(kms, timeOfKurentoDisconnection); + // Close all session with reason "nodeCrashed" log.warn("Closing {} sessions hosted by KMS with uri {}: {}", kms.getKurentoSessions().size(), kms.getUri(), kms.getKurentoSessions().stream().map(s -> s.getSessionId()) .collect(Collectors.joining(",", "[", "]"))); + closeAllSessionsInKms(kms, EndReason.nodeCrashed); - kms.getKurentoSessions().forEach(kSession -> { - sessionManager.closeSession(kSession.getSessionId(), EndReason.nodeCrashed); - }); + // Remove Media Node + log.warn("Removing Media Node {} after crash", kms.getId()); + removeMediaNodeUponCrash(kms.getId()); } else { @@ -313,6 +315,18 @@ public abstract class KmsManager { } } + public void closeAllSessionsInKms(Kms kms, EndReason reason) { + // Close all active sessions + kms.getKurentoSessions().forEach(kSession -> { + sessionManager.closeSession(kSession.getSessionId(), reason); + }); + // Close all non active sessions configured with this Media Node + sessionManager.closeNonActiveSessions(sessionNotActive -> { + return (sessionNotActive.getSessionProperties().mediaNode() != null + && kms.getId().equals(sessionNotActive.getSessionProperties().mediaNode())); + }); + } + public abstract List initializeKurentoClients(List kmsProperties, boolean disconnectUponFailure) throws Exception; @@ -322,6 +336,8 @@ public abstract class KmsManager { public abstract void decrementActiveRecordings(RecordingProperties recordingProperties, String recordingId, Session session); + protected abstract void removeMediaNodeUponCrash(String mediaNodeId); + @PostConstruct protected abstract void postConstructInitKurentoClients();