openvidu-server: RPC reconnect method

pull/391/head
pabloFuente 2020-02-14 22:32:57 +01:00
parent 46f0ad564e
commit 13e3b52ff6
7 changed files with 182 additions and 68 deletions

View File

@ -155,6 +155,9 @@ public abstract class SessionManager {
public abstract Participant publishIpcam(Session session, MediaOptions mediaOptions, String serverMetadata)
throws Exception;
public abstract void reconnectStream(Participant participant, String streamId, String sdpOffer,
Integer transactionId);
public abstract String getParticipantPrivateIdFromStreamId(String sessionId, String streamId)
throws OpenViduException;

View File

@ -34,6 +34,7 @@ import org.kurento.client.Filter;
import org.kurento.client.IceCandidate;
import org.kurento.client.MediaElement;
import org.kurento.client.MediaPipeline;
import org.kurento.client.PassThrough;
import org.kurento.client.internal.server.KurentoServerException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -87,7 +88,7 @@ public class KurentoParticipant extends Participant {
if (!OpenViduRole.SUBSCRIBER.equals(participant.getToken().getRole())) {
// Initialize a PublisherEndpoint
this.publisher = new PublisherEndpoint(endpointType, this, participant.getParticipantPublicId(),
this.session.getPipeline(), this.openviduConfig);
this.session.getPipeline(), this.openviduConfig, null);
}
for (Participant other : session.getParticipants()) {
@ -100,24 +101,25 @@ public class KurentoParticipant extends Participant {
}
}
public void createPublishingEndpoint(MediaOptions mediaOptions) {
public void createPublishingEndpoint(MediaOptions mediaOptions, String streamId) {
String type = mediaOptions.hasVideo() ? mediaOptions.getTypeOfVideo() : "MICRO";
final String publisherStreamId = IdentifierPrefixes.STREAM_ID + type.substring(0, Math.min(type.length(), 3))
+ "_" + RandomStringUtils.randomAlphabetic(1).toUpperCase() + RandomStringUtils.randomAlphanumeric(3)
if (streamId == null) {
streamId = IdentifierPrefixes.STREAM_ID + type.substring(0, Math.min(type.length(), 3)) + "_"
+ RandomStringUtils.randomAlphabetic(1).toUpperCase() + RandomStringUtils.randomAlphanumeric(3)
+ "_" + this.getParticipantPublicId();
publisher.setStreamId(publisherStreamId);
publisher.setEndpointName(publisherStreamId);
}
publisher.setStreamId(streamId);
publisher.setEndpointName(streamId);
publisher.setMediaOptions(mediaOptions);
publisher.createEndpoint(publisherLatch);
if (getPublisher().getEndpoint() == null) {
throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE, "Unable to create publisher endpoint");
}
this.publisher.getEndpoint().setName(publisherStreamId);
publisher.getEndpoint().setName(streamId);
endpointConfig.addEndpointListeners(this.publisher, "publisher");
// Remove streamId from publisher's map
// Put streamId in publisher's map
this.session.publishedStreamIds.putIfAbsent(this.getPublisherStreamId(), this.getParticipantPrivateId());
}
@ -155,6 +157,10 @@ public class KurentoParticipant extends Participant {
return this.publisher;
}
public SubscriberEndpoint getSubscriber(String senderPublicId) {
return this.subscribers.get(senderPublicId);
}
public Collection<SubscriberEndpoint> getSubscribers() {
return this.subscribers.values();
}
@ -171,7 +177,7 @@ public class KurentoParticipant extends Participant {
return session;
}
public String publishToRoom(SdpType sdpType, String sdpString, boolean doLoopback) {
public String publishToRoom(SdpType sdpType, String sdpString, boolean doLoopback, boolean silent) {
log.info("PARTICIPANT {}: Request to publish video in room {} (sdp type {})", this.getParticipantPublicId(),
this.session.getSessionId(), sdpType);
log.trace("PARTICIPANT {}: Publishing Sdp ({}) is {}", this.getParticipantPublicId(), sdpType, sdpString);
@ -188,8 +194,10 @@ public class KurentoParticipant extends Participant {
this.recordingManager.startOneIndividualStreamRecording(session, null, null, this);
}
if (!silent) {
endpointConfig.getCdr().recordNewPublisher(this, session.getSessionId(), publisher.getStreamId(),
publisher.getMediaOptions(), publisher.createdAt());
}
return sdpResponse;
}
@ -199,12 +207,12 @@ public class KurentoParticipant extends Participant {
this.session.getSessionId());
final MediaOptions mediaOptions = this.getPublisher().getMediaOptions();
releasePublisherEndpoint(reason, kmsDisconnectionTime);
resetPublisherEndpoint(mediaOptions);
resetPublisherEndpoint(mediaOptions, null);
log.info("PARTICIPANT {}: released publisher endpoint and left it initialized (ready for future streaming)",
this.getParticipantPublicId());
}
public String receiveMediaFrom(Participant sender, String sdpOffer) {
public String receiveMediaFrom(Participant sender, String sdpOffer, boolean silent) {
final String senderName = sender.getParticipantPublicId();
log.info("PARTICIPANT {}: Request to receive media from {} in room {}", this.getParticipantPublicId(),
@ -272,7 +280,7 @@ public class KurentoParticipant extends Participant {
log.info("PARTICIPANT {}: Is now receiving video from {} in room {}", this.getParticipantPublicId(),
senderName, this.session.getSessionId());
if (!ProtocolElements.RECORDER_PARTICIPANT_PUBLICID.equals(this.getParticipantPublicId())) {
if (!silent && !ProtocolElements.RECORDER_PARTICIPANT_PUBLICID.equals(this.getParticipantPublicId())) {
endpointConfig.getCdr().recordNewSubscriber(this, this.session.getSessionId(),
sender.getPublisherStreamId(), sender.getParticipantPublicId(), subscriber.createdAt());
}
@ -287,12 +295,12 @@ public class KurentoParticipant extends Participant {
log.error("Exception connecting subscriber endpoint " + "to publisher endpoint", e);
}
this.subscribers.remove(senderName);
releaseSubscriberEndpoint((KurentoParticipant) sender, subscriber, null);
releaseSubscriberEndpoint((KurentoParticipant) sender, subscriber, null, false);
}
return null;
}
public void cancelReceivingMedia(KurentoParticipant senderKurentoParticipant, EndReason reason) {
public void cancelReceivingMedia(KurentoParticipant senderKurentoParticipant, EndReason reason, boolean silent) {
final String senderName = senderKurentoParticipant.getParticipantPublicId();
log.info("PARTICIPANT {}: cancel receiving media from {}", this.getParticipantPublicId(), senderName);
@ -301,7 +309,7 @@ public class KurentoParticipant extends Participant {
log.warn("PARTICIPANT {}: Trying to cancel receiving video from user {}. "
+ "But there is no such subscriber endpoint.", this.getParticipantPublicId(), senderName);
} else {
releaseSubscriberEndpoint(senderKurentoParticipant, subscriberEndpoint, reason);
releaseSubscriberEndpoint(senderKurentoParticipant, subscriberEndpoint, reason, silent);
log.info("PARTICIPANT {}: stopped receiving media from {} in room {}", this.getParticipantPublicId(),
senderName, this.session.getSessionId());
}
@ -321,9 +329,10 @@ public class KurentoParticipant extends Participant {
final SubscriberEndpoint subscriber = entry.getValue();
it.remove();
if (subscriber != null && subscriber.getEndpoint() != null) {
releaseSubscriberEndpoint(
(KurentoParticipant) this.session.getParticipantByPublicId(remoteParticipantName), subscriber,
reason);
reason, false);
log.debug("PARTICIPANT {}: Released subscriber endpoint to {}", this.getParticipantPublicId(),
remoteParticipantName);
} else {
@ -413,7 +422,7 @@ public class KurentoParticipant extends Participant {
}
private void releaseSubscriberEndpoint(KurentoParticipant senderKurentoParticipant, SubscriberEndpoint subscriber,
EndReason reason) {
EndReason reason, boolean silent) {
final String senderName = senderKurentoParticipant.getParticipantPublicId();
if (subscriber != null) {
@ -424,6 +433,8 @@ public class KurentoParticipant extends Participant {
releaseElement(senderName, subscriber.getEndpoint());
if (!silent) {
// Stop PlayerEndpoint of IP CAM if last subscriber disconnected
final PublisherEndpoint senderPublisher = senderKurentoParticipant.publisher;
if (senderPublisher != null) {
@ -435,7 +446,8 @@ public class KurentoParticipant extends Participant {
if (senderPublisher.isPlayerEndpoint() && senderPublisher.numberOfSubscribers == 0) {
try {
senderPublisher.getPlayerEndpoint().stop();
log.info("IP Camera stream {} feed is now disabled because there are no subscribers",
log.info(
"IP Camera stream {} feed is now disabled because there are no subscribers",
senderPublisher.getStreamId());
} catch (Exception e) {
log.info("Error while disabling feed for IP camera {}: {}",
@ -451,13 +463,14 @@ public class KurentoParticipant extends Participant {
subscriber.getStreamId(), reason);
}
}
} else {
log.warn("PARTICIPANT {}: Trying to release subscriber endpoint for '{}' but is null",
this.getParticipantPublicId(), senderName);
}
}
private void releaseElement(final String senderName, final MediaElement element) {
void releaseElement(final String senderName, final MediaElement element) {
final String eid = element.getId();
try {
element.release(new Continuation<Void>() {
@ -488,11 +501,12 @@ public class KurentoParticipant extends Participant {
return this.publisher.getStreamId();
}
public void resetPublisherEndpoint(MediaOptions mediaOptions) {
public void resetPublisherEndpoint(MediaOptions mediaOptions, PassThrough passThru) {
log.info("Resetting publisher endpoint for participant {}", this.getParticipantPublicId());
this.publisher = new PublisherEndpoint(endpointType, this, this.getParticipantPublicId(),
this.session.getPipeline(), this.openviduConfig);
this.session.getPipeline(), this.openviduConfig, passThru);
this.publisher.setMediaOptions(mediaOptions);
this.publisherLatch = new CountDownLatch(1);
}
@Override

View File

@ -108,7 +108,7 @@ public class KurentoSession extends Session {
if (participant.equals(subscriber)) {
continue;
}
((KurentoParticipant) subscriber).cancelReceivingMedia((KurentoParticipant) participant, reason);
((KurentoParticipant) subscriber).cancelReceivingMedia((KurentoParticipant) participant, reason, false);
}
log.debug("SESSION {}: Unsubscribed other participants {} from the publisher {}", sessionId,
@ -187,7 +187,7 @@ public class KurentoSession extends Session {
log.debug("SESSION {}: Cancel receiving media from participant '{}' for other participant", this.sessionId,
participant.getParticipantPublicId());
for (Participant other : participants.values()) {
((KurentoParticipant) other).cancelReceivingMedia(removedParticipant, reason);
((KurentoParticipant) other).cancelReceivingMedia(removedParticipant, reason, false);
}
}
@ -329,8 +329,9 @@ public class KurentoSession extends Session {
}
getParticipants().forEach(p -> {
if (!OpenViduRole.SUBSCRIBER.equals(p.getToken().getRole())) {
((KurentoParticipant) p)
.resetPublisherEndpoint(mediaOptionsMap.get(p.getParticipantPublicId()));
((KurentoParticipant) p).resetPublisherEndpoint(mediaOptionsMap.get(p.getParticipantPublicId()),
null);
}
});
log.info(

View File

@ -33,6 +33,7 @@ import org.apache.commons.lang3.RandomStringUtils;
import org.kurento.client.GenericMediaElement;
import org.kurento.client.IceCandidate;
import org.kurento.client.ListenerSubscription;
import org.kurento.client.PassThrough;
import org.kurento.jsonrpc.Props;
import org.kurento.jsonrpc.message.Request;
import org.slf4j.Logger;
@ -281,7 +282,7 @@ public class KurentoSessionManager extends SessionManager {
SdpType sdpType = kurentoOptions.isOffer ? SdpType.OFFER : SdpType.ANSWER;
KurentoSession kSession = kParticipant.getSession();
kParticipant.createPublishingEndpoint(mediaOptions);
kParticipant.createPublishingEndpoint(mediaOptions, null);
/*
* for (MediaElement elem : kurentoOptions.mediaElements) {
@ -305,7 +306,7 @@ public class KurentoSessionManager extends SessionManager {
}
}
sdpAnswer = kParticipant.publishToRoom(sdpType, kurentoOptions.sdpOffer, kurentoOptions.doLoopback);
sdpAnswer = kParticipant.publishToRoom(sdpType, kurentoOptions.sdpOffer, kurentoOptions.doLoopback, false);
if (sdpAnswer == null) {
OpenViduException e = new OpenViduException(Code.MEDIA_SDP_ERROR_CODE,
@ -416,7 +417,7 @@ public class KurentoSessionManager extends SessionManager {
"User '" + senderName + " not streaming media in session '" + session.getSessionId() + "'");
}
sdpAnswer = kParticipant.receiveMediaFrom(senderParticipant, sdpOffer);
sdpAnswer = kParticipant.receiveMediaFrom(senderParticipant, sdpOffer, false);
if (sdpAnswer == null) {
throw new OpenViduException(Code.MEDIA_SDP_ERROR_CODE,
"Unable to generate SDP answer when subscribing '" + participant.getParticipantPublicId()
@ -448,7 +449,7 @@ public class KurentoSessionManager extends SessionManager {
"User " + senderName + " not found in session " + session.getSessionId());
}
kParticipant.cancelReceivingMedia((KurentoParticipant) sender, EndReason.unsubscribe);
kParticipant.cancelReceivingMedia((KurentoParticipant) sender, EndReason.unsubscribe, false);
sessionEventsHandler.onUnsubscribe(participant, transactionId, null);
}
@ -924,6 +925,57 @@ public class KurentoSessionManager extends SessionManager {
return kParticipant;
}
@Override
public void reconnectStream(Participant participant, String streamId, String sdpOffer, Integer transactionId) {
KurentoParticipant kParticipant = (KurentoParticipant) participant;
KurentoSession kSession = kParticipant.getSession();
if (streamId.equals(participant.getPublisherStreamId())) {
// Reconnect publisher
final KurentoMediaOptions kurentoOptions = (KurentoMediaOptions) kParticipant.getPublisher()
.getMediaOptions();
// 1) Disconnect broken PublisherEndpoint from its PassThrough
PublisherEndpoint publisher = kParticipant.getPublisher();
final PassThrough passThru = publisher.disconnectFromPassThrough();
// 2) Destroy the broken PublisherEndpoint and nothing else
if (publisher.kmsWebrtcStatsThread != null) {
publisher.kmsWebrtcStatsThread.cancel(true);
}
kParticipant.releaseElement(participant.getParticipantPublicId(), publisher.getEndpoint());
// 3) Create a new PublisherEndpoint connecting it to the previous PassThrough
kParticipant.resetPublisherEndpoint(kurentoOptions, passThru);
kParticipant.createPublishingEndpoint(kurentoOptions, streamId);
SdpType sdpType = kurentoOptions.isOffer ? SdpType.OFFER : SdpType.ANSWER;
String sdpAnswer = kParticipant.publishToRoom(sdpType, sdpOffer, kurentoOptions.doLoopback, true);
sessionEventsHandler.onPublishMedia(participant, participant.getPublisherStreamId(),
kParticipant.getPublisher().createdAt(), kSession.getSessionId(), kurentoOptions, sdpAnswer,
new HashSet<Participant>(), transactionId, null);
} else {
// Reconnect subscriber
String senderPrivateId = kSession.getParticipantPrivateIdFromStreamId(streamId);
if (senderPrivateId != null) {
KurentoParticipant sender = (KurentoParticipant) kSession.getParticipantByPrivateId(senderPrivateId);
kParticipant.cancelReceivingMedia(sender, null, true);
String sdpAnswer = kParticipant.receiveMediaFrom(sender, sdpOffer, true);
if (sdpAnswer == null) {
throw new OpenViduException(Code.MEDIA_SDP_ERROR_CODE,
"Unable to generate SDP answer when reconnecting subscriber to '" + streamId + "'");
}
sessionEventsHandler.onSubscribe(participant, kSession, sdpAnswer, transactionId, null);
} else {
throw new OpenViduException(Code.USER_NOT_STREAMING_ERROR_CODE,
"Stream '" + streamId + "' does not exist in Session '" + kSession.getSessionId() + "'");
}
}
}
@Override
public String getParticipantPrivateIdFromStreamId(String sessionId, String streamId) {
Session session = this.getSession(sessionId);

View File

@ -24,6 +24,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import org.kurento.client.BaseRtpEndpoint;
import org.kurento.client.Continuation;
import org.kurento.client.Endpoint;
import org.kurento.client.ErrorEvent;
@ -32,7 +33,7 @@ import org.kurento.client.IceCandidate;
import org.kurento.client.ListenerSubscription;
import org.kurento.client.MediaElement;
import org.kurento.client.MediaPipeline;
import org.kurento.client.OnIceCandidateEvent;
import org.kurento.client.PassThrough;
import org.kurento.client.PlayerEndpoint;
import org.kurento.client.RtpEndpoint;
import org.kurento.client.SdpEndpoint;
@ -155,7 +156,8 @@ public abstract class MediaEndpoint {
}
/**
* @return the internal endpoint ({@link RtpEndpoint} or {@link WebRtcEndpoint})
* @return the internal endpoint ({@link RtpEndpoint} or {@link WebRtcEndpoint}
* or {@link PlayerEndpoint})
*/
public Endpoint getEndpoint() {
if (this.isWeb()) {
@ -167,6 +169,13 @@ public abstract class MediaEndpoint {
}
}
public BaseRtpEndpoint getBaseRtpEndpoint() {
if (this.isWeb()) {
return this.webEndpoint;
}
return this.endpoint;
}
public long createdAt() {
return this.createdAt;
}

View File

@ -77,16 +77,19 @@ public class PublisherEndpoint extends MediaEndpoint {
public int numberOfSubscribers = 0;
public PublisherEndpoint(EndpointType endpointType, KurentoParticipant owner, String endpointName,
MediaPipeline pipeline, OpenviduConfig openviduConfig) {
MediaPipeline pipeline, OpenviduConfig openviduConfig, PassThrough passThru) {
super(endpointType, owner, endpointName, pipeline, openviduConfig, log);
this.passThru = passThru;
}
@Override
protected void internalEndpointInitialization(final CountDownLatch endpointLatch) {
super.internalEndpointInitialization(endpointLatch);
if (this.passThru == null) {
passThru = new PassThrough.Builder(getPipeline()).build();
passThruSubscription = registerElemErrListener(passThru);
}
}
@Override
public synchronized void unregisterErrorListeners() {
@ -404,6 +407,11 @@ public class PublisherEndpoint extends MediaEndpoint {
}
}
public synchronized PassThrough disconnectFromPassThrough() {
this.internalSinkDisconnect(this.getWebEndpoint(), this.passThru);
return this.passThru;
}
private String getNext(String uid) {
int idx = elementIds.indexOf(uid);
if (idx < 0 || idx + 1 == elementIds.size()) {

View File

@ -162,6 +162,9 @@ public class RpcHandler extends DefaultJsonRpcHandler<JsonObject> {
case ProtocolElements.REMOVEFILTEREVENTLISTENER_METHOD:
removeFilterEventListener(rpcConnection, request);
break;
case ProtocolElements.RECONNECTSTREAM_METHOD:
reconnectStream(rpcConnection, request);
break;
default:
log.error("Unrecognized request {}", request);
break;
@ -602,6 +605,23 @@ public class RpcHandler extends DefaultJsonRpcHandler<JsonObject> {
}
}
private void reconnectStream(RpcConnection rpcConnection, Request<JsonObject> request) {
Participant participant;
try {
participant = sanityCheckOfSession(rpcConnection, "reconnectStream");
} catch (OpenViduException e) {
return;
}
String streamId = getStringParam(request, ProtocolElements.RECONNECTSTREAM_STREAM_PARAM);
String sdpOffer = getStringParam(request, ProtocolElements.RECONNECTSTREAM_SDPOFFER_PARAM);
try {
sessionManager.reconnectStream(participant, streamId, sdpOffer, request.getId());
} catch (OpenViduException e) {
this.notificationService.sendErrorResponse(participant.getParticipantPrivateId(), request.getId(),
new JsonObject(), e);
}
}
public void leaveRoomAfterConnClosed(String participantPrivateId, EndReason reason) {
try {
sessionManager.evictParticipant(this.sessionManager.getParticipant(participantPrivateId), null, null,
@ -670,11 +690,17 @@ public class RpcHandler extends DefaultJsonRpcHandler<JsonObject> {
}
}
@Override
public void afterReconnection(Session rpcSession) throws Exception {
log.info("After reconnection for WebSocket session: {}", rpcSession.getSessionId());
}
@Override
public void handleTransportError(Session rpcSession, Throwable exception) throws Exception {
if (rpcSession != null) {
log.error("Transport exception for WebSocket session: {} - Exception: {}", rpcSession.getSessionId(),
exception.getMessage());
if ("IOException".equals(exception.getClass().getSimpleName())
if ("IOException".equals(exception.getClass().getSimpleName()) && exception.getCause() != null
&& "Broken pipe".equals(exception.getCause().getMessage())) {
log.warn("Parcipant with private id {} unexpectedly closed the websocket", rpcSession.getSessionId());
}
@ -684,6 +710,7 @@ public class RpcHandler extends DefaultJsonRpcHandler<JsonObject> {
this.webSocketEOFTransportError.put(rpcSession.getSessionId(), true);
}
}
}
@Override
public void handleUncaughtException(Session rpcSession, Exception exception) {