From 5cb59193eeed2c3eca8538e2cf0ec6b8b5924828 Mon Sep 17 00:00:00 2001 From: pabloFuente Date: Wed, 12 Jun 2019 18:29:41 +0200 Subject: [PATCH] openvidu-server: fixed kmsDisconnectionTime for recordings --- .../server/kurento/core/KurentoParticipant.java | 12 ++++++------ .../server/kurento/core/KurentoSession.java | 10 +++++----- .../kurento/core/KurentoSessionManager.java | 2 +- .../server/kurento/kms/FixedOneKmsManager.java | 5 ++++- .../openvidu/server/kurento/kms/KmsManager.java | 4 ---- .../service/ComposedRecordingService.java | 15 ++++++++++++--- .../recording/service/RecordingManager.java | 10 +++++----- .../service/SingleStreamRecordingService.java | 10 ++++++---- 8 files changed, 39 insertions(+), 29 deletions(-) diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoParticipant.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoParticipant.java index fdc7692b..e697067f 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoParticipant.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoParticipant.java @@ -187,10 +187,10 @@ public class KurentoParticipant extends Participant { return sdpResponse; } - public void unpublishMedia(EndReason reason) { + public void unpublishMedia(EndReason reason, long kmsDisconnectionTime) { log.info("PARTICIPANT {}: unpublishing media stream from room {}", this.getParticipantPublicId(), this.session.getSessionId()); - releasePublisherEndpoint(reason); + releasePublisherEndpoint(reason, kmsDisconnectionTime); this.publisher = new PublisherEndpoint(webParticipant, this, this.getParticipantPublicId(), this.getPipeline(), this.openviduConfig); log.info("PARTICIPANT {}: released publisher endpoint and left it initialized (ready for future streaming)", @@ -298,7 +298,7 @@ public class KurentoParticipant extends Participant { } } - public void close(EndReason reason, boolean definitelyClosed) { + public void close(EndReason reason, boolean definitelyClosed, long kmsDisconnectionTime) { log.debug("PARTICIPANT {}: Closing user", this.getParticipantPublicId()); if (isClosed()) { log.warn("PARTICIPANT {}: Already closed", this.getParticipantPublicId()); @@ -319,7 +319,7 @@ public class KurentoParticipant extends Participant { } } this.subscribers.clear(); - releasePublisherEndpoint(reason); + releasePublisherEndpoint(reason, kmsDisconnectionTime); } /** @@ -364,7 +364,7 @@ public class KurentoParticipant extends Participant { session.sendMediaError(this.getParticipantPrivateId(), desc); } - private void releasePublisherEndpoint(EndReason reason) { + private void releasePublisherEndpoint(EndReason reason, long kmsDisconnectionTime) { if (publisher != null && publisher.getEndpoint() != null) { // Remove streamId from publisher's map @@ -372,7 +372,7 @@ public class KurentoParticipant extends Participant { if (this.openviduConfig.isRecordingModuleEnabled() && this.recordingManager.sessionIsBeingRecorded(session.getSessionId())) { - this.recordingManager.stopOneIndividualStreamRecording(session, this.getPublisherStreamId()); + this.recordingManager.stopOneIndividualStreamRecording(session, this.getPublisherStreamId(), kmsDisconnectionTime); } publisher.unregisterErrorListeners(); diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSession.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSession.java index 7eaa47de..760adbdb 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSession.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSession.java @@ -137,7 +137,7 @@ public class KurentoSession extends Session { log.info("PARTICIPANT {}: Leaving session {}", participant.getParticipantPublicId(), this.sessionId); this.removeParticipant(participant, reason); - participant.close(reason, true); + participant.close(reason, true, 0); } @Override @@ -146,7 +146,7 @@ public class KurentoSession extends Session { for (Participant participant : participants.values()) { ((KurentoParticipant) participant).releaseAllFilters(); - ((KurentoParticipant) participant).close(reason, true); + ((KurentoParticipant) participant).close(reason, true, 0); } participants.clear(); @@ -288,13 +288,13 @@ public class KurentoSession extends Session { return this.publishedStreamIds.get(streamId); } - public void restartStatusInKurento() { + public void restartStatusInKurento(long kmsDisconnectionTime) { log.info("Reseting process: reseting remote media objects for active session {}", this.sessionId); // Stop recording if session is being recorded if (recordingManager.sessionIsBeingRecorded(this.sessionId)) { - this.recordingManager.forceStopRecording(this, EndReason.mediaServerDisconnect); + this.recordingManager.forceStopRecording(this, EndReason.mediaServerDisconnect, kmsDisconnectionTime); } // Close all MediaEndpoints of participants @@ -302,7 +302,7 @@ public class KurentoSession extends Session { KurentoParticipant kParticipant = (KurentoParticipant) p; final boolean wasStreaming = kParticipant.isStreaming(); kParticipant.releaseAllFilters(); - kParticipant.close(EndReason.mediaServerDisconnect, false); + kParticipant.close(EndReason.mediaServerDisconnect, false, kmsDisconnectionTime); if (wasStreaming) { kurentoSessionHandler.onUnpublishMedia(kParticipant, this.getParticipants(), null, null, null, EndReason.mediaServerDisconnect); diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSessionManager.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSessionManager.java index 02feebf5..3700c2f5 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSessionManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSessionManager.java @@ -339,7 +339,7 @@ public class KurentoSessionManager extends SessionManager { throw new OpenViduException(Code.USER_NOT_STREAMING_ERROR_CODE, "Participant '" + participant.getParticipantPublicId() + "' is not streaming media"); } - kParticipant.unpublishMedia(reason); + kParticipant.unpublishMedia(reason, 0); session.cancelPublisher(participant, reason); Set participants = session.getParticipants(); diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/FixedOneKmsManager.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/FixedOneKmsManager.java index b2bbc690..7f456914 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/FixedOneKmsManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/FixedOneKmsManager.java @@ -41,9 +41,12 @@ public class FixedOneKmsManager extends KmsManager { // Different KMS. Reset sessions status (no Publisher or SUbscriber endpoints) log.warn("Kurento Client reconnected to a different KMS instance, with uri {}", kmsWsUri); log.warn("Updating all webrtc endpoints for active sessions"); + final Kms kms = ((KurentoSessionManager) sessionManager).getKmsManager().kmss.get(kmsWsUri); + final long timeOfKurentoDisconnection = kms.getTimeOfKurentoClientDisconnection(); sessionManager.getSessions().forEach(s -> { - ((KurentoSession) s).restartStatusInKurento(); + ((KurentoSession) s).restartStatusInKurento(timeOfKurentoDisconnection); }); + kms.setTimeOfKurentoClientDisconnection(0); } else { // Same KMS. We may infer that openvidu-server/KMS connection has been lost, but // not the clients/KMS connections diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/KmsManager.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/KmsManager.java index c37c13fb..121c613b 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/KmsManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/KmsManager.java @@ -117,10 +117,6 @@ public abstract class KmsManager { return this.kmss.get(kms.getUri()).isKurentoClientConnected(); } - public long getTimeOfKurentoClientDisconnection(Kms kms) { - return this.kmss.get(kms.getUri()).getTimeOfKurentoClientDisconnection(); - } - public void setKurentoClientConnectedToKms(String kmsUri, boolean isConnected) { this.kmss.get(kmsUri).setKurentoClientConnected(isConnected); } diff --git a/openvidu-server/src/main/java/io/openvidu/server/recording/service/ComposedRecordingService.java b/openvidu-server/src/main/java/io/openvidu/server/recording/service/ComposedRecordingService.java index 7d0c5591..b0775f96 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/recording/service/ComposedRecordingService.java +++ b/openvidu-server/src/main/java/io/openvidu/server/recording/service/ComposedRecordingService.java @@ -94,7 +94,15 @@ public class ComposedRecordingService extends RecordingService { if (recording.hasVideo()) { return this.stopRecordingWithVideo(session, recording, reason); } else { - return this.stopRecordingAudioOnly(session, recording, reason); + return this.stopRecordingAudioOnly(session, recording, reason, 0); + } + } + + public Recording stopRecording(Session session, Recording recording, EndReason reason, long kmsDisconnectionTime) { + if (recording.hasVideo()) { + return this.stopRecordingWithVideo(session, recording, reason); + } else { + return this.stopRecordingAudioOnly(session, recording, reason, kmsDisconnectionTime); } } @@ -323,7 +331,8 @@ public class ComposedRecordingService extends RecordingService { return recording; } - private Recording stopRecordingAudioOnly(Session session, Recording recording, EndReason reason) { + private Recording stopRecordingAudioOnly(Session session, Recording recording, EndReason reason, + long kmsDisconnectionTime) { log.info("Stopping composed (audio-only) recording {} of session {}. Reason: {}", recording.getId(), recording.getSessionId(), reason); @@ -341,7 +350,7 @@ public class ComposedRecordingService extends RecordingService { CompositeWrapper compositeWrapper = this.composites.remove(sessionId); final CountDownLatch stoppedCountDown = new CountDownLatch(1); - compositeWrapper.stopCompositeRecording(stoppedCountDown, ((KurentoSession)session).getKms().getTimeOfKurentoClientDisconnection()); + compositeWrapper.stopCompositeRecording(stoppedCountDown, kmsDisconnectionTime); try { if (!stoppedCountDown.await(5, TimeUnit.SECONDS)) { diff --git a/openvidu-server/src/main/java/io/openvidu/server/recording/service/RecordingManager.java b/openvidu-server/src/main/java/io/openvidu/server/recording/service/RecordingManager.java index ed0a04f5..729d644d 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/recording/service/RecordingManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/recording/service/RecordingManager.java @@ -213,12 +213,12 @@ public class RecordingManager { return recording; } - public Recording forceStopRecording(Session session, EndReason reason) { + public Recording forceStopRecording(Session session, EndReason reason, long kmsDisconnectionTime) { Recording recording; recording = this.sessionsRecordings.get(session.getSessionId()); switch (recording.getOutputMode()) { case COMPOSED: - recording = this.composedRecordingService.stopRecording(session, recording, reason); + recording = this.composedRecordingService.stopRecording(session, recording, reason, kmsDisconnectionTime); if (recording.hasVideo()) { // Evict the recorder participant if composed recording with video this.sessionManager.evictParticipant( @@ -227,7 +227,7 @@ public class RecordingManager { } break; case INDIVIDUAL: - recording = this.singleStreamRecordingService.stopRecording(session, recording, reason); + recording = this.singleStreamRecordingService.stopRecording(session, recording, reason, kmsDisconnectionTime); break; } this.abortAutomaticRecordingStopThread(session); @@ -257,7 +257,7 @@ public class RecordingManager { } } - public void stopOneIndividualStreamRecording(KurentoSession session, String streamId) { + public void stopOneIndividualStreamRecording(KurentoSession session, String streamId, long kmsDisconnectionTime) { Recording recording = this.sessionsRecordings.get(session.getSessionId()); if (recording == null) { log.error("Cannot stop recording of existing stream {}. Session {} is not being recorded", streamId, @@ -269,7 +269,7 @@ public class RecordingManager { streamId); final CountDownLatch stoppedCountDown = new CountDownLatch(1); this.singleStreamRecordingService.stopRecorderEndpointOfPublisherEndpoint(session.getSessionId(), streamId, - stoppedCountDown, session.getKms().getTimeOfKurentoClientDisconnection()); + stoppedCountDown, kmsDisconnectionTime); try { if (!stoppedCountDown.await(5, TimeUnit.SECONDS)) { log.error("Error waiting for recorder endpoint of stream {} to stop in session {}", streamId, diff --git a/openvidu-server/src/main/java/io/openvidu/server/recording/service/SingleStreamRecordingService.java b/openvidu-server/src/main/java/io/openvidu/server/recording/service/SingleStreamRecordingService.java index 12d06cdf..6f7cb140 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/recording/service/SingleStreamRecordingService.java +++ b/openvidu-server/src/main/java/io/openvidu/server/recording/service/SingleStreamRecordingService.java @@ -56,7 +56,6 @@ import io.openvidu.server.core.EndReason; import io.openvidu.server.core.Participant; import io.openvidu.server.core.Session; import io.openvidu.server.kurento.core.KurentoParticipant; -import io.openvidu.server.kurento.core.KurentoSession; import io.openvidu.server.kurento.endpoint.PublisherEndpoint; import io.openvidu.server.recording.RecorderEndpointWrapper; import io.openvidu.server.recording.Recording; @@ -127,6 +126,10 @@ public class SingleStreamRecordingService extends RecordingService { @Override public Recording stopRecording(Session session, Recording recording, EndReason reason) { + return this.stopRecording(session, recording, reason, 0); + } + + public Recording stopRecording(Session session, Recording recording, EndReason reason, long kmsDisconnectionTime) { log.info("Stopping individual ({}) recording {} of session {}. Reason: {}", recording.hasVideo() ? (recording.hasAudio() ? "video+audio" : "video-only") : "audioOnly", recording.getId(), recording.getSessionId(), reason); @@ -134,11 +137,9 @@ public class SingleStreamRecordingService extends RecordingService { final int numberOfActiveRecorders = recorders.get(recording.getSessionId()).size(); final CountDownLatch stoppedCountDown = new CountDownLatch(numberOfActiveRecorders); - final long timeOfKurentoClientDisconnection = ((KurentoSession) session).getKms() - .getTimeOfKurentoClientDisconnection(); for (RecorderEndpointWrapper wrapper : recorders.get(recording.getSessionId()).values()) { this.stopRecorderEndpointOfPublisherEndpoint(recording.getSessionId(), wrapper.getStreamId(), - stoppedCountDown, timeOfKurentoClientDisconnection); + stoppedCountDown, kmsDisconnectionTime); } try { if (!stoppedCountDown.await(5, TimeUnit.SECONDS)) { @@ -237,6 +238,7 @@ public class SingleStreamRecordingService extends RecordingService { finalWrapper.getRecorder().stop(); } else { if (kmsDisconnectionTime != 0) { + // Stopping recorder endpoint because of a KMS disconnection finalWrapper.setEndTime(kmsDisconnectionTime); generateIndividualMetadataFile(finalWrapper); log.warn("Forcing individual recording stop after KMS restart for stream {} in session {}", streamId,