openvidu-server: automatic stop and play of PlayerEndpoint (onlyPlayWithSubscribers)

pull/370/head
pabloFuente 2019-10-30 16:12:56 +01:00
parent 407b710d10
commit b41036ead3
7 changed files with 81 additions and 22 deletions

View File

@ -69,6 +69,7 @@ public class CDREventWebrtcConnection extends CDREventEnd implements Comparable<
if (kMediaOptions.rtspUri != null) {
json.addProperty("rtspUri", kMediaOptions.rtspUri);
json.addProperty("adaptativeBitrate", kMediaOptions.adaptativeBitrate);
json.addProperty("onlyPlayWithSubscribers", kMediaOptions.onlyPlayWithSubscribers);
}
}
}

View File

@ -132,7 +132,7 @@ public class Participant {
public void setPlatform(String platform) {
this.platform = platform;
}
public EndpointType getEndpointType() {
return this.endpointType;
}
@ -149,10 +149,6 @@ public class Participant {
return this.platform.equals("IPCAM") && this.participantPrivatetId.startsWith("IPCAM-");
}
public void setStreaming(boolean streaming) {
this.streaming = streaming;
}
public String getPublisherStreamId() {
return null;
}

View File

@ -31,6 +31,7 @@ public class KurentoMediaOptions extends MediaOptions {
// IPCAM properties
public String rtspUri;
public Boolean adaptativeBitrate;
public Boolean onlyPlayWithSubscribers;
public KurentoMediaOptions(boolean isOffer, String sdpOffer, Boolean hasAudio, Boolean hasVideo,
Boolean audioActive, Boolean videoActive, String typeOfVideo, Integer frameRate, String videoDimensions,
@ -43,13 +44,15 @@ public class KurentoMediaOptions extends MediaOptions {
public KurentoMediaOptions(boolean isOffer, String sdpOffer, Boolean hasAudio, Boolean hasVideo,
Boolean audioActive, Boolean videoActive, String typeOfVideo, Integer frameRate, String videoDimensions,
KurentoFilter filter, boolean doLoopback, String rtspUri, Boolean adaptativeBitrate) {
KurentoFilter filter, boolean doLoopback, String rtspUri, Boolean adaptativeBitrate,
Boolean onlyPlayWithSubscribers) {
super(hasAudio, hasVideo, audioActive, videoActive, typeOfVideo, frameRate, videoDimensions, filter);
this.isOffer = isOffer;
this.sdpOffer = sdpOffer;
this.doLoopback = doLoopback;
this.rtspUri = rtspUri;
this.adaptativeBitrate = adaptativeBitrate;
this.onlyPlayWithSubscribers = onlyPlayWithSubscribers;
}
@Override
@ -58,6 +61,9 @@ public class KurentoMediaOptions extends MediaOptions {
if (adaptativeBitrate != null) {
json.addProperty("adaptativeBitrate", adaptativeBitrate);
}
if (onlyPlayWithSubscribers != null) {
json.addProperty("onlyPlayWithSubscribers", onlyPlayWithSubscribers);
}
return json;
}

View File

@ -18,6 +18,8 @@
package io.openvidu.server.kurento.core;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
@ -98,6 +100,11 @@ public class KurentoParticipant extends Participant {
}
public void createPublishingEndpoint(MediaOptions mediaOptions) {
final String publisherStreamId = this.getParticipantPublicId() + "_"
+ (mediaOptions.hasVideo() ? mediaOptions.getTypeOfVideo() : "MICRO") + "_"
+ RandomStringUtils.random(5, true, false).toUpperCase();
this.publisher.setStreamId(publisherStreamId);
this.publisher.setEndpointName(publisherStreamId);
publisher.setMediaOptions(mediaOptions);
publisher.createEndpoint(publisherLatch);
@ -105,13 +112,7 @@ public class KurentoParticipant extends Participant {
throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE, "Unable to create publisher endpoint");
}
String publisherStreamId = this.getParticipantPublicId() + "_"
+ (mediaOptions.hasVideo() ? mediaOptions.getTypeOfVideo() : "MICRO") + "_"
+ RandomStringUtils.random(5, true, false).toUpperCase();
this.publisher.setEndpointName(publisherStreamId);
this.publisher.getEndpoint().setName(publisherStreamId);
this.publisher.setStreamId(publisherStreamId);
endpointConfig.addEndpointListeners(this.publisher, "publisher");
@ -310,8 +311,12 @@ public class KurentoParticipant extends Participant {
return;
}
this.closed = definitelyClosed;
for (String remoteParticipantName : subscribers.keySet()) {
SubscriberEndpoint subscriber = this.subscribers.get(remoteParticipantName);
Iterator<Entry<String, SubscriberEndpoint>> it = subscribers.entrySet().iterator();
while (it.hasNext()) {
final Entry<String, SubscriberEndpoint> entry = it.next();
final String remoteParticipantName = entry.getKey();
final SubscriberEndpoint subscriber = entry.getValue();
it.remove();
if (subscriber != null && subscriber.getEndpoint() != null) {
releaseSubscriberEndpoint(remoteParticipantName, subscriber, reason);
log.debug("PARTICIPANT {}: Released subscriber endpoint to {}", this.getParticipantPublicId(),
@ -323,7 +328,6 @@ public class KurentoParticipant extends Participant {
this.getParticipantPublicId(), remoteParticipantName);
}
}
this.subscribers.clear();
releasePublisherEndpoint(reason, kmsDisconnectionTime);
}
@ -411,6 +415,27 @@ public class KurentoParticipant extends Participant {
releaseElement(senderName, subscriber.getEndpoint());
// Stop PlayerEndpoint of IP CAM if last subscriber disconnected
final KurentoParticipant sender = (KurentoParticipant) this.session.getParticipantByPublicId(senderName);
if (sender != null && ((KurentoMediaOptions) sender.getPublisherMediaOptions()).onlyPlayWithSubscribers) {
final PublisherEndpoint publisher = sender.publisher;
if (publisher != null) {
synchronized (publisher) {
publisher.numberOfSubscribers--;
if (publisher.isPlayerEndpoint() && publisher.numberOfSubscribers == 0) {
try {
publisher.getPlayerEndpoint().stop();
log.info("IP Camera stream {} feed is now disabled because there are no subscribers",
publisher.getStreamId());
} catch (Exception e) {
log.info("Error while disabling feed for IP camera {}: {}", publisher.getStreamId(),
e.getMessage());
}
}
}
}
}
if (!ProtocolElements.RECORDER_PARTICIPANT_PUBLICID.equals(this.getParticipantPublicId())) {
endpointConfig.getCdr().stopSubscriber(this.getParticipantPublicId(), senderName,
subscriber.getStreamId(), reason);

View File

@ -287,7 +287,7 @@ public abstract class MediaEndpoint {
}
});
} else if (this.isPlayerEndpoint()) {
KurentoMediaOptions mediaOptions = (KurentoMediaOptions) this.owner.getPublisherMediaOptions();
final KurentoMediaOptions mediaOptions = (KurentoMediaOptions) this.owner.getPublisherMediaOptions();
PlayerEndpoint.Builder playerBuilder = new PlayerEndpoint.Builder(pipeline, mediaOptions.rtspUri);
if (!mediaOptions.adaptativeBitrate) {
@ -299,9 +299,24 @@ public abstract class MediaEndpoint {
@Override
public void onSuccess(PlayerEndpoint result) throws Exception {
playerEndpoint = result;
if (!mediaOptions.onlyPlayWithSubscribers) {
playerEndpoint.play(new Continuation<Void>() {
@Override
public void onSuccess(Void result) throws Exception {
log.info("IP Camera stream {} feed is now enabled", streamId);
}
@Override
public void onError(Throwable cause) throws Exception {
log.info("Error while enabling feed for IP camera {}: {}", streamId,
cause.getMessage());
}
});
}
log.trace("EP {}: Created a new PlayerEndpoint", endpointName);
endpointSubscription = registerElemErrListener(playerEndpoint);
playerEndpoint.play();
endpointLatch.countDown();
}

View File

@ -74,6 +74,8 @@ public class PublisherEndpoint extends MediaEndpoint {
private Map<String, ListenerSubscription> elementsErrorSubscriptions = new HashMap<String, ListenerSubscription>();
public int numberOfSubscribers = 0;
public PublisherEndpoint(EndpointType endpointType, KurentoParticipant owner, String endpointName,
MediaPipeline pipeline, OpenviduConfig openviduConfig) {
super(endpointType, owner, endpointName, pipeline, openviduConfig, log);
@ -201,6 +203,7 @@ public class PublisherEndpoint extends MediaEndpoint {
innerConnect();
}
internalSinkConnect(passThru, sink);
this.enableIpCameraIfNecessary();
}
public synchronized void connect(MediaElement sink, MediaType type) {
@ -208,16 +211,26 @@ public class PublisherEndpoint extends MediaEndpoint {
innerConnect();
}
internalSinkConnect(passThru, sink, type);
this.enableIpCameraIfNecessary();
}
private void enableIpCameraIfNecessary() {
numberOfSubscribers++;
if (this.isPlayerEndpoint() && ((KurentoMediaOptions) this.mediaOptions).onlyPlayWithSubscribers
&& numberOfSubscribers == 1) {
try {
this.getPlayerEndpoint().play();
log.info("IP Camera stream {} feed is now enabled", streamId);
} catch (Exception e) {
log.info("Error while enabling feed for IP camera {}: {}", streamId, e.getMessage());
}
}
}
public synchronized void disconnectFrom(MediaElement sink) {
internalSinkDisconnect(passThru, sink);
}
public synchronized void disconnectFrom(MediaElement sink, MediaType type) {
internalSinkDisconnect(passThru, sink, type);
}
/**
* Changes the media passing through a chain of media elements by applying the
* specified element/shaper. The element is plugged into the stream only if the

View File

@ -682,10 +682,12 @@ public class SessionRestController {
String type;
String rtspUri;
Boolean adaptativeBitrate;
Boolean onlyPlayWithSubscribers;
try {
type = (String) params.get("type");
rtspUri = (String) params.get("rtspUri");
adaptativeBitrate = (Boolean) params.get("adaptativeBitrate");
onlyPlayWithSubscribers = (Boolean) params.get("onlyPlayWithSubscribers");
} catch (ClassCastException e) {
return this.generateErrorResponse("Type error in some parameter",
"/api/sessions/" + sessionId + "/connection", HttpStatus.BAD_REQUEST);
@ -697,6 +699,7 @@ public class SessionRestController {
type = type != null ? type : "IPCAM";
adaptativeBitrate = adaptativeBitrate != null ? adaptativeBitrate : true;
onlyPlayWithSubscribers = onlyPlayWithSubscribers != null ? onlyPlayWithSubscribers : true;
boolean hasAudio = true;
boolean hasVideo = true;
@ -706,7 +709,7 @@ public class SessionRestController {
Integer frameRate = null;
String videoDimensions = null;
KurentoMediaOptions mediaOptions = new KurentoMediaOptions(true, null, hasAudio, hasVideo, audioActive,
videoActive, typeOfVideo, frameRate, videoDimensions, null, false, rtspUri, adaptativeBitrate);
videoActive, typeOfVideo, frameRate, videoDimensions, null, false, rtspUri, adaptativeBitrate, onlyPlayWithSubscribers);
try {
Participant ipcamParticipant = this.sessionManager.publishIpcam(session, mediaOptions);