Forcibly reconnect subscribers upon publisher reconnection if necessary

pull/648/head
pabloFuente 2021-06-30 16:00:13 +02:00
parent 00d64ded9b
commit 6d93fc3405
10 changed files with 113 additions and 74 deletions

View File

@ -1159,7 +1159,7 @@ export class Session extends EventDispatcher {
return;
}
stream.completeWebRtcPeerReceive(true, event.sdpOffer)
stream.completeWebRtcPeerReceive(true, true, event.sdpOffer)
.then(() => stream.finalResolveForSubscription(true, resolve))
.catch(error => stream.finalRejectForSubscription(true, `Error while forcibly reconnecting remote stream ${event.streamId}: ${error.toString()}`, reject));
} else {

View File

@ -1031,7 +1031,7 @@ export class Stream {
*/
initWebRtcPeerReceiveFromClient(reconnect: boolean): Promise<void> {
return new Promise((resolve, reject) => {
this.completeWebRtcPeerReceive(reconnect).then(response => {
this.completeWebRtcPeerReceive(reconnect, false).then(response => {
this.webRtcPeer.processRemoteAnswer(response.sdpAnswer)
.then(() => resolve()).catch(error => reject(error));
}).catch(error => reject(error));
@ -1048,7 +1048,7 @@ export class Stream {
if (error) {
reject(new Error('Error on prepareReceiveVideoFrom: ' + JSON.stringify(error)));
} else {
this.completeWebRtcPeerReceive(reconnect, response.sdpOffer)
this.completeWebRtcPeerReceive(reconnect, false, response.sdpOffer)
.then(() => resolve()).catch(error => reject(error));
}
});
@ -1058,7 +1058,7 @@ export class Stream {
/**
* @hidden
*/
completeWebRtcPeerReceive(reconnect: boolean, sdpOfferByServer?: string): Promise<any> {
completeWebRtcPeerReceive(reconnect: boolean, forciblyReconnect: boolean, sdpOfferByServer?: string): Promise<any> {
return new Promise((resolve, reject) => {
logger.debug("'Session.subscribe(Stream)' called");
@ -1075,6 +1075,9 @@ export class Stream {
} else {
params['sdpOffer'] = sdpString;
}
if (reconnect) {
params['forciblyReconnect'] = forciblyReconnect;
}
this.session.openvidu.sendRequest(method, params, (error, response) => {
if (error) {

View File

@ -134,6 +134,7 @@ public class ProtocolElements {
public static final String RECONNECTSTREAM_METHOD = "reconnectStream";
public static final String RECONNECTSTREAM_STREAM_PARAM = "stream";
public static final String RECONNECTSTREAM_SDPSTRING_PARAM = "sdpString";
public static final String RECONNECTSTREAM_FORCIBLYRECONNECT_PARAM = "forciblyReconnect";
// TODO: REMOVE ON 2.18.0
public static final String RECONNECTSTREAM_SDPOFFER_PARAM = "sdpOffer";
// ENDTODO
@ -142,6 +143,11 @@ public class ProtocolElements {
public static final String ECHO_METHOD = "echo";
public static final String FORCIBLYRECONNECTSUBSCRIBER_METHOD = "forciblyReconnectSubscriber";
public static final String FORCIBLYRECONNECTSUBSCRIBER_CONNECTIONID_PARAM = "connectionId";
public static final String FORCIBLYRECONNECTSUBSCRIBER_STREAMID_PARAM = "streamId";
public static final String FORCIBLYRECONNECTSUBSCRIBER_SDPOFFER_PARAM = "sdpOffer";
// ---------------------------- SERVER RESPONSES & EVENTS -----------------
public static final String PARTICIPANTJOINED_METHOD = "participantJoined";

View File

@ -11,7 +11,7 @@ public class WebrtcDebugEvent {
}
public enum WebrtcDebugEventOperation {
publish, subscribe, reconnectPublisher, reconnectSubscriber
publish, subscribe, reconnectPublisher, reconnectSubscriber, forciblyReconnectSubscriber
}
public enum WebrtcDebugEventType {

View File

@ -308,7 +308,7 @@ public class SessionEventsHandler {
}
}
public void onPrepareSubscription(Participant participant, Session session, String sdpOffer, Integer transactionId,
public void onPrepareSubscription(Participant participant, String sdpOffer, Integer transactionId,
OpenViduException error) {
if (error != null) {
rpcNotificationService.sendErrorResponse(participant.getParticipantPrivateId(), transactionId, null, error);

View File

@ -116,6 +116,8 @@ public abstract class SessionManager {
public abstract void prepareSubscription(Participant participant, String senderPublicId, boolean reconnect,
Integer id);
public abstract String prepareForcedSubscription(Participant participant, String senderPublicId);
public abstract void subscribe(Participant participant, String senderName, String sdpString, Integer transactionId,
boolean initByServer);
@ -177,7 +179,7 @@ public abstract class SessionManager {
Integer transactionId);
public abstract void reconnectSubscriber(Participant participant, String streamId, String sdpString,
Integer transactionId, boolean initByServer);
Integer transactionId, boolean initByServer, boolean forciblyReconnect);
public abstract String getParticipantPrivateIdFromStreamId(String sessionId, String streamId)
throws OpenViduException;

View File

@ -19,9 +19,11 @@ package io.openvidu.server.kurento.core;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.kurento.client.Continuation;
import org.kurento.client.ErrorEvent;
@ -285,6 +287,12 @@ public class KurentoSession extends Session {
return this.publishedStreamIds.get(streamId);
}
public Set<Participant> getParticipantsSubscribedToParticipant(String senderPublicId) {
return this.participants.values().stream()
.filter(p -> ((KurentoParticipant) p).getSubscriber(senderPublicId) != null)
.collect(Collectors.toSet());
}
public void restartStatusInKurentoAfterReconnection(Long kmsDisconnectionTime) {
log.info("Resetting process: resetting remote media objects for active session {}", this.sessionId);

View File

@ -32,7 +32,6 @@ public class KurentoSessionEventsHandler extends SessionEventsHandler {
public void onIceCandidate(String roomName, String participantPrivateId, String senderPublicId, String endpointName,
IceCandidate candidate) {
JsonObject params = new JsonObject();
params.addProperty(ProtocolElements.ICECANDIDATE_SENDERCONNECTIONID_PARAM, senderPublicId);
params.addProperty(ProtocolElements.ICECANDIDATE_EPNAME_PARAM, endpointName);
params.addProperty(ProtocolElements.ICECANDIDATE_SDPMLINEINDEX_PARAM, candidate.getSdpMLineIndex());

View File

@ -538,67 +538,80 @@ public class KurentoSessionManager extends SessionManager {
@Override
public void prepareSubscription(Participant participant, String senderPublicId, boolean reconnect,
Integer transactionId) {
String sdpOffer = null;
Session session = null;
try {
log.debug("Request [SUBSCRIBE] remoteParticipant={} sdpOffer={} ({})", senderPublicId, sdpOffer,
participant.getParticipantPublicId());
KurentoParticipant kParticipant = (KurentoParticipant) participant;
session = ((KurentoParticipant) participant).getSession();
Participant senderParticipant = session.getParticipantByPublicId(senderPublicId);
if (senderParticipant == null) {
log.warn(
"PARTICIPANT {}: Requesting to recv media from user {} "
+ "in session {} but user could not be found",
participant.getParticipantPublicId(), senderPublicId, session.getSessionId());
throw new OpenViduException(Code.USER_NOT_FOUND_ERROR_CODE,
"User '" + senderPublicId + " not found in session '" + session.getSessionId() + "'");
}
if (!senderParticipant.isStreaming()) {
log.warn(
"PARTICIPANT {}: Requesting to recv media from user {} "
+ "in session {} but user is not streaming media",
participant.getParticipantPublicId(), senderPublicId, session.getSessionId());
throw new OpenViduException(Code.USER_NOT_STREAMING_ERROR_CODE,
"User '" + senderPublicId + " not streaming media in session '" + session.getSessionId() + "'");
}
if (reconnect) {
kParticipant.cancelReceivingMedia(((KurentoParticipant) senderParticipant), null, true);
}
sdpOffer = kParticipant.prepareReceiveMediaFrom(senderParticipant);
final String subscriberEndpointName = kParticipant.calculateSubscriberEndpointName(senderParticipant);
CDR.log(new WebrtcDebugEvent(participant, subscriberEndpointName, WebrtcDebugEventIssuer.server,
WebrtcDebugEventOperation.subscribe, WebrtcDebugEventType.sdpOffer, sdpOffer));
boolean isTranscodingAllowed = session.getSessionProperties().isTranscodingAllowed();
VideoCodec forcedVideoCodec = session.getSessionProperties().forcedVideoCodec();
// Modify server's SDPOffer if forced codec is defined
if (forcedVideoCodec != VideoCodec.NONE && !participant.isIpcam()) {
sdpOffer = sdpMunging.forceCodec(sdpOffer, participant, false, false, isTranscodingAllowed,
forcedVideoCodec);
CDR.log(new WebrtcDebugEvent(participant, subscriberEndpointName, WebrtcDebugEventIssuer.server,
WebrtcDebugEventOperation.subscribe, WebrtcDebugEventType.sdpOfferMunged, sdpOffer));
}
if (sdpOffer == null) {
throw new OpenViduException(Code.MEDIA_SDP_ERROR_CODE, "Unable to generate SDP offer when subscribing '"
+ participant.getParticipantPublicId() + "' to '" + senderPublicId + "'");
}
String sdpOfferByServer = this.commonPrepareSubscription(participant, senderPublicId, reconnect,
WebrtcDebugEventOperation.subscribe);
sessionEventsHandler.onPrepareSubscription(participant, sdpOfferByServer, transactionId, null);
} catch (OpenViduException e) {
log.error("PARTICIPANT {}: Error preparing subscription to {}", participant.getParticipantPublicId(),
senderPublicId, e);
sessionEventsHandler.onPrepareSubscription(participant, session, null, transactionId, e);
sessionEventsHandler.onPrepareSubscription(participant, null, transactionId, e);
}
if (sdpOffer != null) {
sessionEventsHandler.onPrepareSubscription(participant, session, sdpOffer, transactionId, null);
}
@Override
public String prepareForcedSubscription(Participant participant, String senderPublicId) {
return this.commonPrepareSubscription(participant, senderPublicId, true,
WebrtcDebugEventOperation.forciblyReconnectSubscriber);
}
private String commonPrepareSubscription(Participant participant, String senderPublicId, boolean reconnect,
WebrtcDebugEventOperation operation) {
String sdpOffer = null;
Session session = null;
log.debug("Request [SUBSCRIBE] remoteParticipant={} sdpOffer={} ({})", senderPublicId, sdpOffer,
participant.getParticipantPublicId());
KurentoParticipant kParticipant = (KurentoParticipant) participant;
session = ((KurentoParticipant) participant).getSession();
Participant senderParticipant = session.getParticipantByPublicId(senderPublicId);
if (senderParticipant == null) {
log.warn(
"PARTICIPANT {}: Requesting to recv media from user {} "
+ "in session {} but user could not be found",
participant.getParticipantPublicId(), senderPublicId, session.getSessionId());
throw new OpenViduException(Code.USER_NOT_FOUND_ERROR_CODE,
"User '" + senderPublicId + " not found in session '" + session.getSessionId() + "'");
}
if (!senderParticipant.isStreaming()) {
log.warn(
"PARTICIPANT {}: Requesting to recv media from user {} "
+ "in session {} but user is not streaming media",
participant.getParticipantPublicId(), senderPublicId, session.getSessionId());
throw new OpenViduException(Code.USER_NOT_STREAMING_ERROR_CODE,
"User '" + senderPublicId + " not streaming media in session '" + session.getSessionId() + "'");
}
if (reconnect) {
kParticipant.cancelReceivingMedia(((KurentoParticipant) senderParticipant), null, true);
}
sdpOffer = kParticipant.prepareReceiveMediaFrom(senderParticipant);
final String subscriberEndpointName = kParticipant.calculateSubscriberEndpointName(senderParticipant);
CDR.log(new WebrtcDebugEvent(participant, subscriberEndpointName, WebrtcDebugEventIssuer.server, operation,
WebrtcDebugEventType.sdpOffer, sdpOffer));
boolean isTranscodingAllowed = session.getSessionProperties().isTranscodingAllowed();
VideoCodec forcedVideoCodec = session.getSessionProperties().forcedVideoCodec();
// Modify server's SDPOffer if forced codec is defined
if (forcedVideoCodec != VideoCodec.NONE && !participant.isIpcam()) {
sdpOffer = sdpMunging.forceCodec(sdpOffer, participant, false, false, isTranscodingAllowed,
forcedVideoCodec);
CDR.log(new WebrtcDebugEvent(participant, subscriberEndpointName, WebrtcDebugEventIssuer.server, operation,
WebrtcDebugEventType.sdpOfferMunged, sdpOffer));
}
if (sdpOffer == null) {
throw new OpenViduException(Code.MEDIA_SDP_ERROR_CODE, "Unable to generate SDP offer when subscribing '"
+ participant.getParticipantPublicId() + "' to '" + senderPublicId + "'");
}
return sdpOffer;
}
@Override
@ -1188,10 +1201,10 @@ public class KurentoSessionManager extends SessionManager {
@Override
public void reconnectSubscriber(Participant participant, String streamId, String sdpString, Integer transactionId,
boolean initByServer) {
boolean initByServer, boolean forciblyReconnect) {
KurentoParticipant kParticipant = (KurentoParticipant) participant;
KurentoSession kSession = kParticipant.getSession();
reconnectSubscriber(kSession, kParticipant, streamId, sdpString, transactionId, initByServer);
reconnectSubscriber(kSession, kParticipant, streamId, sdpString, transactionId, initByServer, forciblyReconnect);
}
private String mungeSdpOffer(Session kSession, Participant participant, String sdpOffer, boolean isPublisher) {
@ -1205,7 +1218,7 @@ public class KurentoSessionManager extends SessionManager {
return null;
}
private void reconnectPublisher(KurentoSession kSession, KurentoParticipant kParticipant, String streamId,
protected void reconnectPublisher(KurentoSession kSession, KurentoParticipant kParticipant, String streamId,
String sdpOffer, Integer transactionId) {
String sdpOfferMunged = mungeSdpOffer(kSession, kParticipant, sdpOffer, true);
@ -1239,13 +1252,16 @@ public class KurentoSessionManager extends SessionManager {
}
private void reconnectSubscriber(KurentoSession kSession, KurentoParticipant kParticipant, String streamId,
String sdpString, Integer transactionId, boolean initByServer) {
String sdpString, Integer transactionId, boolean initByServer, boolean forciblyReconnect) {
String senderPrivateId = kSession.getParticipantPrivateIdFromStreamId(streamId);
if (senderPrivateId != null) {
KurentoParticipant sender = (KurentoParticipant) kSession.getParticipantByPrivateId(senderPrivateId);
String subscriberEndpointName = kParticipant.calculateSubscriberEndpointName(sender);
WebrtcDebugEventOperation operation = forciblyReconnect
? WebrtcDebugEventOperation.forciblyReconnectSubscriber
: WebrtcDebugEventOperation.reconnectSubscriber;
if (initByServer) {
@ -1254,7 +1270,7 @@ public class KurentoSessionManager extends SessionManager {
final String sdpAnswer = sdpString;
CDR.log(new WebrtcDebugEvent(kParticipant, subscriberEndpointName, WebrtcDebugEventIssuer.client,
WebrtcDebugEventOperation.reconnectSubscriber, WebrtcDebugEventType.sdpAnswer, sdpAnswer));
operation, WebrtcDebugEventType.sdpAnswer, sdpAnswer));
kParticipant.receiveMedia(sender, sdpAnswer, true, true);
@ -1270,14 +1286,13 @@ public class KurentoSessionManager extends SessionManager {
String sdpOffer = sdpString;
CDR.log(new WebrtcDebugEvent(kParticipant, subscriberEndpointName, WebrtcDebugEventIssuer.client,
WebrtcDebugEventOperation.reconnectSubscriber, WebrtcDebugEventType.sdpOffer, sdpOffer));
operation, WebrtcDebugEventType.sdpOffer, sdpOffer));
String sdpOfferMunged = mungeSdpOffer(kSession, kParticipant, sdpOffer, false);
if (sdpOfferMunged != null) {
sdpOffer = sdpOfferMunged;
CDR.log(new WebrtcDebugEvent(kParticipant, subscriberEndpointName, WebrtcDebugEventIssuer.client,
WebrtcDebugEventOperation.reconnectSubscriber, WebrtcDebugEventType.sdpOfferMunged,
sdpOffer));
operation, WebrtcDebugEventType.sdpOfferMunged, sdpOffer));
}
kParticipant.cancelReceivingMedia(sender, null, true);
@ -1291,7 +1306,7 @@ public class KurentoSessionManager extends SessionManager {
kParticipant.getParticipantPublicId(), sdpAnswer);
CDR.log(new WebrtcDebugEvent(kParticipant, subscriberEndpointName, WebrtcDebugEventIssuer.server,
WebrtcDebugEventOperation.reconnectSubscriber, WebrtcDebugEventType.sdpAnswer, sdpAnswer));
operation, WebrtcDebugEventType.sdpAnswer, sdpAnswer));
sessionEventsHandler.onSubscribe(kParticipant, kSession, sdpAnswer, transactionId, null);

View File

@ -662,7 +662,13 @@ public class RpcHandler extends DefaultJsonRpcHandler<JsonObject> {
sessionManager.reconnectPublisher(participant, streamId, sdpString, request.getId());
} else {
boolean initByServer = request.getParams().has(ProtocolElements.RECONNECTSTREAM_SDPSTRING_PARAM);
sessionManager.reconnectSubscriber(participant, streamId, sdpString, request.getId(), initByServer);
boolean forciblyReconnectSubscriber = false;
if (request.getParams().has(ProtocolElements.RECONNECTSTREAM_FORCIBLYRECONNECT_PARAM)) {
forciblyReconnectSubscriber = getBooleanParam(request,
ProtocolElements.RECONNECTSTREAM_FORCIBLYRECONNECT_PARAM);
}
sessionManager.reconnectSubscriber(participant, streamId, sdpString, request.getId(), initByServer,
forciblyReconnectSubscriber);
}
} catch (OpenViduException e) {
this.notificationService.sendErrorResponse(participant.getParticipantPrivateId(), request.getId(),