openvidu-server: minor refactoring of KmsManager KurentoClient events handler

pull/711/head
pabloFuente 2022-03-25 14:01:10 +01:00
parent 888e45fe4d
commit 2a3e05ef80
1 changed files with 76 additions and 64 deletions

View File

@ -252,52 +252,28 @@ public abstract class KmsManager {
"OpenVidu Server [{}] could not reconnect to Media Node {} with IP {} in {} seconds. Media Node crashed", "OpenVidu Server [{}] could not reconnect to Media Node {} with IP {} in {} seconds. Media Node crashed",
kms.getKurentoClient().toString(), kms.getId(), kms.getIp(), kms.getKurentoClient().toString(), kms.getId(), kms.getIp(),
(INTERVAL_WAIT_MS * RECONNECTION_LOOPS / 1000)); (INTERVAL_WAIT_MS * RECONNECTION_LOOPS / 1000));
mustRemoveMediaNode = !mustRetryReconnection;
nodeCrashedHandler(kms, mustRetryReconnection);
kms.setHasTriggeredNodeCrashedEvent(true);
final long timeOfKurentoDisconnection = kms.getTimeOfKurentoClientDisconnection();
final List<String> affectedSessionIds = kms.getKurentoSessions().stream()
.map(session -> session.getSessionId()).collect(Collectors.toUnmodifiableList());
final List<String> affectedRecordingIds = kms.getActiveRecordings().stream()
.map(entry -> entry.getKey()).collect(Collectors.toUnmodifiableList());
// 1. Send nodeCrashed webhook event
String environmentId = getEnvironmentId(kms.getId());
sessionEventsHandler.onMediaNodeCrashed(kms, environmentId, timeOfKurentoDisconnection,
affectedSessionIds, affectedRecordingIds);
// 2. Remove Media Node from cluster
if (!mustRetryReconnection) {
mustRemoveMediaNode = false;
removeMediaNodeUponCrash(kms.getId());
}
// 3. Close all sessions and recordings with reason "nodeCrashed"
log.warn("Closing {} sessions hosted by Media Node {} with IP {}: {}",
kms.getKurentoSessions().size(), kms.getId(), kms.getIp(),
kms.getKurentoSessions().stream().map(s -> s.getSessionId())
.collect(Collectors.joining(",", "[", "]")));
try {
// Flag the thread to skip remote operations to KMS
RemoteOperationUtils.setToSkipRemoteOperations();
sessionManager.closeAllSessionsAndRecordingsOfKms(kms, EndReason.nodeCrashed);
} finally {
RemoteOperationUtils.revertToRunRemoteOperations();
}
} else { } else {
log.error( log.error(
"Retry error. OpenVidu Server [{}] could not connect to Media Node {} with IP {} in {} seconds", "Retry error. OpenVidu Server [{}] could not connect to Media Node {} with IP {} in {} seconds",
kms.getKurentoClient().toString(), kms.getId(), kms.getIp(), kms.getKurentoClient().toString(), kms.getId(), kms.getIp(),
(INTERVAL_WAIT_MS * RECONNECTION_LOOPS / 1000)); (INTERVAL_WAIT_MS * RECONNECTION_LOOPS / 1000));
} }
if (mustRetryReconnection) { if (mustRetryReconnection) {
log.info( log.info(
"Retrying reconnection to Media Node {} with IP {}. {} seconds consumed of a maximum of {}", "Retrying reconnection to Media Node {} with IP {}. {} seconds consumed of a maximum of {}",
kms.getId(), kms.getIp(), accumulatedTimeout, kms.getId(), kms.getIp(), accumulatedTimeout,
openviduConfig.getReconnectionTimeout()); openviduConfig.getReconnectionTimeout());
disconnectionHandler(kms, accumulatedTimeout); disconnectionHandler(kms, accumulatedTimeout);
} else { } else {
log.warn( log.warn(
"Reconnection process to Media Node {} with IP {} aborted. {} seconds have been consumed and the upper limit is {} seconds", "Reconnection process to Media Node {} with IP {} aborted. {} seconds have been consumed and the upper limit is {} seconds",
kms.getId(), kms.getIp(), accumulatedTimeout, kms.getId(), kms.getIp(), accumulatedTimeout,
@ -305,6 +281,7 @@ public abstract class KmsManager {
if (mustRemoveMediaNode) { if (mustRemoveMediaNode) {
removeMediaNodeUponCrash(kms.getId()); removeMediaNodeUponCrash(kms.getId());
} }
} }
} else { } else {
@ -326,39 +303,7 @@ public abstract class KmsManager {
return; return;
} }
log.info("According to Timer KMS with uri {} and KurentoClient [{}] is now reconnected", nodeRecoveredHandler(kms);
kms.getUri(), kms.getKurentoClient().toString());
kms.getKurentoClientReconnectTimer().cancelTimer();
final boolean mustTriggerNodeRecoveredEvent = kms.hasTriggeredNodeCrashedEvent();
final long timeOfKurentoDisconnection = kms.getTimeOfKurentoClientDisconnection();
kms.setKurentoClientConnected(true, true);
if (kms.getKurentoSessions().isEmpty()) {
log.info("There were no sessions in the KMS with uri {}. Nothing must be done",
kms.getUri());
} else {
if (isNewKms(kms)) {
log.warn("KMS with URI {} is a new KMS process. Resetting {} sessions: {}",
kms.getUri(), kms.getKurentoSessions().size(), kms.getKurentoSessions().stream()
.map(s -> s.getSessionId()).collect(Collectors.joining(",", "[", "]")));
kms.getKurentoSessions().forEach(kSession -> {
kSession.restartStatusInKurentoAfterReconnectionToNewKms(
timeOfKurentoDisconnection);
});
} else {
log.info("KMS with URI {} is the same process. Nothing must be done", kms.getUri());
}
}
if (mustTriggerNodeRecoveredEvent) {
// Send nodeRecovered webhook event
String environmentId = getEnvironmentId(kms.getId());
long timeOfConnection = kms.getTimeOfKurentoClientConnection();
sessionEventsHandler.onMediaNodeRecovered(kms, environmentId, timeOfConnection);
}
} }
}, () -> Long.valueOf(INTERVAL_WAIT_MS)); // Try 2 times per second }, () -> Long.valueOf(INTERVAL_WAIT_MS)); // Try 2 times per second
@ -367,6 +312,73 @@ public abstract class KmsManager {
kurentoClientReconnectTimer.updateTimer(); kurentoClientReconnectTimer.updateTimer();
} }
private void nodeCrashedHandler(Kms kms, boolean mustRetryReconnection) {
kms.setHasTriggeredNodeCrashedEvent(true);
final long timeOfKurentoDisconnection = kms.getTimeOfKurentoClientDisconnection();
final List<String> affectedSessionIds = kms.getKurentoSessions().stream()
.map(session -> session.getSessionId()).collect(Collectors.toUnmodifiableList());
final List<String> affectedRecordingIds = kms.getActiveRecordings().stream()
.map(entry -> entry.getKey()).collect(Collectors.toUnmodifiableList());
// 1. Send nodeCrashed webhook event
String environmentId = getEnvironmentId(kms.getId());
sessionEventsHandler.onMediaNodeCrashed(kms, environmentId, timeOfKurentoDisconnection,
affectedSessionIds, affectedRecordingIds);
// 2. Remove Media Node from cluster
if (!mustRetryReconnection) {
removeMediaNodeUponCrash(kms.getId());
}
// 3. Close all sessions and recordings with reason "nodeCrashed"
log.warn("Closing {} sessions hosted by Media Node {} with IP {}: {}", kms.getKurentoSessions().size(),
kms.getId(), kms.getIp(), kms.getKurentoSessions().stream().map(s -> s.getSessionId())
.collect(Collectors.joining(",", "[", "]")));
try {
// Flag the thread to skip remote operations to KMS
RemoteOperationUtils.setToSkipRemoteOperations();
sessionManager.closeAllSessionsAndRecordingsOfKms(kms, EndReason.nodeCrashed);
} finally {
RemoteOperationUtils.revertToRunRemoteOperations();
}
}
private void nodeRecoveredHandler(Kms kms) {
log.info("According to Timer KMS with uri {} and KurentoClient [{}] is now reconnected", kms.getUri(),
kms.getKurentoClient().toString());
kms.getKurentoClientReconnectTimer().cancelTimer();
final boolean mustTriggerNodeRecoveredEvent = kms.hasTriggeredNodeCrashedEvent();
final long timeOfKurentoDisconnection = kms.getTimeOfKurentoClientDisconnection();
kms.setKurentoClientConnected(true, true);
if (kms.getKurentoSessions().isEmpty()) {
log.info("There were no sessions in the KMS with uri {}. Nothing must be done", kms.getUri());
} else {
if (isNewKms(kms)) {
log.warn("KMS with URI {} is a new KMS process. Resetting {} sessions: {}", kms.getUri(),
kms.getKurentoSessions().size(), kms.getKurentoSessions().stream()
.map(s -> s.getSessionId()).collect(Collectors.joining(",", "[", "]")));
kms.getKurentoSessions().forEach(kSession -> {
kSession.restartStatusInKurentoAfterReconnectionToNewKms(timeOfKurentoDisconnection);
});
} else {
log.info("KMS with URI {} is the same process. Nothing must be done", kms.getUri());
}
}
if (mustTriggerNodeRecoveredEvent) {
// Send nodeRecovered webhook event
String environmentId = getEnvironmentId(kms.getId());
long timeOfConnection = kms.getTimeOfKurentoClientConnection();
sessionEventsHandler.onMediaNodeRecovered(kms, environmentId, timeOfConnection);
}
}
}; };
} }