openvidu-server: mediasoup refactoring

pull/600/head
pabloFuente 2020-09-01 15:17:17 +02:00
parent 0d3d54351e
commit 3086b4aaf2
12 changed files with 277 additions and 157 deletions

View File

@ -830,7 +830,8 @@ export class Stream extends EventDispatcher {
let params;
if (reconnect) {
params = {
stream: this.streamId
stream: this.streamId,
sdpString: sdpOfferParam
}
} else {
let typeOfVideo = '';
@ -846,10 +847,10 @@ export class Stream extends EventDispatcher {
typeOfVideo,
frameRate: !!this.frameRate ? this.frameRate : -1,
videoDimensions: JSON.stringify(this.videoDimensions),
filter: this.outboundStreamOpts.publisherProperties.filter
filter: this.outboundStreamOpts.publisherProperties.filter,
sdpOffer: sdpOfferParam
}
}
params['sdpOffer'] = sdpOfferParam;
this.session.openvidu.sendRequest(method, params, (error, response) => {
if (error) {
@ -955,8 +956,9 @@ export class Stream extends EventDispatcher {
+ this.streamId, sdpAnswer);
const method = reconnect ? 'reconnectStream' : 'receiveVideoFrom';
const params = { sdpAnswer };
const params = {};
params[reconnect ? 'stream' : 'sender'] = this.streamId;
params[reconnect ? 'sdpString' : 'sdpAnswer'] = sdpAnswer;
this.session.openvidu.sendRequest(method, params, (error, response) => {
if (error) {

View File

@ -70,6 +70,9 @@ public class ProtocolElements {
public static final String UNPUBLISHVIDEO_METHOD = "unpublishVideo";
public static final String PREPARERECEIVEVIDEO_METHOD = "prepareReceiveVideFrom";
public static final String PREPARERECEIVEVIDEO_SDPOFFER_PARAM = "sdpOffer";
public static final String RECEIVEVIDEO_METHOD = "receiveVideoFrom";
public static final String RECEIVEVIDEO_SDPOFFER_PARAM = "sdpOffer";
public static final String RECEIVEVIDEO_SENDER_PARAM = "sender";
@ -120,7 +123,7 @@ public class ProtocolElements {
public static final String RECONNECTSTREAM_METHOD = "reconnectStream";
public static final String RECONNECTSTREAM_STREAM_PARAM = "stream";
public static final String RECONNECTSTREAM_SDPOFFER_PARAM = "sdpOffer";
public static final String RECONNECTSTREAM_SDPSTRING_PARAM = "sdpString";
// ---------------------------- SERVER RESPONSES & EVENTS -----------------

View File

@ -279,14 +279,24 @@ public class SessionEventsHandler {
}
}
public void onSubscribe(Participant participant, Session session, String sdpAnswer, Integer transactionId,
public void onPrepareSubscription(Participant participant, Session session, String sdpOffer, Integer transactionId,
OpenViduException error) {
if (error != null) {
rpcNotificationService.sendErrorResponse(participant.getParticipantPrivateId(), transactionId, null, error);
return;
}
JsonObject result = new JsonObject();
result.addProperty(ProtocolElements.PREPARERECEIVEVIDEO_SDPOFFER_PARAM, sdpOffer);
rpcNotificationService.sendResponse(participant.getParticipantPrivateId(), transactionId, result);
}
public void onSubscribe(Participant participant, Session session, Integer transactionId,
OpenViduException error) {
if (error != null) {
rpcNotificationService.sendErrorResponse(participant.getParticipantPrivateId(), transactionId, null, error);
return;
}
JsonObject result = new JsonObject();
result.addProperty(ProtocolElements.RECEIVEVIDEO_SDPANSWER_PARAM, sdpAnswer);
rpcNotificationService.sendResponse(participant.getParticipantPrivateId(), transactionId, result);
if (ProtocolElements.RECORDER_PARTICIPANT_PUBLICID.equals(participant.getParticipantPublicId())) {

View File

@ -106,7 +106,9 @@ public abstract class SessionManager {
public abstract void unpublishVideo(Participant participant, Participant moderator, Integer transactionId,
EndReason reason);
public abstract void subscribe(Participant participant, String senderName, String sdpOffer, Integer transactionId);
public abstract void prepareSubscription(Participant participant, String senderPublicId, Integer id);
public abstract void subscribe(Participant participant, String senderName, String sdpAnwser, Integer transactionId);
public abstract void unsubscribe(Participant participant, String senderName, Integer transactionId);

View File

@ -54,7 +54,6 @@ import io.openvidu.server.core.MediaOptions;
import io.openvidu.server.core.Participant;
import io.openvidu.server.kurento.endpoint.MediaEndpoint;
import io.openvidu.server.kurento.endpoint.PublisherEndpoint;
import io.openvidu.server.kurento.endpoint.SdpType;
import io.openvidu.server.kurento.endpoint.SubscriberEndpoint;
import io.openvidu.server.recording.service.RecordingManager;
@ -169,15 +168,15 @@ public class KurentoParticipant extends Participant {
return session;
}
public String publishToRoom(SdpType sdpType, String sdpString, boolean doLoopback, boolean silent) {
log.info("PARTICIPANT {}: Request to publish video in room {} (sdp type {})", this.getParticipantPublicId(),
this.session.getSessionId(), sdpType);
log.trace("PARTICIPANT {}: Publishing Sdp ({}) is {}", this.getParticipantPublicId(), sdpType, sdpString);
public String publishToRoom(String sdpOffer, boolean doLoopback, boolean silent) {
log.info("PARTICIPANT {}: Request to publish video in room {})", this.getParticipantPublicId(),
this.session.getSessionId());
log.trace("PARTICIPANT {}: Publishing Sdp Offer is {}", this.getParticipantPublicId(), sdpOffer);
String sdpResponse = this.getPublisher().publish(sdpType, sdpString, doLoopback);
String sdpAnswer = this.getPublisher().publish(sdpOffer, doLoopback);
this.streaming = true;
log.trace("PARTICIPANT {}: Publishing Sdp ({}) is {}", this.getParticipantPublicId(), sdpType, sdpResponse);
log.trace("PARTICIPANT {}: Publishing Sdp Answer is {}", this.getParticipantPublicId(), sdpAnswer);
log.info("PARTICIPANT {}: Is now publishing video in room {}", this.getParticipantPublicId(),
this.session.getSessionId());
@ -191,7 +190,7 @@ public class KurentoParticipant extends Participant {
publisher.getMediaOptions(), publisher.createdAt());
}
return sdpResponse;
return sdpAnswer;
}
public void unpublishMedia(EndReason reason, long kmsDisconnectionTime) {
@ -204,12 +203,11 @@ public class KurentoParticipant extends Participant {
this.getParticipantPublicId());
}
public String receiveMediaFrom(Participant sender, String sdpOffer, boolean silent) {
public String prepareReceiveMediaFrom(Participant sender) {
final String senderName = sender.getParticipantPublicId();
log.info("PARTICIPANT {}: Request to receive media from {} in room {}", this.getParticipantPublicId(),
log.info("PARTICIPANT {}: Request to prepare receive media from {} in room {}", this.getParticipantPublicId(),
senderName, this.session.getSessionId());
log.trace("PARTICIPANT {}: SdpOffer for {} is {}", this.getParticipantPublicId(), senderName, sdpOffer);
if (senderName.equals(this.getParticipantPublicId())) {
log.warn("PARTICIPANT {}: trying to configure loopback by subscribing", this.getParticipantPublicId());
@ -269,8 +267,56 @@ public class KurentoParticipant extends Participant {
log.debug("PARTICIPANT {}: Created subscriber endpoint for user {}", this.getParticipantPublicId(),
senderName);
try {
String sdpAnswer = subscriber.subscribe(sdpOffer, kSender.getPublisher());
log.trace("PARTICIPANT {}: Subscribing SdpAnswer is {}", this.getParticipantPublicId(), sdpAnswer);
String sdpOffer = subscriber.prepareSubscription(kSender.getPublisher());
log.trace("PARTICIPANT {}: Subscribing SdpOffer is {}", this.getParticipantPublicId(), sdpOffer);
log.info("PARTICIPANT {}: offer prepared to receive media from {} in room {}",
this.getParticipantPublicId(), senderName, this.session.getSessionId());
return sdpOffer;
} catch (KurentoServerException e) {
log.error("Exception preparing subscriber endpoint for user {}: {}", this.getParticipantPublicId(),
e.getMessage());
this.subscribers.remove(senderName);
releaseSubscriberEndpoint(senderName, (KurentoParticipant) sender, subscriber, null, false);
return null;
}
} finally {
kSender.getPublisher().closingLock.readLock().unlock();
}
} else {
log.error(
"PublisherEndpoint of participant {} of session {} is closed. Participant {} couldn't subscribe to it ",
senderName, sender.getSessionId(), this.participantPublicId);
throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE,
"Unable to create subscriber endpoint. Publisher endpoint of participant " + senderName
+ "is closed");
}
}
public void receiveMediaFrom(Participant sender, String sdpAnswer, boolean silent) {
final String senderName = sender.getParticipantPublicId();
log.info("PARTICIPANT {}: Request to receive media from {} in room {}", this.getParticipantPublicId(),
senderName, this.session.getSessionId());
log.trace("PARTICIPANT {}: SdpAnswer for {} is {}", this.getParticipantPublicId(), senderName, sdpAnswer);
if (senderName.equals(this.getParticipantPublicId())) {
log.warn("PARTICIPANT {}: trying to configure loopback by subscribing", this.getParticipantPublicId());
throw new OpenViduException(Code.USER_NOT_STREAMING_ERROR_CODE, "Can loopback only when publishing media");
}
KurentoParticipant kSender = (KurentoParticipant) sender;
if (kSender.streaming && kSender.getPublisher() != null
&& kSender.getPublisher().closingLock.readLock().tryLock()) {
try {
final SubscriberEndpoint subscriber = getSubscriber(senderName);
if (subscriber.getEndpoint() == null) {
throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE, "Unable to create subscriber endpoint");
}
try {
subscriber.subscribe(sdpAnswer, kSender.getPublisher());
log.info("PARTICIPANT {}: Is now receiving video from {} in room {}", this.getParticipantPublicId(),
senderName, this.session.getSessionId());
@ -279,8 +325,6 @@ public class KurentoParticipant extends Participant {
endpointConfig.getCdr().recordNewSubscriber(this, this.session.getSessionId(),
sender.getPublisherStreamId(), sender.getParticipantPublicId(), subscriber.createdAt());
}
return sdpAnswer;
} catch (KurentoServerException e) {
// TODO Check object status when KurentoClient sets this info in the object
if (e.getCode() == 40101) {
@ -292,7 +336,6 @@ public class KurentoParticipant extends Participant {
}
this.subscribers.remove(senderName);
releaseSubscriberEndpoint(senderName, (KurentoParticipant) sender, subscriber, null, false);
return null;
}
} finally {
kSender.getPublisher().closingLock.readLock().unlock();

View File

@ -30,7 +30,6 @@ import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import io.openvidu.java.client.*;
import org.apache.commons.lang3.RandomStringUtils;
import org.kurento.client.GenericMediaElement;
import org.kurento.client.IceCandidate;
@ -48,6 +47,12 @@ import com.google.gson.JsonObject;
import io.openvidu.client.OpenViduException;
import io.openvidu.client.OpenViduException.Code;
import io.openvidu.client.internal.ProtocolElements;
import io.openvidu.java.client.MediaMode;
import io.openvidu.java.client.Recording;
import io.openvidu.java.client.RecordingLayout;
import io.openvidu.java.client.RecordingMode;
import io.openvidu.java.client.RecordingProperties;
import io.openvidu.java.client.SessionProperties;
import io.openvidu.server.core.EndReason;
import io.openvidu.server.core.FinalUser;
import io.openvidu.server.core.IdentifierPrefixes;
@ -58,7 +63,6 @@ import io.openvidu.server.core.SessionManager;
import io.openvidu.server.core.Token;
import io.openvidu.server.kurento.endpoint.KurentoFilter;
import io.openvidu.server.kurento.endpoint.PublisherEndpoint;
import io.openvidu.server.kurento.endpoint.SdpType;
import io.openvidu.server.kurento.kms.Kms;
import io.openvidu.server.kurento.kms.KmsManager;
import io.openvidu.server.rpc.RpcHandler;
@ -368,7 +372,6 @@ public class KurentoSessionManager extends SessionManager {
kurentoOptions.isOffer, kurentoOptions.sdpOffer, kurentoOptions.doLoopback, kurentoOptions.rtspUri,
participant.getParticipantPublicId());
SdpType sdpType = kurentoOptions.isOffer ? SdpType.OFFER : SdpType.ANSWER;
KurentoSession kSession = kParticipant.getSession();
kParticipant.createPublishingEndpoint(mediaOptions, null);
@ -395,7 +398,7 @@ public class KurentoSessionManager extends SessionManager {
}
}
sdpAnswer = kParticipant.publishToRoom(sdpType, kurentoOptions.sdpOffer, kurentoOptions.doLoopback, false);
sdpAnswer = kParticipant.publishToRoom(kurentoOptions.sdpOffer, kurentoOptions.doLoopback, false);
if (sdpAnswer == null) {
OpenViduException e = new OpenViduException(Code.MEDIA_SDP_ERROR_CODE,
@ -502,11 +505,54 @@ public class KurentoSessionManager extends SessionManager {
}
@Override
public void subscribe(Participant participant, String senderName, String sdpOffer, Integer transactionId) {
String sdpAnswer = null;
public void prepareSubscription(Participant participant, String senderPublicId, Integer transactionId) {
String sdpOffer = null;
Session session = null;
try {
log.debug("Request [SUBSCRIBE] remoteParticipant={} sdpOffer={} ({})", senderName, sdpOffer,
log.debug("Request [SUBSCRIBE] remoteParticipant={} sdpOffer={} ({})", senderPublicId, sdpOffer,
participant.getParticipantPublicId());
KurentoParticipant kParticipant = (KurentoParticipant) participant;
session = ((KurentoParticipant) participant).getSession();
Participant senderParticipant = session.getParticipantByPublicId(senderPublicId);
if (senderParticipant == null) {
log.warn(
"PARTICIPANT {}: Requesting to recv media from user {} "
+ "in session {} but user could not be found",
participant.getParticipantPublicId(), senderPublicId, session.getSessionId());
throw new OpenViduException(Code.USER_NOT_FOUND_ERROR_CODE,
"User '" + senderPublicId + " not found in session '" + session.getSessionId() + "'");
}
if (!senderParticipant.isStreaming()) {
log.warn(
"PARTICIPANT {}: Requesting to recv media from user {} "
+ "in session {} but user is not streaming media",
participant.getParticipantPublicId(), senderPublicId, session.getSessionId());
throw new OpenViduException(Code.USER_NOT_STREAMING_ERROR_CODE,
"User '" + senderPublicId + " not streaming media in session '" + session.getSessionId() + "'");
}
sdpOffer = kParticipant.prepareReceiveMediaFrom(senderParticipant);
if (sdpOffer == null) {
throw new OpenViduException(Code.MEDIA_SDP_ERROR_CODE, "Unable to generate SDP offer when subscribing '"
+ participant.getParticipantPublicId() + "' to '" + senderPublicId + "'");
}
} catch (OpenViduException e) {
log.error("PARTICIPANT {}: Error preparing subscription to {}", participant.getParticipantPublicId(),
senderPublicId, e);
sessionEventsHandler.onPrepareSubscription(participant, session, null, transactionId, e);
}
if (sdpOffer != null) {
sessionEventsHandler.onPrepareSubscription(participant, session, sdpOffer, transactionId, null);
}
}
@Override
public void subscribe(Participant participant, String senderName, String sdpAnswer, Integer transactionId) {
Session session = null;
try {
log.debug("Request [SUBSCRIBE] remoteParticipant={} sdpAnswer={} ({})", senderName, sdpAnswer,
participant.getParticipantPublicId());
KurentoParticipant kParticipant = (KurentoParticipant) participant;
@ -530,18 +576,11 @@ public class KurentoSessionManager extends SessionManager {
"User '" + senderName + " not streaming media in session '" + session.getSessionId() + "'");
}
sdpAnswer = kParticipant.receiveMediaFrom(senderParticipant, sdpOffer, false);
if (sdpAnswer == null) {
throw new OpenViduException(Code.MEDIA_SDP_ERROR_CODE,
"Unable to generate SDP answer when subscribing '" + participant.getParticipantPublicId()
+ "' to '" + senderName + "'");
}
kParticipant.receiveMediaFrom(senderParticipant, sdpAnswer, false);
sessionEventsHandler.onSubscribe(participant, session, transactionId, null);
} catch (OpenViduException e) {
log.error("PARTICIPANT {}: Error subscribing to {}", participant.getParticipantPublicId(), senderName, e);
sessionEventsHandler.onSubscribe(participant, session, null, transactionId, e);
}
if (sdpAnswer != null) {
sessionEventsHandler.onSubscribe(participant, session, sdpAnswer, transactionId, null);
sessionEventsHandler.onSubscribe(participant, session, transactionId, e);
}
}
@ -1046,7 +1085,7 @@ public class KurentoSessionManager extends SessionManager {
}
@Override
public void reconnectStream(Participant participant, String streamId, String sdpOffer, Integer transactionId) {
public void reconnectStream(Participant participant, String streamId, String sdpString, Integer transactionId) {
KurentoParticipant kParticipant = (KurentoParticipant) participant;
KurentoSession kSession = kParticipant.getSession();
@ -1067,8 +1106,7 @@ public class KurentoSessionManager extends SessionManager {
// 3) Create a new PublisherEndpoint connecting it to the previous PassThrough
kParticipant.resetPublisherEndpoint(kurentoOptions, passThru);
kParticipant.createPublishingEndpoint(kurentoOptions, streamId);
SdpType sdpType = kurentoOptions.isOffer ? SdpType.OFFER : SdpType.ANSWER;
String sdpAnswer = kParticipant.publishToRoom(sdpType, sdpOffer, kurentoOptions.doLoopback, true);
String sdpAnswer = kParticipant.publishToRoom(sdpString, kurentoOptions.doLoopback, true);
sessionEventsHandler.onPublishMedia(participant, participant.getPublisherStreamId(),
kParticipant.getPublisher().createdAt(), kSession.getSessionId(), kurentoOptions, sdpAnswer,
@ -1081,12 +1119,8 @@ public class KurentoSessionManager extends SessionManager {
if (senderPrivateId != null) {
KurentoParticipant sender = (KurentoParticipant) kSession.getParticipantByPrivateId(senderPrivateId);
kParticipant.cancelReceivingMedia(sender, null, true);
String sdpAnswer = kParticipant.receiveMediaFrom(sender, sdpOffer, true);
if (sdpAnswer == null) {
throw new OpenViduException(Code.MEDIA_SDP_ERROR_CODE,
"Unable to generate SDP answer when reconnecting subscriber to '" + streamId + "'");
}
sessionEventsHandler.onSubscribe(participant, kSession, sdpAnswer, transactionId, null);
kParticipant.receiveMediaFrom(sender, sdpString, true);
sessionEventsHandler.onSubscribe(participant, kSession, transactionId, null);
} else {
throw new OpenViduException(Code.USER_NOT_STREAMING_ERROR_CODE,
"Stream '" + streamId + "' does not exist in Session '" + kSession.getSessionId() + "'");

View File

@ -516,6 +516,24 @@ public abstract class MediaEndpoint {
}
}
protected String generateOffer() throws OpenViduException {
if (this.isWeb()) {
if (webEndpoint == null) {
throw new OpenViduException(Code.MEDIA_WEBRTC_ENDPOINT_ERROR_CODE,
"Can't generate offer when WebRtcEndpoint is null (ep: " + endpointName + ")");
}
return webEndpoint.generateOffer();
} else if (this.isPlayerEndpoint()) {
return "";
} else {
if (endpoint == null) {
throw new OpenViduException(Code.MEDIA_RTP_ENDPOINT_ERROR_CODE,
"Can't generate offer when RtpEndpoint is null (ep: " + endpointName + ")");
}
return endpoint.generateOffer();
}
}
/**
* If supported, it registers a listener for when a new {@link IceCandidate} is
* gathered by the internal endpoint ({@link WebRtcEndpoint}) and sends it to

View File

@ -175,57 +175,42 @@ public class PublisherEndpoint extends MediaEndpoint {
/**
* Initializes this media endpoint for publishing media and processes the SDP
* offer or answer. If the internal endpoint is an {@link WebRtcEndpoint}, it
* first registers an event listener for the ICE candidates and instructs the
* endpoint to start gathering the candidates. If required, it connects to
* itself (after applying the intermediate media elements and the
* {@link PassThrough}) to allow loopback of the media stream.
* offer. If the internal endpoint is an {@link WebRtcEndpoint}, it first
* registers an event listener for the ICE candidates and instructs the endpoint
* to start gathering the candidates. If required, it connects to itself (after
* applying the intermediate media elements and the {@link PassThrough}) to
* allow loopback of the media stream.
*
* @param sdpType indicates the type of the sdpString (offer or
* answer)
* @param sdpString offer or answer from the remote peer
* @param sdpOffer SDP offer from the remote peer
* @param doLoopback loopback flag
* @param loopbackAlternativeSrc alternative loopback source
* @param loopbackConnectionType how to connect the loopback source
* @return the SDP response (the answer if processing an offer SDP, otherwise is
* the updated offer generated previously by this endpoint)
* @return the SDP answer
*/
public synchronized String publish(SdpType sdpType, String sdpString, boolean doLoopback) {
public synchronized String publish(String sdpOffer, boolean doLoopback) {
String sdpResponse = processOffer(sdpOffer);
registerOnIceCandidateEventListener(this.getOwner().getParticipantPublicId());
if (doLoopback) {
connect(this.getEndpoint());
connect(this.getEndpoint(), false);
} else {
innerConnect();
innerConnect(false);
}
this.createdAt = System.currentTimeMillis();
String sdpResponse = null;
switch (sdpType) {
case ANSWER:
sdpResponse = processAnswer(sdpString);
break;
case OFFER:
sdpResponse = processOffer(sdpString);
break;
default:
throw new OpenViduException(Code.MEDIA_SDP_ERROR_CODE, "Sdp type not supported: " + sdpType);
}
gatherCandidates();
return sdpResponse;
}
public synchronized void connect(MediaElement sink) {
public synchronized void connect(MediaElement sink, boolean blocking) {
if (!connected) {
innerConnect();
innerConnect(blocking);
}
internalSinkConnect(passThru, sink);
internalSinkConnect(passThru, sink, blocking);
this.enableIpCameraIfNecessary();
}
public synchronized void connect(MediaElement sink, MediaType type) {
public synchronized void connect(MediaElement sink, MediaType type, boolean blocking) {
if (!connected) {
innerConnect();
innerConnect(blocking);
}
internalSinkConnect(passThru, sink, type);
internalSinkConnect(passThru, sink, type, blocking);
this.enableIpCameraIfNecessary();
}
@ -289,11 +274,11 @@ public class PublisherEndpoint extends MediaEndpoint {
}
if (connected) {
if (first != null) {
internalSinkConnect(first, shaper, type);
internalSinkConnect(first, shaper, type, false);
} else {
internalSinkConnect(this.getEndpoint(), shaper, type);
internalSinkConnect(this.getEndpoint(), shaper, type, false);
}
internalSinkConnect(shaper, passThru, type);
internalSinkConnect(shaper, passThru, type, false);
}
elementIds.addFirst(id);
elements.put(id, shaper);
@ -343,7 +328,7 @@ public class PublisherEndpoint extends MediaEndpoint {
} else {
prev = passThru;
}
internalSinkConnect(next, prev);
internalSinkConnect(next, prev, false);
}
elementIds.remove(elementId);
if (releaseElement) {
@ -408,13 +393,13 @@ public class PublisherEndpoint extends MediaEndpoint {
}
switch (muteType) {
case ALL:
internalSinkConnect(this.getEndpoint(), sink);
internalSinkConnect(this.getEndpoint(), sink, false);
break;
case AUDIO:
internalSinkConnect(this.getEndpoint(), sink, MediaType.AUDIO);
internalSinkConnect(this.getEndpoint(), sink, MediaType.AUDIO, false);
break;
case VIDEO:
internalSinkConnect(this.getEndpoint(), sink, MediaType.VIDEO);
internalSinkConnect(this.getEndpoint(), sink, MediaType.VIDEO, false);
break;
}
}
@ -440,7 +425,7 @@ public class PublisherEndpoint extends MediaEndpoint {
return elementIds.get(idx - 1);
}
private void innerConnect() {
private void innerConnect(boolean blocking) {
if (this.getEndpoint() == null) {
throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE,
"Can't connect null endpoint (ep: " + getEndpointName() + ")");
@ -453,15 +438,18 @@ public class PublisherEndpoint extends MediaEndpoint {
throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE,
"No media element with id " + prevId + " (ep: " + getEndpointName() + ")");
}
internalSinkConnect(current, prev);
internalSinkConnect(current, prev, blocking);
current = prev;
prevId = getPrevious(prevId);
}
internalSinkConnect(current, passThru);
internalSinkConnect(current, passThru, blocking);
connected = true;
}
private void internalSinkConnect(final MediaElement source, final MediaElement sink) {
private void internalSinkConnect(final MediaElement source, final MediaElement sink, boolean blocking) {
if (blocking) {
source.connect(sink);
} else {
source.connect(sink, new Continuation<Void>() {
@Override
public void onSuccess(Void result) throws Exception {
@ -476,6 +464,7 @@ public class PublisherEndpoint extends MediaEndpoint {
}
});
}
}
/**
* Same as {@link #internalSinkConnect(MediaElement, MediaElement)}, but can
@ -488,15 +477,19 @@ public class PublisherEndpoint extends MediaEndpoint {
* be used instead
* @see #internalSinkConnect(MediaElement, MediaElement)
*/
private void internalSinkConnect(final MediaElement source, final MediaElement sink, final MediaType type) {
private void internalSinkConnect(final MediaElement source, final MediaElement sink, final MediaType type,
boolean blocking) {
if (type == null) {
internalSinkConnect(source, sink);
internalSinkConnect(source, sink, blocking);
} else {
if (blocking) {
source.connect(sink, type);
} else {
source.connect(sink, type, new Continuation<Void>() {
@Override
public void onSuccess(Void result) throws Exception {
log.debug("EP {}: {} media elements have been connected (source {} -> sink {})", getEndpointName(),
type, source.getId(), sink.getId());
log.debug("EP {}: {} media elements have been connected (source {} -> sink {})",
getEndpointName(), type, source.getId(), sink.getId());
}
@Override
@ -507,6 +500,7 @@ public class PublisherEndpoint extends MediaEndpoint {
});
}
}
}
private void internalSinkDisconnect(final MediaElement source, final MediaElement sink) {
source.disconnect(sink, new Continuation<Void>() {

View File

@ -18,7 +18,6 @@
package io.openvidu.server.kurento.endpoint;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicBoolean;
import org.kurento.client.MediaPipeline;
import org.slf4j.Logger;
@ -38,8 +37,6 @@ import io.openvidu.server.kurento.core.KurentoParticipant;
public class SubscriberEndpoint extends MediaEndpoint {
private final static Logger log = LoggerFactory.getLogger(SubscriberEndpoint.class);
private AtomicBoolean connectedToPublisher = new AtomicBoolean(false);
private String publisherStreamId;
public SubscriberEndpoint(EndpointType endpointType, KurentoParticipant owner, String endpointName,
@ -47,23 +44,18 @@ public class SubscriberEndpoint extends MediaEndpoint {
super(endpointType, owner, endpointName, pipeline, openviduConfig, log);
}
public synchronized String subscribe(String sdpOffer, PublisherEndpoint publisher) {
public synchronized String prepareSubscription(PublisherEndpoint publisher) {
registerOnIceCandidateEventListener(publisher.getOwner().getParticipantPublicId());
publisher.connect(this.getEndpoint(), true);
this.createdAt = System.currentTimeMillis();
String sdpAnswer = processOffer(sdpOffer);
gatherCandidates();
publisher.connect(this.getEndpoint());
setConnectedToPublisher(true);
this.publisherStreamId = publisher.getStreamId();
return sdpAnswer;
String sdpOffer = generateOffer();
gatherCandidates();
return sdpOffer;
}
public boolean isConnectedToPublisher() {
return connectedToPublisher.get();
}
public void setConnectedToPublisher(boolean connectedToPublisher) {
this.connectedToPublisher.set(connectedToPublisher);
public synchronized void subscribe(String sdpAnswer, PublisherEndpoint publisher) {
processAnswer(sdpAnswer);
}
@Override

View File

@ -111,7 +111,7 @@ public class CompositeWrapper {
public void connectPublisherEndpoint(PublisherEndpoint endpoint) throws OpenViduException {
HubPort hubPort = new HubPort.Builder(composite).build();
endpoint.connect(hubPort);
endpoint.connect(hubPort, false);
String streamId = endpoint.getOwner().getPublisherStreamId();
this.hubPorts.put(streamId, hubPort);
this.publisherEndpoints.put(streamId, endpoint);

View File

@ -391,14 +391,14 @@ public class SingleStreamRecordingService extends RecordingService {
MediaProfileSpecType profile) {
switch (profile) {
case WEBM:
publisherEndpoint.connect(recorder, MediaType.AUDIO);
publisherEndpoint.connect(recorder, MediaType.VIDEO);
publisherEndpoint.connect(recorder, MediaType.AUDIO, false);
publisherEndpoint.connect(recorder, MediaType.VIDEO, false);
break;
case WEBM_AUDIO_ONLY:
publisherEndpoint.connect(recorder, MediaType.AUDIO);
publisherEndpoint.connect(recorder, MediaType.AUDIO, false);
break;
case WEBM_VIDEO_ONLY:
publisherEndpoint.connect(recorder, MediaType.VIDEO);
publisherEndpoint.connect(recorder, MediaType.VIDEO, false);
break;
default:
throw new UnsupportedOperationException("Unsupported profile when single stream recording: " + profile);

View File

@ -126,6 +126,9 @@ public class RpcHandler extends DefaultJsonRpcHandler<JsonObject> {
case ProtocolElements.ONICECANDIDATE_METHOD:
onIceCandidate(rpcConnection, request);
break;
case ProtocolElements.PREPARERECEIVEVIDEO_METHOD:
prepareReceiveVideoFrom(rpcConnection, request);
break;
case ProtocolElements.RECEIVEVIDEO_METHOD:
receiveVideoFrom(rpcConnection, request);
break;
@ -333,6 +336,20 @@ public class RpcHandler extends DefaultJsonRpcHandler<JsonObject> {
}
}
private void prepareReceiveVideoFrom(RpcConnection rpcConnection, Request<JsonObject> request) {
Participant participant;
try {
participant = sanityCheckOfSession(rpcConnection, "subscribe");
} catch (OpenViduException e) {
return;
}
String senderStreamId = getStringParam(request, ProtocolElements.RECEIVEVIDEO_SENDER_PARAM);
String senderPublicId = parseSenderPublicIdFromStreamId(senderStreamId);
sessionManager.prepareSubscription(participant, senderPublicId, request.getId());
}
private void receiveVideoFrom(RpcConnection rpcConnection, Request<JsonObject> request) {
Participant participant;
try {
@ -341,23 +358,12 @@ public class RpcHandler extends DefaultJsonRpcHandler<JsonObject> {
return;
}
String senderPublicId = getStringParam(request, ProtocolElements.RECEIVEVIDEO_SENDER_PARAM);
String senderStreamId = getStringParam(request, ProtocolElements.RECEIVEVIDEO_SENDER_PARAM);
String sdpAnswer = getStringParam(request, ProtocolElements.RECEIVEVIDEO_SDPANSWER_PARAM);
// Parse sender public id from stream id
if (senderPublicId.startsWith(IdentifierPrefixes.STREAM_ID + "IPC_")
&& senderPublicId.contains(IdentifierPrefixes.IPCAM_ID)) {
// If IPCAM
senderPublicId = senderPublicId.substring(senderPublicId.indexOf("_" + IdentifierPrefixes.IPCAM_ID) + 1,
senderPublicId.length());
} else {
// Not IPCAM
senderPublicId = senderPublicId.substring(
senderPublicId.lastIndexOf(IdentifierPrefixes.PARTICIPANT_PUBLIC_ID), senderPublicId.length());
}
String senderPublicId = parseSenderPublicIdFromStreamId(senderStreamId);
String sdpOffer = getStringParam(request, ProtocolElements.RECEIVEVIDEO_SDPOFFER_PARAM);
sessionManager.subscribe(participant, senderPublicId, sdpOffer, request.getId());
sessionManager.subscribe(participant, senderPublicId, sdpAnswer, request.getId());
}
private void unsubscribeFromVideo(RpcConnection rpcConnection, Request<JsonObject> request) {
@ -622,9 +628,9 @@ public class RpcHandler extends DefaultJsonRpcHandler<JsonObject> {
return;
}
String streamId = getStringParam(request, ProtocolElements.RECONNECTSTREAM_STREAM_PARAM);
String sdpOffer = getStringParam(request, ProtocolElements.RECONNECTSTREAM_SDPOFFER_PARAM);
String sdpString = getStringParam(request, ProtocolElements.RECONNECTSTREAM_SDPSTRING_PARAM);
try {
sessionManager.reconnectStream(participant, streamId, sdpOffer, request.getId());
sessionManager.reconnectStream(participant, streamId, sdpString, request.getId());
} catch (OpenViduException e) {
this.notificationService.sendErrorResponse(participant.getParticipantPrivateId(), request.getId(),
new JsonObject(), e);
@ -800,4 +806,20 @@ public class RpcHandler extends DefaultJsonRpcHandler<JsonObject> {
.equals(this.sessionManager.getParticipantPrivateIdFromStreamId(sessionId, streamId));
}
private String parseSenderPublicIdFromStreamId(String streamId) {
String senderPublicId;
// Parse sender public id from stream id
if (streamId.startsWith(IdentifierPrefixes.STREAM_ID + "IPC_")
&& streamId.contains(IdentifierPrefixes.IPCAM_ID)) {
// If IPCAM
senderPublicId = streamId.substring(streamId.indexOf("_" + IdentifierPrefixes.IPCAM_ID) + 1,
streamId.length());
} else {
// Not IPCAM
senderPublicId = streamId.substring(streamId.lastIndexOf(IdentifierPrefixes.PARTICIPANT_PUBLIC_ID),
streamId.length());
}
return senderPublicId;
}
}