diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/PublisherEndpoint.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/PublisherEndpoint.java index abd18100..e1281815 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/PublisherEndpoint.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/PublisherEndpoint.java @@ -175,45 +175,42 @@ public class PublisherEndpoint extends MediaEndpoint { /** * Initializes this media endpoint for publishing media and processes the SDP - * offer or answer. If the internal endpoint is an {@link WebRtcEndpoint}, it - * first registers an event listener for the ICE candidates and instructs the - * endpoint to start gathering the candidates. If required, it connects to - * itself (after applying the intermediate media elements and the - * {@link PassThrough}) to allow loopback of the media stream. + * offer. If the internal endpoint is an {@link WebRtcEndpoint}, it first + * registers an event listener for the ICE candidates and instructs the endpoint + * to start gathering the candidates. If required, it connects to itself (after + * applying the intermediate media elements and the {@link PassThrough}) to + * allow loopback of the media stream. * - * @param sdpOffer offer from the remote peer - * @param doLoopback loopback flag - * @param loopbackAlternativeSrc alternative loopback source - * @param loopbackConnectionType how to connect the loopback source - * @return the SDP response (the answer if processing an offer SDP, otherwise is - * the updated offer generated previously by this endpoint) + * @param sdpOffer SDP offer from the remote peer + * @param doLoopback loopback flag + * @return the SDP answer */ public synchronized String publish(String sdpOffer, boolean doLoopback) { + String sdpResponse = processOffer(sdpOffer); registerOnIceCandidateEventListener(this.getOwner().getParticipantPublicId()); if (doLoopback) { - connect(this.getEndpoint()); + connect(this.getEndpoint(), false); } else { - innerConnect(); + innerConnect(false); } this.createdAt = System.currentTimeMillis(); - String sdpResponse = processOffer(sdpOffer); gatherCandidates(); return sdpResponse; } - public synchronized void connect(MediaElement sink) { + public synchronized void connect(MediaElement sink, boolean blocking) { if (!connected) { - innerConnect(); + innerConnect(blocking); } - internalSinkConnect(passThru, sink); + internalSinkConnect(passThru, sink, blocking); this.enableIpCameraIfNecessary(); } - public synchronized void connect(MediaElement sink, MediaType type) { + public synchronized void connect(MediaElement sink, MediaType type, boolean blocking) { if (!connected) { - innerConnect(); + innerConnect(blocking); } - internalSinkConnect(passThru, sink, type); + internalSinkConnect(passThru, sink, type, blocking); this.enableIpCameraIfNecessary(); } @@ -231,7 +228,7 @@ public class PublisherEndpoint extends MediaEndpoint { } public synchronized void disconnectFrom(MediaElement sink) { - internalSinkDisconnect(passThru, sink); + internalSinkDisconnect(passThru, sink, false); } /** @@ -277,11 +274,11 @@ public class PublisherEndpoint extends MediaEndpoint { } if (connected) { if (first != null) { - internalSinkConnect(first, shaper, type); + internalSinkConnect(first, shaper, type, false); } else { - internalSinkConnect(this.getEndpoint(), shaper, type); + internalSinkConnect(this.getEndpoint(), shaper, type, false); } - internalSinkConnect(shaper, passThru, type); + internalSinkConnect(shaper, passThru, type, false); } elementIds.addFirst(id); elements.put(id, shaper); @@ -331,7 +328,7 @@ public class PublisherEndpoint extends MediaEndpoint { } else { prev = passThru; } - internalSinkConnect(next, prev); + internalSinkConnect(next, prev, false); } elementIds.remove(elementId); if (releaseElement) { @@ -370,13 +367,13 @@ public class PublisherEndpoint extends MediaEndpoint { } switch (muteType) { case ALL: - internalSinkDisconnect(this.getEndpoint(), sink); + internalSinkDisconnect(this.getEndpoint(), sink, false); break; case AUDIO: - internalSinkDisconnect(this.getEndpoint(), sink, MediaType.AUDIO); + internalSinkDisconnect(this.getEndpoint(), sink, MediaType.AUDIO, false); break; case VIDEO: - internalSinkDisconnect(this.getEndpoint(), sink, MediaType.VIDEO); + internalSinkDisconnect(this.getEndpoint(), sink, MediaType.VIDEO, false); break; } } @@ -396,19 +393,19 @@ public class PublisherEndpoint extends MediaEndpoint { } switch (muteType) { case ALL: - internalSinkConnect(this.getEndpoint(), sink); + internalSinkConnect(this.getEndpoint(), sink, false); break; case AUDIO: - internalSinkConnect(this.getEndpoint(), sink, MediaType.AUDIO); + internalSinkConnect(this.getEndpoint(), sink, MediaType.AUDIO, false); break; case VIDEO: - internalSinkConnect(this.getEndpoint(), sink, MediaType.VIDEO); + internalSinkConnect(this.getEndpoint(), sink, MediaType.VIDEO, false); break; } } public synchronized PassThrough disconnectFromPassThrough() { - this.internalSinkDisconnect(this.getWebEndpoint(), this.passThru); + this.internalSinkDisconnect(this.getWebEndpoint(), this.passThru, false); return this.passThru; } @@ -428,7 +425,7 @@ public class PublisherEndpoint extends MediaEndpoint { return elementIds.get(idx - 1); } - private void innerConnect() { + private void innerConnect(boolean blocking) { if (this.getEndpoint() == null) { throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE, "Can't connect null endpoint (ep: " + getEndpointName() + ")"); @@ -441,28 +438,32 @@ public class PublisherEndpoint extends MediaEndpoint { throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE, "No media element with id " + prevId + " (ep: " + getEndpointName() + ")"); } - internalSinkConnect(current, prev); + internalSinkConnect(current, prev, blocking); current = prev; prevId = getPrevious(prevId); } - internalSinkConnect(current, passThru); + internalSinkConnect(current, passThru, blocking); connected = true; } - private void internalSinkConnect(final MediaElement source, final MediaElement sink) { - source.connect(sink, new Continuation() { - @Override - public void onSuccess(Void result) throws Exception { - log.debug("EP {}: Elements have been connected (source {} -> sink {})", getEndpointName(), - source.getId(), sink.getId()); - } + private void internalSinkConnect(final MediaElement source, final MediaElement sink, boolean blocking) { + if (blocking) { + source.connect(sink); + } else { + source.connect(sink, new Continuation() { + @Override + public void onSuccess(Void result) throws Exception { + log.debug("EP {}: Elements have been connected (source {} -> sink {})", getEndpointName(), + source.getId(), sink.getId()); + } - @Override - public void onError(Throwable cause) throws Exception { - log.warn("EP {}: Failed to connect media elements (source {} -> sink {})", getEndpointName(), - source.getId(), sink.getId(), cause); - } - }); + @Override + public void onError(Throwable cause) throws Exception { + log.warn("EP {}: Failed to connect media elements (source {} -> sink {})", getEndpointName(), + source.getId(), sink.getId(), cause); + } + }); + } } /** @@ -476,42 +477,51 @@ public class PublisherEndpoint extends MediaEndpoint { * be used instead * @see #internalSinkConnect(MediaElement, MediaElement) */ - private void internalSinkConnect(final MediaElement source, final MediaElement sink, final MediaType type) { + private void internalSinkConnect(final MediaElement source, final MediaElement sink, final MediaType type, + boolean blocking) { if (type == null) { - internalSinkConnect(source, sink); + internalSinkConnect(source, sink, blocking); } else { - source.connect(sink, type, new Continuation() { + if (blocking) { + source.connect(sink, type); + } else { + source.connect(sink, type, new Continuation() { + @Override + public void onSuccess(Void result) throws Exception { + log.debug("EP {}: {} media elements have been connected (source {} -> sink {})", + getEndpointName(), type, source.getId(), sink.getId()); + } + + @Override + public void onError(Throwable cause) throws Exception { + log.warn("EP {}: Failed to connect {} media elements (source {} -> sink {})", getEndpointName(), + type, source.getId(), sink.getId(), cause); + } + }); + } + } + } + + private void internalSinkDisconnect(final MediaElement source, final MediaElement sink, boolean blocking) { + if (blocking) { + source.disconnect(sink); + } else { + source.disconnect(sink, new Continuation() { @Override public void onSuccess(Void result) throws Exception { - log.debug("EP {}: {} media elements have been connected (source {} -> sink {})", getEndpointName(), - type, source.getId(), sink.getId()); + log.debug("EP {}: Elements have been disconnected (source {} -> sink {})", getEndpointName(), + source.getId(), sink.getId()); } @Override public void onError(Throwable cause) throws Exception { - log.warn("EP {}: Failed to connect {} media elements (source {} -> sink {})", getEndpointName(), - type, source.getId(), sink.getId(), cause); + log.warn("EP {}: Failed to disconnect media elements (source {} -> sink {})", getEndpointName(), + source.getId(), sink.getId(), cause); } }); } } - private void internalSinkDisconnect(final MediaElement source, final MediaElement sink) { - source.disconnect(sink, new Continuation() { - @Override - public void onSuccess(Void result) throws Exception { - log.debug("EP {}: Elements have been disconnected (source {} -> sink {})", getEndpointName(), - source.getId(), sink.getId()); - } - - @Override - public void onError(Throwable cause) throws Exception { - log.warn("EP {}: Failed to disconnect media elements (source {} -> sink {})", getEndpointName(), - source.getId(), sink.getId(), cause); - } - }); - } - /** * Same as {@link #internalSinkDisconnect(MediaElement, MediaElement)}, but can * specify the type of the media that will be disconnected. @@ -523,23 +533,28 @@ public class PublisherEndpoint extends MediaEndpoint { * be used instead * @see #internalSinkConnect(MediaElement, MediaElement) */ - private void internalSinkDisconnect(final MediaElement source, final MediaElement sink, final MediaType type) { + private void internalSinkDisconnect(final MediaElement source, final MediaElement sink, final MediaType type, + boolean blocking) { if (type == null) { - internalSinkDisconnect(source, sink); + internalSinkDisconnect(source, sink, blocking); } else { - source.disconnect(sink, type, new Continuation() { - @Override - public void onSuccess(Void result) throws Exception { - log.debug("EP {}: {} media elements have been disconnected (source {} -> sink {})", - getEndpointName(), type, source.getId(), sink.getId()); - } + if (blocking) { + source.disconnect(sink, type); + } else { + source.disconnect(sink, type, new Continuation() { + @Override + public void onSuccess(Void result) throws Exception { + log.debug("EP {}: {} media elements have been disconnected (source {} -> sink {})", + getEndpointName(), type, source.getId(), sink.getId()); + } - @Override - public void onError(Throwable cause) throws Exception { - log.warn("EP {}: Failed to disconnect {} media elements (source {} -> sink {})", getEndpointName(), - type, source.getId(), sink.getId(), cause); - } - }); + @Override + public void onError(Throwable cause) throws Exception { + log.warn("EP {}: Failed to disconnect {} media elements (source {} -> sink {})", + getEndpointName(), type, source.getId(), sink.getId(), cause); + } + }); + } } } diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/SubscriberEndpoint.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/SubscriberEndpoint.java index 215e576c..2e8079b8 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/SubscriberEndpoint.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/SubscriberEndpoint.java @@ -50,7 +50,7 @@ public class SubscriberEndpoint extends MediaEndpoint { this.createdAt = System.currentTimeMillis(); String sdpAnswer = processOffer(sdpOffer); gatherCandidates(); - publisher.connect(this.getEndpoint()); + publisher.connect(this.getEndpoint(), false); this.publisherStreamId = publisher.getStreamId(); return sdpAnswer; } diff --git a/openvidu-server/src/main/java/io/openvidu/server/recording/CompositeWrapper.java b/openvidu-server/src/main/java/io/openvidu/server/recording/CompositeWrapper.java index 4513f22d..1d66e9af 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/recording/CompositeWrapper.java +++ b/openvidu-server/src/main/java/io/openvidu/server/recording/CompositeWrapper.java @@ -111,7 +111,7 @@ public class CompositeWrapper { public void connectPublisherEndpoint(PublisherEndpoint endpoint) throws OpenViduException { HubPort hubPort = new HubPort.Builder(composite).build(); - endpoint.connect(hubPort); + endpoint.connect(hubPort, false); String streamId = endpoint.getOwner().getPublisherStreamId(); this.hubPorts.put(streamId, hubPort); this.publisherEndpoints.put(streamId, endpoint); diff --git a/openvidu-server/src/main/java/io/openvidu/server/recording/service/SingleStreamRecordingService.java b/openvidu-server/src/main/java/io/openvidu/server/recording/service/SingleStreamRecordingService.java index 913230f2..532c7110 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/recording/service/SingleStreamRecordingService.java +++ b/openvidu-server/src/main/java/io/openvidu/server/recording/service/SingleStreamRecordingService.java @@ -392,14 +392,14 @@ public class SingleStreamRecordingService extends RecordingService { MediaProfileSpecType profile) { switch (profile) { case WEBM: - publisherEndpoint.connect(recorder, MediaType.AUDIO); - publisherEndpoint.connect(recorder, MediaType.VIDEO); + publisherEndpoint.connect(recorder, MediaType.AUDIO, false); + publisherEndpoint.connect(recorder, MediaType.VIDEO, false); break; case WEBM_AUDIO_ONLY: - publisherEndpoint.connect(recorder, MediaType.AUDIO); + publisherEndpoint.connect(recorder, MediaType.AUDIO, false); break; case WEBM_VIDEO_ONLY: - publisherEndpoint.connect(recorder, MediaType.VIDEO); + publisherEndpoint.connect(recorder, MediaType.VIDEO, false); break; default: throw new UnsupportedOperationException("Unsupported profile when single stream recording: " + profile);