openvidu-server: add affected sessions and recordings to nodeCrashed event

pull/621/head
pabloFuente 2021-04-19 18:23:59 +02:00
parent fb1a5c1d51
commit c1354b5bb8
4 changed files with 54 additions and 14 deletions

View File

@ -1,5 +1,9 @@
package io.openvidu.server.cdr; package io.openvidu.server.cdr;
import java.util.ArrayList;
import java.util.List;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject; import com.google.gson.JsonObject;
import io.openvidu.server.kurento.kms.Kms; import io.openvidu.server.kurento.kms.Kms;
@ -7,23 +11,37 @@ import io.openvidu.server.kurento.kms.Kms;
public class CDREventNodeCrashed extends CDREvent { public class CDREventNodeCrashed extends CDREvent {
private Kms kms; private Kms kms;
private List<String> sessionIds = new ArrayList<>();
private List<String> recordingIds = new ArrayList<>();
private String environmentId; private String environmentId;
public CDREventNodeCrashed(CDREventName eventName, Long timeStamp, Kms kms, String environmentId) { public CDREventNodeCrashed(CDREventName eventName, Long timeStamp, Kms kms, String environmentId) {
super(eventName, null, null, timeStamp); super(eventName, null, null, timeStamp);
this.kms = kms; this.kms = kms;
kms.getKurentoSessions().forEach(session -> sessionIds.add(session.getSessionId()));
kms.getActiveRecordings().forEach(entry -> recordingIds.add(entry.getKey()));
this.environmentId = environmentId; this.environmentId = environmentId;
} }
@Override @Override
public JsonObject toJson() { public JsonObject toJson() {
JsonObject json = super.toJson(); JsonObject json = super.toJson();
json.addProperty("id", this.kms.getId()); json.addProperty("id", this.kms.getId());
if (this.environmentId != null) { if (this.environmentId != null) {
json.addProperty("environmentId", this.environmentId); json.addProperty("environmentId", this.environmentId);
} }
json.addProperty("ip", this.kms.getIp()); json.addProperty("ip", this.kms.getIp());
json.addProperty("uri", this.kms.getUri()); json.addProperty("uri", this.kms.getUri());
JsonArray sIds = new JsonArray();
this.sessionIds.forEach(sId -> sIds.add(sId));
json.add("sessionIds", sIds);
JsonArray rIds = new JsonArray();
this.recordingIds.forEach(rId -> rIds.add(rId));
json.add("recordingIds", rIds);
return json; return json;
} }

View File

@ -631,6 +631,10 @@ public class SessionEventsHandler {
recordingsToSendClientEvents.put(recording.getSessionId(), recording); recordingsToSendClientEvents.put(recording.getSessionId(), recording);
} }
/**
* This handler must be called before cleaning any sessions or recordings hosted
* by the crashed Media Node
*/
public void onMediaNodeCrashed(Kms kms, long timeOfKurentoDisconnection) { public void onMediaNodeCrashed(Kms kms, long timeOfKurentoDisconnection) {
} }

View File

@ -55,6 +55,7 @@ import io.openvidu.server.cdr.CDREventRecording;
import io.openvidu.server.config.OpenviduConfig; import io.openvidu.server.config.OpenviduConfig;
import io.openvidu.server.coturn.CoturnCredentialsService; import io.openvidu.server.coturn.CoturnCredentialsService;
import io.openvidu.server.kurento.endpoint.EndpointType; import io.openvidu.server.kurento.endpoint.EndpointType;
import io.openvidu.server.kurento.kms.Kms;
import io.openvidu.server.recording.service.RecordingManager; import io.openvidu.server.recording.service.RecordingManager;
import io.openvidu.server.utils.FormatChecker; import io.openvidu.server.utils.FormatChecker;
import io.openvidu.server.utils.GeoLocation; import io.openvidu.server.utils.GeoLocation;
@ -642,4 +643,33 @@ public abstract class SessionManager {
} }
} }
public void closeAllSessionsAndRecordingsOfKms(Kms kms, EndReason reason) {
// Close all active sessions
kms.getKurentoSessions().forEach(kSession -> {
this.closeSession(kSession.getSessionId(), reason);
});
// Close all non active sessions configured with this Media Node
this.closeNonActiveSessions(sessionNotActive -> {
return (sessionNotActive.getSessionProperties().mediaNode() != null
&& kms.getId().equals(sessionNotActive.getSessionProperties().mediaNode()));
});
// Stop all external recordings
kms.getActiveRecordings().forEach(recordingIdSessionId -> {
final String recordingId = recordingIdSessionId.getKey();
final String sessionId = recordingIdSessionId.getValue();
Session session = this.getSession(sessionId);
if (session != null && !session.isClosed()) {
// This is a recording of a Session hosted on a different Media Node
try {
this.recordingManager.stopRecording(session, null, RecordingManager.finalReason(reason));
} catch (OpenViduException e) {
log.error("Error stopping external recording {} of session {} in Media Node {}: {}", recordingId,
sessionId, kms.getId(), e.getMessage());
}
}
});
}
} }

View File

@ -217,11 +217,11 @@ public abstract class KmsManager {
final long timeOfKurentoDisconnection = kms.getTimeOfKurentoClientDisconnection(); final long timeOfKurentoDisconnection = kms.getTimeOfKurentoClientDisconnection();
sessionEventsHandler.onMediaNodeCrashed(kms, timeOfKurentoDisconnection); sessionEventsHandler.onMediaNodeCrashed(kms, timeOfKurentoDisconnection);
// Close all session with reason "nodeCrashed" // Close all sessions and recordings with reason "nodeCrashed"
log.warn("Closing {} sessions hosted by KMS with uri {}: {}", kms.getKurentoSessions().size(), log.warn("Closing {} sessions hosted by KMS with uri {}: {}", kms.getKurentoSessions().size(),
kms.getUri(), kms.getKurentoSessions().stream().map(s -> s.getSessionId()) kms.getUri(), kms.getKurentoSessions().stream().map(s -> s.getSessionId())
.collect(Collectors.joining(",", "[", "]"))); .collect(Collectors.joining(",", "[", "]")));
closeAllSessionsInKms(kms, EndReason.nodeCrashed); sessionManager.closeAllSessionsAndRecordingsOfKms(kms, EndReason.nodeCrashed);
// Remove Media Node // Remove Media Node
log.warn("Removing Media Node {} after crash", kms.getId()); log.warn("Removing Media Node {} after crash", kms.getId());
@ -315,18 +315,6 @@ public abstract class KmsManager {
} }
} }
public void closeAllSessionsInKms(Kms kms, EndReason reason) {
// Close all active sessions
kms.getKurentoSessions().forEach(kSession -> {
sessionManager.closeSession(kSession.getSessionId(), reason);
});
// Close all non active sessions configured with this Media Node
sessionManager.closeNonActiveSessions(sessionNotActive -> {
return (sessionNotActive.getSessionProperties().mediaNode() != null
&& kms.getId().equals(sessionNotActive.getSessionProperties().mediaNode()));
});
}
public abstract List<Kms> initializeKurentoClients(List<KmsProperties> kmsProperties, boolean disconnectUponFailure) public abstract List<Kms> initializeKurentoClients(List<KmsProperties> kmsProperties, boolean disconnectUponFailure)
throws Exception; throws Exception;