From b41036ead355e91e2baac52cca6aa91bf939510e Mon Sep 17 00:00:00 2001 From: pabloFuente Date: Wed, 30 Oct 2019 16:12:56 +0100 Subject: [PATCH] openvidu-server: automatic stop and play of PlayerEndpoint (onlyPlayWithSubscribers) --- .../server/cdr/CDREventWebrtcConnection.java | 1 + .../io/openvidu/server/core/Participant.java | 6 +-- .../kurento/core/KurentoMediaOptions.java | 8 +++- .../kurento/core/KurentoParticipant.java | 43 +++++++++++++++---- .../kurento/endpoint/MediaEndpoint.java | 19 +++++++- .../kurento/endpoint/PublisherEndpoint.java | 21 +++++++-- .../server/rest/SessionRestController.java | 5 ++- 7 files changed, 81 insertions(+), 22 deletions(-) diff --git a/openvidu-server/src/main/java/io/openvidu/server/cdr/CDREventWebrtcConnection.java b/openvidu-server/src/main/java/io/openvidu/server/cdr/CDREventWebrtcConnection.java index 2063746a..44b41421 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/cdr/CDREventWebrtcConnection.java +++ b/openvidu-server/src/main/java/io/openvidu/server/cdr/CDREventWebrtcConnection.java @@ -69,6 +69,7 @@ public class CDREventWebrtcConnection extends CDREventEnd implements Comparable< if (kMediaOptions.rtspUri != null) { json.addProperty("rtspUri", kMediaOptions.rtspUri); json.addProperty("adaptativeBitrate", kMediaOptions.adaptativeBitrate); + json.addProperty("onlyPlayWithSubscribers", kMediaOptions.onlyPlayWithSubscribers); } } } diff --git a/openvidu-server/src/main/java/io/openvidu/server/core/Participant.java b/openvidu-server/src/main/java/io/openvidu/server/core/Participant.java index 74560b82..44fb4fc7 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/core/Participant.java +++ b/openvidu-server/src/main/java/io/openvidu/server/core/Participant.java @@ -132,7 +132,7 @@ public class Participant { public void setPlatform(String platform) { this.platform = platform; } - + public EndpointType getEndpointType() { return this.endpointType; } @@ -149,10 +149,6 @@ public class Participant { return this.platform.equals("IPCAM") && this.participantPrivatetId.startsWith("IPCAM-"); } - public void setStreaming(boolean streaming) { - this.streaming = streaming; - } - public String getPublisherStreamId() { return null; } diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoMediaOptions.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoMediaOptions.java index d157f0b8..52b80daa 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoMediaOptions.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoMediaOptions.java @@ -31,6 +31,7 @@ public class KurentoMediaOptions extends MediaOptions { // IPCAM properties public String rtspUri; public Boolean adaptativeBitrate; + public Boolean onlyPlayWithSubscribers; public KurentoMediaOptions(boolean isOffer, String sdpOffer, Boolean hasAudio, Boolean hasVideo, Boolean audioActive, Boolean videoActive, String typeOfVideo, Integer frameRate, String videoDimensions, @@ -43,13 +44,15 @@ public class KurentoMediaOptions extends MediaOptions { public KurentoMediaOptions(boolean isOffer, String sdpOffer, Boolean hasAudio, Boolean hasVideo, Boolean audioActive, Boolean videoActive, String typeOfVideo, Integer frameRate, String videoDimensions, - KurentoFilter filter, boolean doLoopback, String rtspUri, Boolean adaptativeBitrate) { + KurentoFilter filter, boolean doLoopback, String rtspUri, Boolean adaptativeBitrate, + Boolean onlyPlayWithSubscribers) { super(hasAudio, hasVideo, audioActive, videoActive, typeOfVideo, frameRate, videoDimensions, filter); this.isOffer = isOffer; this.sdpOffer = sdpOffer; this.doLoopback = doLoopback; this.rtspUri = rtspUri; this.adaptativeBitrate = adaptativeBitrate; + this.onlyPlayWithSubscribers = onlyPlayWithSubscribers; } @Override @@ -58,6 +61,9 @@ public class KurentoMediaOptions extends MediaOptions { if (adaptativeBitrate != null) { json.addProperty("adaptativeBitrate", adaptativeBitrate); } + if (onlyPlayWithSubscribers != null) { + json.addProperty("onlyPlayWithSubscribers", onlyPlayWithSubscribers); + } return json; } diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoParticipant.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoParticipant.java index e1fe4d83..78e559c4 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoParticipant.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoParticipant.java @@ -18,6 +18,8 @@ package io.openvidu.server.kurento.core; import java.util.Collection; +import java.util.Iterator; +import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; @@ -98,6 +100,11 @@ public class KurentoParticipant extends Participant { } public void createPublishingEndpoint(MediaOptions mediaOptions) { + final String publisherStreamId = this.getParticipantPublicId() + "_" + + (mediaOptions.hasVideo() ? mediaOptions.getTypeOfVideo() : "MICRO") + "_" + + RandomStringUtils.random(5, true, false).toUpperCase(); + this.publisher.setStreamId(publisherStreamId); + this.publisher.setEndpointName(publisherStreamId); publisher.setMediaOptions(mediaOptions); publisher.createEndpoint(publisherLatch); @@ -105,13 +112,7 @@ public class KurentoParticipant extends Participant { throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE, "Unable to create publisher endpoint"); } - String publisherStreamId = this.getParticipantPublicId() + "_" - + (mediaOptions.hasVideo() ? mediaOptions.getTypeOfVideo() : "MICRO") + "_" - + RandomStringUtils.random(5, true, false).toUpperCase(); - - this.publisher.setEndpointName(publisherStreamId); this.publisher.getEndpoint().setName(publisherStreamId); - this.publisher.setStreamId(publisherStreamId); endpointConfig.addEndpointListeners(this.publisher, "publisher"); @@ -310,8 +311,12 @@ public class KurentoParticipant extends Participant { return; } this.closed = definitelyClosed; - for (String remoteParticipantName : subscribers.keySet()) { - SubscriberEndpoint subscriber = this.subscribers.get(remoteParticipantName); + Iterator> it = subscribers.entrySet().iterator(); + while (it.hasNext()) { + final Entry entry = it.next(); + final String remoteParticipantName = entry.getKey(); + final SubscriberEndpoint subscriber = entry.getValue(); + it.remove(); if (subscriber != null && subscriber.getEndpoint() != null) { releaseSubscriberEndpoint(remoteParticipantName, subscriber, reason); log.debug("PARTICIPANT {}: Released subscriber endpoint to {}", this.getParticipantPublicId(), @@ -323,7 +328,6 @@ public class KurentoParticipant extends Participant { this.getParticipantPublicId(), remoteParticipantName); } } - this.subscribers.clear(); releasePublisherEndpoint(reason, kmsDisconnectionTime); } @@ -411,6 +415,27 @@ public class KurentoParticipant extends Participant { releaseElement(senderName, subscriber.getEndpoint()); + // Stop PlayerEndpoint of IP CAM if last subscriber disconnected + final KurentoParticipant sender = (KurentoParticipant) this.session.getParticipantByPublicId(senderName); + if (sender != null && ((KurentoMediaOptions) sender.getPublisherMediaOptions()).onlyPlayWithSubscribers) { + final PublisherEndpoint publisher = sender.publisher; + if (publisher != null) { + synchronized (publisher) { + publisher.numberOfSubscribers--; + if (publisher.isPlayerEndpoint() && publisher.numberOfSubscribers == 0) { + try { + publisher.getPlayerEndpoint().stop(); + log.info("IP Camera stream {} feed is now disabled because there are no subscribers", + publisher.getStreamId()); + } catch (Exception e) { + log.info("Error while disabling feed for IP camera {}: {}", publisher.getStreamId(), + e.getMessage()); + } + } + } + } + } + if (!ProtocolElements.RECORDER_PARTICIPANT_PUBLICID.equals(this.getParticipantPublicId())) { endpointConfig.getCdr().stopSubscriber(this.getParticipantPublicId(), senderName, subscriber.getStreamId(), reason); diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/MediaEndpoint.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/MediaEndpoint.java index c5d0be7d..5f10bae1 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/MediaEndpoint.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/MediaEndpoint.java @@ -287,7 +287,7 @@ public abstract class MediaEndpoint { } }); } else if (this.isPlayerEndpoint()) { - KurentoMediaOptions mediaOptions = (KurentoMediaOptions) this.owner.getPublisherMediaOptions(); + final KurentoMediaOptions mediaOptions = (KurentoMediaOptions) this.owner.getPublisherMediaOptions(); PlayerEndpoint.Builder playerBuilder = new PlayerEndpoint.Builder(pipeline, mediaOptions.rtspUri); if (!mediaOptions.adaptativeBitrate) { @@ -299,9 +299,24 @@ public abstract class MediaEndpoint { @Override public void onSuccess(PlayerEndpoint result) throws Exception { playerEndpoint = result; + + if (!mediaOptions.onlyPlayWithSubscribers) { + playerEndpoint.play(new Continuation() { + @Override + public void onSuccess(Void result) throws Exception { + log.info("IP Camera stream {} feed is now enabled", streamId); + } + + @Override + public void onError(Throwable cause) throws Exception { + log.info("Error while enabling feed for IP camera {}: {}", streamId, + cause.getMessage()); + } + }); + } + log.trace("EP {}: Created a new PlayerEndpoint", endpointName); endpointSubscription = registerElemErrListener(playerEndpoint); - playerEndpoint.play(); endpointLatch.countDown(); } diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/PublisherEndpoint.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/PublisherEndpoint.java index 4827f67f..2d14974a 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/PublisherEndpoint.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/PublisherEndpoint.java @@ -74,6 +74,8 @@ public class PublisherEndpoint extends MediaEndpoint { private Map elementsErrorSubscriptions = new HashMap(); + public int numberOfSubscribers = 0; + public PublisherEndpoint(EndpointType endpointType, KurentoParticipant owner, String endpointName, MediaPipeline pipeline, OpenviduConfig openviduConfig) { super(endpointType, owner, endpointName, pipeline, openviduConfig, log); @@ -201,6 +203,7 @@ public class PublisherEndpoint extends MediaEndpoint { innerConnect(); } internalSinkConnect(passThru, sink); + this.enableIpCameraIfNecessary(); } public synchronized void connect(MediaElement sink, MediaType type) { @@ -208,16 +211,26 @@ public class PublisherEndpoint extends MediaEndpoint { innerConnect(); } internalSinkConnect(passThru, sink, type); + this.enableIpCameraIfNecessary(); + } + + private void enableIpCameraIfNecessary() { + numberOfSubscribers++; + if (this.isPlayerEndpoint() && ((KurentoMediaOptions) this.mediaOptions).onlyPlayWithSubscribers + && numberOfSubscribers == 1) { + try { + this.getPlayerEndpoint().play(); + log.info("IP Camera stream {} feed is now enabled", streamId); + } catch (Exception e) { + log.info("Error while enabling feed for IP camera {}: {}", streamId, e.getMessage()); + } + } } public synchronized void disconnectFrom(MediaElement sink) { internalSinkDisconnect(passThru, sink); } - public synchronized void disconnectFrom(MediaElement sink, MediaType type) { - internalSinkDisconnect(passThru, sink, type); - } - /** * Changes the media passing through a chain of media elements by applying the * specified element/shaper. The element is plugged into the stream only if the diff --git a/openvidu-server/src/main/java/io/openvidu/server/rest/SessionRestController.java b/openvidu-server/src/main/java/io/openvidu/server/rest/SessionRestController.java index 90d60d5f..cfb9fd6f 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/rest/SessionRestController.java +++ b/openvidu-server/src/main/java/io/openvidu/server/rest/SessionRestController.java @@ -682,10 +682,12 @@ public class SessionRestController { String type; String rtspUri; Boolean adaptativeBitrate; + Boolean onlyPlayWithSubscribers; try { type = (String) params.get("type"); rtspUri = (String) params.get("rtspUri"); adaptativeBitrate = (Boolean) params.get("adaptativeBitrate"); + onlyPlayWithSubscribers = (Boolean) params.get("onlyPlayWithSubscribers"); } catch (ClassCastException e) { return this.generateErrorResponse("Type error in some parameter", "/api/sessions/" + sessionId + "/connection", HttpStatus.BAD_REQUEST); @@ -697,6 +699,7 @@ public class SessionRestController { type = type != null ? type : "IPCAM"; adaptativeBitrate = adaptativeBitrate != null ? adaptativeBitrate : true; + onlyPlayWithSubscribers = onlyPlayWithSubscribers != null ? onlyPlayWithSubscribers : true; boolean hasAudio = true; boolean hasVideo = true; @@ -706,7 +709,7 @@ public class SessionRestController { Integer frameRate = null; String videoDimensions = null; KurentoMediaOptions mediaOptions = new KurentoMediaOptions(true, null, hasAudio, hasVideo, audioActive, - videoActive, typeOfVideo, frameRate, videoDimensions, null, false, rtspUri, adaptativeBitrate); + videoActive, typeOfVideo, frameRate, videoDimensions, null, false, rtspUri, adaptativeBitrate, onlyPlayWithSubscribers); try { Participant ipcamParticipant = this.sessionManager.publishIpcam(session, mediaOptions);