Support subscription with different client-server negotiation order

pull/630/head
pabloFuente 2021-05-31 15:20:54 +02:00
parent 845a088e69
commit 21615755f9
8 changed files with 352 additions and 487 deletions

View File

@ -197,7 +197,7 @@ export class Session extends EventDispatcher {
* #### Events dispatched * #### Events dispatched
* *
* The [[Session]] object of the local participant will dispatch a `sessionDisconnected` event. * The [[Session]] object of the local participant will dispatch a `sessionDisconnected` event.
* This event will automatically unsubscribe the leaving participant from every Subscriber object of the session (this includes closing the WebRTCPeer connection and disposing all MediaStreamTracks) * This event will automatically unsubscribe the leaving participant from every Subscriber object of the session (this includes closing the RTCPeerConnection and disposing all MediaStreamTracks)
* and also deletes any HTML video element associated to each Subscriber (only those [created by OpenVidu Browser](/en/stable/cheatsheet/manage-videos/#let-openvidu-take-care-of-the-video-players)). * and also deletes any HTML video element associated to each Subscriber (only those [created by OpenVidu Browser](/en/stable/cheatsheet/manage-videos/#let-openvidu-take-care-of-the-video-players)).
* For every video removed, each Subscriber object will dispatch a `videoElementDestroyed` event. * For every video removed, each Subscriber object will dispatch a `videoElementDestroyed` event.
* Call `event.preventDefault()` upon event `sessionDisconnected` to avoid this behavior and take care of disposing and cleaning all the Subscriber objects yourself. * Call `event.preventDefault()` upon event `sessionDisconnected` to avoid this behavior and take care of disposing and cleaning all the Subscriber objects yourself.
@ -210,7 +210,7 @@ export class Session extends EventDispatcher {
* or/and `Session.disconnect()` in the previous session). See [[StreamEvent]] and [[VideoElementEvent]] to learn more. * or/and `Session.disconnect()` in the previous session). See [[StreamEvent]] and [[VideoElementEvent]] to learn more.
* *
* The [[Session]] object of every other participant connected to the session will dispatch a `streamDestroyed` event if the disconnected participant was publishing. * The [[Session]] object of every other participant connected to the session will dispatch a `streamDestroyed` event if the disconnected participant was publishing.
* This event will automatically unsubscribe the Subscriber object from the session (this includes closing the WebRTCPeer connection and disposing all MediaStreamTracks) * This event will automatically unsubscribe the Subscriber object from the session (this includes closing the RTCPeerConnection and disposing all MediaStreamTracks)
* and also deletes any HTML video element associated to that Subscriber (only those [created by OpenVidu Browser](/en/stable/cheatsheet/manage-videos/#let-openvidu-take-care-of-the-video-players)). * and also deletes any HTML video element associated to that Subscriber (only those [created by OpenVidu Browser](/en/stable/cheatsheet/manage-videos/#let-openvidu-take-care-of-the-video-players)).
* For every video removed, the Subscriber object will dispatch a `videoElementDestroyed` event. * For every video removed, the Subscriber object will dispatch a `videoElementDestroyed` event.
* Call `event.preventDefault()` upon event `streamDestroyed` to avoid this default behavior and take care of disposing and cleaning the Subscriber object yourself. * Call `event.preventDefault()` upon event `streamDestroyed` to avoid this default behavior and take care of disposing and cleaning the Subscriber object yourself.
@ -437,7 +437,7 @@ export class Session extends EventDispatcher {
* Call `event.preventDefault()` upon event `streamDestroyed` if you want to clean the Publisher object on your own or re-publish it in a different Session. * Call `event.preventDefault()` upon event `streamDestroyed` if you want to clean the Publisher object on your own or re-publish it in a different Session.
* *
* The [[Session]] object of every other participant connected to the session will dispatch a `streamDestroyed` event. * The [[Session]] object of every other participant connected to the session will dispatch a `streamDestroyed` event.
* This event will automatically unsubscribe the Subscriber object from the session (this includes closing the WebRTCPeer connection and disposing all MediaStreamTracks) and * This event will automatically unsubscribe the Subscriber object from the session (this includes closing the RTCPeerConnection and disposing all MediaStreamTracks) and
* delete any HTML video element associated to it (only those [created by OpenVidu Browser](/en/stable/cheatsheet/manage-videos/#let-openvidu-take-care-of-the-video-players)). * delete any HTML video element associated to it (only those [created by OpenVidu Browser](/en/stable/cheatsheet/manage-videos/#let-openvidu-take-care-of-the-video-players)).
* For every video removed, the Subscriber object will dispatch a `videoElementDestroyed` event. * For every video removed, the Subscriber object will dispatch a `videoElementDestroyed` event.
* Call `event.preventDefault()` upon event `streamDestroyed` to avoid this default behavior and take care of disposing and cleaning the Subscriber object on your own. * Call `event.preventDefault()` upon event `streamDestroyed` to avoid this default behavior and take care of disposing and cleaning the Subscriber object on your own.

View File

@ -281,6 +281,19 @@ export class Stream {
} }
/**
* Recreates the media connection with the server. This entails the disposal of the previous RTCPeerConnection and the re-negotiation
* of a new one, that will apply the same properties.
*
* This method can be useful in those situations were there the media connection breaks and OpenVidu is not able to recover on its own
* for any kind of unanticipated reason (see [Automatic reconnection](/en/latest/advanced-features/automatic-reconnection/)).
*
* @returns A Promise (to which you can optionally subscribe to) that is resolved if the reconnection operation was successful and rejected with an Error object if not
*/
public reconnect(): Promise<void> {
return this.reconnectStream('API');
}
/** /**
* Applies an audio/video filter to the stream. * Applies an audio/video filter to the stream.
* *
@ -465,11 +478,12 @@ export class Stream {
* @hidden * @hidden
*/ */
disposeWebRtcPeer(): void { disposeWebRtcPeer(): void {
const webrtcId: string = this.webRtcPeer.id;
if (!!this.webRtcPeer) { if (!!this.webRtcPeer) {
this.webRtcPeer.dispose(); this.webRtcPeer.dispose();
this.stopWebRtcStats(); this.stopWebRtcStats();
} }
logger.info((!!this.outboundStreamOpts ? 'Outbound ' : 'Inbound ') + "WebRTCPeer from 'Stream' with id [" + this.streamId + '] is now closed'); logger.info((!!this.outboundStreamOpts ? 'Outbound ' : 'Inbound ') + "RTCPeerConnection with id [" + webrtcId + "] from 'Stream' with id [" + this.streamId + '] is now closed');
} }
/** /**
@ -967,6 +981,9 @@ export class Stream {
} }
const finalResolve = () => { const finalResolve = () => {
logger.info("'Subscriber' (" + this.streamId + ") successfully " + (reconnect ? "reconnected" : "subscribed"));
this.remotePeerSuccessfullyEstablished(reconnect);
this.initWebRtcStats();
if (reconnect) { if (reconnect) {
this.reconnectionEventEmitter?.emitEvent('success'); this.reconnectionEventEmitter?.emitEvent('success');
delete this.reconnectionEventEmitter; delete this.reconnectionEventEmitter;
@ -982,20 +999,50 @@ export class Stream {
reject(error); reject(error);
} }
if (this.session.openvidu.mediaServer === 'mediasoup') {
// Server initiates negotiation
this.initWebRtcPeerReceiveFromServer(reconnect)
.then(() => finalResolve())
.catch(error => finalReject(error));
} else {
// Client initiates negotiation
this.initWebRtcPeerReceiveFromClient(reconnect)
.then(() => finalResolve())
.catch(error => finalReject(error));
}
});
}
/**
* @hidden
*/
initWebRtcPeerReceiveFromClient(reconnect: boolean): Promise<void> {
return new Promise((resolve, reject) => {
this.completeWebRtcPeerReceive(reconnect).then(response => {
this.webRtcPeer.processRemoteAnswer(response.sdpAnswer)
.then(() => resolve()).catch(error => reject(error));
}).catch(error => reject(error));
});
}
/**
* @hidden
*/
initWebRtcPeerReceiveFromServer(reconnect: boolean): Promise<void> {
return new Promise((resolve, reject) => {
// Server initiates negotiation
this.session.openvidu.sendRequest('prepareReceiveVideoFrom', { sender: this.streamId, reconnect }, (error, response) => { this.session.openvidu.sendRequest('prepareReceiveVideoFrom', { sender: this.streamId, reconnect }, (error, response) => {
if (error) { if (error) {
finalReject(new Error('Error on prepareReceiveVideoFrom: ' + JSON.stringify(error))); reject(new Error('Error on prepareReceiveVideoFrom: ' + JSON.stringify(error)));
} else { } else {
this.completeWebRtcPeerReceive(response.sdpOffer, reconnect) this.completeWebRtcPeerReceive(reconnect, response.sdpOffer)
.then(() => { .then(() => resolve()).catch(error => reject(error));
logger.info("'Subscriber' (" + this.streamId + ") successfully " + (reconnect ? "reconnected" : "subscribed"));
this.remotePeerSuccessfullyEstablished(reconnect);
this.initWebRtcStats();
finalResolve();
})
.catch(error => {
finalReject(error);
});
} }
}); });
}); });
@ -1004,7 +1051,7 @@ export class Stream {
/** /**
* @hidden * @hidden
*/ */
completeWebRtcPeerReceive(sdpOffer: string, reconnect: boolean): Promise<void> { completeWebRtcPeerReceive(reconnect: boolean, sdpOfferByServer?: string): Promise<any> {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
const offerConstraints = { const offerConstraints = {
@ -1021,20 +1068,24 @@ export class Stream {
simulcast: false simulcast: false
}; };
const successAnswerCallback = (sdpAnswer) => { const sendSdpToServer = (sdpString: string) => {
logger.debug('Sending SDP answer to subscribe to '
+ this.streamId, sdpAnswer); logger.debug(`Sending local SDP ${(!!sdpOfferByServer ? 'answer' : 'offer')} to subscribe to ${this.streamId}`, sdpString);
const method = reconnect ? 'reconnectStream' : 'receiveVideoFrom'; const method = reconnect ? 'reconnectStream' : 'receiveVideoFrom';
const params = {}; const params = {};
params[reconnect ? 'stream' : 'sender'] = this.streamId; params[reconnect ? 'stream' : 'sender'] = this.streamId;
params[reconnect ? 'sdpString' : 'sdpAnswer'] = sdpAnswer; if (!!sdpOfferByServer) {
params[reconnect ? 'sdpString' : 'sdpAnswer'] = sdpString;
} else {
params['sdpOffer'] = sdpString;
}
this.session.openvidu.sendRequest(method, params, (error, response) => { this.session.openvidu.sendRequest(method, params, (error, response) => {
if (error) { if (error) {
reject(new Error('Error on ' + method + ' : ' + JSON.stringify(error))); reject(new Error('Error on ' + method + ' : ' + JSON.stringify(error)));
} else { } else {
resolve(); resolve(response);
} }
}); });
}; };
@ -1044,22 +1095,36 @@ export class Stream {
} }
this.webRtcPeer = new WebRtcPeerRecvonly(options); this.webRtcPeer = new WebRtcPeerRecvonly(options);
this.webRtcPeer.addIceConnectionStateChangeListener(this.streamId); this.webRtcPeer.addIceConnectionStateChangeListener(this.streamId);
this.webRtcPeer.processRemoteOffer(sdpOffer)
.then(() => { if (!!sdpOfferByServer) {
this.webRtcPeer.processRemoteOffer(sdpOfferByServer).then(() => {
this.webRtcPeer.createAnswer().then(sdpAnswer => { this.webRtcPeer.createAnswer().then(sdpAnswer => {
this.webRtcPeer.processLocalAnswer(sdpAnswer) this.webRtcPeer.processLocalAnswer(sdpAnswer).then(() => {
.then(() => { sendSdpToServer(sdpAnswer.sdp!);
successAnswerCallback(sdpAnswer.sdp); }).catch(error => {
}).catch(error => { reject(new Error('(subscribe) SDP process local answer error: ' + JSON.stringify(error)));
reject(new Error('(subscribe) SDP process local answer error: ' + JSON.stringify(error))); });
});
}).catch(error => { }).catch(error => {
reject(new Error('(subscribe) SDP create answer error: ' + JSON.stringify(error))); reject(new Error('(subscribe) SDP create answer error: ' + JSON.stringify(error)));
}); });
}) }).catch(error => {
.catch(error => {
reject(new Error('(subscribe) SDP process remote offer error: ' + JSON.stringify(error))); reject(new Error('(subscribe) SDP process remote offer error: ' + JSON.stringify(error)));
}); });
} else {
this.webRtcPeer.createOffer().then(sdpOffer => {
this.webRtcPeer.processLocalOffer(sdpOffer).then(() => {
sendSdpToServer(sdpOffer.sdp!);
}).catch(error => {
reject(new Error('(subscribe) SDP process local offer error: ' + JSON.stringify(error)));
});
}).catch(error => {
reject(new Error('(subscribe) SDP create offer error: ' + JSON.stringify(error)));
});
}
}); });
} }
@ -1198,7 +1263,7 @@ export class Stream {
} }
} }
public async reconnectStream(event: string): Promise<void> { private async reconnectStream(event: string) {
const isWsConnected = await this.isWebsocketConnected(event, 3000); const isWsConnected = await this.isWebsocketConnected(event, 3000);
if (isWsConnected) { if (isWsConnected) {
// There is connection to openvidu-server. The RTCPeerConnection is the only one broken // There is connection to openvidu-server. The RTCPeerConnection is the only one broken
@ -1241,7 +1306,7 @@ export class Stream {
}); });
} }
async awaitWebRtcPeerConnectionState(timeout: number): Promise<RTCIceConnectionState> { private async awaitWebRtcPeerConnectionState(timeout: number): Promise<RTCIceConnectionState> {
let state = this.getRTCPeerConnection().iceConnectionState; let state = this.getRTCPeerConnection().iceConnectionState;
const interval = 150; const interval = 150;
const intervals = Math.ceil(timeout / interval); const intervals = Math.ceil(timeout / interval);

View File

@ -60,14 +60,14 @@ export abstract class Event {
/** /**
* Prevents the default behavior of the event. The following events have a default behavior: * Prevents the default behavior of the event. The following events have a default behavior:
* *
* - `sessionDisconnected`: dispatched by [[Session]] object, automatically unsubscribes the leaving participant from every Subscriber object of the session (this includes closing the WebRTCPeer connection and disposing all MediaStreamTracks) * - `sessionDisconnected`: dispatched by [[Session]] object, automatically unsubscribes the leaving participant from every Subscriber object of the session (this includes closing the RTCPeerConnection and disposing all MediaStreamTracks)
* and also deletes any HTML video element associated to each Subscriber (only those created by OpenVidu Browser, either by passing a valid parameter as `targetElement` in method [[Session.subscribe]] or * and also deletes any HTML video element associated to each Subscriber (only those created by OpenVidu Browser, either by passing a valid parameter as `targetElement` in method [[Session.subscribe]] or
* by calling [[Subscriber.createVideoElement]]). For every video removed, each Subscriber object will also dispatch a `videoElementDestroyed` event. * by calling [[Subscriber.createVideoElement]]). For every video removed, each Subscriber object will also dispatch a `videoElementDestroyed` event.
* *
* - `streamDestroyed`: * - `streamDestroyed`:
* - If dispatched by a [[Publisher]] (*you* have unpublished): automatically stops all media tracks and deletes any HTML video element associated to it (only those created by OpenVidu Browser, either by passing a valid parameter as `targetElement` * - If dispatched by a [[Publisher]] (*you* have unpublished): automatically stops all media tracks and deletes any HTML video element associated to it (only those created by OpenVidu Browser, either by passing a valid parameter as `targetElement`
* in method [[OpenVidu.initPublisher]] or by calling [[Publisher.createVideoElement]]). For every video removed, the Publisher object will also dispatch a `videoElementDestroyed` event. * in method [[OpenVidu.initPublisher]] or by calling [[Publisher.createVideoElement]]). For every video removed, the Publisher object will also dispatch a `videoElementDestroyed` event.
* - If dispatched by [[Session]] (*other user* has unpublished): automatically unsubscribes the proper Subscriber object from the session (this includes closing the WebRTCPeer connection and disposing all MediaStreamTracks) * - If dispatched by [[Session]] (*other user* has unpublished): automatically unsubscribes the proper Subscriber object from the session (this includes closing the RTCPeerConnection and disposing all MediaStreamTracks)
* and also deletes any HTML video element associated to that Subscriber (only those created by OpenVidu Browser, either by passing a valid parameter as `targetElement` in method [[Session.subscribe]] or * and also deletes any HTML video element associated to that Subscriber (only those created by OpenVidu Browser, either by passing a valid parameter as `targetElement` in method [[Session.subscribe]] or
* by calling [[Subscriber.createVideoElement]]). For every video removed, the Subscriber object will also dispatch a `videoElementDestroyed` event. * by calling [[Subscriber.createVideoElement]]). For every video removed, the Subscriber object will also dispatch a `videoElementDestroyed` event.
*/ */

View File

@ -116,15 +116,8 @@ public abstract class SessionManager {
public abstract void prepareSubscription(Participant participant, String senderPublicId, boolean reconnect, public abstract void prepareSubscription(Participant participant, String senderPublicId, boolean reconnect,
Integer id); Integer id);
// TODO: REMOVE ON 2.18.0 public abstract void subscribe(Participant participant, String senderName, String sdpString, Integer transactionId,
public abstract void subscribe(Participant participant, String senderName, String sdpAnwser, Integer transactionId, boolean initByServer);
boolean is2180);
// END TODO
// TODO: UNCOMMENT ON 2.18.0
// public abstract void subscribe(Participant participant, String senderName,
// String sdpAnwser, Integer transactionId);
// END TODO
public abstract void unsubscribe(Participant participant, String senderName, Integer transactionId); public abstract void unsubscribe(Participant participant, String senderName, Integer transactionId);
@ -180,13 +173,11 @@ public abstract class SessionManager {
public abstract Participant publishIpcam(Session session, MediaOptions mediaOptions, public abstract Participant publishIpcam(Session session, MediaOptions mediaOptions,
ConnectionProperties connectionProperties) throws Exception; ConnectionProperties connectionProperties) throws Exception;
public abstract void reconnectStream(Participant participant, String streamId, String sdpOffer, public abstract void reconnectPublisher(Participant participant, String streamId, String sdpOffer,
Integer transactionId); Integer transactionId);
// TODO: REMOVE ON 2.18.0 public abstract void reconnectSubscriber(Participant participant, String streamId, String sdpString,
public abstract void reconnectStream2170(Participant participant, String streamId, String sdpOffer, Integer transactionId, boolean initByServer);
Integer transactionId);
// END TODO
public abstract String getParticipantPrivateIdFromStreamId(String sessionId, String streamId) public abstract String getParticipantPrivateIdFromStreamId(String sessionId, String streamId)
throws OpenViduException; throws OpenViduException;

View File

@ -24,7 +24,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.Lock;
import java.util.function.Function; import java.util.function.Function;
import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.RandomStringUtils;
@ -220,240 +220,100 @@ public class KurentoParticipant extends Participant {
} }
KurentoParticipant kSender = (KurentoParticipant) sender; KurentoParticipant kSender = (KurentoParticipant) sender;
if (kSender.streaming && kSender.getPublisher() != null) {
if (kSender.streaming && kSender.getPublisher() != null final Lock closingReadLock = kSender.getPublisher().closingLock.readLock();
&& kSender.getPublisher().closingLock.readLock().tryLock()) { if (closingReadLock.tryLock()) {
try {
log.debug("PARTICIPANT {}: Creating a subscriber endpoint to user {}", this.getParticipantPublicId(),
senderName);
SubscriberEndpoint subscriber = getNewOrExistingSubscriber(senderName);
try { try {
CountDownLatch subscriberLatch = new CountDownLatch(1);
Endpoint oldMediaEndpoint = subscriber.createEndpoint(subscriberLatch); SubscriberEndpoint subscriber = initializeSubscriberEndpoint(kSender);
try { try {
if (!subscriberLatch.await(KurentoSession.ASYNC_LATCH_TIMEOUT, TimeUnit.SECONDS)) { String sdpOffer = subscriber.prepareSubscription(kSender.getPublisher());
throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE, log.trace("PARTICIPANT {}: Subscribing SdpOffer is {}", this.getParticipantPublicId(),
"Timeout reached when creating subscriber endpoint"); sdpOffer);
} log.info("PARTICIPANT {}: offer prepared to receive media from {} in room {}",
} catch (InterruptedException e) { this.getParticipantPublicId(), senderName, this.session.getSessionId());
throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE, return sdpOffer;
"Interrupted when creating subscriber endpoint: " + e.getMessage()); } catch (KurentoServerException e) {
} log.error("Exception preparing subscriber endpoint for user {}: {}",
if (oldMediaEndpoint != null) { this.getParticipantPublicId(), e.getMessage());
log.warn( this.subscribers.remove(senderName);
"PARTICIPANT {}: Two threads are trying to create at " releaseSubscriberEndpoint(senderName, (KurentoParticipant) sender, subscriber, null, false);
+ "the same time a subscriber endpoint for user {}",
this.getParticipantPublicId(), senderName);
return null; return null;
} }
if (subscriber.getEndpoint() == null) { } finally {
throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE, closingReadLock.unlock();
"Unable to create subscriber endpoint");
}
String subscriberEndpointName = calculateSubscriberEndpointName(kSender);
subscriber.setEndpointName(subscriberEndpointName);
subscriber.getEndpoint().setName(subscriberEndpointName);
subscriber.setStreamId(kSender.getPublisherStreamId());
endpointConfig.addEndpointListeners(subscriber, "subscriber");
} catch (OpenViduException e) {
this.subscribers.remove(senderName);
throw e;
} }
log.debug("PARTICIPANT {}: Created subscriber endpoint for user {}", this.getParticipantPublicId(),
senderName);
try {
String sdpOffer = subscriber.prepareSubscription(kSender.getPublisher());
log.trace("PARTICIPANT {}: Subscribing SdpOffer is {}", this.getParticipantPublicId(), sdpOffer);
log.info("PARTICIPANT {}: offer prepared to receive media from {} in room {}",
this.getParticipantPublicId(), senderName, this.session.getSessionId());
return sdpOffer;
} catch (KurentoServerException e) {
log.error("Exception preparing subscriber endpoint for user {}: {}", this.getParticipantPublicId(),
e.getMessage());
this.subscribers.remove(senderName);
releaseSubscriberEndpoint(senderName, (KurentoParticipant) sender, subscriber, null, false);
return null;
}
} finally {
kSender.getPublisher().closingLock.readLock().unlock();
} }
} else {
log.error(
"PublisherEndpoint of participant {} of session {} is closed. Participant {} couldn't subscribe to it ",
senderName, sender.getSessionId(), this.participantPublicId);
throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE,
"Unable to create subscriber endpoint. Publisher endpoint of participant " + senderName
+ "is closed");
} }
log.error(
"PublisherEndpoint of participant {} of session {} is closed. Participant {} couldn't subscribe to it ",
senderName, sender.getSessionId(), this.participantPublicId);
throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE,
"Unable to create subscriber endpoint. Publisher endpoint of participant " + senderName + "is closed");
} }
public void receiveMediaFrom2180(Participant sender, String sdpAnswer, boolean silent) { public String receiveMedia(Participant sender, String sdpString, boolean silent, boolean initByServer) {
final String senderName = sender.getParticipantPublicId(); final String senderName = sender.getParticipantPublicId();
log.info("PARTICIPANT {}: Request to receive media from {} in room {}", this.getParticipantPublicId(), log.info("PARTICIPANT {}: Request to receive media from {} in room {}", this.getParticipantPublicId(),
senderName, this.session.getSessionId()); senderName, this.session.getSessionId());
log.trace("PARTICIPANT {}: SdpAnswer for {} is {}", this.getParticipantPublicId(), senderName, sdpAnswer); log.trace("PARTICIPANT {}: Sdp string for {} is {}", this.getParticipantPublicId(), senderName, sdpString);
if (senderName.equals(this.getParticipantPublicId())) { if (senderName.equals(this.getParticipantPublicId())) {
log.warn("PARTICIPANT {}: trying to configure loopback by subscribing", this.getParticipantPublicId()); log.warn("PARTICIPANT {}: trying to configure loopback by subscribing", this.getParticipantPublicId());
throw new OpenViduException(Code.USER_NOT_STREAMING_ERROR_CODE, "Can loopback only when publishing media"); throw new OpenViduException(Code.USER_NOT_STREAMING_ERROR_CODE, "Can loopback only when publishing media");
} }
KurentoParticipant kSender = (KurentoParticipant) sender; KurentoParticipant kSender = (KurentoParticipant) sender;
if (kSender.streaming && kSender.getPublisher() != null) {
if (kSender.streaming && kSender.getPublisher() != null final Lock closingReadLock = kSender.getPublisher().closingLock.readLock();
&& kSender.getPublisher().closingLock.readLock().tryLock()) { if (closingReadLock.tryLock()) {
try {
final SubscriberEndpoint subscriber = getSubscriber(senderName);
if (subscriber.getEndpoint() == null) {
throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE, "Unable to create subscriber endpoint");
}
try { try {
subscriber.subscribe(sdpAnswer, kSender.getPublisher());
log.info("PARTICIPANT {}: Is now receiving video from {} in room {}", this.getParticipantPublicId(),
senderName, this.session.getSessionId());
if (!silent // If initialized by server SubscriberEndpoint was created on
&& !ProtocolElements.RECORDER_PARTICIPANT_PUBLICID.equals(this.getParticipantPublicId())) { // prepareReceiveMediaFrom. If initialized by client must be created now
endpointConfig.getCdr().recordNewSubscriber(this, sender.getPublisherStreamId(), final SubscriberEndpoint subscriber = initByServer ? getSubscriber(senderName)
sender.getParticipantPublicId(), subscriber.createdAt()); : initializeSubscriberEndpoint(kSender);
}
} catch (KurentoServerException e) {
// TODO Check object status when KurentoClient sets this info in the object
if (e.getCode() == 40101) {
log.warn(
"Publisher endpoint was already released when trying to connect a subscriber endpoint to it",
e);
} else {
log.error("Exception connecting subscriber endpoint to publisher endpoint", e);
}
this.subscribers.remove(senderName);
releaseSubscriberEndpoint(senderName, (KurentoParticipant) sender, subscriber, null, false);
}
} finally {
kSender.getPublisher().closingLock.readLock().unlock();
}
} else {
log.error(
"PublisherEndpoint of participant {} of session {} is closed. Participant {} couldn't subscribe to it ",
senderName, sender.getSessionId(), this.participantPublicId);
throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE,
"Unable to create subscriber endpoint. Publisher endpoint of participant " + senderName
+ "is closed");
}
}
public String receiveMediaFrom2170(Participant sender, String sdpOffer, boolean silent) {
final String senderName = sender.getParticipantPublicId();
log.info("PARTICIPANT {}: Request to receive media from {} in room {}", this.getParticipantPublicId(),
senderName, this.session.getSessionId());
log.trace("PARTICIPANT {}: SdpOffer for {} is {}", this.getParticipantPublicId(), senderName, sdpOffer);
if (senderName.equals(this.getParticipantPublicId())) {
log.warn("PARTICIPANT {}: trying to configure loopback by subscribing", this.getParticipantPublicId());
throw new OpenViduException(Code.USER_NOT_STREAMING_ERROR_CODE, "Can loopback only when publishing media");
}
KurentoParticipant kSender = (KurentoParticipant) sender;
if (kSender.streaming && kSender.getPublisher() != null
&& kSender.getPublisher().closingLock.readLock().tryLock()) {
try {
log.debug("PARTICIPANT {}: Creating a subscriber endpoint to user {}", this.getParticipantPublicId(),
senderName);
SubscriberEndpoint subscriber = getNewOrExistingSubscriber(senderName);
try {
CountDownLatch subscriberLatch = new CountDownLatch(1);
Endpoint oldMediaEndpoint = subscriber.createEndpoint(subscriberLatch);
try {
if (!subscriberLatch.await(KurentoSession.ASYNC_LATCH_TIMEOUT, TimeUnit.SECONDS)) {
throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE,
"Timeout reached when creating subscriber endpoint");
}
} catch (InterruptedException e) {
throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE,
"Interrupted when creating subscriber endpoint: " + e.getMessage());
}
if (oldMediaEndpoint != null) {
log.warn(
"PARTICIPANT {}: Two threads are trying to create at "
+ "the same time a subscriber endpoint for user {}",
this.getParticipantPublicId(), senderName);
return null;
}
if (subscriber.getEndpoint() == null) { if (subscriber.getEndpoint() == null) {
throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE, throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE,
"Unable to create subscriber endpoint"); "Unable to create subscriber endpoint");
} }
try {
String subscriberEndpointName = calculateSubscriberEndpointName(kSender); String sdpAnswer = subscriber.subscribe(sdpString, kSender.getPublisher());
log.info("PARTICIPANT {}: Is now receiving video from {} in room {}",
subscriber.setEndpointName(subscriberEndpointName); this.getParticipantPublicId(), senderName, this.session.getSessionId());
subscriber.getEndpoint().setName(subscriberEndpointName); if (!silent && !ProtocolElements.RECORDER_PARTICIPANT_PUBLICID
subscriber.setStreamId(kSender.getPublisherStreamId()); .equals(this.getParticipantPublicId())) {
endpointConfig.getCdr().recordNewSubscriber(this, sender.getPublisherStreamId(),
endpointConfig.addEndpointListeners(subscriber, "subscriber"); sender.getParticipantPublicId(), subscriber.createdAt());
}
} catch (OpenViduException e) { return sdpAnswer;
this.subscribers.remove(senderName); } catch (KurentoServerException e) {
throw e; // TODO Check object status when KurentoClient sets this info in the object
if (e.getCode() == 40101) {
log.warn(
"Publisher endpoint was already released when trying to connect a subscriber endpoint to it",
e);
} else {
log.error("Exception connecting subscriber endpoint to publisher endpoint", e);
}
this.subscribers.remove(senderName);
releaseSubscriberEndpoint(senderName, (KurentoParticipant) sender, subscriber, null, false);
return null;
}
} finally {
closingReadLock.unlock();
} }
log.debug("PARTICIPANT {}: Created subscriber endpoint for user {}", this.getParticipantPublicId(),
senderName);
try {
String sdpAnswer = subscriber.subscribe(sdpOffer, kSender.getPublisher());
log.trace("PARTICIPANT {}: Subscribing SdpAnswer is {}", this.getParticipantPublicId(), sdpAnswer);
log.info("PARTICIPANT {}: Is now receiving video from {} in room {}", this.getParticipantPublicId(),
senderName, this.session.getSessionId());
if (!silent
&& !ProtocolElements.RECORDER_PARTICIPANT_PUBLICID.equals(this.getParticipantPublicId())) {
endpointConfig.getCdr().recordNewSubscriber(this, sender.getPublisherStreamId(),
sender.getParticipantPublicId(), subscriber.createdAt());
}
return sdpAnswer;
} catch (KurentoServerException e) {
// TODO Check object status when KurentoClient sets this info in the object
if (e.getCode() == 40101) {
log.warn(
"Publisher endpoint was already released when trying to connect a subscriber endpoint to it",
e);
} else {
log.error("Exception connecting subscriber endpoint to publisher endpoint", e);
}
this.subscribers.remove(senderName);
releaseSubscriberEndpoint(senderName, (KurentoParticipant) sender, subscriber, null, false);
return null;
}
} finally {
kSender.getPublisher().closingLock.readLock().unlock();
} }
} else {
log.error(
"PublisherEndpoint of participant {} of session {} is closed. Participant {} couldn't subscribe to it ",
senderName, sender.getSessionId(), this.participantPublicId);
throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE,
"Unable to create subscriber endpoint. Publisher endpoint of participant " + senderName
+ "is closed");
} }
log.error(
"PublisherEndpoint of participant {} of session {} is closed. Participant {} couldn't subscribe to it ",
senderName, sender.getSessionId(), this.participantPublicId);
throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE,
"Unable to create subscriber endpoint. Publisher endpoint of participant " + senderName + "is closed");
} }
public void cancelReceivingMedia(KurentoParticipant senderKurentoParticipant, EndReason reason, boolean silent) { public void cancelReceivingMedia(KurentoParticipant senderKurentoParticipant, EndReason reason, boolean silent) {
@ -461,7 +321,8 @@ public class KurentoParticipant extends Participant {
final PublisherEndpoint pub = senderKurentoParticipant.publisher; final PublisherEndpoint pub = senderKurentoParticipant.publisher;
if (pub != null) { if (pub != null) {
try { try {
if (pub.closingLock.writeLock().tryLock(15, TimeUnit.SECONDS)) { final Lock closingWriteLock = pub.closingLock.writeLock();
if (closingWriteLock.tryLock(15, TimeUnit.SECONDS)) {
try { try {
log.info("PARTICIPANT {}: cancel receiving media from {}", this.getParticipantPublicId(), log.info("PARTICIPANT {}: cancel receiving media from {}", this.getParticipantPublicId(),
senderName); senderName);
@ -478,7 +339,7 @@ public class KurentoParticipant extends Participant {
this.getParticipantPublicId(), senderName, this.session.getSessionId()); this.getParticipantPublicId(), senderName, this.session.getSessionId());
} }
} finally { } finally {
pub.closingLock.writeLock().unlock(); closingWriteLock.unlock();
} }
} else { } else {
log.error( log.error(
@ -585,15 +446,66 @@ public class KurentoParticipant extends Participant {
return this.getParticipantPublicId() + "_" + senderParticipant.getPublisherStreamId(); return this.getParticipantPublicId() + "_" + senderParticipant.getPublisherStreamId();
} }
private SubscriberEndpoint initializeSubscriberEndpoint(Participant kSender) {
String senderName = kSender.getParticipantPublicId();
log.debug("PARTICIPANT {}: Creating a subscriber endpoint to user {}", this.getParticipantPublicId(),
senderName);
SubscriberEndpoint subscriber = getNewOrExistingSubscriber(senderName);
try {
CountDownLatch subscriberLatch = new CountDownLatch(1);
Endpoint oldMediaEndpoint = subscriber.createEndpoint(subscriberLatch);
try {
if (!subscriberLatch.await(KurentoSession.ASYNC_LATCH_TIMEOUT, TimeUnit.SECONDS)) {
throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE,
"Timeout reached when creating subscriber endpoint");
}
} catch (InterruptedException e) {
throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE,
"Interrupted when creating subscriber endpoint: " + e.getMessage());
}
if (oldMediaEndpoint != null) {
log.warn(
"PARTICIPANT {}: Two threads are trying to create at "
+ "the same time a subscriber endpoint for user {}",
this.getParticipantPublicId(), senderName);
return null;
}
if (subscriber.getEndpoint() == null) {
throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE, "Unable to create subscriber endpoint");
}
String subscriberEndpointName = calculateSubscriberEndpointName(kSender);
subscriber.setEndpointName(subscriberEndpointName);
subscriber.getEndpoint().setName(subscriberEndpointName);
subscriber.setStreamId(kSender.getPublisherStreamId());
endpointConfig.addEndpointListeners(subscriber, "subscriber");
} catch (OpenViduException e) {
this.subscribers.remove(senderName);
throw e;
}
log.debug("PARTICIPANT {}: Created subscriber endpoint for user {}", this.getParticipantPublicId(), senderName);
return subscriber;
}
private void releasePublisherEndpoint(EndReason reason, Long kmsDisconnectionTime) { private void releasePublisherEndpoint(EndReason reason, Long kmsDisconnectionTime) {
if (publisher != null && publisher.getEndpoint() != null) { if (publisher != null && publisher.getEndpoint() != null) {
final ReadWriteLock closingLock = publisher.closingLock;
try { try {
if (closingLock.writeLock().tryLock(15, TimeUnit.SECONDS)) { final Lock closingWriteLock = publisher.closingLock.writeLock();
if (closingWriteLock.tryLock(15, TimeUnit.SECONDS)) {
try { try {
this.releasePublisherEndpointAux(reason, kmsDisconnectionTime); this.releasePublisherEndpointAux(reason, kmsDisconnectionTime);
} finally { } finally {
closingLock.writeLock().unlock(); closingWriteLock.unlock();
} }
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {

View File

@ -596,12 +596,12 @@ public class KurentoSessionManager extends SessionManager {
} }
@Override @Override
public void subscribe(Participant participant, String senderName, String sdpAnswer, Integer transactionId, public void subscribe(Participant participant, String senderName, String sdpString, Integer transactionId,
boolean is2180) { boolean initByServer) {
Session session = null; Session session = null;
try { try {
log.debug("Request [SUBSCRIBE] remoteParticipant={} sdpAnswer={} ({})", senderName, sdpAnswer, log.debug("Request [SUBSCRIBE] remoteParticipant={} sdpString={} ({})", senderName, sdpString,
participant.getParticipantPublicId()); participant.getParticipantPublicId());
KurentoParticipant kParticipant = (KurentoParticipant) participant; KurentoParticipant kParticipant = (KurentoParticipant) participant;
@ -627,48 +627,44 @@ public class KurentoSessionManager extends SessionManager {
String subscriberEndpointName = kParticipant.calculateSubscriberEndpointName(senderParticipant); String subscriberEndpointName = kParticipant.calculateSubscriberEndpointName(senderParticipant);
// TODO: REMOVE ON 2.18.0 if (initByServer) {
if (is2180) {
// Server initiated negotiation. sdpString is the SDP Answer of the client
// Client's SDPAnswer to the server's SDPOffer
CDR.log(new WebrtcDebugEvent(participant, subscriberEndpointName, WebrtcDebugEventIssuer.client, CDR.log(new WebrtcDebugEvent(participant, subscriberEndpointName, WebrtcDebugEventIssuer.client,
WebrtcDebugEventOperation.subscribe, WebrtcDebugEventType.sdpAnswer, sdpAnswer)); WebrtcDebugEventOperation.subscribe, WebrtcDebugEventType.sdpAnswer, sdpString));
kParticipant.receiveMediaFrom2180(senderParticipant, sdpAnswer, false); kParticipant.receiveMedia(senderParticipant, sdpString, false, true);
sessionEventsHandler.onSubscribe(participant, session, transactionId, null); sessionEventsHandler.onSubscribe(participant, session, transactionId, null);
} else { } else {
// Client initiated negotiation. sdpString is the SDP Offer of the client
boolean isTranscodingAllowed = session.getSessionProperties().isTranscodingAllowed(); boolean isTranscodingAllowed = session.getSessionProperties().isTranscodingAllowed();
VideoCodec forcedVideoCodec = session.getSessionProperties().forcedVideoCodec(); VideoCodec forcedVideoCodec = session.getSessionProperties().forcedVideoCodec();
String sdpOffer = sdpString;
// Modify sdp if forced codec is defined // Modify sdp if forced codec is defined
if (forcedVideoCodec != VideoCodec.NONE && !participant.isIpcam()) { if (forcedVideoCodec != VideoCodec.NONE && !participant.isIpcam()) {
sdpAnswer = sdpMunging.forceCodec(sdpAnswer, participant, false, false, isTranscodingAllowed, sdpOffer = sdpMunging.forceCodec(sdpString, participant, false, false, isTranscodingAllowed,
forcedVideoCodec, false); forcedVideoCodec, false);
CDR.log(new WebrtcDebugEvent(participant, subscriberEndpointName, WebrtcDebugEventIssuer.client, CDR.log(new WebrtcDebugEvent(participant, subscriberEndpointName, WebrtcDebugEventIssuer.client,
WebrtcDebugEventOperation.subscribe, WebrtcDebugEventType.sdpOfferMunged, sdpAnswer)); WebrtcDebugEventOperation.subscribe, WebrtcDebugEventType.sdpOfferMunged, sdpOffer));
} }
String finalSdpAnswer = kParticipant.receiveMediaFrom2170(senderParticipant, sdpAnswer, false); String sdpAnswer = kParticipant.receiveMedia(senderParticipant, sdpOffer, false, false);
if (finalSdpAnswer == null) { if (sdpAnswer == null) {
throw new OpenViduException(Code.MEDIA_SDP_ERROR_CODE, throw new OpenViduException(Code.MEDIA_SDP_ERROR_CODE,
"Unable to generate SDP answer when subscribing '" + participant.getParticipantPublicId() "Unable to generate SDP answer when subscribing '" + participant.getParticipantPublicId()
+ "' to '" + senderName + "'"); + "' to '" + senderName + "'");
} }
CDR.log(new WebrtcDebugEvent(participant, subscriberEndpointName, WebrtcDebugEventIssuer.server, CDR.log(new WebrtcDebugEvent(participant, subscriberEndpointName, WebrtcDebugEventIssuer.server,
WebrtcDebugEventOperation.subscribe, WebrtcDebugEventType.sdpAnswer, finalSdpAnswer)); WebrtcDebugEventOperation.subscribe, WebrtcDebugEventType.sdpAnswer, sdpAnswer));
sessionEventsHandler.onSubscribe(participant, session, finalSdpAnswer, transactionId, null); sessionEventsHandler.onSubscribe(participant, session, sdpAnswer, transactionId, null);
} }
// END TODO
// TODO: UNCOMMENT ON 2.18.0
// CDR.log(new WebrtcDebugEvent(participant, subscriberEndpointName, WebrtcDebugEventIssuer.client,
// WebrtcDebugEventOperation.subscribe, WebrtcDebugEventType.sdpAnswer, sdpAnswer));
// String remoteSdpAnswer = kParticipant.receiveMediaFrom(senderParticipant, sdpAnswer, false);
// sessionEventsHandler.onSubscribe(participant, session, transactionId, null);
// END TODO
} catch (OpenViduException e) { } catch (OpenViduException e) {
log.error("PARTICIPANT {}: Error subscribing to {}", participant.getParticipantPublicId(), senderName, e); log.error("PARTICIPANT {}: Error subscribing to {}", participant.getParticipantPublicId(), senderName, e);
@ -1178,184 +1174,127 @@ public class KurentoSessionManager extends SessionManager {
return kParticipant; return kParticipant;
} }
// TODO: REMOVE ON 2.18.0
@Override @Override
public void reconnectStream2170(Participant participant, String streamId, String sdpOffer, Integer transactionId) { public void reconnectPublisher(Participant participant, String streamId, String sdpString, Integer transactionId) {
KurentoParticipant kParticipant = (KurentoParticipant) participant; KurentoParticipant kParticipant = (KurentoParticipant) participant;
KurentoSession kSession = kParticipant.getSession(); KurentoSession kSession = kParticipant.getSession();
boolean isPublisher = streamId.equals(participant.getPublisherStreamId()); reconnectPublisher(kSession, kParticipant, streamId, sdpString, transactionId);
}
@Override
public void reconnectSubscriber(Participant participant, String streamId, String sdpString, Integer transactionId,
boolean initByServer) {
KurentoParticipant kParticipant = (KurentoParticipant) participant;
KurentoSession kSession = kParticipant.getSession();
reconnectSubscriber(kSession, kParticipant, streamId, sdpString, transactionId, initByServer);
}
private String mungeSdpOffer(Session kSession, Participant participant, String sdpOffer, boolean isPublisher) {
boolean isTranscodingAllowed = kSession.getSessionProperties().isTranscodingAllowed(); boolean isTranscodingAllowed = kSession.getSessionProperties().isTranscodingAllowed();
VideoCodec forcedVideoCodec = kSession.getSessionProperties().forcedVideoCodec(); VideoCodec forcedVideoCodec = kSession.getSessionProperties().forcedVideoCodec();
boolean sdpOfferHasBeenMunged = false;
String originalSdpOffer = sdpOffer;
// Modify sdp if forced codec is defined // Modify sdp if forced codec is defined
if (forcedVideoCodec != VideoCodec.NONE && !participant.isIpcam()) { if (forcedVideoCodec != VideoCodec.NONE && !participant.isIpcam()) {
sdpOfferHasBeenMunged = true; return sdpMunging.forceCodec(sdpOffer, participant, isPublisher, true, isTranscodingAllowed,
sdpOffer = sdpMunging.forceCodec(sdpOffer, participant, isPublisher, true, isTranscodingAllowed,
forcedVideoCodec, false); forcedVideoCodec, false);
} }
return null;
}
if (isPublisher) { private void reconnectPublisher(KurentoSession kSession, KurentoParticipant kParticipant, String streamId,
String sdpOffer, Integer transactionId) {
CDR.log(new WebrtcDebugEvent(participant, streamId, WebrtcDebugEventIssuer.client, String sdpOfferMunged = mungeSdpOffer(kSession, kParticipant, sdpOffer, true);
WebrtcDebugEventOperation.reconnectPublisher, WebrtcDebugEventType.sdpOffer, originalSdpOffer));
if (sdpOfferHasBeenMunged) {
CDR.log(new WebrtcDebugEvent(participant, streamId, WebrtcDebugEventIssuer.client,
WebrtcDebugEventOperation.reconnectPublisher, WebrtcDebugEventType.sdpOfferMunged, sdpOffer));
}
// Reconnect publisher CDR.log(new WebrtcDebugEvent(kParticipant, streamId, WebrtcDebugEventIssuer.client,
final KurentoMediaOptions kurentoOptions = (KurentoMediaOptions) kParticipant.getPublisher() WebrtcDebugEventOperation.reconnectPublisher, WebrtcDebugEventType.sdpOffer, sdpOffer));
.getMediaOptions(); if (sdpOfferMunged != null) {
sdpOffer = sdpOfferMunged;
CDR.log(new WebrtcDebugEvent(kParticipant, streamId, WebrtcDebugEventIssuer.client,
WebrtcDebugEventOperation.reconnectPublisher, WebrtcDebugEventType.sdpOfferMunged, sdpOffer));
}
// Reconnect publisher
final KurentoMediaOptions kurentoOptions = (KurentoMediaOptions) kParticipant.getPublisher().getMediaOptions();
// 1) Disconnect broken PublisherEndpoint from its PassThrough
PublisherEndpoint publisher = kParticipant.getPublisher();
final PassThrough passThru = publisher.disconnectFromPassThrough();
// 2) Destroy the broken PublisherEndpoint and nothing else
publisher.cancelStatsLoop.set(true);
kParticipant.releaseElement(kParticipant.getParticipantPublicId(), publisher.getEndpoint());
// 3) Create a new PublisherEndpoint connecting it to the previous PassThrough
kParticipant.resetPublisherEndpoint(kurentoOptions, passThru);
kParticipant.createPublishingEndpoint(kurentoOptions, streamId);
String sdpAnswer = kParticipant.publishToRoom(sdpOffer, kurentoOptions.doLoopback, true);
log.debug("SDP Answer for publishing reconnection PARTICIPANT {}: {}", kParticipant.getParticipantPublicId(),
sdpAnswer);
CDR.log(new WebrtcDebugEvent(kParticipant, streamId, WebrtcDebugEventIssuer.server,
WebrtcDebugEventOperation.reconnectPublisher, WebrtcDebugEventType.sdpAnswer, sdpAnswer));
sessionEventsHandler.onPublishMedia(kParticipant, kParticipant.getPublisherStreamId(),
kParticipant.getPublisher().createdAt(), kSession.getSessionId(), kurentoOptions, sdpAnswer,
new HashSet<Participant>(), transactionId, null);
}
// 1) Disconnect broken PublisherEndpoint from its PassThrough private void reconnectSubscriber(KurentoSession kSession, KurentoParticipant kParticipant, String streamId,
PublisherEndpoint publisher = kParticipant.getPublisher(); String sdpString, Integer transactionId, boolean initByServer) {
final PassThrough passThru = publisher.disconnectFromPassThrough();
// 2) Destroy the broken PublisherEndpoint and nothing else String senderPrivateId = kSession.getParticipantPrivateIdFromStreamId(streamId);
publisher.cancelStatsLoop.set(true); if (senderPrivateId != null) {
kParticipant.releaseElement(participant.getParticipantPublicId(), publisher.getEndpoint());
// 3) Create a new PublisherEndpoint connecting it to the previous PassThrough KurentoParticipant sender = (KurentoParticipant) kSession.getParticipantByPrivateId(senderPrivateId);
kParticipant.resetPublisherEndpoint(kurentoOptions, passThru); String subscriberEndpointName = kParticipant.calculateSubscriberEndpointName(sender);
kParticipant.createPublishingEndpoint(kurentoOptions, streamId);
String sdpAnswer = kParticipant.publishToRoom(sdpOffer, kurentoOptions.doLoopback, true);
log.debug("SDP Answer for publishing reconnection PARTICIPANT {}: {}", participant.getParticipantPublicId(),
sdpAnswer);
CDR.log(new WebrtcDebugEvent(participant, streamId, WebrtcDebugEventIssuer.server, if (initByServer) {
WebrtcDebugEventOperation.reconnectPublisher, WebrtcDebugEventType.sdpAnswer, sdpAnswer));
sessionEventsHandler.onPublishMedia(participant, participant.getPublisherStreamId(), // Server initiated negotiation
kParticipant.getPublisher().createdAt(), kSession.getSessionId(), kurentoOptions, sdpAnswer,
new HashSet<Participant>(), transactionId, null);
} else { final String sdpAnswer = sdpString;
// Reconnect subscriber CDR.log(new WebrtcDebugEvent(kParticipant, subscriberEndpointName, WebrtcDebugEventIssuer.client,
String senderPrivateId = kSession.getParticipantPrivateIdFromStreamId(streamId); WebrtcDebugEventOperation.reconnectSubscriber, WebrtcDebugEventType.sdpAnswer, sdpAnswer));
if (senderPrivateId != null) {
KurentoParticipant sender = (KurentoParticipant) kSession.getParticipantByPrivateId(senderPrivateId); kParticipant.receiveMedia(sender, sdpAnswer, true, true);
String subscriberEndpointName = kParticipant.calculateSubscriberEndpointName(sender);
CDR.log(new WebrtcDebugEvent(participant, subscriberEndpointName, WebrtcDebugEventIssuer.client, log.debug("SDP Answer for subscribing reconnection PARTICIPANT {}: {}",
WebrtcDebugEventOperation.reconnectSubscriber, WebrtcDebugEventType.sdpOffer, kParticipant.getParticipantPublicId(), sdpAnswer);
originalSdpOffer));
if (sdpOfferHasBeenMunged) { sessionEventsHandler.onSubscribe(kParticipant, kSession, sdpAnswer, transactionId, null);
CDR.log(new WebrtcDebugEvent(participant, subscriberEndpointName, WebrtcDebugEventIssuer.client,
} else {
// Client initiated negotiation
String sdpOffer = sdpString;
CDR.log(new WebrtcDebugEvent(kParticipant, subscriberEndpointName, WebrtcDebugEventIssuer.client,
WebrtcDebugEventOperation.reconnectSubscriber, WebrtcDebugEventType.sdpOffer, sdpOffer));
String sdpOfferMunged = mungeSdpOffer(kSession, kParticipant, sdpOffer, false);
if (sdpOfferMunged != null) {
sdpOffer = sdpOfferMunged;
CDR.log(new WebrtcDebugEvent(kParticipant, subscriberEndpointName, WebrtcDebugEventIssuer.client,
WebrtcDebugEventOperation.reconnectSubscriber, WebrtcDebugEventType.sdpOfferMunged, WebrtcDebugEventOperation.reconnectSubscriber, WebrtcDebugEventType.sdpOfferMunged,
sdpOffer)); sdpOffer));
} }
String sdpAnswer = kParticipant.receiveMediaFrom2170(sender, sdpOffer, true); kParticipant.cancelReceivingMedia(sender, null, true);
String sdpAnswer = kParticipant.receiveMedia(sender, sdpOffer, true, false);
if (sdpAnswer == null) { if (sdpAnswer == null) {
throw new OpenViduException(Code.MEDIA_SDP_ERROR_CODE, throw new OpenViduException(Code.MEDIA_SDP_ERROR_CODE,
"Unable to generate SDP answer when reconnecting subscriber to '" + streamId + "'"); "Unable to generate SDP answer when reconnecting subscriber to '" + streamId + "'");
} }
log.debug("SDP Answer for subscribing reconnection PARTICIPANT {}: {}", log.debug("SDP Answer for subscribing reconnection PARTICIPANT {}: {}",
participant.getParticipantPublicId(), sdpAnswer); kParticipant.getParticipantPublicId(), sdpAnswer);
CDR.log(new WebrtcDebugEvent(participant, subscriberEndpointName, WebrtcDebugEventIssuer.server, CDR.log(new WebrtcDebugEvent(kParticipant, subscriberEndpointName, WebrtcDebugEventIssuer.server,
WebrtcDebugEventOperation.reconnectSubscriber, WebrtcDebugEventType.sdpAnswer, sdpAnswer)); WebrtcDebugEventOperation.reconnectSubscriber, WebrtcDebugEventType.sdpAnswer, sdpAnswer));
sessionEventsHandler.onSubscribe(participant, kSession, sdpAnswer, transactionId, null); sessionEventsHandler.onSubscribe(kParticipant, kSession, sdpAnswer, transactionId, null);
} else {
throw new OpenViduException(Code.USER_NOT_STREAMING_ERROR_CODE,
"Stream '" + streamId + "' does not exist in Session '" + kSession.getSessionId() + "'");
} }
}
}
// END TODO
@Override
public void reconnectStream(Participant participant, String streamId, String sdpOfferOrAnswer,
Integer transactionId) {
KurentoParticipant kParticipant = (KurentoParticipant) participant;
KurentoSession kSession = kParticipant.getSession();
boolean isPublisher = streamId.equals(participant.getPublisherStreamId());
if (isPublisher) {
// Reconnect publisher
String sdpOffer = sdpOfferOrAnswer;
boolean isTranscodingAllowed = kSession.getSessionProperties().isTranscodingAllowed();
VideoCodec forcedVideoCodec = kSession.getSessionProperties().forcedVideoCodec();
boolean sdpOfferHasBeenMunged = false;
final String originalSdpOffer = sdpOffer;
// Modify sdp if forced codec is defined
if (forcedVideoCodec != VideoCodec.NONE && !participant.isIpcam()) {
sdpOfferHasBeenMunged = true;
sdpOffer = sdpMunging.forceCodec(sdpOffer, participant, isPublisher, true, isTranscodingAllowed,
forcedVideoCodec, false);
}
CDR.log(new WebrtcDebugEvent(participant, streamId, WebrtcDebugEventIssuer.client,
WebrtcDebugEventOperation.reconnectPublisher, WebrtcDebugEventType.sdpOffer, originalSdpOffer));
if (sdpOfferHasBeenMunged) {
CDR.log(new WebrtcDebugEvent(participant, streamId, WebrtcDebugEventIssuer.client,
WebrtcDebugEventOperation.reconnectPublisher, WebrtcDebugEventType.sdpOfferMunged, sdpOffer));
}
// Reconnect publisher
final KurentoMediaOptions kurentoOptions = (KurentoMediaOptions) kParticipant.getPublisher()
.getMediaOptions();
// 1) Disconnect broken PublisherEndpoint from its PassThrough
PublisherEndpoint publisher = kParticipant.getPublisher();
final PassThrough passThru = publisher.disconnectFromPassThrough();
// 2) Destroy the broken PublisherEndpoint and nothing else
publisher.cancelStatsLoop.set(true);
kParticipant.releaseElement(participant.getParticipantPublicId(), publisher.getEndpoint());
// 3) Create a new PublisherEndpoint connecting it to the previous PassThrough
kParticipant.resetPublisherEndpoint(kurentoOptions, passThru);
kParticipant.createPublishingEndpoint(kurentoOptions, streamId);
String sdpAnswer = kParticipant.publishToRoom(sdpOffer, kurentoOptions.doLoopback, true);
log.debug("SDP Answer for publishing reconnection PARTICIPANT {}: {}", participant.getParticipantPublicId(),
sdpAnswer);
CDR.log(new WebrtcDebugEvent(participant, streamId, WebrtcDebugEventIssuer.server,
WebrtcDebugEventOperation.reconnectPublisher, WebrtcDebugEventType.sdpAnswer, sdpAnswer));
sessionEventsHandler.onPublishMedia(participant, participant.getPublisherStreamId(),
kParticipant.getPublisher().createdAt(), kSession.getSessionId(), kurentoOptions, sdpAnswer,
new HashSet<Participant>(), transactionId, null);
} else { } else {
throw new OpenViduException(Code.USER_NOT_STREAMING_ERROR_CODE,
// Reconnect subscriber "Stream '" + streamId + "' does not exist in Session '" + kSession.getSessionId() + "'");
final String sdpAnswer = sdpOfferOrAnswer;
String senderPrivateId = kSession.getParticipantPrivateIdFromStreamId(streamId);
if (senderPrivateId != null) {
KurentoParticipant sender = (KurentoParticipant) kSession.getParticipantByPrivateId(senderPrivateId);
String subscriberEndpointName = kParticipant.calculateSubscriberEndpointName(sender);
CDR.log(new WebrtcDebugEvent(participant, subscriberEndpointName, WebrtcDebugEventIssuer.client,
WebrtcDebugEventOperation.reconnectSubscriber, WebrtcDebugEventType.sdpAnswer, sdpAnswer));
kParticipant.receiveMediaFrom2180(sender, sdpAnswer, true);
log.debug("SDP Answer for subscribing reconnection PARTICIPANT {}: {}",
participant.getParticipantPublicId(), sdpAnswer);
sessionEventsHandler.onSubscribe(participant, kSession, sdpAnswer, transactionId, null);
} else {
throw new OpenViduException(Code.USER_NOT_STREAMING_ERROR_CODE,
"Stream '" + streamId + "' does not exist in Session '" + kSession.getSessionId() + "'");
}
} }
} }

View File

@ -62,26 +62,20 @@ public class SubscriberEndpoint extends MediaEndpoint {
return sdpOffer; return sdpOffer;
} }
public synchronized String subscribe(String sdpAnswer, PublisherEndpoint publisher) { public synchronized String subscribe(String sdpString, PublisherEndpoint publisher) {
// TODO: REMOVE ON 2.18.0 if (this.publisherStreamId == null) {
if (this.createdAt == null) { // Client initiated negotiation
// 2.17.0
registerOnIceCandidateEventListener(publisher.getOwner().getParticipantPublicId()); registerOnIceCandidateEventListener(publisher.getOwner().getParticipantPublicId());
this.createdAt = System.currentTimeMillis(); this.createdAt = System.currentTimeMillis();
String realSdpAnswer = processOffer(sdpAnswer); String realSdpAnswer = processOffer(sdpString);
gatherCandidates(); gatherCandidates();
publisher.connect(this.getEndpoint(), false); publisher.connect(this.getEndpoint(), false);
this.publisherStreamId = publisher.getStreamId(); this.publisherStreamId = publisher.getStreamId();
return realSdpAnswer; return realSdpAnswer;
} else { } else {
// 2.18.0 // Server initiated negotiation
return processAnswer(sdpAnswer); return processAnswer(sdpString);
} }
// END TODO
// TODO: UNCOMMENT ON 2.18.0
// processAnswer(sdpAnswer);
// END TODO
} }
@Override @Override

View File

@ -358,18 +358,7 @@ public class RpcHandler extends DefaultJsonRpcHandler<JsonObject> {
String senderStreamId = getStringParam(request, ProtocolElements.RECEIVEVIDEO_SENDER_PARAM); String senderStreamId = getStringParam(request, ProtocolElements.RECEIVEVIDEO_SENDER_PARAM);
String senderPublicId = parseSenderPublicIdFromStreamId(senderStreamId); String senderPublicId = parseSenderPublicIdFromStreamId(senderStreamId);
boolean reconnect = false; boolean reconnect = getBooleanParam(request, ProtocolElements.PREPARERECEIVEVIDEO_RECONNECT_PARAM);
// TODO: REMOVE ON 2.18.0
if (request.getParams().has(ProtocolElements.PREPARERECEIVEVIDEO_RECONNECT_PARAM)) {
reconnect = getBooleanParam(request, ProtocolElements.PREPARERECEIVEVIDEO_RECONNECT_PARAM);
}
// END TODO
// TODO: UNCOMMENT ON 2.18.0
// boolean reconnect = getBooleanParam(request,
// ProtocolElements.PREPARERECEIVEVIDEO_RECONNECT_PARAM);
// END TODO
sessionManager.prepareSubscription(participant, senderPublicId, reconnect, request.getId()); sessionManager.prepareSubscription(participant, senderPublicId, reconnect, request.getId());
} }
@ -385,28 +374,15 @@ public class RpcHandler extends DefaultJsonRpcHandler<JsonObject> {
String senderStreamId = getStringParam(request, ProtocolElements.RECEIVEVIDEO_SENDER_PARAM); String senderStreamId = getStringParam(request, ProtocolElements.RECEIVEVIDEO_SENDER_PARAM);
String senderPublicId = parseSenderPublicIdFromStreamId(senderStreamId); String senderPublicId = parseSenderPublicIdFromStreamId(senderStreamId);
// TODO: REMOVE ON 2.18.0
if (request.getParams().has(ProtocolElements.RECEIVEVIDEO_SDPOFFER_PARAM)) { if (request.getParams().has(ProtocolElements.RECEIVEVIDEO_SDPOFFER_PARAM)) {
// 2.17.0: initiative held by browser when subscribing // Client initiated negotiation (comes with SDP Offer)
// The request comes with an SDPOffer
String sdpOffer = getStringParam(request, ProtocolElements.RECEIVEVIDEO_SDPOFFER_PARAM); String sdpOffer = getStringParam(request, ProtocolElements.RECEIVEVIDEO_SDPOFFER_PARAM);
sessionManager.subscribe(participant, senderPublicId, sdpOffer, request.getId(), false); sessionManager.subscribe(participant, senderPublicId, sdpOffer, request.getId(), false);
} else if (request.getParams().has(ProtocolElements.RECEIVEVIDEO_SDPANSWER_PARAM)) { } else if (request.getParams().has(ProtocolElements.RECEIVEVIDEO_SDPANSWER_PARAM)) {
// 2.18.0: initiative held by server when subscribing // Server initiated negotiation (comes with SDP Answer)
// This is the final call after prepareReceiveVidoFrom, comes with SDPAnswer
String sdpAnswer = getStringParam(request, ProtocolElements.RECEIVEVIDEO_SDPANSWER_PARAM); String sdpAnswer = getStringParam(request, ProtocolElements.RECEIVEVIDEO_SDPANSWER_PARAM);
sessionManager.subscribe(participant, senderPublicId, sdpAnswer, request.getId(), true); sessionManager.subscribe(participant, senderPublicId, sdpAnswer, request.getId(), true);
} }
// END TODO
// TODO: UNCOMMENT ON 2.18.0
/*
* String sdpAnswer = getStringParam(request,
* ProtocolElements.RECEIVEVIDEO_SDPANSWER_PARAM);
* sessionManager.subscribe(participant, senderPublicId, sdpAnswer,
* request.getId());
*/
// END TODO
} }
private void unsubscribeFromVideo(RpcConnection rpcConnection, Request<JsonObject> request) { private void unsubscribeFromVideo(RpcConnection rpcConnection, Request<JsonObject> request) {
@ -670,40 +646,28 @@ public class RpcHandler extends DefaultJsonRpcHandler<JsonObject> {
} catch (OpenViduException e) { } catch (OpenViduException e) {
return; return;
} }
String streamId = getStringParam(request, ProtocolElements.RECONNECTSTREAM_STREAM_PARAM); String streamId = getStringParam(request, ProtocolElements.RECONNECTSTREAM_STREAM_PARAM);
boolean isPublisher = streamId.equals(participant.getPublisherStreamId());
// TODO: REMOVE ON 2.18.0 String sdpString = null;
if (request.getParams().has(ProtocolElements.RECONNECTSTREAM_SDPOFFER_PARAM)) { if (request.getParams().has(ProtocolElements.RECONNECTSTREAM_SDPOFFER_PARAM)) {
// 2.17.0 sdpString = getStringParam(request, ProtocolElements.RECONNECTSTREAM_SDPOFFER_PARAM);
try {
String sdpOffer = getStringParam(request, ProtocolElements.RECONNECTSTREAM_SDPOFFER_PARAM);
sessionManager.reconnectStream(participant, streamId, sdpOffer, request.getId());
} catch (OpenViduException e) {
this.notificationService.sendErrorResponse(participant.getParticipantPrivateId(), request.getId(),
new JsonObject(), e);
}
} else if (request.getParams().has(ProtocolElements.RECONNECTSTREAM_SDPSTRING_PARAM)) { } else if (request.getParams().has(ProtocolElements.RECONNECTSTREAM_SDPSTRING_PARAM)) {
// 2.18.0 sdpString = getStringParam(request, ProtocolElements.RECONNECTSTREAM_SDPSTRING_PARAM);
String sdpString = getStringParam(request, ProtocolElements.RECONNECTSTREAM_SDPSTRING_PARAM);
try {
sessionManager.reconnectStream(participant, streamId, sdpString, request.getId());
} catch (OpenViduException e) {
this.notificationService.sendErrorResponse(participant.getParticipantPrivateId(), request.getId(),
new JsonObject(), e);
}
} }
// END TODO
// TODO: UNCOMMENT ON 2.18.0 try {
/* if (isPublisher) {
* String sdpString = getStringParam(request, sessionManager.reconnectPublisher(participant, streamId, sdpString, request.getId());
* ProtocolElements.RECONNECTSTREAM_SDPSTRING_PARAM); try { } else {
* sessionManager.reconnectStream(participant, streamId, sdpString, boolean initByServer = request.getParams().has(ProtocolElements.RECONNECTSTREAM_SDPSTRING_PARAM);
* request.getId()); } catch (OpenViduException e) { sessionManager.reconnectSubscriber(participant, streamId, sdpString, request.getId(), initByServer);
* this.notificationService.sendErrorResponse(participant. }
* getParticipantPrivateId(), request.getId(), new JsonObject(), e); } } catch (OpenViduException e) {
*/ this.notificationService.sendErrorResponse(participant.getParticipantPrivateId(), request.getId(),
// END TODO new JsonObject(), e);
}
} }
private void updateVideoData(RpcConnection rpcConnection, Request<JsonObject> request) { private void updateVideoData(RpcConnection rpcConnection, Request<JsonObject> request) {