From 6ee258966d945a4de7c422143488c507e76a786c Mon Sep 17 00:00:00 2001 From: pabloFuente Date: Fri, 25 Mar 2022 15:21:08 +0100 Subject: [PATCH] openvidu-server: some protected methods to public. KmsManager refactoring of handler --- .../openvidu/server/cdr/CallDetailRecord.java | 2 +- .../kurento/kms/FixedOneKmsManager.java | 2 +- .../server/kurento/kms/KmsManager.java | 136 +++++++++--------- 3 files changed, 70 insertions(+), 70 deletions(-) diff --git a/openvidu-server/src/main/java/io/openvidu/server/cdr/CallDetailRecord.java b/openvidu-server/src/main/java/io/openvidu/server/cdr/CallDetailRecord.java index f4f05b24..6de9c2e1 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/cdr/CallDetailRecord.java +++ b/openvidu-server/src/main/java/io/openvidu/server/cdr/CallDetailRecord.java @@ -181,7 +181,7 @@ public class CallDetailRecord { this.log(new CDREventSignal(sessionId, uniqueSessionId, from, to, type, data)); } - protected void log(CDREvent event) { + public void log(CDREvent event) { this.loggers.forEach(logger -> { logger.log(event); }); diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/FixedOneKmsManager.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/FixedOneKmsManager.java index 823e2ecd..3f80b2af 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/FixedOneKmsManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/FixedOneKmsManager.java @@ -92,7 +92,7 @@ public class FixedOneKmsManager extends KmsManager { } @Override - protected void removeMediaNodeUponCrash(String mediaNodeId) { + public void removeMediaNodeUponCrash(String mediaNodeId) { } @Override diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/KmsManager.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/KmsManager.java index ab7c80b6..4c1068df 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/KmsManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/KmsManager.java @@ -312,73 +312,6 @@ public abstract class KmsManager { kurentoClientReconnectTimer.updateTimer(); } - private void nodeCrashedHandler(Kms kms, boolean mustRemoveMediaNode) { - - kms.setHasTriggeredNodeCrashedEvent(true); - - final long timeOfKurentoDisconnection = kms.getTimeOfKurentoClientDisconnection(); - final List affectedSessionIds = kms.getKurentoSessions().stream() - .map(session -> session.getSessionId()).collect(Collectors.toUnmodifiableList()); - final List affectedRecordingIds = kms.getActiveRecordings().stream() - .map(entry -> entry.getKey()).collect(Collectors.toUnmodifiableList()); - - // 1. Send nodeCrashed webhook event - String environmentId = getEnvironmentId(kms.getId()); - sessionEventsHandler.onMediaNodeCrashed(kms, environmentId, timeOfKurentoDisconnection, - affectedSessionIds, affectedRecordingIds); - - // 2. Remove Media Node from cluster if necessary - if (mustRemoveMediaNode) { - removeMediaNodeUponCrash(kms.getId()); - } - - // 3. Close all sessions and recordings with reason "nodeCrashed" - log.warn("Closing {} sessions hosted by Media Node {} with IP {}: {}", kms.getKurentoSessions().size(), - kms.getId(), kms.getIp(), kms.getKurentoSessions().stream().map(s -> s.getSessionId()) - .collect(Collectors.joining(",", "[", "]"))); - try { - // Flag the thread to skip remote operations to KMS - RemoteOperationUtils.setToSkipRemoteOperations(); - sessionManager.closeAllSessionsAndRecordingsOfKms(kms, EndReason.nodeCrashed); - } finally { - RemoteOperationUtils.revertToRunRemoteOperations(); - } - } - - private void nodeRecoveredHandler(Kms kms) { - log.info("According to Timer KMS with uri {} and KurentoClient [{}] is now reconnected", kms.getUri(), - kms.getKurentoClient().toString()); - - kms.getKurentoClientReconnectTimer().cancelTimer(); - - final boolean mustTriggerNodeRecoveredEvent = kms.hasTriggeredNodeCrashedEvent(); - final long timeOfKurentoDisconnection = kms.getTimeOfKurentoClientDisconnection(); - - kms.setKurentoClientConnected(true, true); - - if (kms.getKurentoSessions().isEmpty()) { - log.info("There were no sessions in the KMS with uri {}. Nothing must be done", kms.getUri()); - } else { - if (isNewKms(kms)) { - log.warn("KMS with URI {} is a new KMS process. Resetting {} sessions: {}", kms.getUri(), - kms.getKurentoSessions().size(), kms.getKurentoSessions().stream() - .map(s -> s.getSessionId()).collect(Collectors.joining(",", "[", "]"))); - kms.getKurentoSessions().forEach(kSession -> { - kSession.restartStatusInKurentoAfterReconnectionToNewKms(timeOfKurentoDisconnection); - }); - } else { - log.info("KMS with URI {} is the same process. Nothing must be done", kms.getUri()); - } - } - - if (mustTriggerNodeRecoveredEvent) { - // Send nodeRecovered webhook event - String environmentId = getEnvironmentId(kms.getId()); - long timeOfConnection = kms.getTimeOfKurentoClientConnection(); - sessionEventsHandler.onMediaNodeRecovered(kms, environmentId, timeOfConnection); - } - } - }; } @@ -403,7 +336,7 @@ public abstract class KmsManager { public abstract void decrementActiveRecordings(RecordingProperties recordingProperties, String recordingId, Session session); - protected abstract void removeMediaNodeUponCrash(String mediaNodeId); + public abstract void removeMediaNodeUponCrash(String mediaNodeId); protected abstract String getEnvironmentId(String mediaNodeId); @@ -425,4 +358,71 @@ public abstract class KmsManager { + RandomStringUtils.randomAlphanumeric(7); } + public void nodeCrashedHandler(Kms kms, boolean mustRemoveMediaNode) { + + kms.setHasTriggeredNodeCrashedEvent(true); + + final long timeOfKurentoDisconnection = kms.getTimeOfKurentoClientDisconnection(); + final List affectedSessionIds = kms.getKurentoSessions().stream().map(session -> session.getSessionId()) + .collect(Collectors.toUnmodifiableList()); + final List affectedRecordingIds = kms.getActiveRecordings().stream().map(entry -> entry.getKey()) + .collect(Collectors.toUnmodifiableList()); + + // 1. Send nodeCrashed webhook event + String environmentId = getEnvironmentId(kms.getId()); + sessionEventsHandler.onMediaNodeCrashed(kms, environmentId, timeOfKurentoDisconnection, affectedSessionIds, + affectedRecordingIds); + + // 2. Remove Media Node from cluster if necessary + if (mustRemoveMediaNode) { + removeMediaNodeUponCrash(kms.getId()); + } + + // 3. Close all sessions and recordings with reason "nodeCrashed" + log.warn("Closing {} sessions hosted by Media Node {} with IP {}: {}", kms.getKurentoSessions().size(), + kms.getId(), kms.getIp(), kms.getKurentoSessions().stream().map(s -> s.getSessionId()) + .collect(Collectors.joining(",", "[", "]"))); + try { + // Flag the thread to skip remote operations to KMS + RemoteOperationUtils.setToSkipRemoteOperations(); + sessionManager.closeAllSessionsAndRecordingsOfKms(kms, EndReason.nodeCrashed); + } finally { + RemoteOperationUtils.revertToRunRemoteOperations(); + } + } + + public void nodeRecoveredHandler(Kms kms) { + log.info("According to Timer KMS with uri {} and KurentoClient [{}] is now reconnected", kms.getUri(), + kms.getKurentoClient().toString()); + + kms.getKurentoClientReconnectTimer().cancelTimer(); + + final boolean mustTriggerNodeRecoveredEvent = kms.hasTriggeredNodeCrashedEvent(); + final long timeOfKurentoDisconnection = kms.getTimeOfKurentoClientDisconnection(); + + kms.setKurentoClientConnected(true, true); + + if (kms.getKurentoSessions().isEmpty()) { + log.info("There were no sessions in the KMS with uri {}. Nothing must be done", kms.getUri()); + } else { + if (isNewKms(kms)) { + log.warn("KMS with URI {} is a new KMS process. Resetting {} sessions: {}", kms.getUri(), + kms.getKurentoSessions().size(), kms.getKurentoSessions().stream().map(s -> s.getSessionId()) + .collect(Collectors.joining(",", "[", "]"))); + kms.getKurentoSessions().forEach(kSession -> { + kSession.restartStatusInKurentoAfterReconnectionToNewKms(timeOfKurentoDisconnection); + }); + } else { + log.info("KMS with URI {} is the same process. Nothing must be done", kms.getUri()); + } + } + + if (mustTriggerNodeRecoveredEvent) { + // Send nodeRecovered webhook event + String environmentId = getEnvironmentId(kms.getId()); + long timeOfConnection = kms.getTimeOfKurentoClientConnection(); + sessionEventsHandler.onMediaNodeRecovered(kms, environmentId, timeOfConnection); + } + } + }