openvidu-server: blocking and non-blocking versions of endpoint connection

pull/621/head
pabloFuente 2021-03-12 11:25:13 +01:00
parent 17ea55ad07
commit 2e40d14432
4 changed files with 106 additions and 91 deletions

View File

@ -175,45 +175,42 @@ public class PublisherEndpoint extends MediaEndpoint {
/** /**
* Initializes this media endpoint for publishing media and processes the SDP * Initializes this media endpoint for publishing media and processes the SDP
* offer or answer. If the internal endpoint is an {@link WebRtcEndpoint}, it * offer. If the internal endpoint is an {@link WebRtcEndpoint}, it first
* first registers an event listener for the ICE candidates and instructs the * registers an event listener for the ICE candidates and instructs the endpoint
* endpoint to start gathering the candidates. If required, it connects to * to start gathering the candidates. If required, it connects to itself (after
* itself (after applying the intermediate media elements and the * applying the intermediate media elements and the {@link PassThrough}) to
* {@link PassThrough}) to allow loopback of the media stream. * allow loopback of the media stream.
* *
* @param sdpOffer offer from the remote peer * @param sdpOffer SDP offer from the remote peer
* @param doLoopback loopback flag * @param doLoopback loopback flag
* @param loopbackAlternativeSrc alternative loopback source * @return the SDP answer
* @param loopbackConnectionType how to connect the loopback source
* @return the SDP response (the answer if processing an offer SDP, otherwise is
* the updated offer generated previously by this endpoint)
*/ */
public synchronized String publish(String sdpOffer, boolean doLoopback) { public synchronized String publish(String sdpOffer, boolean doLoopback) {
String sdpResponse = processOffer(sdpOffer);
registerOnIceCandidateEventListener(this.getOwner().getParticipantPublicId()); registerOnIceCandidateEventListener(this.getOwner().getParticipantPublicId());
if (doLoopback) { if (doLoopback) {
connect(this.getEndpoint()); connect(this.getEndpoint(), false);
} else { } else {
innerConnect(); innerConnect(false);
} }
this.createdAt = System.currentTimeMillis(); this.createdAt = System.currentTimeMillis();
String sdpResponse = processOffer(sdpOffer);
gatherCandidates(); gatherCandidates();
return sdpResponse; return sdpResponse;
} }
public synchronized void connect(MediaElement sink) { public synchronized void connect(MediaElement sink, boolean blocking) {
if (!connected) { if (!connected) {
innerConnect(); innerConnect(blocking);
} }
internalSinkConnect(passThru, sink); internalSinkConnect(passThru, sink, blocking);
this.enableIpCameraIfNecessary(); this.enableIpCameraIfNecessary();
} }
public synchronized void connect(MediaElement sink, MediaType type) { public synchronized void connect(MediaElement sink, MediaType type, boolean blocking) {
if (!connected) { if (!connected) {
innerConnect(); innerConnect(blocking);
} }
internalSinkConnect(passThru, sink, type); internalSinkConnect(passThru, sink, type, blocking);
this.enableIpCameraIfNecessary(); this.enableIpCameraIfNecessary();
} }
@ -231,7 +228,7 @@ public class PublisherEndpoint extends MediaEndpoint {
} }
public synchronized void disconnectFrom(MediaElement sink) { public synchronized void disconnectFrom(MediaElement sink) {
internalSinkDisconnect(passThru, sink); internalSinkDisconnect(passThru, sink, false);
} }
/** /**
@ -277,11 +274,11 @@ public class PublisherEndpoint extends MediaEndpoint {
} }
if (connected) { if (connected) {
if (first != null) { if (first != null) {
internalSinkConnect(first, shaper, type); internalSinkConnect(first, shaper, type, false);
} else { } else {
internalSinkConnect(this.getEndpoint(), shaper, type); internalSinkConnect(this.getEndpoint(), shaper, type, false);
} }
internalSinkConnect(shaper, passThru, type); internalSinkConnect(shaper, passThru, type, false);
} }
elementIds.addFirst(id); elementIds.addFirst(id);
elements.put(id, shaper); elements.put(id, shaper);
@ -331,7 +328,7 @@ public class PublisherEndpoint extends MediaEndpoint {
} else { } else {
prev = passThru; prev = passThru;
} }
internalSinkConnect(next, prev); internalSinkConnect(next, prev, false);
} }
elementIds.remove(elementId); elementIds.remove(elementId);
if (releaseElement) { if (releaseElement) {
@ -370,13 +367,13 @@ public class PublisherEndpoint extends MediaEndpoint {
} }
switch (muteType) { switch (muteType) {
case ALL: case ALL:
internalSinkDisconnect(this.getEndpoint(), sink); internalSinkDisconnect(this.getEndpoint(), sink, false);
break; break;
case AUDIO: case AUDIO:
internalSinkDisconnect(this.getEndpoint(), sink, MediaType.AUDIO); internalSinkDisconnect(this.getEndpoint(), sink, MediaType.AUDIO, false);
break; break;
case VIDEO: case VIDEO:
internalSinkDisconnect(this.getEndpoint(), sink, MediaType.VIDEO); internalSinkDisconnect(this.getEndpoint(), sink, MediaType.VIDEO, false);
break; break;
} }
} }
@ -396,19 +393,19 @@ public class PublisherEndpoint extends MediaEndpoint {
} }
switch (muteType) { switch (muteType) {
case ALL: case ALL:
internalSinkConnect(this.getEndpoint(), sink); internalSinkConnect(this.getEndpoint(), sink, false);
break; break;
case AUDIO: case AUDIO:
internalSinkConnect(this.getEndpoint(), sink, MediaType.AUDIO); internalSinkConnect(this.getEndpoint(), sink, MediaType.AUDIO, false);
break; break;
case VIDEO: case VIDEO:
internalSinkConnect(this.getEndpoint(), sink, MediaType.VIDEO); internalSinkConnect(this.getEndpoint(), sink, MediaType.VIDEO, false);
break; break;
} }
} }
public synchronized PassThrough disconnectFromPassThrough() { public synchronized PassThrough disconnectFromPassThrough() {
this.internalSinkDisconnect(this.getWebEndpoint(), this.passThru); this.internalSinkDisconnect(this.getWebEndpoint(), this.passThru, false);
return this.passThru; return this.passThru;
} }
@ -428,7 +425,7 @@ public class PublisherEndpoint extends MediaEndpoint {
return elementIds.get(idx - 1); return elementIds.get(idx - 1);
} }
private void innerConnect() { private void innerConnect(boolean blocking) {
if (this.getEndpoint() == null) { if (this.getEndpoint() == null) {
throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE, throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE,
"Can't connect null endpoint (ep: " + getEndpointName() + ")"); "Can't connect null endpoint (ep: " + getEndpointName() + ")");
@ -441,15 +438,18 @@ public class PublisherEndpoint extends MediaEndpoint {
throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE, throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE,
"No media element with id " + prevId + " (ep: " + getEndpointName() + ")"); "No media element with id " + prevId + " (ep: " + getEndpointName() + ")");
} }
internalSinkConnect(current, prev); internalSinkConnect(current, prev, blocking);
current = prev; current = prev;
prevId = getPrevious(prevId); prevId = getPrevious(prevId);
} }
internalSinkConnect(current, passThru); internalSinkConnect(current, passThru, blocking);
connected = true; connected = true;
} }
private void internalSinkConnect(final MediaElement source, final MediaElement sink) { private void internalSinkConnect(final MediaElement source, final MediaElement sink, boolean blocking) {
if (blocking) {
source.connect(sink);
} else {
source.connect(sink, new Continuation<Void>() { source.connect(sink, new Continuation<Void>() {
@Override @Override
public void onSuccess(Void result) throws Exception { public void onSuccess(Void result) throws Exception {
@ -464,6 +464,7 @@ public class PublisherEndpoint extends MediaEndpoint {
} }
}); });
} }
}
/** /**
* Same as {@link #internalSinkConnect(MediaElement, MediaElement)}, but can * Same as {@link #internalSinkConnect(MediaElement, MediaElement)}, but can
@ -476,15 +477,19 @@ public class PublisherEndpoint extends MediaEndpoint {
* be used instead * be used instead
* @see #internalSinkConnect(MediaElement, MediaElement) * @see #internalSinkConnect(MediaElement, MediaElement)
*/ */
private void internalSinkConnect(final MediaElement source, final MediaElement sink, final MediaType type) { private void internalSinkConnect(final MediaElement source, final MediaElement sink, final MediaType type,
boolean blocking) {
if (type == null) { if (type == null) {
internalSinkConnect(source, sink); internalSinkConnect(source, sink, blocking);
} else {
if (blocking) {
source.connect(sink, type);
} else { } else {
source.connect(sink, type, new Continuation<Void>() { source.connect(sink, type, new Continuation<Void>() {
@Override @Override
public void onSuccess(Void result) throws Exception { public void onSuccess(Void result) throws Exception {
log.debug("EP {}: {} media elements have been connected (source {} -> sink {})", getEndpointName(), log.debug("EP {}: {} media elements have been connected (source {} -> sink {})",
type, source.getId(), sink.getId()); getEndpointName(), type, source.getId(), sink.getId());
} }
@Override @Override
@ -495,8 +500,12 @@ public class PublisherEndpoint extends MediaEndpoint {
}); });
} }
} }
}
private void internalSinkDisconnect(final MediaElement source, final MediaElement sink) { private void internalSinkDisconnect(final MediaElement source, final MediaElement sink, boolean blocking) {
if (blocking) {
source.disconnect(sink);
} else {
source.disconnect(sink, new Continuation<Void>() { source.disconnect(sink, new Continuation<Void>() {
@Override @Override
public void onSuccess(Void result) throws Exception { public void onSuccess(Void result) throws Exception {
@ -511,6 +520,7 @@ public class PublisherEndpoint extends MediaEndpoint {
} }
}); });
} }
}
/** /**
* Same as {@link #internalSinkDisconnect(MediaElement, MediaElement)}, but can * Same as {@link #internalSinkDisconnect(MediaElement, MediaElement)}, but can
@ -523,9 +533,13 @@ public class PublisherEndpoint extends MediaEndpoint {
* be used instead * be used instead
* @see #internalSinkConnect(MediaElement, MediaElement) * @see #internalSinkConnect(MediaElement, MediaElement)
*/ */
private void internalSinkDisconnect(final MediaElement source, final MediaElement sink, final MediaType type) { private void internalSinkDisconnect(final MediaElement source, final MediaElement sink, final MediaType type,
boolean blocking) {
if (type == null) { if (type == null) {
internalSinkDisconnect(source, sink); internalSinkDisconnect(source, sink, blocking);
} else {
if (blocking) {
source.disconnect(sink, type);
} else { } else {
source.disconnect(sink, type, new Continuation<Void>() { source.disconnect(sink, type, new Continuation<Void>() {
@Override @Override
@ -536,12 +550,13 @@ public class PublisherEndpoint extends MediaEndpoint {
@Override @Override
public void onError(Throwable cause) throws Exception { public void onError(Throwable cause) throws Exception {
log.warn("EP {}: Failed to disconnect {} media elements (source {} -> sink {})", getEndpointName(), log.warn("EP {}: Failed to disconnect {} media elements (source {} -> sink {})",
type, source.getId(), sink.getId(), cause); getEndpointName(), type, source.getId(), sink.getId(), cause);
} }
}); });
} }
} }
}
public MediaOptions getMediaOptions() { public MediaOptions getMediaOptions() {
return mediaOptions; return mediaOptions;

View File

@ -50,7 +50,7 @@ public class SubscriberEndpoint extends MediaEndpoint {
this.createdAt = System.currentTimeMillis(); this.createdAt = System.currentTimeMillis();
String sdpAnswer = processOffer(sdpOffer); String sdpAnswer = processOffer(sdpOffer);
gatherCandidates(); gatherCandidates();
publisher.connect(this.getEndpoint()); publisher.connect(this.getEndpoint(), false);
this.publisherStreamId = publisher.getStreamId(); this.publisherStreamId = publisher.getStreamId();
return sdpAnswer; return sdpAnswer;
} }

View File

@ -111,7 +111,7 @@ public class CompositeWrapper {
public void connectPublisherEndpoint(PublisherEndpoint endpoint) throws OpenViduException { public void connectPublisherEndpoint(PublisherEndpoint endpoint) throws OpenViduException {
HubPort hubPort = new HubPort.Builder(composite).build(); HubPort hubPort = new HubPort.Builder(composite).build();
endpoint.connect(hubPort); endpoint.connect(hubPort, false);
String streamId = endpoint.getOwner().getPublisherStreamId(); String streamId = endpoint.getOwner().getPublisherStreamId();
this.hubPorts.put(streamId, hubPort); this.hubPorts.put(streamId, hubPort);
this.publisherEndpoints.put(streamId, endpoint); this.publisherEndpoints.put(streamId, endpoint);

View File

@ -392,14 +392,14 @@ public class SingleStreamRecordingService extends RecordingService {
MediaProfileSpecType profile) { MediaProfileSpecType profile) {
switch (profile) { switch (profile) {
case WEBM: case WEBM:
publisherEndpoint.connect(recorder, MediaType.AUDIO); publisherEndpoint.connect(recorder, MediaType.AUDIO, false);
publisherEndpoint.connect(recorder, MediaType.VIDEO); publisherEndpoint.connect(recorder, MediaType.VIDEO, false);
break; break;
case WEBM_AUDIO_ONLY: case WEBM_AUDIO_ONLY:
publisherEndpoint.connect(recorder, MediaType.AUDIO); publisherEndpoint.connect(recorder, MediaType.AUDIO, false);
break; break;
case WEBM_VIDEO_ONLY: case WEBM_VIDEO_ONLY:
publisherEndpoint.connect(recorder, MediaType.VIDEO); publisherEndpoint.connect(recorder, MediaType.VIDEO, false);
break; break;
default: default:
throw new UnsupportedOperationException("Unsupported profile when single stream recording: " + profile); throw new UnsupportedOperationException("Unsupported profile when single stream recording: " + profile);