mediasoup refactoring

pull/621/head
pabloFuente 2021-03-16 10:26:39 +01:00
parent 2e40d14432
commit 084cfc49f7
13 changed files with 885 additions and 341 deletions

View File

@ -106,14 +106,6 @@ export class Session extends EventDispatcher {
*/
remoteStreamsCreated: Map<string, boolean> = new Map();
/**
* @hidden
*/
isFirstIonicIosSubscriber = true;
/**
* @hidden
*/
countDownForIonicIosSubscribersActive = true;
/**
* @hidden
*/
@ -735,11 +727,6 @@ export class Session extends EventDispatcher {
streamEvent.callDefaultBehavior();
this.remoteStreamsCreated.delete(stream.streamId);
if (this.remoteStreamsCreated.size === 0) {
this.isFirstIonicIosSubscriber = true;
this.countDownForIonicIosSubscribersActive = true;
}
}
this.remoteConnections.delete(connection.connectionId);
this.ee.emitEvent('connectionDestroyed', [new ConnectionEvent(false, this, 'connectionDestroyed', connection, msg.reason)]);
@ -810,11 +797,6 @@ export class Session extends EventDispatcher {
const streamId: string = connection.stream!.streamId;
this.remoteStreamsCreated.delete(streamId);
if (this.remoteStreamsCreated.size === 0) {
this.isFirstIonicIosSubscriber = true;
this.countDownForIonicIosSubscribersActive = true;
}
connection.removeStream(streamId);
})
.catch(openViduError => {

View File

@ -825,7 +825,7 @@ export class Stream extends EventDispatcher {
simulcast: false
};
const successCallback = (sdpOfferParam) => {
const successOfferCallback = (sdpOfferParam) => {
logger.debug('Sending SDP offer to publish as '
+ this.streamId, sdpOfferParam);
@ -833,7 +833,8 @@ export class Stream extends EventDispatcher {
let params;
if (reconnect) {
params = {
stream: this.streamId
stream: this.streamId,
sdpString: sdpOfferParam
}
} else {
let typeOfVideo = '';
@ -849,10 +850,10 @@ export class Stream extends EventDispatcher {
typeOfVideo,
frameRate: !!this.frameRate ? this.frameRate : -1,
videoDimensions: JSON.stringify(this.videoDimensions),
filter: this.outboundStreamOpts.publisherProperties.filter
filter: this.outboundStreamOpts.publisherProperties.filter,
sdpOffer: sdpOfferParam
}
}
params['sdpOffer'] = sdpOfferParam;
this.session.openvidu.sendRequest(method, params, (error, response) => {
if (error) {
@ -862,7 +863,7 @@ export class Stream extends EventDispatcher {
reject('Error on publishVideo: ' + JSON.stringify(error));
}
} else {
this.webRtcPeer.processAnswer(response.sdpAnswer, false)
this.webRtcPeer.processRemoteAnswer(response.sdpAnswer)
.then(() => {
this.streamId = response.id;
this.creationTime = response.createdAt;
@ -897,10 +898,15 @@ export class Stream extends EventDispatcher {
this.webRtcPeer = new WebRtcPeerSendonly(options);
}
this.webRtcPeer.addIceConnectionStateChangeListener('publisher of ' + this.connection.connectionId);
this.webRtcPeer.generateOffer().then(sdpOffer => {
successCallback(sdpOffer);
this.webRtcPeer.createOffer().then(sdpOffer => {
this.webRtcPeer.processLocalOffer(sdpOffer)
.then(() => {
successOfferCallback(sdpOffer.sdp);
}).catch(error => {
reject(new Error('(publish) SDP offer error: ' + JSON.stringify(error)));
reject(new Error('(publish) SDP process local offer error: ' + JSON.stringify(error)));
});
}).catch(error => {
reject(new Error('(publish) SDP create offer error: ' + JSON.stringify(error)));
});
});
}
@ -909,6 +915,30 @@ export class Stream extends EventDispatcher {
* @hidden
*/
initWebRtcPeerReceive(reconnect: boolean): Promise<void> {
return new Promise((resolve, reject) => {
this.session.openvidu.sendRequest('prepareReceiveVideoFrom', { sender: this.streamId, reconnect }, (error, response) => {
if (error) {
reject(new Error('Error on prepareReceiveVideoFrom: ' + JSON.stringify(error)));
} else {
this.completeWebRtcPeerReceive(response.sdpOffer, reconnect)
.then(() => {
logger.info("'Subscriber' (" + this.streamId + ") successfully " + (reconnect ? "reconnected" : "subscribed"));
this.remotePeerSuccessfullyEstablished();
this.initWebRtcStats();
resolve();
})
.catch(error => {
reject(error);
});
}
});
});
}
/**
* @hidden
*/
completeWebRtcPeerReceive(sdpOffer: string, reconnect: boolean): Promise<void> {
return new Promise((resolve, reject) => {
const offerConstraints = {
@ -924,50 +954,41 @@ export class Stream extends EventDispatcher {
simulcast: false
};
const successCallback = (sdpOfferParam) => {
logger.debug('Sending SDP offer to subscribe to '
+ this.streamId, sdpOfferParam);
const successAnswerCallback = (sdpAnswer) => {
logger.debug('Sending SDP answer to subscribe to '
+ this.streamId, sdpAnswer);
const method = reconnect ? 'reconnectStream' : 'receiveVideoFrom';
const params = { sdpOffer: sdpOfferParam };
const params = {};
params[reconnect ? 'stream' : 'sender'] = this.streamId;
params[reconnect ? 'sdpString' : 'sdpAnswer'] = sdpAnswer;
this.session.openvidu.sendRequest(method, params, (error, response) => {
if (error) {
reject(new Error('Error on recvVideoFrom: ' + JSON.stringify(error)));
reject(new Error('Error on ' + method + ' : ' + JSON.stringify(error)));
} else {
// Ios Ionic. Limitation: some bug in iosrtc cordova plugin makes it necessary
// to add a timeout before calling PeerConnection#setRemoteDescription during
// some time (400 ms) from the moment first subscriber stream is received
if (this.session.isFirstIonicIosSubscriber) {
this.session.isFirstIonicIosSubscriber = false;
setTimeout(() => {
// After 400 ms Ionic iOS subscribers won't need to run
// PeerConnection#setRemoteDescription after 250 ms timeout anymore
this.session.countDownForIonicIosSubscribersActive = false;
}, 400);
}
const needsTimeoutOnProcessAnswer = this.session.countDownForIonicIosSubscribersActive;
this.webRtcPeer.processAnswer(response.sdpAnswer, needsTimeoutOnProcessAnswer).then(() => {
logger.info("'Subscriber' (" + this.streamId + ") successfully " + (reconnect ? "reconnected" : "subscribed"));
this.remotePeerSuccessfullyEstablished();
this.initWebRtcStats();
resolve();
}).catch(error => {
reject(error);
});
}
});
};
this.webRtcPeer = new WebRtcPeerRecvonly(options);
this.webRtcPeer.addIceConnectionStateChangeListener(this.streamId);
this.webRtcPeer.generateOffer()
.then(sdpOffer => {
successCallback(sdpOffer);
this.webRtcPeer.processRemoteOffer(sdpOffer)
.then(() => {
this.webRtcPeer.createAnswer().then(sdpAnswer => {
this.webRtcPeer.processLocalAnswer(sdpAnswer)
.then(() => {
successAnswerCallback(sdpAnswer.sdp);
}).catch(error => {
reject(new Error('(subscribe) SDP process local answer error: ' + JSON.stringify(error)));
});
}).catch(error => {
reject(new Error('(subscribe) SDP create answer error: ' + JSON.stringify(error)));
});
})
.catch(error => {
reject(new Error('(subscribe) SDP offer error: ' + JSON.stringify(error)));
reject(new Error('(subscribe) SDP process remote offer error: ' + JSON.stringify(error)));
});
});
}

View File

@ -36,7 +36,7 @@ export interface WebRtcPeerConfiguration {
video: boolean
};
simulcast: boolean;
onicecandidate: (event) => void;
onicecandidate: (event: RTCIceCandidate) => void;
iceServers: RTCIceServer[] | undefined;
mediaStream?: MediaStream;
mode?: 'sendonly' | 'recvonly' | 'sendrecv';
@ -52,8 +52,6 @@ export class WebRtcPeer {
iceCandidateList: RTCIceCandidate[] = [];
private candidategatheringdone = false;
constructor(protected configuration: WebRtcPeerConfiguration) {
platform = PlatformUtils.getInstance();
this.configuration.iceServers = (!!this.configuration.iceServers && this.configuration.iceServers.length > 0) ? this.configuration.iceServers : freeice();
@ -61,15 +59,12 @@ export class WebRtcPeer {
this.pc = new RTCPeerConnection({ iceServers: this.configuration.iceServers });
this.id = !!configuration.id ? configuration.id : this.generateUniqueId();
this.pc.onicecandidate = event => {
if (!!event.candidate) {
this.pc.onicecandidate = (event: RTCPeerConnectionIceEvent) => {
if (event.candidate != null) {
const candidate: RTCIceCandidate = event.candidate;
if (candidate) {
this.configuration.onicecandidate(candidate);
if (candidate.candidate !== '') {
this.localCandidatesQueue.push(<RTCIceCandidate>{ candidate: candidate.candidate });
this.candidategatheringdone = false;
this.configuration.onicecandidate(event.candidate);
} else if (!this.candidategatheringdone) {
this.candidategatheringdone = true;
}
}
};
@ -123,10 +118,10 @@ export class WebRtcPeer {
}
/**
* Function that creates an offer, sets it as local description and returns the offer param
* to send to OpenVidu Server (will be the remote description of other peer)
* Creates an SDP offer from the local RTCPeerConnection to send to the other peer
* Only if the negotiation was initiated by the this peer
*/
generateOffer(): Promise<string> {
createOffer(): Promise<RTCSessionDescriptionInit> {
return new Promise((resolve, reject) => {
let offerAudio, offerVideo = true;
@ -146,7 +141,8 @@ export class WebRtcPeer {
logger.debug('RTCPeerConnection constraints: ' + JSON.stringify(constraints));
if (platform.isSafariBrowser() && !platform.isIonicIos()) {
// Safari (excluding Ionic), at least on iOS just seems to support unified plan, whereas in other browsers is not yet ready and considered experimental
// Safari (excluding Ionic), at least on iOS just seems to support unified plan,
// whereas in other browsers is not yet ready and considered experimental
if (offerAudio) {
this.pc.addTransceiver('audio', {
direction: this.configuration.mode,
@ -159,39 +155,20 @@ export class WebRtcPeer {
});
}
this.pc
.createOffer()
this.pc.createOffer()
.then(offer => {
logger.debug('Created SDP offer');
return this.pc.setLocalDescription(offer);
})
.then(() => {
const localDescription = this.pc.localDescription;
if (!!localDescription) {
logger.debug('Local description set', localDescription.sdp);
resolve(localDescription.sdp);
} else {
reject('Local description is not defined');
}
resolve(offer);
})
.catch(error => reject(error));
} else {
// Rest of platforms
this.pc.createOffer(constraints).then(offer => {
this.pc.createOffer(constraints)
.then(offer => {
logger.debug('Created SDP offer');
return this.pc.setLocalDescription(offer);
})
.then(() => {
const localDescription = this.pc.localDescription;
if (!!localDescription) {
logger.debug('Local description set', localDescription.sdp);
resolve(localDescription.sdp);
} else {
reject('Local description is not defined');
}
resolve(offer);
})
.catch(error => reject(error));
}
@ -199,10 +176,94 @@ export class WebRtcPeer {
}
/**
* Function invoked when a SDP answer is received. Final step in SDP negotiation, the peer
* just needs to set the answer as its remote description
* Creates an SDP answer from the local RTCPeerConnection to send to the other peer
* Only if the negotiation was initiated by the other peer
*/
processAnswer(sdpAnswer: string, needsTimeoutOnProcessAnswer: boolean): Promise<string> {
createAnswer(): Promise<RTCSessionDescriptionInit> {
return new Promise((resolve, reject) => {
let offerAudio, offerVideo = true;
if (!!this.configuration.mediaConstraints) {
offerAudio = (typeof this.configuration.mediaConstraints.audio === 'boolean') ?
this.configuration.mediaConstraints.audio : true;
offerVideo = (typeof this.configuration.mediaConstraints.video === 'boolean') ?
this.configuration.mediaConstraints.video : true;
}
const constraints: RTCOfferOptions = {
offerToReceiveAudio: offerAudio,
offerToReceiveVideo: offerVideo
};
this.pc.createAnswer(constraints).then(sdpAnswer => {
resolve(sdpAnswer);
}).catch(error => {
reject(error);
});
});
}
/**
* This peer initiated negotiation. Step 1/4 of SDP offer-answer protocol
*/
processLocalOffer(offer: RTCSessionDescriptionInit): Promise<void> {
return new Promise((resolve, reject) => {
this.pc.setLocalDescription(offer)
.then(() => {
const localDescription = this.pc.localDescription;
if (!!localDescription) {
logger.debug('Local description set', localDescription.sdp);
resolve();
} else {
reject('Local description is not defined');
}
})
.catch(error => {
reject(error);
});
});
}
/**
* Other peer initiated negotiation. Step 2/4 of SDP offer-answer protocol
*/
processRemoteOffer(sdpOffer: string): Promise<void> {
return new Promise((resolve, reject) => {
const offer: RTCSessionDescriptionInit = {
type: 'offer',
sdp: sdpOffer
};
logger.debug('SDP offer received, setting remote description', offer);
if (this.pc.signalingState === 'closed') {
reject('RTCPeerConnection is closed when trying to set remote description');
}
this.setRemoteDescription(offer)
.then(() => {
resolve();
})
.catch(error => {
reject(error);
});
});
}
/**
* Other peer initiated negotiation. Step 3/4 of SDP offer-answer protocol
*/
processLocalAnswer(answer: RTCSessionDescriptionInit): Promise<void> {
return new Promise((resolve, reject) => {
logger.debug('SDP answer created, setting local description');
if (this.pc.signalingState === 'closed') {
reject('RTCPeerConnection is closed when trying to set local description');
}
this.pc.setLocalDescription(answer)
.then(() => resolve())
.catch(error => reject(error));
});
}
/**
* This peer initiated negotiation. Step 4/4 of SDP offer-answer protocol
*/
processRemoteAnswer(sdpAnswer: string): Promise<void> {
return new Promise((resolve, reject) => {
const answer: RTCSessionDescriptionInit = {
type: 'answer',
@ -211,34 +272,19 @@ export class WebRtcPeer {
logger.debug('SDP answer received, setting remote description');
if (this.pc.signalingState === 'closed') {
reject('RTCPeerConnection is closed');
reject('RTCPeerConnection is closed when trying to set remote description');
}
this.setRemoteDescription(answer, needsTimeoutOnProcessAnswer, resolve, reject);
this.setRemoteDescription(answer)
.then(() => resolve())
.catch(error => reject(error));
});
}
/**
* @hidden
*/
setRemoteDescription(answer: RTCSessionDescriptionInit, needsTimeoutOnProcessAnswer: boolean, resolve: (value?: string | PromiseLike<string> | undefined) => void, reject: (reason?: any) => void) {
if (platform.isIonicIos()) {
// Ionic iOS platform
if (needsTimeoutOnProcessAnswer) {
// 400 ms have not elapsed yet since first remote stream triggered Stream#initWebRtcPeerReceive
setTimeout(() => {
logger.info('setRemoteDescription run after timeout for Ionic iOS device');
this.pc.setRemoteDescription(new RTCSessionDescription(answer)).then(() => resolve()).catch(error => reject(error));
}, 250);
} else {
// 400 ms have elapsed
this.pc.setRemoteDescription(new RTCSessionDescription(answer)).then(() => resolve()).catch(error => reject(error));
}
} else {
// Rest of platforms
this.pc.setRemoteDescription(answer).then(() => resolve()).catch(error => reject(error));
}
async setRemoteDescription(sdp: RTCSessionDescriptionInit): Promise<void> {
return this.pc.setRemoteDescription(sdp);
}
/**

View File

@ -70,6 +70,10 @@ public class ProtocolElements {
public static final String UNPUBLISHVIDEO_METHOD = "unpublishVideo";
public static final String PREPARERECEIVEVIDEO_METHOD = "prepareReceiveVideoFrom";
public static final String PREPARERECEIVEVIDEO_SDPOFFER_PARAM = "sdpOffer";
public static final String PREPARERECEIVEVIDEO_RECONNECT_PARAM = "reconnect";
public static final String RECEIVEVIDEO_METHOD = "receiveVideoFrom";
public static final String RECEIVEVIDEO_SDPOFFER_PARAM = "sdpOffer";
public static final String RECEIVEVIDEO_SENDER_PARAM = "sender";
@ -129,7 +133,10 @@ public class ProtocolElements {
public static final String RECONNECTSTREAM_METHOD = "reconnectStream";
public static final String RECONNECTSTREAM_STREAM_PARAM = "stream";
public static final String RECONNECTSTREAM_SDPSTRING_PARAM = "sdpString";
// TODO: REMOVE ON 2.18.0
public static final String RECONNECTSTREAM_SDPOFFER_PARAM = "sdpOffer";
// ENDTODO
public static final String VIDEODATA_METHOD = "videoData";

View File

@ -295,6 +295,18 @@ public class SessionEventsHandler {
}
}
public void onPrepareSubscription(Participant participant, Session session, String sdpOffer, Integer transactionId,
OpenViduException error) {
if (error != null) {
rpcNotificationService.sendErrorResponse(participant.getParticipantPrivateId(), transactionId, null, error);
return;
}
JsonObject result = new JsonObject();
result.addProperty(ProtocolElements.PREPARERECEIVEVIDEO_SDPOFFER_PARAM, sdpOffer);
rpcNotificationService.sendResponse(participant.getParticipantPrivateId(), transactionId, result);
}
// TODO: REMOVE ON 2.18.0
public void onSubscribe(Participant participant, Session session, String sdpAnswer, Integer transactionId,
OpenViduException error) {
if (error != null) {
@ -312,6 +324,23 @@ public class SessionEventsHandler {
});
}
}
// END TODO
public void onSubscribe(Participant participant, Session session, Integer transactionId, OpenViduException error) {
if (error != null) {
rpcNotificationService.sendErrorResponse(participant.getParticipantPrivateId(), transactionId, null, error);
return;
}
JsonObject result = new JsonObject();
rpcNotificationService.sendResponse(participant.getParticipantPrivateId(), transactionId, result);
if (ProtocolElements.RECORDER_PARTICIPANT_PUBLICID.equals(participant.getParticipantPublicId())) {
recordingsToSendClientEvents.computeIfPresent(session.getSessionId(), (key, value) -> {
sendRecordingStartedNotification(session, value);
return null;
});
}
}
public void onUnsubscribe(Participant participant, Integer transactionId, OpenViduException error) {
if (error != null) {
@ -561,8 +590,9 @@ public class SessionEventsHandler {
}
}
public void onFilterEventDispatched(String sessionId, String uniqueSessionId, String connectionId, String streamId, String filterType,
GenericMediaEvent event, Set<Participant> participants, Set<String> subscribedParticipants) {
public void onFilterEventDispatched(String sessionId, String uniqueSessionId, String connectionId, String streamId,
String filterType, GenericMediaEvent event, Set<Participant> participants,
Set<String> subscribedParticipants) {
CDR.recordFilterEventDispatched(sessionId, uniqueSessionId, connectionId, streamId, filterType, event);

View File

@ -109,7 +109,18 @@ public abstract class SessionManager {
public abstract void unpublishVideo(Participant participant, Participant moderator, Integer transactionId,
EndReason reason);
public abstract void subscribe(Participant participant, String senderName, String sdpOffer, Integer transactionId);
public abstract void prepareSubscription(Participant participant, String senderPublicId, boolean reconnect,
Integer id);
// TODO: REMOVE ON 2.18.0
public abstract void subscribe(Participant participant, String senderName, String sdpAnwser, Integer transactionId,
boolean is2180);
// END TODO
// TODO: UNCOMMENT ON 2.18.0
// public abstract void subscribe(Participant participant, String senderName,
// String sdpAnwser, Integer transactionId);
// END TODO
public abstract void unsubscribe(Participant participant, String senderName, Integer transactionId);
@ -168,6 +179,11 @@ public abstract class SessionManager {
public abstract void reconnectStream(Participant participant, String streamId, String sdpOffer,
Integer transactionId);
// TODO: REMOVE ON 2.18.0
public abstract void reconnectStream2170(Participant participant, String streamId, String sdpOffer,
Integer transactionId);
// END TODO
public abstract String getParticipantPrivateIdFromStreamId(String sessionId, String streamId)
throws OpenViduException;

View File

@ -206,7 +206,153 @@ public class KurentoParticipant extends Participant {
this.getParticipantPublicId());
}
public String receiveMediaFrom(Participant sender, String sdpOffer, boolean silent) {
public String prepareReceiveMediaFrom(Participant sender) {
final String senderName = sender.getParticipantPublicId();
log.info("PARTICIPANT {}: Request to prepare receive media from {} in room {}", this.getParticipantPublicId(),
senderName, this.session.getSessionId());
if (senderName.equals(this.getParticipantPublicId())) {
log.warn("PARTICIPANT {}: trying to configure loopback by subscribing", this.getParticipantPublicId());
throw new OpenViduException(Code.USER_NOT_STREAMING_ERROR_CODE, "Can loopback only when publishing media");
}
KurentoParticipant kSender = (KurentoParticipant) sender;
if (kSender.streaming && kSender.getPublisher() != null
&& kSender.getPublisher().closingLock.readLock().tryLock()) {
try {
log.debug("PARTICIPANT {}: Creating a subscriber endpoint to user {}", this.getParticipantPublicId(),
senderName);
SubscriberEndpoint subscriber = getNewOrExistingSubscriber(senderName);
try {
CountDownLatch subscriberLatch = new CountDownLatch(1);
Endpoint oldMediaEndpoint = subscriber.createEndpoint(subscriberLatch);
try {
if (!subscriberLatch.await(KurentoSession.ASYNC_LATCH_TIMEOUT, TimeUnit.SECONDS)) {
throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE,
"Timeout reached when creating subscriber endpoint");
}
} catch (InterruptedException e) {
throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE,
"Interrupted when creating subscriber endpoint: " + e.getMessage());
}
if (oldMediaEndpoint != null) {
log.warn(
"PARTICIPANT {}: Two threads are trying to create at "
+ "the same time a subscriber endpoint for user {}",
this.getParticipantPublicId(), senderName);
return null;
}
if (subscriber.getEndpoint() == null) {
throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE,
"Unable to create subscriber endpoint");
}
String subscriberEndpointName = calculateSubscriberEndpointName(kSender);
subscriber.setEndpointName(subscriberEndpointName);
subscriber.getEndpoint().setName(subscriberEndpointName);
subscriber.setStreamId(kSender.getPublisherStreamId());
endpointConfig.addEndpointListeners(subscriber, "subscriber");
} catch (OpenViduException e) {
this.subscribers.remove(senderName);
throw e;
}
log.debug("PARTICIPANT {}: Created subscriber endpoint for user {}", this.getParticipantPublicId(),
senderName);
try {
String sdpOffer = subscriber.prepareSubscription(kSender.getPublisher());
log.trace("PARTICIPANT {}: Subscribing SdpOffer is {}", this.getParticipantPublicId(), sdpOffer);
log.info("PARTICIPANT {}: offer prepared to receive media from {} in room {}",
this.getParticipantPublicId(), senderName, this.session.getSessionId());
return sdpOffer;
} catch (KurentoServerException e) {
log.error("Exception preparing subscriber endpoint for user {}: {}", this.getParticipantPublicId(),
e.getMessage());
this.subscribers.remove(senderName);
releaseSubscriberEndpoint(senderName, (KurentoParticipant) sender, subscriber, null, false);
return null;
}
} finally {
kSender.getPublisher().closingLock.readLock().unlock();
}
} else {
log.error(
"PublisherEndpoint of participant {} of session {} is closed. Participant {} couldn't subscribe to it ",
senderName, sender.getSessionId(), this.participantPublicId);
throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE,
"Unable to create subscriber endpoint. Publisher endpoint of participant " + senderName
+ "is closed");
}
}
public void receiveMediaFrom2180(Participant sender, String sdpAnswer, boolean silent) {
final String senderName = sender.getParticipantPublicId();
log.info("PARTICIPANT {}: Request to receive media from {} in room {}", this.getParticipantPublicId(),
senderName, this.session.getSessionId());
log.trace("PARTICIPANT {}: SdpAnswer for {} is {}", this.getParticipantPublicId(), senderName, sdpAnswer);
if (senderName.equals(this.getParticipantPublicId())) {
log.warn("PARTICIPANT {}: trying to configure loopback by subscribing", this.getParticipantPublicId());
throw new OpenViduException(Code.USER_NOT_STREAMING_ERROR_CODE, "Can loopback only when publishing media");
}
KurentoParticipant kSender = (KurentoParticipant) sender;
if (kSender.streaming && kSender.getPublisher() != null
&& kSender.getPublisher().closingLock.readLock().tryLock()) {
try {
final SubscriberEndpoint subscriber = getSubscriber(senderName);
if (subscriber.getEndpoint() == null) {
throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE, "Unable to create subscriber endpoint");
}
try {
subscriber.subscribe(sdpAnswer, kSender.getPublisher());
log.info("PARTICIPANT {}: Is now receiving video from {} in room {}", this.getParticipantPublicId(),
senderName, this.session.getSessionId());
if (!silent
&& !ProtocolElements.RECORDER_PARTICIPANT_PUBLICID.equals(this.getParticipantPublicId())) {
endpointConfig.getCdr().recordNewSubscriber(this, sender.getPublisherStreamId(),
sender.getParticipantPublicId(), subscriber.createdAt());
}
} catch (KurentoServerException e) {
// TODO Check object status when KurentoClient sets this info in the object
if (e.getCode() == 40101) {
log.warn(
"Publisher endpoint was already released when trying to connect a subscriber endpoint to it",
e);
} else {
log.error("Exception connecting subscriber endpoint to publisher endpoint", e);
}
this.subscribers.remove(senderName);
releaseSubscriberEndpoint(senderName, (KurentoParticipant) sender, subscriber, null, false);
}
} finally {
kSender.getPublisher().closingLock.readLock().unlock();
}
} else {
log.error(
"PublisherEndpoint of participant {} of session {} is closed. Participant {} couldn't subscribe to it ",
senderName, sender.getSessionId(), this.participantPublicId);
throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE,
"Unable to create subscriber endpoint. Publisher endpoint of participant " + senderName
+ "is closed");
}
}
public String receiveMediaFrom2170(Participant sender, String sdpOffer, boolean silent) {
final String senderName = sender.getParticipantPublicId();
log.info("PARTICIPANT {}: Request to receive media from {} in room {}", this.getParticipantPublicId(),

View File

@ -389,8 +389,8 @@ public class KurentoSessionManager extends SessionManager {
// Modify sdp if forced codec is defined
if (forcedVideoCodec != VideoCodec.NONE && !participant.isIpcam()) {
kurentoOptions.sdpOffer = sdpMunging.forceCodec(participant, kurentoOptions.sdpOffer, kSession, true, false,
isTranscodingAllowed, forcedVideoCodec);
kurentoOptions.sdpOffer = sdpMunging.forceCodec(kurentoOptions.sdpOffer, participant, true, false,
isTranscodingAllowed, forcedVideoCodec, false);
CDR.log(new WebrtcDebugEvent(participant, streamId, WebrtcDebugEventIssuer.client,
WebrtcDebugEventOperation.publish, WebrtcDebugEventType.sdpOfferMunged, kurentoOptions.sdpOffer));
}
@ -537,12 +537,78 @@ public class KurentoSessionManager extends SessionManager {
}
@Override
public void subscribe(Participant participant, String senderName, String sdpOffer, Integer transactionId) {
String sdpAnswer = null;
public void prepareSubscription(Participant participant, String senderPublicId, boolean reconnect,
Integer transactionId) {
String sdpOffer = null;
Session session = null;
try {
log.debug("Request [SUBSCRIBE] remoteParticipant={} sdpOffer={} ({})", senderPublicId, sdpOffer,
participant.getParticipantPublicId());
KurentoParticipant kParticipant = (KurentoParticipant) participant;
session = ((KurentoParticipant) participant).getSession();
Participant senderParticipant = session.getParticipantByPublicId(senderPublicId);
if (senderParticipant == null) {
log.warn(
"PARTICIPANT {}: Requesting to recv media from user {} "
+ "in session {} but user could not be found",
participant.getParticipantPublicId(), senderPublicId, session.getSessionId());
throw new OpenViduException(Code.USER_NOT_FOUND_ERROR_CODE,
"User '" + senderPublicId + " not found in session '" + session.getSessionId() + "'");
}
if (!senderParticipant.isStreaming()) {
log.warn(
"PARTICIPANT {}: Requesting to recv media from user {} "
+ "in session {} but user is not streaming media",
participant.getParticipantPublicId(), senderPublicId, session.getSessionId());
throw new OpenViduException(Code.USER_NOT_STREAMING_ERROR_CODE,
"User '" + senderPublicId + " not streaming media in session '" + session.getSessionId() + "'");
}
if (reconnect) {
kParticipant.cancelReceivingMedia(((KurentoParticipant) senderParticipant), null, true);
}
sdpOffer = kParticipant.prepareReceiveMediaFrom(senderParticipant);
final String subscriberEndpointName = kParticipant.calculateSubscriberEndpointName(senderParticipant);
CDR.log(new WebrtcDebugEvent(participant, subscriberEndpointName, WebrtcDebugEventIssuer.server,
WebrtcDebugEventOperation.subscribe, WebrtcDebugEventType.sdpOffer, sdpOffer));
boolean isTranscodingAllowed = session.getSessionProperties().isTranscodingAllowed();
VideoCodec forcedVideoCodec = session.getSessionProperties().forcedVideoCodec();
// Modify server's SDPOffer if forced codec is defined
if (forcedVideoCodec != VideoCodec.NONE && !participant.isIpcam()) {
sdpOffer = sdpMunging.forceCodec(sdpOffer, participant, false, false, isTranscodingAllowed,
forcedVideoCodec, true);
CDR.log(new WebrtcDebugEvent(participant, subscriberEndpointName, WebrtcDebugEventIssuer.server,
WebrtcDebugEventOperation.subscribe, WebrtcDebugEventType.sdpOfferMunged, sdpOffer));
}
if (sdpOffer == null) {
throw new OpenViduException(Code.MEDIA_SDP_ERROR_CODE, "Unable to generate SDP offer when subscribing '"
+ participant.getParticipantPublicId() + "' to '" + senderPublicId + "'");
}
} catch (OpenViduException e) {
log.error("PARTICIPANT {}: Error preparing subscription to {}", participant.getParticipantPublicId(),
senderPublicId, e);
sessionEventsHandler.onPrepareSubscription(participant, session, null, transactionId, e);
}
if (sdpOffer != null) {
sessionEventsHandler.onPrepareSubscription(participant, session, sdpOffer, transactionId, null);
}
}
@Override
public void subscribe(Participant participant, String senderName, String sdpAnswer, Integer transactionId,
boolean is2180) {
Session session = null;
try {
log.debug("Request [SUBSCRIBE] remoteParticipant={} sdpOffer={} ({})", senderName, sdpOffer,
log.debug("Request [SUBSCRIBE] remoteParticipant={} sdpAnswer={} ({})", senderName, sdpAnswer,
participant.getParticipantPublicId());
KurentoParticipant kParticipant = (KurentoParticipant) participant;
@ -568,39 +634,53 @@ public class KurentoSessionManager extends SessionManager {
String subscriberEndpointName = kParticipant.calculateSubscriberEndpointName(senderParticipant);
// TODO: REMOVE ON 2.18.0
if (is2180) {
// Client's SDPAnswer to the server's SDPOffer
CDR.log(new WebrtcDebugEvent(participant, subscriberEndpointName, WebrtcDebugEventIssuer.client,
WebrtcDebugEventOperation.subscribe, WebrtcDebugEventType.sdpOffer, sdpOffer));
WebrtcDebugEventOperation.subscribe, WebrtcDebugEventType.sdpAnswer, sdpAnswer));
kParticipant.receiveMediaFrom2180(senderParticipant, sdpAnswer, false);
sessionEventsHandler.onSubscribe(participant, session, transactionId, null);
} else {
boolean isTranscodingAllowed = session.getSessionProperties().isTranscodingAllowed();
VideoCodec forcedVideoCodec = session.getSessionProperties().forcedVideoCodec();
// Modify sdp if forced codec is defined
if (forcedVideoCodec != VideoCodec.NONE && !participant.isIpcam()) {
sdpOffer = sdpMunging.forceCodec(participant, sdpOffer, session, false, false, isTranscodingAllowed,
forcedVideoCodec);
sdpAnswer = sdpMunging.forceCodec(sdpAnswer, participant, false, false, isTranscodingAllowed,
forcedVideoCodec, false);
CDR.log(new WebrtcDebugEvent(participant, subscriberEndpointName, WebrtcDebugEventIssuer.client,
WebrtcDebugEventOperation.subscribe, WebrtcDebugEventType.sdpOfferMunged, sdpOffer));
WebrtcDebugEventOperation.subscribe, WebrtcDebugEventType.sdpOfferMunged, sdpAnswer));
}
sdpAnswer = kParticipant.receiveMediaFrom(senderParticipant, sdpOffer, false);
if (sdpAnswer == null) {
String finalSdpAnswer = kParticipant.receiveMediaFrom2170(senderParticipant, sdpAnswer, false);
if (finalSdpAnswer == null) {
throw new OpenViduException(Code.MEDIA_SDP_ERROR_CODE,
"Unable to generate SDP answer when subscribing '" + participant.getParticipantPublicId()
+ "' to '" + senderName + "'");
}
CDR.log(new WebrtcDebugEvent(participant, subscriberEndpointName, WebrtcDebugEventIssuer.server,
WebrtcDebugEventOperation.subscribe, WebrtcDebugEventType.sdpAnswer, sdpAnswer));
WebrtcDebugEventOperation.subscribe, WebrtcDebugEventType.sdpAnswer, finalSdpAnswer));
sessionEventsHandler.onSubscribe(participant, session, finalSdpAnswer, transactionId, null);
}
// END TODO
// TODO: UNCOMMENT ON 2.18.0
// CDR.log(new WebrtcDebugEvent(participant, subscriberEndpointName, WebrtcDebugEventIssuer.client,
// WebrtcDebugEventOperation.subscribe, WebrtcDebugEventType.sdpAnswer, sdpAnswer));
// String remoteSdpAnswer = kParticipant.receiveMediaFrom(senderParticipant, sdpAnswer, false);
// sessionEventsHandler.onSubscribe(participant, session, transactionId, null);
// END TODO
} catch (OpenViduException e) {
log.error("PARTICIPANT {}: Error subscribing to {}", participant.getParticipantPublicId(), senderName, e);
sessionEventsHandler.onSubscribe(participant, session, null, transactionId, e);
}
if (sdpAnswer != null) {
log.debug("SDP Answer for subscribing PARTICIPANT {}: {}", participant.getParticipantPublicId(), sdpAnswer);
sessionEventsHandler.onSubscribe(participant, session, sdpAnswer, transactionId, null);
}
}
@Override
@ -1105,8 +1185,9 @@ public class KurentoSessionManager extends SessionManager {
return kParticipant;
}
// TODO: REMOVE ON 2.18.0
@Override
public void reconnectStream(Participant participant, String streamId, String sdpOffer, Integer transactionId) {
public void reconnectStream2170(Participant participant, String streamId, String sdpOffer, Integer transactionId) {
KurentoParticipant kParticipant = (KurentoParticipant) participant;
KurentoSession kSession = kParticipant.getSession();
boolean isPublisher = streamId.equals(participant.getPublisherStreamId());
@ -1119,8 +1200,8 @@ public class KurentoSessionManager extends SessionManager {
// Modify sdp if forced codec is defined
if (forcedVideoCodec != VideoCodec.NONE && !participant.isIpcam()) {
sdpOfferHasBeenMunged = true;
sdpOffer = sdpMunging.forceCodec(participant, sdpOffer, kSession, isPublisher, true, isTranscodingAllowed,
forcedVideoCodec);
sdpOffer = sdpMunging.forceCodec(sdpOffer, participant, isPublisher, true, isTranscodingAllowed,
forcedVideoCodec, false);
}
if (isPublisher) {
@ -1176,8 +1257,7 @@ public class KurentoSessionManager extends SessionManager {
sdpOffer));
}
kParticipant.cancelReceivingMedia(sender, null, true);
String sdpAnswer = kParticipant.receiveMediaFrom(sender, sdpOffer, true);
String sdpAnswer = kParticipant.receiveMediaFrom2170(sender, sdpOffer, true);
if (sdpAnswer == null) {
throw new OpenViduException(Code.MEDIA_SDP_ERROR_CODE,
"Unable to generate SDP answer when reconnecting subscriber to '" + streamId + "'");
@ -1196,6 +1276,95 @@ public class KurentoSessionManager extends SessionManager {
}
}
}
// END TODO
@Override
public void reconnectStream(Participant participant, String streamId, String sdpOfferOrAnswer,
Integer transactionId) {
KurentoParticipant kParticipant = (KurentoParticipant) participant;
KurentoSession kSession = kParticipant.getSession();
boolean isPublisher = streamId.equals(participant.getPublisherStreamId());
if (isPublisher) {
// Reconnect publisher
String sdpOffer = sdpOfferOrAnswer;
boolean isTranscodingAllowed = kSession.getSessionProperties().isTranscodingAllowed();
VideoCodec forcedVideoCodec = kSession.getSessionProperties().forcedVideoCodec();
boolean sdpOfferHasBeenMunged = false;
final String originalSdpOffer = sdpOffer;
// Modify sdp if forced codec is defined
if (forcedVideoCodec != VideoCodec.NONE && !participant.isIpcam()) {
sdpOfferHasBeenMunged = true;
sdpOffer = sdpMunging.forceCodec(sdpOffer, participant, isPublisher, true, isTranscodingAllowed,
forcedVideoCodec, false);
}
CDR.log(new WebrtcDebugEvent(participant, streamId, WebrtcDebugEventIssuer.client,
WebrtcDebugEventOperation.reconnectPublisher, WebrtcDebugEventType.sdpOffer, originalSdpOffer));
if (sdpOfferHasBeenMunged) {
CDR.log(new WebrtcDebugEvent(participant, streamId, WebrtcDebugEventIssuer.client,
WebrtcDebugEventOperation.reconnectPublisher, WebrtcDebugEventType.sdpOfferMunged, sdpOffer));
}
// Reconnect publisher
final KurentoMediaOptions kurentoOptions = (KurentoMediaOptions) kParticipant.getPublisher()
.getMediaOptions();
// 1) Disconnect broken PublisherEndpoint from its PassThrough
PublisherEndpoint publisher = kParticipant.getPublisher();
final PassThrough passThru = publisher.disconnectFromPassThrough();
// 2) Destroy the broken PublisherEndpoint and nothing else
publisher.cancelStatsLoop.set(true);
kParticipant.releaseElement(participant.getParticipantPublicId(), publisher.getEndpoint());
// 3) Create a new PublisherEndpoint connecting it to the previous PassThrough
kParticipant.resetPublisherEndpoint(kurentoOptions, passThru);
kParticipant.createPublishingEndpoint(kurentoOptions, streamId);
String sdpAnswer = kParticipant.publishToRoom(sdpOffer, kurentoOptions.doLoopback, true);
log.debug("SDP Answer for publishing reconnection PARTICIPANT {}: {}", participant.getParticipantPublicId(),
sdpAnswer);
CDR.log(new WebrtcDebugEvent(participant, streamId, WebrtcDebugEventIssuer.server,
WebrtcDebugEventOperation.reconnectPublisher, WebrtcDebugEventType.sdpAnswer, sdpAnswer));
sessionEventsHandler.onPublishMedia(participant, participant.getPublisherStreamId(),
kParticipant.getPublisher().createdAt(), kSession.getSessionId(), kurentoOptions, sdpAnswer,
new HashSet<Participant>(), transactionId, null);
} else {
// Reconnect subscriber
final String sdpAnswer = sdpOfferOrAnswer;
String senderPrivateId = kSession.getParticipantPrivateIdFromStreamId(streamId);
if (senderPrivateId != null) {
KurentoParticipant sender = (KurentoParticipant) kSession.getParticipantByPrivateId(senderPrivateId);
String subscriberEndpointName = kParticipant.calculateSubscriberEndpointName(sender);
CDR.log(new WebrtcDebugEvent(participant, subscriberEndpointName, WebrtcDebugEventIssuer.client,
WebrtcDebugEventOperation.reconnectSubscriber, WebrtcDebugEventType.sdpAnswer, sdpAnswer));
kParticipant.receiveMediaFrom2180(sender, sdpAnswer, true);
log.debug("SDP Answer for subscribing reconnection PARTICIPANT {}: {}",
participant.getParticipantPublicId(), sdpAnswer);
sessionEventsHandler.onSubscribe(participant, kSession, sdpAnswer, transactionId, null);
} else {
throw new OpenViduException(Code.USER_NOT_STREAMING_ERROR_CODE,
"Stream '" + streamId + "' does not exist in Session '" + kSession.getSessionId() + "'");
}
}
}
@Override
public String getParticipantPrivateIdFromStreamId(String sessionId, String streamId) {

View File

@ -519,6 +519,24 @@ public abstract class MediaEndpoint {
}
}
protected String generateOffer() throws OpenViduException {
if (this.isWeb()) {
if (webEndpoint == null) {
throw new OpenViduException(Code.MEDIA_WEBRTC_ENDPOINT_ERROR_CODE,
"Can't generate offer when WebRtcEndpoint is null (ep: " + endpointName + ")");
}
return webEndpoint.generateOffer();
} else if (this.isPlayerEndpoint()) {
return "";
} else {
if (endpoint == null) {
throw new OpenViduException(Code.MEDIA_RTP_ENDPOINT_ERROR_CODE,
"Can't generate offer when RtpEndpoint is null (ep: " + endpointName + ")");
}
return endpoint.generateOffer();
}
}
/**
* If supported, it registers a listener for when a new {@link IceCandidate} is
* gathered by the internal endpoint ({@link WebRtcEndpoint}) and sends it to

View File

@ -45,14 +45,36 @@ public class SubscriberEndpoint extends MediaEndpoint {
super(endpointType, owner, endpointName, pipeline, openviduConfig, log);
}
public synchronized String subscribe(String sdpOffer, PublisherEndpoint publisher) {
public synchronized String prepareSubscription(PublisherEndpoint publisher) {
registerOnIceCandidateEventListener(publisher.getOwner().getParticipantPublicId());
publisher.connect(this.getEndpoint(), true);
this.createdAt = System.currentTimeMillis();
this.publisherStreamId = publisher.getStreamId();
String sdpOffer = generateOffer();
gatherCandidates();
return sdpOffer;
}
public synchronized String subscribe(String sdpAnswer, PublisherEndpoint publisher) {
// TODO: REMOVE ON 2.18.0
if (this.createdAt == null) {
// 2.17.0
registerOnIceCandidateEventListener(publisher.getOwner().getParticipantPublicId());
this.createdAt = System.currentTimeMillis();
String sdpAnswer = processOffer(sdpOffer);
String realSdpAnswer = processOffer(sdpAnswer);
gatherCandidates();
publisher.connect(this.getEndpoint(), false);
this.publisherStreamId = publisher.getStreamId();
return sdpAnswer;
return realSdpAnswer;
} else {
// 2.18.0
return processAnswer(sdpAnswer);
}
// END TODO
// TODO: UNCOMMENT ON 2.18.0
// processAnswer(sdpAnswer);
// END TODO
}
@Override

View File

@ -127,6 +127,9 @@ public class RpcHandler extends DefaultJsonRpcHandler<JsonObject> {
case ProtocolElements.ONICECANDIDATE_METHOD:
onIceCandidate(rpcConnection, request);
break;
case ProtocolElements.PREPARERECEIVEVIDEO_METHOD:
prepareReceiveVideoFrom(rpcConnection, request);
break;
case ProtocolElements.RECEIVEVIDEO_METHOD:
receiveVideoFrom(rpcConnection, request);
break;
@ -341,31 +344,65 @@ public class RpcHandler extends DefaultJsonRpcHandler<JsonObject> {
}
}
private void receiveVideoFrom(RpcConnection rpcConnection, Request<JsonObject> request) {
private void prepareReceiveVideoFrom(RpcConnection rpcConnection, Request<JsonObject> request) {
Participant participant;
try {
participant = sanityCheckOfSession(rpcConnection, "subscribe");
participant = sanityCheckOfSession(rpcConnection, "prepareReceiveVideoFrom");
} catch (OpenViduException e) {
return;
}
String senderPublicId = getStringParam(request, ProtocolElements.RECEIVEVIDEO_SENDER_PARAM);
String senderStreamId = getStringParam(request, ProtocolElements.RECEIVEVIDEO_SENDER_PARAM);
String senderPublicId = parseSenderPublicIdFromStreamId(senderStreamId);
boolean reconnect = false;
// Parse sender public id from stream id
if (senderPublicId.startsWith(IdentifierPrefixes.STREAM_ID + "IPC_")
&& senderPublicId.contains(IdentifierPrefixes.IPCAM_ID)) {
// If IPCAM
senderPublicId = senderPublicId.substring(senderPublicId.indexOf("_" + IdentifierPrefixes.IPCAM_ID) + 1,
senderPublicId.length());
} else {
// Not IPCAM
senderPublicId = senderPublicId.substring(
senderPublicId.lastIndexOf(IdentifierPrefixes.PARTICIPANT_PUBLIC_ID), senderPublicId.length());
// TODO: REMOVE ON 2.18.0
if (request.getParams().has(ProtocolElements.PREPARERECEIVEVIDEO_RECONNECT_PARAM)) {
reconnect = getBooleanParam(request, ProtocolElements.PREPARERECEIVEVIDEO_RECONNECT_PARAM);
}
// END TODO
// TODO: UNCOMMENT ON 2.18.0
// boolean reconnect = getBooleanParam(request,
// ProtocolElements.PREPARERECEIVEVIDEO_RECONNECT_PARAM);
// END TODO
sessionManager.prepareSubscription(participant, senderPublicId, reconnect, request.getId());
}
String sdpOffer = getStringParam(request, ProtocolElements.RECEIVEVIDEO_SDPOFFER_PARAM);
private void receiveVideoFrom(RpcConnection rpcConnection, Request<JsonObject> request) {
Participant participant;
try {
participant = sanityCheckOfSession(rpcConnection, "receiveVideoFrom");
} catch (OpenViduException e) {
return;
}
sessionManager.subscribe(participant, senderPublicId, sdpOffer, request.getId());
String senderStreamId = getStringParam(request, ProtocolElements.RECEIVEVIDEO_SENDER_PARAM);
String senderPublicId = parseSenderPublicIdFromStreamId(senderStreamId);
// TODO: REMOVE ON 2.18.0
if (request.getParams().has(ProtocolElements.RECEIVEVIDEO_SDPOFFER_PARAM)) {
// 2.17.0: initiative held by browser when subscribing
// The request comes with an SDPOffer
String sdpOffer = getStringParam(request, ProtocolElements.RECEIVEVIDEO_SDPOFFER_PARAM);
sessionManager.subscribe(participant, senderPublicId, sdpOffer, request.getId(), false);
} else if (request.getParams().has(ProtocolElements.RECEIVEVIDEO_SDPANSWER_PARAM)) {
// 2.18.0: initiative held by server when subscribing
// This is the final call after prepareReceiveVidoFrom, comes with SDPAnswer
String sdpAnswer = getStringParam(request, ProtocolElements.RECEIVEVIDEO_SDPANSWER_PARAM);
sessionManager.subscribe(participant, senderPublicId, sdpAnswer, request.getId(), true);
}
// END TODO
// TODO: UNCOMMENT ON 2.18.0
/*
* String sdpAnswer = getStringParam(request,
* ProtocolElements.RECEIVEVIDEO_SDPANSWER_PARAM);
* sessionManager.subscribe(participant, senderPublicId, sdpAnswer,
* request.getId());
*/
// END TODO
}
private void unsubscribeFromVideo(RpcConnection rpcConnection, Request<JsonObject> request) {
@ -630,13 +667,39 @@ public class RpcHandler extends DefaultJsonRpcHandler<JsonObject> {
return;
}
String streamId = getStringParam(request, ProtocolElements.RECONNECTSTREAM_STREAM_PARAM);
String sdpOffer = getStringParam(request, ProtocolElements.RECONNECTSTREAM_SDPOFFER_PARAM);
// TODO: REMOVE ON 2.18.0
if (request.getParams().has(ProtocolElements.RECONNECTSTREAM_SDPOFFER_PARAM)) {
// 2.17.0
try {
String sdpOffer = getStringParam(request, ProtocolElements.RECONNECTSTREAM_SDPOFFER_PARAM);
sessionManager.reconnectStream(participant, streamId, sdpOffer, request.getId());
} catch (OpenViduException e) {
this.notificationService.sendErrorResponse(participant.getParticipantPrivateId(), request.getId(),
new JsonObject(), e);
}
} else if (request.getParams().has(ProtocolElements.RECONNECTSTREAM_SDPSTRING_PARAM)) {
// 2.18.0
String sdpString = getStringParam(request, ProtocolElements.RECONNECTSTREAM_SDPSTRING_PARAM);
try {
sessionManager.reconnectStream(participant, streamId, sdpString, request.getId());
} catch (OpenViduException e) {
this.notificationService.sendErrorResponse(participant.getParticipantPrivateId(), request.getId(),
new JsonObject(), e);
}
}
// END TODO
// TODO: UNCOMMENT ON 2.18.0
/*
* String sdpString = getStringParam(request,
* ProtocolElements.RECONNECTSTREAM_SDPSTRING_PARAM); try {
* sessionManager.reconnectStream(participant, streamId, sdpString,
* request.getId()); } catch (OpenViduException e) {
* this.notificationService.sendErrorResponse(participant.
* getParticipantPrivateId(), request.getId(), new JsonObject(), e); }
*/
// END TODO
}
private void updateVideoData(RpcConnection rpcConnection, Request<JsonObject> request) {
@ -822,4 +885,20 @@ public class RpcHandler extends DefaultJsonRpcHandler<JsonObject> {
.equals(this.sessionManager.getParticipantPrivateIdFromStreamId(sessionId, streamId));
}
private String parseSenderPublicIdFromStreamId(String streamId) {
String senderPublicId;
// Parse sender public id from stream id
if (streamId.startsWith(IdentifierPrefixes.STREAM_ID + "IPC_")
&& streamId.contains(IdentifierPrefixes.IPCAM_ID)) {
// If IPCAM
senderPublicId = streamId.substring(streamId.indexOf("_" + IdentifierPrefixes.IPCAM_ID) + 1,
streamId.length());
} else {
// Not IPCAM
senderPublicId = streamId.substring(streamId.lastIndexOf(IdentifierPrefixes.PARTICIPANT_PUBLIC_ID),
streamId.length());
}
return senderPublicId;
}
}

View File

@ -18,6 +18,7 @@ package io.openvidu.server.utils;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@ -31,7 +32,6 @@ import io.openvidu.client.OpenViduException;
import io.openvidu.client.OpenViduException.Code;
import io.openvidu.java.client.VideoCodec;
import io.openvidu.server.core.Participant;
import io.openvidu.server.core.Session;
public class SDPMunging {
@ -39,6 +39,11 @@ public class SDPMunging {
private Set<VideoCodec> supportedVideoCodecs = new HashSet<>(Arrays.asList(VideoCodec.VP8, VideoCodec.H264));
private final String PT_PATTERN = "a=rtpmap:(\\d+) %s/90000";
private final String EXTRA_PT_PATTERN = "a=fmtp:(\\d+) apt=%s";
private final List<String> PATTERNS = Collections.unmodifiableList(
Arrays.asList("^a=extmap:%s .+$", "^a=rtpmap:%s .+$", "^a=fmtp:%s .+$", "^a=rtcp-fb:%s .+$"));
/**
* `codec` is a uppercase SDP-style codec name: "VP8", "H264".
*
@ -63,13 +68,14 @@ public class SDPMunging {
* ordering of formats. Browsers (tested with Chrome 84) honor this change and
* use the first codec provided in the answer, so this operation actually works.
*/
public String setCodecPreference(VideoCodec codec, String sdp) throws OpenViduException {
public String setCodecPreference(VideoCodec codec, String sdp, boolean applyHeavyMunging) throws OpenViduException {
String codecStr = codec.name();
log.info("[setCodecPreference] codec: {}", codecStr);
List<String> codecPts = new ArrayList<String>();
List<String> usedCodecPts = new ArrayList<String>();
List<String> unusedCodecPts = new ArrayList<String>();
String[] lines = sdp.split("\\R+");
Pattern ptRegex = Pattern.compile(String.format("a=rtpmap:(\\d+) %s/90000", codecStr));
Pattern ptRegex = Pattern.compile(String.format(PT_PATTERN, codecStr));
for (int sl = 0; sl < lines.length; sl++) {
String sdpLine = lines[sl];
@ -78,10 +84,10 @@ public class SDPMunging {
continue;
}
// m-section found. Prepare an array to store PayloadTypes.
codecPts.clear();
// m-section found. Prepare an array to store PayloadTypes
usedCodecPts.clear();
// Search the m-section to find our codec's PayloadType, if any.
// Search the m-section to find our codec's PayloadType, if any
for (int ml = sl + 1; ml < lines.length; ml++) {
String mediaLine = lines[ml];
@ -92,38 +98,38 @@ public class SDPMunging {
Matcher ptMatch = ptRegex.matcher(mediaLine);
if (ptMatch.find()) {
// PayloadType found.
// PayloadType found
String pt = ptMatch.group(1);
codecPts.add(pt);
usedCodecPts.add(pt);
// Search the m-section to find the APT subtype, if any.
Pattern aptRegex = Pattern.compile(String.format("a=fmtp:(\\d+) apt=%s", pt));
// Search the m-section to find the APT subtype, if any
Pattern aptRegex = Pattern.compile(String.format(EXTRA_PT_PATTERN, pt));
for (int al = sl + 1; al < lines.length; al++) {
String aptLine = lines[al];
// Abort if we reach the next m-section.
// Abort if we reach the next m-section
if (aptLine.startsWith("m=")) {
break;
}
Matcher aptMatch = aptRegex.matcher(aptLine);
if (aptMatch.find()) {
// APT found.
// APT found
String apt = aptMatch.group(1);
codecPts.add(apt);
usedCodecPts.add(apt);
}
}
}
}
if (codecPts.isEmpty()) {
if (usedCodecPts.isEmpty()) {
throw new OpenViduException(Code.FORCED_CODEC_NOT_FOUND_IN_SDPOFFER,
"The specified forced codec " + codecStr + " is not present in the SDP");
}
// Build a new m= line where any PayloadTypes found have been moved
// to the front of the PT list.
// to the front of the PT list
StringBuilder newLine = new StringBuilder(sdpLine.length());
List<String> lineParts = new ArrayList<String>(Arrays.asList(sdpLine.split(" ")));
@ -132,29 +138,35 @@ public class SDPMunging {
continue;
}
// Add "m=video", Port, and Protocol.
// Add "m=video", Port, and Protocol
for (int i = 0; i < 3; i++) {
newLine.append(lineParts.remove(0) + " ");
}
// Add the PayloadTypes that correspond to our preferred codec.
for (String pt : codecPts) {
// Add the PayloadTypes that correspond to our preferred codec
for (String pt : usedCodecPts) {
lineParts.remove(pt);
newLine.append(pt + " ");
}
// Replace the original m= line with the one we just built.
// Collect all codecs to remove
unusedCodecPts.addAll(lineParts);
// Replace the original m= line with the one we just built
lines[sl] = newLine.toString().trim();
}
if (applyHeavyMunging) {
lines = cleanLinesWithRemovedCodecs(unusedCodecPts, lines);
}
return String.join("\r\n", lines) + "\r\n";
}
/**
* Return a SDP modified to force a specific codec
*/
public String forceCodec(Participant participant, String sdp, Session session, boolean isPublisher,
boolean isReconnecting, boolean isTranscodingAllowed, VideoCodec forcedVideoCodec)
public String forceCodec(String sdp, Participant participant, boolean isPublisher, boolean isReconnecting,
boolean isTranscodingAllowed, VideoCodec forcedVideoCodec, boolean applyHeavyMunging)
throws OpenViduException {
try {
if (supportedVideoCodecs.contains(forcedVideoCodec)) {
@ -163,15 +175,15 @@ public class SDPMunging {
log.debug(
"PARTICIPANT '{}' in Session '{}'. Is Publisher: '{}'. Is Subscriber: '{}'. Is Reconnecting '{}'."
+ " SDP before munging: \n {}",
participant.getParticipantPublicId(), session.getSessionId(), isPublisher, !isPublisher,
participant.getParticipantPublicId(), participant.getSessionId(), isPublisher, !isPublisher,
isReconnecting, sdp);
mungedSdpOffer = this.setCodecPreference(forcedVideoCodec, sdp);
mungedSdpOffer = this.setCodecPreference(forcedVideoCodec, sdp, applyHeavyMunging);
log.debug(
"PARTICIPANT '{}' in Session '{}'. Is Publisher: '{}'. Is Subscriber: '{}'."
+ " Is Reconnecting '{}'." + " SDP after munging: \n {}",
participant.getParticipantPublicId(), session.getSessionId(), isPublisher, !isPublisher,
participant.getParticipantPublicId(), participant.getSessionId(), isPublisher, !isPublisher,
isReconnecting, mungedSdpOffer);
return mungedSdpOffer;
@ -183,7 +195,7 @@ public class SDPMunging {
} catch (OpenViduException e) {
String errorMessage = "Error forcing codec: '" + forcedVideoCodec + "', for PARTICIPANT: '"
+ participant.getParticipantPublicId() + "' in Session: '" + session.getSessionId()
+ participant.getParticipantPublicId() + "' in Session: '" + participant.getSessionId()
+ "'. Is publishing: '" + isPublisher + "'. Is Subscriber: '" + !isPublisher
+ "'. Is Reconnecting: '" + isReconnecting + "'.\nException: " + e.getMessage() + "\nSDP:\n" + sdp;
@ -194,11 +206,22 @@ public class SDPMunging {
log.info(
"Codec: '{}' is not supported for PARTICIPANT: '{}' in Session: '{}'. Is publishing: '{}'. "
+ "Is Subscriber: '{}'. Is Reconnecting: '{}'." + " Transcoding will be allowed",
forcedVideoCodec, participant.getParticipantPublicId(), session.getSessionId(), isPublisher,
forcedVideoCodec, participant.getParticipantPublicId(), participant.getSessionId(), isPublisher,
!isPublisher, isReconnecting);
return sdp;
}
}
private String[] cleanLinesWithRemovedCodecs(List<String> removedCodecs, String[] lines) {
List<String> listOfLines = new ArrayList<>(Arrays.asList(lines));
removedCodecs.forEach(unusedPt -> {
for (String pattern : PATTERNS) {
listOfLines.removeIf(Pattern.compile(String.format(pattern, unusedPt)).asPredicate());
}
});
lines = listOfLines.toArray(new String[0]);
return lines;
}
}

View File

@ -1,6 +1,5 @@
package io.openvidu.server.test.unit;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
@ -31,29 +30,15 @@ public class SDPMungingTest {
List<String> forceCodecPayloads;
String validSDPH264Files[] = new String[]{
"sdp_kurento_h264.txt",
"sdp_chrome84.txt",
"sdp_firefox79.txt",
"sdp_safari13-1.txt"
};
String validSDPH264Files[] = new String[] { "sdp_kurento_h264.txt", "sdp_chrome84.txt", "sdp_firefox79.txt",
"sdp_safari13-1.txt" };
String validSDPVP8Files[] = new String[]{
"sdp_kurento_h264.txt",
"sdp_chrome84.txt",
"sdp_firefox79.txt",
"sdp_safari13-1.txt"
};
String validSDPVP8Files[] = new String[] { "sdp_kurento_h264.txt", "sdp_chrome84.txt", "sdp_firefox79.txt",
"sdp_safari13-1.txt" };
String validSDPVP9Files[] = new String[] {
"sdp_chrome84.txt",
"sdp_firefox79.txt"
};
String validSDPVP9Files[] = new String[] { "sdp_chrome84.txt", "sdp_firefox79.txt" };
String notValidVP9Files[] = new String[] {
"sdp_kurento_h264.txt",
"sdp_safari13-1.txt"
};
String notValidVP9Files[] = new String[] { "sdp_kurento_h264.txt", "sdp_safari13-1.txt" };
@Test
@DisplayName("[setCodecPreference] Force VP8 Codec prevalence in 'm=video' line")
@ -103,7 +88,7 @@ public class SDPMungingTest {
private void initTestsSetCodecPrevalence(VideoCodec codec, String sdpNameFile) throws IOException {
this.oldSdp = getSdpFile(sdpNameFile);
this.newSdp = this.sdpMungin.setCodecPreference(codec, oldSdp);
this.newSdp = this.sdpMungin.setCodecPreference(codec, oldSdp, false);
this.forceCodecPayloads = new ArrayList<>();
// Get all Payload-Type for video Codec