openvidu-server: skip Kurento remote operations if node crashed

pull/621/head
pabloFuente 2021-04-22 18:06:30 +02:00
parent 63b312227f
commit f6422c7a40
9 changed files with 177 additions and 103 deletions

View File

@ -58,6 +58,7 @@ import io.openvidu.server.kurento.endpoint.MediaEndpoint;
import io.openvidu.server.kurento.endpoint.PublisherEndpoint;
import io.openvidu.server.kurento.endpoint.SubscriberEndpoint;
import io.openvidu.server.recording.service.RecordingManager;
import io.openvidu.server.utils.RemoteOperationUtils;
public class KurentoParticipant extends Participant {
@ -663,7 +664,9 @@ public class KurentoParticipant extends Participant {
senderPublisher.numberOfSubscribers--;
if (senderPublisher.isPlayerEndpoint() && senderPublisher.numberOfSubscribers == 0) {
try {
senderPublisher.getPlayerEndpoint().stop();
if (!RemoteOperationUtils.mustSkipRemoteOperation()) {
senderPublisher.getPlayerEndpoint().stop();
}
log.info(
"IP Camera stream {} feed is now disabled because there are no subscribers",
senderPublisher.getStreamId());
@ -691,19 +694,21 @@ public class KurentoParticipant extends Participant {
void releaseElement(final String senderName, final MediaElement element) {
final String eid = element.getId();
try {
element.release(new Continuation<Void>() {
@Override
public void onSuccess(Void result) throws Exception {
log.debug("PARTICIPANT {}: Released successfully media element #{} for {}",
getParticipantPublicId(), eid, senderName);
}
if (!RemoteOperationUtils.mustSkipRemoteOperation()) {
element.release(new Continuation<Void>() {
@Override
public void onSuccess(Void result) throws Exception {
log.debug("PARTICIPANT {}: Released successfully media element #{} for {}",
getParticipantPublicId(), eid, senderName);
}
@Override
public void onError(Throwable cause) throws Exception {
log.warn("PARTICIPANT {}: Could not release media element #{} for {}", getParticipantPublicId(),
eid, senderName, cause);
}
});
@Override
public void onError(Throwable cause) throws Exception {
log.warn("PARTICIPANT {}: Could not release media element #{} for {}", getParticipantPublicId(),
eid, senderName, cause);
}
});
}
} catch (Exception e) {
log.error("PARTICIPANT {}: Error calling release on elem #{} for {}", getParticipantPublicId(), eid,
senderName, e);

View File

@ -42,6 +42,7 @@ import io.openvidu.server.core.MediaOptions;
import io.openvidu.server.core.Participant;
import io.openvidu.server.core.Session;
import io.openvidu.server.kurento.kms.Kms;
import io.openvidu.server.utils.RemoteOperationUtils;
/**
* @author Pablo Fuente (pablofuenteperez@gmail.com)
@ -252,29 +253,31 @@ public class KurentoSession extends Session {
return;
}
getPipeline().release(new Continuation<Void>() {
@Override
public void onSuccess(Void result) throws Exception {
log.debug("SESSION {}: Released Pipeline", sessionId);
pipeline = null;
pipelineLatch = new CountDownLatch(1);
pipelineCreationErrorCause = null;
if (callback != null) {
callback.run();
if (!RemoteOperationUtils.mustSkipRemoteOperation()) {
getPipeline().release(new Continuation<Void>() {
@Override
public void onSuccess(Void result) throws Exception {
log.debug("SESSION {}: Released Pipeline", sessionId);
pipeline = null;
pipelineLatch = new CountDownLatch(1);
pipelineCreationErrorCause = null;
if (callback != null) {
callback.run();
}
}
}
@Override
public void onError(Throwable cause) throws Exception {
log.warn("SESSION {}: Could not successfully release Pipeline", sessionId, cause);
pipeline = null;
pipelineLatch = new CountDownLatch(1);
pipelineCreationErrorCause = null;
if (callback != null) {
callback.run();
@Override
public void onError(Throwable cause) throws Exception {
log.warn("SESSION {}: Could not successfully release Pipeline", sessionId, cause);
pipeline = null;
pipelineLatch = new CountDownLatch(1);
pipelineCreationErrorCause = null;
if (callback != null) {
callback.run();
}
}
}
});
});
}
}
}
@ -282,7 +285,7 @@ public class KurentoSession extends Session {
return this.publishedStreamIds.get(streamId);
}
public void restartStatusInKurento(Long kmsDisconnectionTime) {
public void restartStatusInKurentoAfterReconnection(Long kmsDisconnectionTime) {
log.info("Resetting process: resetting remote media objects for active session {}", this.sessionId);

View File

@ -63,6 +63,7 @@ import io.openvidu.server.config.OpenviduConfig;
import io.openvidu.server.core.Participant;
import io.openvidu.server.kurento.core.KurentoMediaOptions;
import io.openvidu.server.kurento.core.KurentoParticipant;
import io.openvidu.server.utils.RemoteOperationUtils;
/**
* {@link Endpoint} wrapper. Can be based on WebRtcEndpoint (that supports
@ -471,7 +472,9 @@ public abstract class MediaEndpoint {
if (element == null || subscription == null) {
return;
}
element.removeErrorListener(subscription);
if (!RemoteOperationUtils.mustSkipRemoteOperation()) {
element.removeErrorListener(subscription);
}
}
/**

View File

@ -52,6 +52,7 @@ import io.openvidu.server.core.MediaOptions;
import io.openvidu.server.kurento.core.KurentoMediaOptions;
import io.openvidu.server.kurento.core.KurentoParticipant;
import io.openvidu.server.utils.JsonUtils;
import io.openvidu.server.utils.RemoteOperationUtils;
/**
* Publisher aspect of the {@link MediaEndpoint}.
@ -333,17 +334,19 @@ public class PublisherEndpoint extends MediaEndpoint {
}
elementIds.remove(elementId);
if (releaseElement) {
element.release(new Continuation<Void>() {
@Override
public void onSuccess(Void result) throws Exception {
log.trace("EP {}: Released media element {}", getEndpointName(), elementId);
}
if (!RemoteOperationUtils.mustSkipRemoteOperation()) {
element.release(new Continuation<Void>() {
@Override
public void onSuccess(Void result) throws Exception {
log.trace("EP {}: Released media element {}", getEndpointName(), elementId);
}
@Override
public void onError(Throwable cause) throws Exception {
log.error("EP {}: Failed to release media element {}", getEndpointName(), elementId, cause);
}
});
@Override
public void onError(Throwable cause) throws Exception {
log.error("EP {}: Failed to release media element {}", getEndpointName(), elementId, cause);
}
});
}
}
this.filter = null;
}
@ -504,22 +507,24 @@ public class PublisherEndpoint extends MediaEndpoint {
}
private void internalSinkDisconnect(final MediaElement source, final MediaElement sink, boolean blocking) {
if (blocking) {
source.disconnect(sink);
} else {
source.disconnect(sink, new Continuation<Void>() {
@Override
public void onSuccess(Void result) throws Exception {
log.debug("EP {}: Elements have been disconnected (source {} -> sink {})", getEndpointName(),
source.getId(), sink.getId());
}
if (!RemoteOperationUtils.mustSkipRemoteOperation()) {
if (blocking) {
source.disconnect(sink);
} else {
source.disconnect(sink, new Continuation<Void>() {
@Override
public void onSuccess(Void result) throws Exception {
log.debug("EP {}: Elements have been disconnected (source {} -> sink {})", getEndpointName(),
source.getId(), sink.getId());
}
@Override
public void onError(Throwable cause) throws Exception {
log.warn("EP {}: Failed to disconnect media elements (source {} -> sink {})", getEndpointName(),
source.getId(), sink.getId(), cause);
}
});
@Override
public void onError(Throwable cause) throws Exception {
log.warn("EP {}: Failed to disconnect media elements (source {} -> sink {})", getEndpointName(),
source.getId(), sink.getId(), cause);
}
});
}
}
}
@ -536,25 +541,27 @@ public class PublisherEndpoint extends MediaEndpoint {
*/
private void internalSinkDisconnect(final MediaElement source, final MediaElement sink, final MediaType type,
boolean blocking) {
if (type == null) {
internalSinkDisconnect(source, sink, blocking);
} else {
if (blocking) {
source.disconnect(sink, type);
if (!RemoteOperationUtils.mustSkipRemoteOperation()) {
if (type == null) {
internalSinkDisconnect(source, sink, blocking);
} else {
source.disconnect(sink, type, new Continuation<Void>() {
@Override
public void onSuccess(Void result) throws Exception {
log.debug("EP {}: {} media elements have been disconnected (source {} -> sink {})",
getEndpointName(), type, source.getId(), sink.getId());
}
if (blocking) {
source.disconnect(sink, type);
} else {
source.disconnect(sink, type, new Continuation<Void>() {
@Override
public void onSuccess(Void result) throws Exception {
log.debug("EP {}: {} media elements have been disconnected (source {} -> sink {})",
getEndpointName(), type, source.getId(), sink.getId());
}
@Override
public void onError(Throwable cause) throws Exception {
log.warn("EP {}: Failed to disconnect {} media elements (source {} -> sink {})",
getEndpointName(), type, source.getId(), sink.getId(), cause);
}
});
@Override
public void onError(Throwable cause) throws Exception {
log.warn("EP {}: Failed to disconnect {} media elements (source {} -> sink {})",
getEndpointName(), type, source.getId(), sink.getId(), cause);
}
});
}
}
}
}

View File

@ -51,6 +51,7 @@ import io.openvidu.server.core.SessionManager;
import io.openvidu.server.kurento.core.KurentoSession;
import io.openvidu.server.utils.MediaNodeStatusManager;
import io.openvidu.server.utils.QuarantineKiller;
import io.openvidu.server.utils.RemoteOperationUtils;
import io.openvidu.server.utils.UpdatableTimerTask;
public abstract class KmsManager {
@ -227,7 +228,14 @@ public abstract class KmsManager {
log.warn("Closing {} sessions hosted by KMS with uri {}: {}", kms.getKurentoSessions().size(),
kms.getUri(), kms.getKurentoSessions().stream().map(s -> s.getSessionId())
.collect(Collectors.joining(",", "[", "]")));
sessionManager.closeAllSessionsAndRecordingsOfKms(kms, EndReason.nodeCrashed);
try {
// Flag the thread to skip remote operations to KMS
RemoteOperationUtils.setToSkipRemoteOperations();
sessionManager.closeAllSessionsAndRecordingsOfKms(kms, EndReason.nodeCrashed);
} finally {
RemoteOperationUtils.revertToRunRemoteOperations();
}
// Remove Media Node
log.warn("Removing Media Node {} after crash", kms.getId());
@ -269,7 +277,7 @@ public abstract class KmsManager {
kms.getUri(), kms.getKurentoSessions().size(), kms.getKurentoSessions().stream()
.map(s -> s.getSessionId()).collect(Collectors.joining(",", "[", "]")));
kms.getKurentoSessions().forEach(kSession -> {
kSession.restartStatusInKurento(timeOfKurentoDisconnection);
kSession.restartStatusInKurentoAfterReconnection(timeOfKurentoDisconnection);
});
} else {
log.info("KMS with URI {} is the same process. Nothing must be done", kms.getUri());

View File

@ -38,6 +38,7 @@ import io.openvidu.client.OpenViduException;
import io.openvidu.client.OpenViduException.Code;
import io.openvidu.server.kurento.core.KurentoSession;
import io.openvidu.server.kurento.endpoint.PublisherEndpoint;
import io.openvidu.server.utils.RemoteOperationUtils;
public class CompositeWrapper {
@ -87,7 +88,21 @@ public class CompositeWrapper {
}
public synchronized void stopCompositeRecording(CountDownLatch stopLatch, Long kmsDisconnectionTime) {
if (kmsDisconnectionTime == null) {
if (kmsDisconnectionTime != null || RemoteOperationUtils.mustSkipRemoteOperation()) {
// Stopping composite endpoint because of a KMS disconnection
String msg;
if (kmsDisconnectionTime != null) {
endTime = kmsDisconnectionTime;
msg = "KMS restart";
} else {
endTime = System.currentTimeMillis();
msg = "node crashed";
}
stopLatch.countDown();
log.warn("Forcing composed audio-only recording stop after {} in session {}", msg,
this.session.getSessionId());
} else {
this.recorderEndpoint.addStoppedListener(new EventListener<StoppedEvent>() {
@Override
public void onEvent(StoppedEvent event) {
@ -100,11 +115,6 @@ public class CompositeWrapper {
}
});
this.recorderEndpoint.stop();
} else {
endTime = kmsDisconnectionTime;
stopLatch.countDown();
log.warn("Forcing composed audio-only recording stop after KMS restart in session {}",
this.session.getSessionId());
}
}
@ -145,7 +155,9 @@ public class CompositeWrapper {
HubPort hubPort = this.hubPorts.remove(streamId);
PublisherEndpoint publisherEndpoint = this.publisherEndpoints.remove(streamId);
publisherEndpoint.disconnectFrom(hubPort);
hubPort.release();
if (!RemoteOperationUtils.mustSkipRemoteOperation()) {
hubPort.release();
}
log.info("Composite for session {} has now {} connected publishers", this.session.getSessionId(),
this.composite.getChildren().size() - 1);
}
@ -155,11 +167,15 @@ public class CompositeWrapper {
PublisherEndpoint endpoint = this.publisherEndpoints.get(streamId);
HubPort hubPort = this.hubPorts.get(streamId);
endpoint.disconnectFrom(hubPort);
hubPort.release();
if (!RemoteOperationUtils.mustSkipRemoteOperation()) {
hubPort.release();
}
});
this.hubPorts.clear();
this.publisherEndpoints.clear();
this.composite.release();
if (!RemoteOperationUtils.mustSkipRemoteOperation()) {
this.composite.release();
}
}
public long getDuration() {

View File

@ -31,8 +31,9 @@ import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
@ -79,6 +80,7 @@ import io.openvidu.server.utils.JsonUtils;
import io.openvidu.server.utils.LocalCustomFileManager;
import io.openvidu.server.utils.LocalDockerManager;
import io.openvidu.server.utils.RecordingUtils;
import io.openvidu.server.utils.RemoteOperationUtils;
public class RecordingManager {
@ -122,8 +124,8 @@ public class RecordingManager {
private JsonUtils jsonUtils = new JsonUtils();
private ScheduledThreadPoolExecutor automaticRecordingStopExecutor = new ScheduledThreadPoolExecutor(
Runtime.getRuntime().availableProcessors());
private ScheduledExecutorService automaticRecordingStopExecutor = Executors
.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());
public static final String IMAGE_NAME = "openvidu/openvidu-recording";
@ -809,9 +811,11 @@ public class RecordingManager {
throw new OpenViduException(Code.RECORDING_PATH_NOT_VALID, errorMessage);
}
recorder.stop();
recorder.release();
pipeline.release();
if (!RemoteOperationUtils.mustSkipRemoteOperation()) {
recorder.stop();
recorder.release();
pipeline.release();
}
log.info("Kurento Media Server has write permissions on recording path: {}", openviduRecordingPath);

View File

@ -68,6 +68,7 @@ import io.openvidu.server.recording.Recording;
import io.openvidu.server.recording.RecordingDownloader;
import io.openvidu.server.recording.RecordingUploader;
import io.openvidu.server.utils.CustomFileManager;
import io.openvidu.server.utils.RemoteOperationUtils;
public class SingleStreamRecordingService extends RecordingService {
@ -289,7 +290,20 @@ public class SingleStreamRecordingService extends RecordingService {
try {
if (kParticipant.singleRecordingLock.tryLock(15, TimeUnit.SECONDS)) {
try {
if (kmsDisconnectionTime == null) {
if (kmsDisconnectionTime != null || RemoteOperationUtils.mustSkipRemoteOperation()) {
// Stopping recorder endpoint because of a KMS disconnection
finalWrapper.setEndTime(
kmsDisconnectionTime != null ? kmsDisconnectionTime : System.currentTimeMillis());
generateIndividualMetadataFile(finalWrapper);
globalStopLatch.countDown();
log.warn("Forcing individual recording stop after {} for stream {} in recording {}",
kmsDisconnectionTime != null ? "KMS restart" : "node crashed", streamId,
recordingId);
} else {
finalWrapper.getRecorder().addStoppedListener(new EventListener<StoppedEvent>() {
@Override
public void onEvent(StoppedEvent event) {
@ -301,14 +315,7 @@ public class SingleStreamRecordingService extends RecordingService {
}
});
finalWrapper.getRecorder().stop();
} else {
// Stopping recorder endpoint because of a KMS disconnection
finalWrapper.setEndTime(kmsDisconnectionTime);
generateIndividualMetadataFile(finalWrapper);
globalStopLatch.countDown();
log.warn(
"Forcing individual recording stop after KMS restart for stream {} in recording {}",
streamId, recordingId);
}
} finally {
kParticipant.singleRecordingLock.unlock();

View File

@ -0,0 +1,21 @@
package io.openvidu.server.utils;
public class RemoteOperationUtils {
private final static String VALUE = "SKIP_REMOTE_OPERATION";
private static final ThreadLocal<String> threadLocal = ThreadLocal.withInitial(() -> "");
public static void setToSkipRemoteOperations() {
threadLocal.set(VALUE);
}
public static boolean mustSkipRemoteOperation() {
return VALUE.equals(threadLocal.get());
}
public static void revertToRunRemoteOperations() {
threadLocal.remove();
}
}