From f6422c7a40411e35374cd6f81d5273bc6c355bed Mon Sep 17 00:00:00 2001 From: pabloFuente Date: Thu, 22 Apr 2021 18:06:30 +0200 Subject: [PATCH] openvidu-server: skip Kurento remote operations if node crashed --- .../kurento/core/KurentoParticipant.java | 31 ++++--- .../server/kurento/core/KurentoSession.java | 45 ++++----- .../kurento/endpoint/MediaEndpoint.java | 5 +- .../kurento/endpoint/PublisherEndpoint.java | 91 ++++++++++--------- .../server/kurento/kms/KmsManager.java | 12 ++- .../server/recording/CompositeWrapper.java | 34 +++++-- .../recording/service/RecordingManager.java | 16 ++-- .../service/SingleStreamRecordingService.java | 25 +++-- .../server/utils/RemoteOperationUtils.java | 21 +++++ 9 files changed, 177 insertions(+), 103 deletions(-) create mode 100644 openvidu-server/src/main/java/io/openvidu/server/utils/RemoteOperationUtils.java diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoParticipant.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoParticipant.java index 7a9b45e1..6f68b914 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoParticipant.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoParticipant.java @@ -58,6 +58,7 @@ import io.openvidu.server.kurento.endpoint.MediaEndpoint; import io.openvidu.server.kurento.endpoint.PublisherEndpoint; import io.openvidu.server.kurento.endpoint.SubscriberEndpoint; import io.openvidu.server.recording.service.RecordingManager; +import io.openvidu.server.utils.RemoteOperationUtils; public class KurentoParticipant extends Participant { @@ -663,7 +664,9 @@ public class KurentoParticipant extends Participant { senderPublisher.numberOfSubscribers--; if (senderPublisher.isPlayerEndpoint() && senderPublisher.numberOfSubscribers == 0) { try { - senderPublisher.getPlayerEndpoint().stop(); + if (!RemoteOperationUtils.mustSkipRemoteOperation()) { + senderPublisher.getPlayerEndpoint().stop(); + } log.info( "IP Camera stream {} feed is now disabled because there are no subscribers", senderPublisher.getStreamId()); @@ -691,19 +694,21 @@ public class KurentoParticipant extends Participant { void releaseElement(final String senderName, final MediaElement element) { final String eid = element.getId(); try { - element.release(new Continuation() { - @Override - public void onSuccess(Void result) throws Exception { - log.debug("PARTICIPANT {}: Released successfully media element #{} for {}", - getParticipantPublicId(), eid, senderName); - } + if (!RemoteOperationUtils.mustSkipRemoteOperation()) { + element.release(new Continuation() { + @Override + public void onSuccess(Void result) throws Exception { + log.debug("PARTICIPANT {}: Released successfully media element #{} for {}", + getParticipantPublicId(), eid, senderName); + } - @Override - public void onError(Throwable cause) throws Exception { - log.warn("PARTICIPANT {}: Could not release media element #{} for {}", getParticipantPublicId(), - eid, senderName, cause); - } - }); + @Override + public void onError(Throwable cause) throws Exception { + log.warn("PARTICIPANT {}: Could not release media element #{} for {}", getParticipantPublicId(), + eid, senderName, cause); + } + }); + } } catch (Exception e) { log.error("PARTICIPANT {}: Error calling release on elem #{} for {}", getParticipantPublicId(), eid, senderName, e); diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSession.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSession.java index 0f18cba2..732ed1e5 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSession.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSession.java @@ -42,6 +42,7 @@ import io.openvidu.server.core.MediaOptions; import io.openvidu.server.core.Participant; import io.openvidu.server.core.Session; import io.openvidu.server.kurento.kms.Kms; +import io.openvidu.server.utils.RemoteOperationUtils; /** * @author Pablo Fuente (pablofuenteperez@gmail.com) @@ -252,29 +253,31 @@ public class KurentoSession extends Session { return; } - getPipeline().release(new Continuation() { - @Override - public void onSuccess(Void result) throws Exception { - log.debug("SESSION {}: Released Pipeline", sessionId); - pipeline = null; - pipelineLatch = new CountDownLatch(1); - pipelineCreationErrorCause = null; - if (callback != null) { - callback.run(); + if (!RemoteOperationUtils.mustSkipRemoteOperation()) { + getPipeline().release(new Continuation() { + @Override + public void onSuccess(Void result) throws Exception { + log.debug("SESSION {}: Released Pipeline", sessionId); + pipeline = null; + pipelineLatch = new CountDownLatch(1); + pipelineCreationErrorCause = null; + if (callback != null) { + callback.run(); + } } - } - @Override - public void onError(Throwable cause) throws Exception { - log.warn("SESSION {}: Could not successfully release Pipeline", sessionId, cause); - pipeline = null; - pipelineLatch = new CountDownLatch(1); - pipelineCreationErrorCause = null; - if (callback != null) { - callback.run(); + @Override + public void onError(Throwable cause) throws Exception { + log.warn("SESSION {}: Could not successfully release Pipeline", sessionId, cause); + pipeline = null; + pipelineLatch = new CountDownLatch(1); + pipelineCreationErrorCause = null; + if (callback != null) { + callback.run(); + } } - } - }); + }); + } } } @@ -282,7 +285,7 @@ public class KurentoSession extends Session { return this.publishedStreamIds.get(streamId); } - public void restartStatusInKurento(Long kmsDisconnectionTime) { + public void restartStatusInKurentoAfterReconnection(Long kmsDisconnectionTime) { log.info("Resetting process: resetting remote media objects for active session {}", this.sessionId); diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/MediaEndpoint.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/MediaEndpoint.java index edb3312d..2ecebfe6 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/MediaEndpoint.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/MediaEndpoint.java @@ -63,6 +63,7 @@ import io.openvidu.server.config.OpenviduConfig; import io.openvidu.server.core.Participant; import io.openvidu.server.kurento.core.KurentoMediaOptions; import io.openvidu.server.kurento.core.KurentoParticipant; +import io.openvidu.server.utils.RemoteOperationUtils; /** * {@link Endpoint} wrapper. Can be based on WebRtcEndpoint (that supports @@ -471,7 +472,9 @@ public abstract class MediaEndpoint { if (element == null || subscription == null) { return; } - element.removeErrorListener(subscription); + if (!RemoteOperationUtils.mustSkipRemoteOperation()) { + element.removeErrorListener(subscription); + } } /** diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/PublisherEndpoint.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/PublisherEndpoint.java index 7b5ab029..d2652417 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/PublisherEndpoint.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/PublisherEndpoint.java @@ -52,6 +52,7 @@ import io.openvidu.server.core.MediaOptions; import io.openvidu.server.kurento.core.KurentoMediaOptions; import io.openvidu.server.kurento.core.KurentoParticipant; import io.openvidu.server.utils.JsonUtils; +import io.openvidu.server.utils.RemoteOperationUtils; /** * Publisher aspect of the {@link MediaEndpoint}. @@ -333,17 +334,19 @@ public class PublisherEndpoint extends MediaEndpoint { } elementIds.remove(elementId); if (releaseElement) { - element.release(new Continuation() { - @Override - public void onSuccess(Void result) throws Exception { - log.trace("EP {}: Released media element {}", getEndpointName(), elementId); - } + if (!RemoteOperationUtils.mustSkipRemoteOperation()) { + element.release(new Continuation() { + @Override + public void onSuccess(Void result) throws Exception { + log.trace("EP {}: Released media element {}", getEndpointName(), elementId); + } - @Override - public void onError(Throwable cause) throws Exception { - log.error("EP {}: Failed to release media element {}", getEndpointName(), elementId, cause); - } - }); + @Override + public void onError(Throwable cause) throws Exception { + log.error("EP {}: Failed to release media element {}", getEndpointName(), elementId, cause); + } + }); + } } this.filter = null; } @@ -504,22 +507,24 @@ public class PublisherEndpoint extends MediaEndpoint { } private void internalSinkDisconnect(final MediaElement source, final MediaElement sink, boolean blocking) { - if (blocking) { - source.disconnect(sink); - } else { - source.disconnect(sink, new Continuation() { - @Override - public void onSuccess(Void result) throws Exception { - log.debug("EP {}: Elements have been disconnected (source {} -> sink {})", getEndpointName(), - source.getId(), sink.getId()); - } + if (!RemoteOperationUtils.mustSkipRemoteOperation()) { + if (blocking) { + source.disconnect(sink); + } else { + source.disconnect(sink, new Continuation() { + @Override + public void onSuccess(Void result) throws Exception { + log.debug("EP {}: Elements have been disconnected (source {} -> sink {})", getEndpointName(), + source.getId(), sink.getId()); + } - @Override - public void onError(Throwable cause) throws Exception { - log.warn("EP {}: Failed to disconnect media elements (source {} -> sink {})", getEndpointName(), - source.getId(), sink.getId(), cause); - } - }); + @Override + public void onError(Throwable cause) throws Exception { + log.warn("EP {}: Failed to disconnect media elements (source {} -> sink {})", getEndpointName(), + source.getId(), sink.getId(), cause); + } + }); + } } } @@ -536,25 +541,27 @@ public class PublisherEndpoint extends MediaEndpoint { */ private void internalSinkDisconnect(final MediaElement source, final MediaElement sink, final MediaType type, boolean blocking) { - if (type == null) { - internalSinkDisconnect(source, sink, blocking); - } else { - if (blocking) { - source.disconnect(sink, type); + if (!RemoteOperationUtils.mustSkipRemoteOperation()) { + if (type == null) { + internalSinkDisconnect(source, sink, blocking); } else { - source.disconnect(sink, type, new Continuation() { - @Override - public void onSuccess(Void result) throws Exception { - log.debug("EP {}: {} media elements have been disconnected (source {} -> sink {})", - getEndpointName(), type, source.getId(), sink.getId()); - } + if (blocking) { + source.disconnect(sink, type); + } else { + source.disconnect(sink, type, new Continuation() { + @Override + public void onSuccess(Void result) throws Exception { + log.debug("EP {}: {} media elements have been disconnected (source {} -> sink {})", + getEndpointName(), type, source.getId(), sink.getId()); + } - @Override - public void onError(Throwable cause) throws Exception { - log.warn("EP {}: Failed to disconnect {} media elements (source {} -> sink {})", - getEndpointName(), type, source.getId(), sink.getId(), cause); - } - }); + @Override + public void onError(Throwable cause) throws Exception { + log.warn("EP {}: Failed to disconnect {} media elements (source {} -> sink {})", + getEndpointName(), type, source.getId(), sink.getId(), cause); + } + }); + } } } } diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/KmsManager.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/KmsManager.java index 5fc07e24..dae5cb19 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/KmsManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/KmsManager.java @@ -51,6 +51,7 @@ import io.openvidu.server.core.SessionManager; import io.openvidu.server.kurento.core.KurentoSession; import io.openvidu.server.utils.MediaNodeStatusManager; import io.openvidu.server.utils.QuarantineKiller; +import io.openvidu.server.utils.RemoteOperationUtils; import io.openvidu.server.utils.UpdatableTimerTask; public abstract class KmsManager { @@ -227,7 +228,14 @@ public abstract class KmsManager { log.warn("Closing {} sessions hosted by KMS with uri {}: {}", kms.getKurentoSessions().size(), kms.getUri(), kms.getKurentoSessions().stream().map(s -> s.getSessionId()) .collect(Collectors.joining(",", "[", "]"))); - sessionManager.closeAllSessionsAndRecordingsOfKms(kms, EndReason.nodeCrashed); + + try { + // Flag the thread to skip remote operations to KMS + RemoteOperationUtils.setToSkipRemoteOperations(); + sessionManager.closeAllSessionsAndRecordingsOfKms(kms, EndReason.nodeCrashed); + } finally { + RemoteOperationUtils.revertToRunRemoteOperations(); + } // Remove Media Node log.warn("Removing Media Node {} after crash", kms.getId()); @@ -269,7 +277,7 @@ public abstract class KmsManager { kms.getUri(), kms.getKurentoSessions().size(), kms.getKurentoSessions().stream() .map(s -> s.getSessionId()).collect(Collectors.joining(",", "[", "]"))); kms.getKurentoSessions().forEach(kSession -> { - kSession.restartStatusInKurento(timeOfKurentoDisconnection); + kSession.restartStatusInKurentoAfterReconnection(timeOfKurentoDisconnection); }); } else { log.info("KMS with URI {} is the same process. Nothing must be done", kms.getUri()); diff --git a/openvidu-server/src/main/java/io/openvidu/server/recording/CompositeWrapper.java b/openvidu-server/src/main/java/io/openvidu/server/recording/CompositeWrapper.java index 1d66e9af..dfa2ee7d 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/recording/CompositeWrapper.java +++ b/openvidu-server/src/main/java/io/openvidu/server/recording/CompositeWrapper.java @@ -38,6 +38,7 @@ import io.openvidu.client.OpenViduException; import io.openvidu.client.OpenViduException.Code; import io.openvidu.server.kurento.core.KurentoSession; import io.openvidu.server.kurento.endpoint.PublisherEndpoint; +import io.openvidu.server.utils.RemoteOperationUtils; public class CompositeWrapper { @@ -87,7 +88,21 @@ public class CompositeWrapper { } public synchronized void stopCompositeRecording(CountDownLatch stopLatch, Long kmsDisconnectionTime) { - if (kmsDisconnectionTime == null) { + + if (kmsDisconnectionTime != null || RemoteOperationUtils.mustSkipRemoteOperation()) { + // Stopping composite endpoint because of a KMS disconnection + String msg; + if (kmsDisconnectionTime != null) { + endTime = kmsDisconnectionTime; + msg = "KMS restart"; + } else { + endTime = System.currentTimeMillis(); + msg = "node crashed"; + } + stopLatch.countDown(); + log.warn("Forcing composed audio-only recording stop after {} in session {}", msg, + this.session.getSessionId()); + } else { this.recorderEndpoint.addStoppedListener(new EventListener() { @Override public void onEvent(StoppedEvent event) { @@ -100,11 +115,6 @@ public class CompositeWrapper { } }); this.recorderEndpoint.stop(); - } else { - endTime = kmsDisconnectionTime; - stopLatch.countDown(); - log.warn("Forcing composed audio-only recording stop after KMS restart in session {}", - this.session.getSessionId()); } } @@ -145,7 +155,9 @@ public class CompositeWrapper { HubPort hubPort = this.hubPorts.remove(streamId); PublisherEndpoint publisherEndpoint = this.publisherEndpoints.remove(streamId); publisherEndpoint.disconnectFrom(hubPort); - hubPort.release(); + if (!RemoteOperationUtils.mustSkipRemoteOperation()) { + hubPort.release(); + } log.info("Composite for session {} has now {} connected publishers", this.session.getSessionId(), this.composite.getChildren().size() - 1); } @@ -155,11 +167,15 @@ public class CompositeWrapper { PublisherEndpoint endpoint = this.publisherEndpoints.get(streamId); HubPort hubPort = this.hubPorts.get(streamId); endpoint.disconnectFrom(hubPort); - hubPort.release(); + if (!RemoteOperationUtils.mustSkipRemoteOperation()) { + hubPort.release(); + } }); this.hubPorts.clear(); this.publisherEndpoints.clear(); - this.composite.release(); + if (!RemoteOperationUtils.mustSkipRemoteOperation()) { + this.composite.release(); + } } public long getDuration() { diff --git a/openvidu-server/src/main/java/io/openvidu/server/recording/service/RecordingManager.java b/openvidu-server/src/main/java/io/openvidu/server/recording/service/RecordingManager.java index 7d69428d..bd47582c 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/recording/service/RecordingManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/recording/service/RecordingManager.java @@ -31,8 +31,9 @@ import java.util.NoSuchElementException; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; @@ -79,6 +80,7 @@ import io.openvidu.server.utils.JsonUtils; import io.openvidu.server.utils.LocalCustomFileManager; import io.openvidu.server.utils.LocalDockerManager; import io.openvidu.server.utils.RecordingUtils; +import io.openvidu.server.utils.RemoteOperationUtils; public class RecordingManager { @@ -122,8 +124,8 @@ public class RecordingManager { private JsonUtils jsonUtils = new JsonUtils(); - private ScheduledThreadPoolExecutor automaticRecordingStopExecutor = new ScheduledThreadPoolExecutor( - Runtime.getRuntime().availableProcessors()); + private ScheduledExecutorService automaticRecordingStopExecutor = Executors + .newScheduledThreadPool(Runtime.getRuntime().availableProcessors()); public static final String IMAGE_NAME = "openvidu/openvidu-recording"; @@ -809,9 +811,11 @@ public class RecordingManager { throw new OpenViduException(Code.RECORDING_PATH_NOT_VALID, errorMessage); } - recorder.stop(); - recorder.release(); - pipeline.release(); + if (!RemoteOperationUtils.mustSkipRemoteOperation()) { + recorder.stop(); + recorder.release(); + pipeline.release(); + } log.info("Kurento Media Server has write permissions on recording path: {}", openviduRecordingPath); diff --git a/openvidu-server/src/main/java/io/openvidu/server/recording/service/SingleStreamRecordingService.java b/openvidu-server/src/main/java/io/openvidu/server/recording/service/SingleStreamRecordingService.java index 532c7110..7a682843 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/recording/service/SingleStreamRecordingService.java +++ b/openvidu-server/src/main/java/io/openvidu/server/recording/service/SingleStreamRecordingService.java @@ -68,6 +68,7 @@ import io.openvidu.server.recording.Recording; import io.openvidu.server.recording.RecordingDownloader; import io.openvidu.server.recording.RecordingUploader; import io.openvidu.server.utils.CustomFileManager; +import io.openvidu.server.utils.RemoteOperationUtils; public class SingleStreamRecordingService extends RecordingService { @@ -289,7 +290,20 @@ public class SingleStreamRecordingService extends RecordingService { try { if (kParticipant.singleRecordingLock.tryLock(15, TimeUnit.SECONDS)) { try { - if (kmsDisconnectionTime == null) { + + if (kmsDisconnectionTime != null || RemoteOperationUtils.mustSkipRemoteOperation()) { + + // Stopping recorder endpoint because of a KMS disconnection + finalWrapper.setEndTime( + kmsDisconnectionTime != null ? kmsDisconnectionTime : System.currentTimeMillis()); + generateIndividualMetadataFile(finalWrapper); + globalStopLatch.countDown(); + log.warn("Forcing individual recording stop after {} for stream {} in recording {}", + kmsDisconnectionTime != null ? "KMS restart" : "node crashed", streamId, + recordingId); + + } else { + finalWrapper.getRecorder().addStoppedListener(new EventListener() { @Override public void onEvent(StoppedEvent event) { @@ -301,14 +315,7 @@ public class SingleStreamRecordingService extends RecordingService { } }); finalWrapper.getRecorder().stop(); - } else { - // Stopping recorder endpoint because of a KMS disconnection - finalWrapper.setEndTime(kmsDisconnectionTime); - generateIndividualMetadataFile(finalWrapper); - globalStopLatch.countDown(); - log.warn( - "Forcing individual recording stop after KMS restart for stream {} in recording {}", - streamId, recordingId); + } } finally { kParticipant.singleRecordingLock.unlock(); diff --git a/openvidu-server/src/main/java/io/openvidu/server/utils/RemoteOperationUtils.java b/openvidu-server/src/main/java/io/openvidu/server/utils/RemoteOperationUtils.java new file mode 100644 index 00000000..91b74bcd --- /dev/null +++ b/openvidu-server/src/main/java/io/openvidu/server/utils/RemoteOperationUtils.java @@ -0,0 +1,21 @@ +package io.openvidu.server.utils; + +public class RemoteOperationUtils { + + private final static String VALUE = "SKIP_REMOTE_OPERATION"; + + private static final ThreadLocal threadLocal = ThreadLocal.withInitial(() -> ""); + + public static void setToSkipRemoteOperations() { + threadLocal.set(VALUE); + } + + public static boolean mustSkipRemoteOperation() { + return VALUE.equals(threadLocal.get()); + } + + public static void revertToRunRemoteOperations() { + threadLocal.remove(); + } + +}