mirror of https://github.com/OpenVidu/openvidu.git
openvidu-server: some protected methods to public. KmsManager refactoring of handler
parent
44363dbde0
commit
6ee258966d
|
@ -181,7 +181,7 @@ public class CallDetailRecord {
|
|||
this.log(new CDREventSignal(sessionId, uniqueSessionId, from, to, type, data));
|
||||
}
|
||||
|
||||
protected void log(CDREvent event) {
|
||||
public void log(CDREvent event) {
|
||||
this.loggers.forEach(logger -> {
|
||||
logger.log(event);
|
||||
});
|
||||
|
|
|
@ -92,7 +92,7 @@ public class FixedOneKmsManager extends KmsManager {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void removeMediaNodeUponCrash(String mediaNodeId) {
|
||||
public void removeMediaNodeUponCrash(String mediaNodeId) {
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -312,73 +312,6 @@ public abstract class KmsManager {
|
|||
kurentoClientReconnectTimer.updateTimer();
|
||||
}
|
||||
|
||||
private void nodeCrashedHandler(Kms kms, boolean mustRemoveMediaNode) {
|
||||
|
||||
kms.setHasTriggeredNodeCrashedEvent(true);
|
||||
|
||||
final long timeOfKurentoDisconnection = kms.getTimeOfKurentoClientDisconnection();
|
||||
final List<String> affectedSessionIds = kms.getKurentoSessions().stream()
|
||||
.map(session -> session.getSessionId()).collect(Collectors.toUnmodifiableList());
|
||||
final List<String> affectedRecordingIds = kms.getActiveRecordings().stream()
|
||||
.map(entry -> entry.getKey()).collect(Collectors.toUnmodifiableList());
|
||||
|
||||
// 1. Send nodeCrashed webhook event
|
||||
String environmentId = getEnvironmentId(kms.getId());
|
||||
sessionEventsHandler.onMediaNodeCrashed(kms, environmentId, timeOfKurentoDisconnection,
|
||||
affectedSessionIds, affectedRecordingIds);
|
||||
|
||||
// 2. Remove Media Node from cluster if necessary
|
||||
if (mustRemoveMediaNode) {
|
||||
removeMediaNodeUponCrash(kms.getId());
|
||||
}
|
||||
|
||||
// 3. Close all sessions and recordings with reason "nodeCrashed"
|
||||
log.warn("Closing {} sessions hosted by Media Node {} with IP {}: {}", kms.getKurentoSessions().size(),
|
||||
kms.getId(), kms.getIp(), kms.getKurentoSessions().stream().map(s -> s.getSessionId())
|
||||
.collect(Collectors.joining(",", "[", "]")));
|
||||
try {
|
||||
// Flag the thread to skip remote operations to KMS
|
||||
RemoteOperationUtils.setToSkipRemoteOperations();
|
||||
sessionManager.closeAllSessionsAndRecordingsOfKms(kms, EndReason.nodeCrashed);
|
||||
} finally {
|
||||
RemoteOperationUtils.revertToRunRemoteOperations();
|
||||
}
|
||||
}
|
||||
|
||||
private void nodeRecoveredHandler(Kms kms) {
|
||||
log.info("According to Timer KMS with uri {} and KurentoClient [{}] is now reconnected", kms.getUri(),
|
||||
kms.getKurentoClient().toString());
|
||||
|
||||
kms.getKurentoClientReconnectTimer().cancelTimer();
|
||||
|
||||
final boolean mustTriggerNodeRecoveredEvent = kms.hasTriggeredNodeCrashedEvent();
|
||||
final long timeOfKurentoDisconnection = kms.getTimeOfKurentoClientDisconnection();
|
||||
|
||||
kms.setKurentoClientConnected(true, true);
|
||||
|
||||
if (kms.getKurentoSessions().isEmpty()) {
|
||||
log.info("There were no sessions in the KMS with uri {}. Nothing must be done", kms.getUri());
|
||||
} else {
|
||||
if (isNewKms(kms)) {
|
||||
log.warn("KMS with URI {} is a new KMS process. Resetting {} sessions: {}", kms.getUri(),
|
||||
kms.getKurentoSessions().size(), kms.getKurentoSessions().stream()
|
||||
.map(s -> s.getSessionId()).collect(Collectors.joining(",", "[", "]")));
|
||||
kms.getKurentoSessions().forEach(kSession -> {
|
||||
kSession.restartStatusInKurentoAfterReconnectionToNewKms(timeOfKurentoDisconnection);
|
||||
});
|
||||
} else {
|
||||
log.info("KMS with URI {} is the same process. Nothing must be done", kms.getUri());
|
||||
}
|
||||
}
|
||||
|
||||
if (mustTriggerNodeRecoveredEvent) {
|
||||
// Send nodeRecovered webhook event
|
||||
String environmentId = getEnvironmentId(kms.getId());
|
||||
long timeOfConnection = kms.getTimeOfKurentoClientConnection();
|
||||
sessionEventsHandler.onMediaNodeRecovered(kms, environmentId, timeOfConnection);
|
||||
}
|
||||
}
|
||||
|
||||
};
|
||||
}
|
||||
|
||||
|
@ -403,7 +336,7 @@ public abstract class KmsManager {
|
|||
public abstract void decrementActiveRecordings(RecordingProperties recordingProperties, String recordingId,
|
||||
Session session);
|
||||
|
||||
protected abstract void removeMediaNodeUponCrash(String mediaNodeId);
|
||||
public abstract void removeMediaNodeUponCrash(String mediaNodeId);
|
||||
|
||||
protected abstract String getEnvironmentId(String mediaNodeId);
|
||||
|
||||
|
@ -425,4 +358,71 @@ public abstract class KmsManager {
|
|||
+ RandomStringUtils.randomAlphanumeric(7);
|
||||
}
|
||||
|
||||
public void nodeCrashedHandler(Kms kms, boolean mustRemoveMediaNode) {
|
||||
|
||||
kms.setHasTriggeredNodeCrashedEvent(true);
|
||||
|
||||
final long timeOfKurentoDisconnection = kms.getTimeOfKurentoClientDisconnection();
|
||||
final List<String> affectedSessionIds = kms.getKurentoSessions().stream().map(session -> session.getSessionId())
|
||||
.collect(Collectors.toUnmodifiableList());
|
||||
final List<String> affectedRecordingIds = kms.getActiveRecordings().stream().map(entry -> entry.getKey())
|
||||
.collect(Collectors.toUnmodifiableList());
|
||||
|
||||
// 1. Send nodeCrashed webhook event
|
||||
String environmentId = getEnvironmentId(kms.getId());
|
||||
sessionEventsHandler.onMediaNodeCrashed(kms, environmentId, timeOfKurentoDisconnection, affectedSessionIds,
|
||||
affectedRecordingIds);
|
||||
|
||||
// 2. Remove Media Node from cluster if necessary
|
||||
if (mustRemoveMediaNode) {
|
||||
removeMediaNodeUponCrash(kms.getId());
|
||||
}
|
||||
|
||||
// 3. Close all sessions and recordings with reason "nodeCrashed"
|
||||
log.warn("Closing {} sessions hosted by Media Node {} with IP {}: {}", kms.getKurentoSessions().size(),
|
||||
kms.getId(), kms.getIp(), kms.getKurentoSessions().stream().map(s -> s.getSessionId())
|
||||
.collect(Collectors.joining(",", "[", "]")));
|
||||
try {
|
||||
// Flag the thread to skip remote operations to KMS
|
||||
RemoteOperationUtils.setToSkipRemoteOperations();
|
||||
sessionManager.closeAllSessionsAndRecordingsOfKms(kms, EndReason.nodeCrashed);
|
||||
} finally {
|
||||
RemoteOperationUtils.revertToRunRemoteOperations();
|
||||
}
|
||||
}
|
||||
|
||||
public void nodeRecoveredHandler(Kms kms) {
|
||||
log.info("According to Timer KMS with uri {} and KurentoClient [{}] is now reconnected", kms.getUri(),
|
||||
kms.getKurentoClient().toString());
|
||||
|
||||
kms.getKurentoClientReconnectTimer().cancelTimer();
|
||||
|
||||
final boolean mustTriggerNodeRecoveredEvent = kms.hasTriggeredNodeCrashedEvent();
|
||||
final long timeOfKurentoDisconnection = kms.getTimeOfKurentoClientDisconnection();
|
||||
|
||||
kms.setKurentoClientConnected(true, true);
|
||||
|
||||
if (kms.getKurentoSessions().isEmpty()) {
|
||||
log.info("There were no sessions in the KMS with uri {}. Nothing must be done", kms.getUri());
|
||||
} else {
|
||||
if (isNewKms(kms)) {
|
||||
log.warn("KMS with URI {} is a new KMS process. Resetting {} sessions: {}", kms.getUri(),
|
||||
kms.getKurentoSessions().size(), kms.getKurentoSessions().stream().map(s -> s.getSessionId())
|
||||
.collect(Collectors.joining(",", "[", "]")));
|
||||
kms.getKurentoSessions().forEach(kSession -> {
|
||||
kSession.restartStatusInKurentoAfterReconnectionToNewKms(timeOfKurentoDisconnection);
|
||||
});
|
||||
} else {
|
||||
log.info("KMS with URI {} is the same process. Nothing must be done", kms.getUri());
|
||||
}
|
||||
}
|
||||
|
||||
if (mustTriggerNodeRecoveredEvent) {
|
||||
// Send nodeRecovered webhook event
|
||||
String environmentId = getEnvironmentId(kms.getId());
|
||||
long timeOfConnection = kms.getTimeOfKurentoClientConnection();
|
||||
sessionEventsHandler.onMediaNodeRecovered(kms, environmentId, timeOfConnection);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue