diff --git a/openvidu-server/src/main/java/io/openvidu/server/cdr/CDREventWebrtcConnection.java b/openvidu-server/src/main/java/io/openvidu/server/cdr/CDREventWebrtcConnection.java index 82fb6340..2063746a 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/cdr/CDREventWebrtcConnection.java +++ b/openvidu-server/src/main/java/io/openvidu/server/cdr/CDREventWebrtcConnection.java @@ -22,6 +22,7 @@ import com.google.gson.JsonObject; import io.openvidu.server.core.EndReason; import io.openvidu.server.core.MediaOptions; import io.openvidu.server.core.Participant; +import io.openvidu.server.kurento.core.KurentoMediaOptions; public class CDREventWebrtcConnection extends CDREventEnd implements Comparable { @@ -63,6 +64,13 @@ public class CDREventWebrtcConnection extends CDREventEnd implements Comparable< json.addProperty("receivingFrom", this.receivingFrom); } else { json.addProperty("connection", "OUTBOUND"); + if (mediaOptions instanceof KurentoMediaOptions) { + KurentoMediaOptions kMediaOptions = (KurentoMediaOptions)mediaOptions; + if (kMediaOptions.rtspUri != null) { + json.addProperty("rtspUri", kMediaOptions.rtspUri); + json.addProperty("adaptativeBitrate", kMediaOptions.adaptativeBitrate); + } + } } if (this.mediaOptions.hasVideo()) { json.addProperty("videoSource", this.mediaOptions.getTypeOfVideo()); diff --git a/openvidu-server/src/main/java/io/openvidu/server/cdr/CallDetailRecord.java b/openvidu-server/src/main/java/io/openvidu/server/cdr/CallDetailRecord.java index b20aaada..93f70f69 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/cdr/CallDetailRecord.java +++ b/openvidu-server/src/main/java/io/openvidu/server/cdr/CallDetailRecord.java @@ -65,7 +65,7 @@ import io.openvidu.server.webhook.CDRLoggerWebhook; * - receivingFrom: string * - audioEnabled: boolean * - videoEnabled: boolean - * - videoSource: "CAMERA", "SCREEN" + * - videoSource: "CAMERA", "SCREEN", "CUSTOM", "IPCAM" * - videoFramerate: number * - videoDimensions: string * - id: string diff --git a/openvidu-server/src/main/java/io/openvidu/server/core/Participant.java b/openvidu-server/src/main/java/io/openvidu/server/core/Participant.java index b0a92ab0..14015c4e 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/core/Participant.java +++ b/openvidu-server/src/main/java/io/openvidu/server/core/Participant.java @@ -51,9 +51,12 @@ public class Participant { this.createdAt = System.currentTimeMillis(); } this.token = token; - this.clientMetadata = clientMetadata; - if (!token.getServerMetadata().isEmpty()) + if (clientMetadata != null) { + this.clientMetadata = clientMetadata; + } + if (!token.getServerMetadata().isEmpty()) { this.serverMetadata = token.getServerMetadata(); + } this.location = location; this.platform = platform; } @@ -134,6 +137,10 @@ public class Participant { return closed; } + public boolean isIpcam() { + return this.platform.equals("IPCAM") && this.participantPrivatetId.startsWith("IPCAM-"); + } + public void setStreaming(boolean streaming) { this.streaming = streaming; } diff --git a/openvidu-server/src/main/java/io/openvidu/server/core/Session.java b/openvidu-server/src/main/java/io/openvidu/server/core/Session.java index 24173bd5..4ab3aeed 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/core/Session.java +++ b/openvidu-server/src/main/java/io/openvidu/server/core/Session.java @@ -36,6 +36,7 @@ import io.openvidu.java.client.RecordingLayout; import io.openvidu.java.client.SessionProperties; import io.openvidu.server.config.OpenviduConfig; import io.openvidu.server.kurento.core.KurentoParticipant; +import io.openvidu.server.kurento.endpoint.EndpointType; import io.openvidu.server.recording.service.RecordingManager; public class Session implements SessionInterface { @@ -167,7 +168,7 @@ public class Session implements SessionInterface { } @Override - public void join(Participant participant) { + public void join(Participant participant, EndpointType endpointType) { } @Override diff --git a/openvidu-server/src/main/java/io/openvidu/server/core/SessionInterface.java b/openvidu-server/src/main/java/io/openvidu/server/core/SessionInterface.java index 6645f866..209e9023 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/core/SessionInterface.java +++ b/openvidu-server/src/main/java/io/openvidu/server/core/SessionInterface.java @@ -22,6 +22,7 @@ import java.util.Set; import com.google.gson.JsonObject; import io.openvidu.java.client.SessionProperties; +import io.openvidu.server.kurento.endpoint.EndpointType; public interface SessionInterface { @@ -29,7 +30,7 @@ public interface SessionInterface { SessionProperties getSessionProperties(); - void join(Participant participant); + void join(Participant participant, EndpointType endpointType); void leave(String participantPrivateId, EndReason reason); diff --git a/openvidu-server/src/main/java/io/openvidu/server/core/SessionManager.java b/openvidu-server/src/main/java/io/openvidu/server/core/SessionManager.java index 1432f067..5da69dd7 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/core/SessionManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/core/SessionManager.java @@ -51,6 +51,7 @@ import io.openvidu.server.kurento.core.KurentoTokenOptions; import io.openvidu.server.recording.service.RecordingManager; import io.openvidu.server.utils.FormatChecker; import io.openvidu.server.utils.GeoLocation; +import io.openvidu.server.utils.GeoLocationByIp; import io.openvidu.server.utils.QuarantineKiller; public abstract class SessionManager { @@ -75,6 +76,9 @@ public abstract class SessionManager { @Autowired protected QuarantineKiller quarantineKiller; + @Autowired + protected GeoLocationByIp geoLocationByIp; + public FormatChecker formatChecker = new FormatChecker(); protected ConcurrentMap sessions = new ConcurrentHashMap<>(); @@ -148,6 +152,8 @@ public abstract class SessionManager { public abstract void removeFilterEventListener(Session session, Participant subscriber, String streamId, String eventType); + public abstract Participant publishIpcam(Session session, MediaOptions mediaOptions) throws Exception; + public abstract String getParticipantPrivateIdFromStreamId(String sessionId, String streamId) throws OpenViduException; @@ -393,6 +399,16 @@ public abstract class SessionManager { } } + public Participant newIpcamParticipant(String sessionId, String ipcamId, Token token, GeoLocation location, String platform) { + if (this.sessionidParticipantpublicidParticipant.get(sessionId) != null) { + Participant p = new Participant(ipcamId, ipcamId, ipcamId, sessionId, token, null, location, platform, null); + this.sessionidParticipantpublicidParticipant.get(sessionId).put(ipcamId, p); + return p; + } else { + throw new OpenViduException(Code.ROOM_NOT_FOUND_ERROR_CODE, sessionId); + } + } + public Token consumeToken(String sessionId, String participantPrivateId, String token) { if (this.sessionidTokenTokenobj.get(sessionId) != null) { Token t = this.sessionidTokenTokenobj.get(sessionId).remove(token); diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoMediaOptions.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoMediaOptions.java index cb3d57b5..d157f0b8 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoMediaOptions.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoMediaOptions.java @@ -17,8 +17,7 @@ package io.openvidu.server.kurento.core; -import org.kurento.client.MediaElement; -import org.kurento.client.MediaType; +import com.google.gson.JsonObject; import io.openvidu.server.core.MediaOptions; import io.openvidu.server.kurento.endpoint.KurentoFilter; @@ -28,21 +27,38 @@ public class KurentoMediaOptions extends MediaOptions { public boolean isOffer; public String sdpOffer; public boolean doLoopback; - public MediaElement loopbackAlternativeSrc; - public MediaType loopbackConnectionType; - public MediaElement[] mediaElements; - public KurentoMediaOptions(boolean isOffer, String sdpOffer, MediaElement loopbackAlternativeSrc, - MediaType loopbackConnectionType, Boolean hasAudio, Boolean hasVideo, Boolean audioActive, - Boolean videoActive, String typeOfVideo, Integer frameRate, String videoDimensions, KurentoFilter filter, - boolean doLoopback, MediaElement... mediaElements) { + // IPCAM properties + public String rtspUri; + public Boolean adaptativeBitrate; + + public KurentoMediaOptions(boolean isOffer, String sdpOffer, Boolean hasAudio, Boolean hasVideo, + Boolean audioActive, Boolean videoActive, String typeOfVideo, Integer frameRate, String videoDimensions, + KurentoFilter filter, boolean doLoopback) { super(hasAudio, hasVideo, audioActive, videoActive, typeOfVideo, frameRate, videoDimensions, filter); this.isOffer = isOffer; this.sdpOffer = sdpOffer; - this.loopbackAlternativeSrc = loopbackAlternativeSrc; - this.loopbackConnectionType = loopbackConnectionType; this.doLoopback = doLoopback; - this.mediaElements = mediaElements; + } + + public KurentoMediaOptions(boolean isOffer, String sdpOffer, Boolean hasAudio, Boolean hasVideo, + Boolean audioActive, Boolean videoActive, String typeOfVideo, Integer frameRate, String videoDimensions, + KurentoFilter filter, boolean doLoopback, String rtspUri, Boolean adaptativeBitrate) { + super(hasAudio, hasVideo, audioActive, videoActive, typeOfVideo, frameRate, videoDimensions, filter); + this.isOffer = isOffer; + this.sdpOffer = sdpOffer; + this.doLoopback = doLoopback; + this.rtspUri = rtspUri; + this.adaptativeBitrate = adaptativeBitrate; + } + + @Override + public JsonObject toJson() { + JsonObject json = super.toJson(); + if (adaptativeBitrate != null) { + json.addProperty("adaptativeBitrate", adaptativeBitrate); + } + return json; } } diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoParticipant.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoParticipant.java index 56b4df27..059a9725 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoParticipant.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoParticipant.java @@ -26,13 +26,12 @@ import java.util.function.Function; import org.apache.commons.lang3.RandomStringUtils; import org.kurento.client.Continuation; +import org.kurento.client.Endpoint; import org.kurento.client.ErrorEvent; import org.kurento.client.Filter; import org.kurento.client.IceCandidate; import org.kurento.client.MediaElement; import org.kurento.client.MediaPipeline; -import org.kurento.client.MediaType; -import org.kurento.client.SdpEndpoint; import org.kurento.client.internal.server.KurentoServerException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,6 +47,7 @@ import io.openvidu.server.config.OpenviduConfig; import io.openvidu.server.core.EndReason; import io.openvidu.server.core.MediaOptions; import io.openvidu.server.core.Participant; +import io.openvidu.server.kurento.endpoint.EndpointType; import io.openvidu.server.kurento.endpoint.MediaEndpoint; import io.openvidu.server.kurento.endpoint.PublisherEndpoint; import io.openvidu.server.kurento.endpoint.SdpType; @@ -61,7 +61,7 @@ public class KurentoParticipant extends Participant { private OpenviduConfig openviduConfig; private RecordingManager recordingManager; - private boolean webParticipant = true; + private EndpointType endpointType; private final KurentoSession session; private KurentoParticipantEndpointConfig endpointConfig; @@ -72,12 +72,13 @@ public class KurentoParticipant extends Participant { private final ConcurrentMap filters = new ConcurrentHashMap<>(); private final ConcurrentMap subscribers = new ConcurrentHashMap(); - public KurentoParticipant(Participant participant, KurentoSession kurentoSession, + public KurentoParticipant(Participant participant, KurentoSession kurentoSession, EndpointType endpointType, KurentoParticipantEndpointConfig endpointConfig, OpenviduConfig openviduConfig, RecordingManager recordingManager) { super(participant.getFinalUserId(), participant.getParticipantPrivateId(), participant.getParticipantPublicId(), kurentoSession.getSessionId(), participant.getToken(), participant.getClientMetadata(), participant.getLocation(), participant.getPlatform(), participant.getCreatedAt()); + this.endpointType = endpointType; this.endpointConfig = endpointConfig; this.openviduConfig = openviduConfig; this.recordingManager = recordingManager; @@ -85,7 +86,7 @@ public class KurentoParticipant extends Participant { if (!OpenViduRole.SUBSCRIBER.equals(participant.getToken().getRole())) { // Initialize a PublisherEndpoint - this.publisher = new PublisherEndpoint(webParticipant, this, participant.getParticipantPublicId(), + this.publisher = new PublisherEndpoint(endpointType, this, participant.getParticipantPublicId(), this.session.getPipeline(), this.openviduConfig); } @@ -101,11 +102,11 @@ public class KurentoParticipant extends Participant { public void createPublishingEndpoint(MediaOptions mediaOptions) { + publisher.setMediaOptions(mediaOptions); publisher.createEndpoint(publisherLatch); if (getPublisher().getEndpoint() == null) { throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE, "Unable to create publisher endpoint"); } - publisher.setMediaOptions(mediaOptions); String publisherStreamId = this.getParticipantPublicId() + "_" + (mediaOptions.hasVideo() ? mediaOptions.getTypeOfVideo() : "MICRO") + "_" @@ -171,14 +172,12 @@ public class KurentoParticipant extends Participant { return session; } - public String publishToRoom(SdpType sdpType, String sdpString, boolean doLoopback, - MediaElement loopbackAlternativeSrc, MediaType loopbackConnectionType) { + public String publishToRoom(SdpType sdpType, String sdpString, boolean doLoopback) { log.info("PARTICIPANT {}: Request to publish video in room {} (sdp type {})", this.getParticipantPublicId(), this.session.getSessionId(), sdpType); log.trace("PARTICIPANT {}: Publishing Sdp ({}) is {}", this.getParticipantPublicId(), sdpType, sdpString); - String sdpResponse = this.getPublisher().publish(sdpType, sdpString, doLoopback, loopbackAlternativeSrc, - loopbackConnectionType); + String sdpResponse = this.getPublisher().publish(sdpType, sdpString, doLoopback); this.streaming = true; log.trace("PARTICIPANT {}: Publishing Sdp ({}) is {}", this.getParticipantPublicId(), sdpType, sdpResponse); @@ -200,7 +199,7 @@ public class KurentoParticipant extends Participant { log.info("PARTICIPANT {}: unpublishing media stream from room {}", this.getParticipantPublicId(), this.session.getSessionId()); releasePublisherEndpoint(reason, kmsDisconnectionTime); - this.publisher = new PublisherEndpoint(webParticipant, this, this.getParticipantPublicId(), this.getPipeline(), + this.publisher = new PublisherEndpoint(endpointType, this, this.getParticipantPublicId(), this.getPipeline(), this.openviduConfig); log.info("PARTICIPANT {}: released publisher endpoint and left it initialized (ready for future streaming)", this.getParticipantPublicId()); @@ -233,7 +232,7 @@ public class KurentoParticipant extends Participant { try { CountDownLatch subscriberLatch = new CountDownLatch(1); - SdpEndpoint oldMediaEndpoint = subscriber.createEndpoint(subscriberLatch); + Endpoint oldMediaEndpoint = subscriber.createEndpoint(subscriberLatch); try { if (!subscriberLatch.await(KurentoSession.ASYNC_LATCH_TIMEOUT, TimeUnit.SECONDS)) { throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE, @@ -339,7 +338,7 @@ public class KurentoParticipant extends Participant { * @return the endpoint instance */ public SubscriberEndpoint getNewOrExistingSubscriber(String senderPublicId) { - SubscriberEndpoint subscriberEndpoint = new SubscriberEndpoint(webParticipant, this, senderPublicId, + SubscriberEndpoint subscriberEndpoint = new SubscriberEndpoint(endpointType, this, senderPublicId, this.getPipeline(), this.openviduConfig); SubscriberEndpoint existingSendingEndpoint = this.subscribers.putIfAbsent(senderPublicId, subscriberEndpoint); @@ -459,7 +458,7 @@ public class KurentoParticipant extends Participant { public void resetPublisherEndpoint() { log.info("Reseting publisher endpoint for participant {}", this.getParticipantPublicId()); - this.publisher = new PublisherEndpoint(webParticipant, this, this.getParticipantPublicId(), + this.publisher = new PublisherEndpoint(endpointType, this, this.getParticipantPublicId(), this.session.getPipeline(), this.openviduConfig); } diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoParticipantEndpointConfig.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoParticipantEndpointConfig.java index 95f8b552..97bcebfa 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoParticipantEndpointConfig.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoParticipantEndpointConfig.java @@ -17,6 +17,10 @@ package io.openvidu.server.kurento.core; +import org.kurento.client.BaseRtpEndpoint; +import org.kurento.client.Endpoint; +import org.kurento.client.PlayerEndpoint; +import org.kurento.client.WebRtcEndpoint; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -39,7 +43,140 @@ public class KurentoParticipantEndpointConfig { public void addEndpointListeners(MediaEndpoint endpoint, String typeOfEndpoint) { - endpoint.getWebEndpoint().addMediaFlowInStateChangeListener(event -> { + // WebRtcEndpoint events + if (endpoint.getWebEndpoint() != null) { + + final WebRtcEndpoint finalEndpoint = endpoint.getWebEndpoint(); + + finalEndpoint.addIceGatheringDoneListener(event -> { + String msg = "KMS event [IceGatheringDone] -> endpoint: " + endpoint.getEndpointName() + " (" + + typeOfEndpoint + ") | timestamp: " + event.getTimestampMillis(); + KmsEvent kmsEvent = new KmsEvent(event, endpoint.getOwner(), endpoint.getEndpointName(), + endpoint.createdAt()); + endpoint.kmsEvents.add(kmsEvent); + this.CDR.log(kmsEvent); + this.infoHandler.sendInfo(msg); + log.info(msg); + }); + + finalEndpoint.addNewCandidatePairSelectedListener(event -> { + endpoint.selectedLocalIceCandidate = event.getCandidatePair().getLocalCandidate(); + endpoint.selectedRemoteIceCandidate = event.getCandidatePair().getRemoteCandidate(); + String msg = "KMS event [NewCandidatePairSelected]: -> endpoint: " + endpoint.getEndpointName() + " (" + + typeOfEndpoint + ") | local: " + endpoint.selectedLocalIceCandidate + " | remote: " + + endpoint.selectedRemoteIceCandidate + " | timestamp: " + event.getTimestampMillis(); + KmsEvent kmsEvent = new KmsEvent(event, endpoint.getOwner(), endpoint.getEndpointName(), + endpoint.createdAt()); + endpoint.kmsEvents.add(kmsEvent); + this.CDR.log(kmsEvent); + this.infoHandler.sendInfo(msg); + log.info(msg); + }); + + finalEndpoint.addIceComponentStateChangeListener(event -> { + String msg = "KMS event [IceComponentStateChange]: -> endpoint: " + endpoint.getEndpointName() + " (" + + typeOfEndpoint + ") | state: " + event.getState().name() + " | componentId: " + + event.getComponentId() + " | streamId: " + event.getStreamId() + " | timestamp: " + + event.getTimestampMillis(); + KmsEvent kmsEvent = new KmsEvent(event, endpoint.getOwner(), endpoint.getEndpointName(), + endpoint.createdAt()); + endpoint.kmsEvents.add(kmsEvent); + this.CDR.log(kmsEvent); + this.infoHandler.sendInfo(msg); + log.info(msg); + }); + + finalEndpoint.addDataChannelOpenListener(event -> { + String msg = "KMS event [DataChannelOpenEvent]: -> endpoint: " + endpoint.getEndpointName() + " (" + + typeOfEndpoint + ") | channelId: " + event.getChannelId() + " | timestamp: " + + event.getTimestampMillis(); + KmsEvent kmsEvent = new KmsEvent(event, endpoint.getOwner(), endpoint.getEndpointName(), + endpoint.createdAt()); + endpoint.kmsEvents.add(kmsEvent); + this.CDR.log(kmsEvent); + this.infoHandler.sendInfo(msg); + log.info(msg); + }); + + finalEndpoint.addDataChannelCloseListener(event -> { + String msg = "KMS event [DataChannelCloseEvent]: -> endpoint: " + endpoint.getEndpointName() + " (" + + typeOfEndpoint + ") | channelId: " + event.getChannelId() + " | timestamp: " + + event.getTimestampMillis(); + KmsEvent kmsEvent = new KmsEvent(event, endpoint.getOwner(), endpoint.getEndpointName(), + endpoint.createdAt()); + endpoint.kmsEvents.add(kmsEvent); + this.CDR.log(kmsEvent); + this.infoHandler.sendInfo(msg); + log.info(msg); + }); + + } + + // PlayerEndpoint events + if (endpoint.getPlayerEndpoint() != null) { + + final PlayerEndpoint finalEndpoint = endpoint.getPlayerEndpoint(); + + finalEndpoint.addEndOfStreamListener(event -> { + String msg = "KMS event [EndOfStreamEvent]: -> endpoint: " + endpoint.getEndpointName() + " (" + + typeOfEndpoint + ") | timestamp: " + event.getTimestampMillis(); + KmsEvent kmsEvent = new KmsEvent(event, endpoint.getOwner(), endpoint.getEndpointName(), + endpoint.createdAt()); + endpoint.kmsEvents.add(kmsEvent); + this.CDR.log(kmsEvent); + this.infoHandler.sendInfo(msg); + log.info(msg); + }); + + finalEndpoint.addUriEndpointStateChangedListener(event -> { + String msg = "KMS event [UriEndpointStateChangedEvent]: -> endpoint: " + endpoint.getEndpointName() + + " (" + typeOfEndpoint + ") | state: " + event.getState().name() + " | timestamp: " + + event.getTimestampMillis(); + KmsEvent kmsEvent = new KmsEvent(event, endpoint.getOwner(), endpoint.getEndpointName(), + endpoint.createdAt()); + endpoint.kmsEvents.add(kmsEvent); + this.CDR.log(kmsEvent); + this.infoHandler.sendInfo(msg); + log.info(msg); + }); + + } + + // BaseRtpEndpoint events + if (endpoint.getWebEndpoint() != null || endpoint.getRtpEndpoint() != null) { + + final BaseRtpEndpoint finalEndpoint = ((BaseRtpEndpoint) endpoint.getEndpoint()); + + finalEndpoint.addConnectionStateChangedListener(event -> { + String msg = "KMS event [ConnectionStateChanged]: -> endpoint: " + endpoint.getEndpointName() + " (" + + typeOfEndpoint + ") | oldState: " + event.getOldState() + " | newState: " + + event.getNewState() + " | timestamp: " + event.getTimestampMillis(); + KmsEvent kmsEvent = new KmsEvent(event, endpoint.getOwner(), endpoint.getEndpointName(), + endpoint.createdAt()); + endpoint.kmsEvents.add(kmsEvent); + this.CDR.log(kmsEvent); + this.infoHandler.sendInfo(msg); + log.info(msg); + }); + + finalEndpoint.addMediaStateChangedListener(event -> { + String msg = "KMS event [MediaStateChangedEvent]: -> endpoint: " + endpoint.getEndpointName() + " (" + + typeOfEndpoint + ") | oldState: " + event.getOldState() + " | newState: " + + event.getNewState() + " | timestamp: " + event.getTimestampMillis(); + KmsEvent kmsEvent = new KmsEvent(event, endpoint.getOwner(), endpoint.getEndpointName(), + endpoint.createdAt()); + endpoint.kmsEvents.add(kmsEvent); + this.CDR.log(kmsEvent); + this.infoHandler.sendInfo(msg); + log.info(msg); + }); + + } + + // Endpoint events + final Endpoint finalEndpoint = endpoint.getEndpoint(); + + finalEndpoint.addMediaFlowInStateChangeListener(event -> { String msg = "KMS event [MediaFlowInStateChange] -> endpoint: " + endpoint.getEndpointName() + " (" + typeOfEndpoint + ") | state: " + event.getState() + " | pad: " + event.getPadName() + " | mediaType: " + event.getMediaType() + " | timestamp: " + event.getTimestampMillis(); @@ -51,7 +188,7 @@ public class KurentoParticipantEndpointConfig { log.info(msg); }); - endpoint.getWebEndpoint().addMediaFlowOutStateChangeListener(event -> { + finalEndpoint.addMediaFlowOutStateChangeListener(event -> { String msg = "KMS event [MediaFlowOutStateChange] -> endpoint: " + endpoint.getEndpointName() + " (" + typeOfEndpoint + ") | state: " + event.getState() + " | pad: " + event.getPadName() + " | mediaType: " + event.getMediaType() + " | timestamp: " + event.getTimestampMillis(); @@ -63,71 +200,7 @@ public class KurentoParticipantEndpointConfig { log.info(msg); }); - endpoint.getWebEndpoint().addIceGatheringDoneListener(event -> { - String msg = "KMS event [IceGatheringDone] -> endpoint: " + endpoint.getEndpointName() + " (" - + typeOfEndpoint + ") | timestamp: " + event.getTimestampMillis(); - KmsEvent kmsEvent = new KmsEvent(event, endpoint.getOwner(), endpoint.getEndpointName(), - endpoint.createdAt()); - endpoint.kmsEvents.add(kmsEvent); - this.CDR.log(kmsEvent); - this.infoHandler.sendInfo(msg); - log.info(msg); - }); - - endpoint.getWebEndpoint().addConnectionStateChangedListener(event -> { - String msg = "KMS event [ConnectionStateChanged]: -> endpoint: " + endpoint.getEndpointName() + " (" - + typeOfEndpoint + ") | oldState: " + event.getOldState() + " | newState: " + event.getNewState() - + " | timestamp: " + event.getTimestampMillis(); - KmsEvent kmsEvent = new KmsEvent(event, endpoint.getOwner(), endpoint.getEndpointName(), - endpoint.createdAt()); - endpoint.kmsEvents.add(kmsEvent); - this.CDR.log(kmsEvent); - this.infoHandler.sendInfo(msg); - log.info(msg); - }); - - endpoint.getWebEndpoint().addNewCandidatePairSelectedListener(event -> { - endpoint.selectedLocalIceCandidate = event.getCandidatePair().getLocalCandidate(); - endpoint.selectedRemoteIceCandidate = event.getCandidatePair().getRemoteCandidate(); - String msg = "KMS event [NewCandidatePairSelected]: -> endpoint: " + endpoint.getEndpointName() + " (" - + typeOfEndpoint + ") | local: " + endpoint.selectedLocalIceCandidate + " | remote: " - + endpoint.selectedRemoteIceCandidate + " | timestamp: " + event.getTimestampMillis(); - KmsEvent kmsEvent = new KmsEvent(event, endpoint.getOwner(), endpoint.getEndpointName(), - endpoint.createdAt()); - endpoint.kmsEvents.add(kmsEvent); - this.CDR.log(kmsEvent); - this.infoHandler.sendInfo(msg); - log.info(msg); - }); - - endpoint.getEndpoint().addMediaTranscodingStateChangeListener(event -> { - String msg = "KMS event [MediaTranscodingStateChange]: -> endpoint: " + endpoint.getEndpointName() + " (" - + typeOfEndpoint + ") | state: " + event.getState().name() + " | mediaType: " + event.getMediaType() - + " | binName: " + event.getBinName() + " | timestamp: " + event.getTimestampMillis(); - KmsEvent kmsEvent = new KmsMediaEvent(event, endpoint.getOwner(), endpoint.getEndpointName(), - event.getMediaType(), endpoint.createdAt()); - endpoint.kmsEvents.add(kmsEvent); - this.CDR.log(kmsEvent); - this.infoHandler.sendInfo(msg); - log.info(msg); - }); - - endpoint.getWebEndpoint().addIceComponentStateChangeListener(event -> { - // if (!event.getState().equals(IceComponentState.READY)) { - String msg = "KMS event [IceComponentStateChange]: -> endpoint: " + endpoint.getEndpointName() + " (" - + typeOfEndpoint + ") | state: " + event.getState().name() + " | componentId: " - + event.getComponentId() + " | streamId: " + event.getStreamId() + " | timestamp: " - + event.getTimestampMillis(); - KmsEvent kmsEvent = new KmsEvent(event, endpoint.getOwner(), endpoint.getEndpointName(), - endpoint.createdAt()); - endpoint.kmsEvents.add(kmsEvent); - this.CDR.log(kmsEvent); - this.infoHandler.sendInfo(msg); - log.info(msg); - // } - }); - - endpoint.getWebEndpoint().addErrorListener(event -> { + finalEndpoint.addErrorListener(event -> { String msg = "KMS event [ERROR]: -> endpoint: " + endpoint.getEndpointName() + " (" + typeOfEndpoint + ") | errorCode: " + event.getErrorCode() + " | description: " + event.getDescription() + " | timestamp: " + event.getTimestampMillis(); @@ -138,6 +211,18 @@ public class KurentoParticipantEndpointConfig { this.infoHandler.sendInfo(msg); log.error(msg); }); + + finalEndpoint.addMediaTranscodingStateChangeListener(event -> { + String msg = "KMS event [MediaTranscodingStateChange]: -> endpoint: " + endpoint.getEndpointName() + " (" + + typeOfEndpoint + ") | state: " + event.getState().name() + " | mediaType: " + event.getMediaType() + + " | binName: " + event.getBinName() + " | timestamp: " + event.getTimestampMillis(); + KmsEvent kmsEvent = new KmsMediaEvent(event, endpoint.getOwner(), endpoint.getEndpointName(), + event.getMediaType(), endpoint.createdAt()); + endpoint.kmsEvents.add(kmsEvent); + this.CDR.log(kmsEvent); + this.infoHandler.sendInfo(msg); + log.info(msg); + }); } public CallDetailRecord getCdr() { diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSession.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSession.java index 7b24f7e9..0dec8596 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSession.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSession.java @@ -36,6 +36,7 @@ import io.openvidu.java.client.OpenViduRole; import io.openvidu.server.core.EndReason; import io.openvidu.server.core.Participant; import io.openvidu.server.core.Session; +import io.openvidu.server.kurento.endpoint.EndpointType; import io.openvidu.server.kurento.kms.Kms; /** @@ -54,8 +55,6 @@ public class KurentoSession extends Session { private KurentoSessionEventsHandler kurentoSessionHandler; private KurentoParticipantEndpointConfig kurentoEndpointConfig; - private final ConcurrentHashMap filterStates = new ConcurrentHashMap<>(); - private Object pipelineCreateLock = new Object(); private Object pipelineReleaseLock = new Object(); @@ -71,19 +70,14 @@ public class KurentoSession extends Session { } @Override - public void join(Participant participant) { + public void join(Participant participant, EndpointType endpointType) { checkClosed(); createPipeline(); - KurentoParticipant kurentoParticipant = new KurentoParticipant(participant, this, this.kurentoEndpointConfig, - this.openviduConfig, this.recordingManager); + KurentoParticipant kurentoParticipant = new KurentoParticipant(participant, this, endpointType, + this.kurentoEndpointConfig, this.openviduConfig, this.recordingManager); participants.put(participant.getParticipantPrivateId(), kurentoParticipant); - filterStates.forEach((filterId, state) -> { - log.info("Adding filter {}", filterId); - kurentoSessionHandler.updateFilter(sessionId, participant, filterId, state); - }); - log.info("SESSION {}: Added participant {}", sessionId, participant); if (!ProtocolElements.RECORDER_PARTICIPANT_PUBLICID.equals(participant.getParticipantPublicId())) { diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSessionManager.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSessionManager.java index b2d7a3a3..d535cdaf 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSessionManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSessionManager.java @@ -17,12 +17,19 @@ package io.openvidu.server.kurento.core; +import java.io.IOException; +import java.net.InetAddress; +import java.net.MalformedURLException; +import java.net.URL; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.NoSuchElementException; import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.apache.commons.lang3.RandomStringUtils; import org.kurento.client.GenericMediaElement; import org.kurento.client.IceCandidate; import org.kurento.client.ListenerSubscription; @@ -44,16 +51,20 @@ import io.openvidu.java.client.RecordingMode; import io.openvidu.java.client.RecordingProperties; import io.openvidu.java.client.SessionProperties; import io.openvidu.server.core.EndReason; +import io.openvidu.server.core.FinalUser; import io.openvidu.server.core.MediaOptions; import io.openvidu.server.core.Participant; import io.openvidu.server.core.Session; import io.openvidu.server.core.SessionManager; +import io.openvidu.server.core.Token; +import io.openvidu.server.kurento.endpoint.EndpointType; import io.openvidu.server.kurento.endpoint.KurentoFilter; import io.openvidu.server.kurento.endpoint.PublisherEndpoint; import io.openvidu.server.kurento.endpoint.SdpType; import io.openvidu.server.kurento.kms.Kms; import io.openvidu.server.kurento.kms.KmsManager; import io.openvidu.server.rpc.RpcHandler; +import io.openvidu.server.utils.GeoLocation; import io.openvidu.server.utils.JsonUtils; public class KurentoSessionManager extends SessionManager { @@ -111,7 +122,7 @@ public class KurentoSessionManager extends SessionManager { } existingParticipants = getParticipants(sessionId); - kSession.join(participant); + kSession.join(participant, EndpointType.WEBRTC_ENDPOINT); } catch (OpenViduException e) { log.warn("PARTICIPANT {}: Error joining/creating session {}", participant.getParticipantPublicId(), sessionId, e); @@ -252,9 +263,8 @@ public class KurentoSessionManager extends SessionManager { log.debug( "Request [PUBLISH_MEDIA] isOffer={} sdp={} " - + "loopbackAltSrc={} lpbkConnType={} doLoopback={} mediaElements={} ({})", - kurentoOptions.isOffer, kurentoOptions.sdpOffer, kurentoOptions.loopbackAlternativeSrc, - kurentoOptions.loopbackConnectionType, kurentoOptions.doLoopback, kurentoOptions.mediaElements, + + "loopbackAltSrc={} lpbkConnType={} doLoopback={} rtspUri={} ({})", + kurentoOptions.isOffer, kurentoOptions.sdpOffer, kurentoOptions.doLoopback, kurentoOptions.rtspUri, participant.getParticipantPublicId()); SdpType sdpType = kurentoOptions.isOffer ? SdpType.OFFER : SdpType.ANSWER; @@ -284,8 +294,7 @@ public class KurentoSessionManager extends SessionManager { } } - sdpAnswer = kParticipant.publishToRoom(sdpType, kurentoOptions.sdpOffer, kurentoOptions.doLoopback, - kurentoOptions.loopbackAlternativeSrc, kurentoOptions.loopbackConnectionType); + sdpAnswer = kParticipant.publishToRoom(sdpType, kurentoOptions.sdpOffer, kurentoOptions.doLoopback); if (sdpAnswer == null) { OpenViduException e = new OpenViduException(Code.MEDIA_SDP_ERROR_CODE, @@ -590,8 +599,8 @@ public class KurentoSessionManager extends SessionManager { boolean doLoopback = RpcHandler.getBooleanParam(request, ProtocolElements.PUBLISHVIDEO_DOLOOPBACK_PARAM); - return new KurentoMediaOptions(true, sdpOffer, null, null, hasAudio, hasVideo, audioActive, videoActive, - typeOfVideo, frameRate, videoDimensions, kurentoFilter, doLoopback); + return new KurentoMediaOptions(true, sdpOffer, hasAudio, hasVideo, audioActive, videoActive, typeOfVideo, + frameRate, videoDimensions, kurentoFilter, doLoopback); } @Override @@ -838,6 +847,68 @@ public class KurentoSessionManager extends SessionManager { } } + @Override + public Participant publishIpcam(Session session, MediaOptions mediaOptions) throws Exception { + KurentoSession kSession = (KurentoSession) session; + KurentoMediaOptions kMediaOptions = (KurentoMediaOptions) mediaOptions; + + // Generate the location for the IpCam + GeoLocation location = null; + URL url = null; + String protocol = null; + try { + Pattern pattern = Pattern.compile("^(file|rtsp)://"); + Matcher matcher = pattern.matcher(kMediaOptions.rtspUri); + if (matcher.find()) { + protocol = matcher.group(0).replaceAll("://$", ""); + } else { + throw new MalformedURLException(); + } + String parsedUrl = kMediaOptions.rtspUri.replaceAll("^.*?://", "http://"); + url = new URL(parsedUrl); + } catch (Exception e) { + throw new MalformedURLException(); + } + + try { + location = this.geoLocationByIp.getLocationByIp(InetAddress.getByName(url.getHost())); + } catch (IOException e) { + e.printStackTrace(); + location = null; + } catch (Exception e) { + log.warn("Error getting address location: {}", e.getMessage()); + location = null; + } + + final String rtspConnectionId = kMediaOptions.getTypeOfVideo() + "-" + protocol + "-" + + RandomStringUtils.randomAlphanumeric(4).toLowerCase() + "-" + url.getAuthority() + + url.getPath().replaceAll("/", "-").replaceAll("_", "-"); + + // Store a "fake" participant for the IpCam connection + this.newInsecureParticipant(rtspConnectionId); + String token = RandomStringUtils.randomAlphanumeric(16).toLowerCase(); + Token tokenObj = null; + if (this.isTokenValidInSession(token, session.getSessionId(), rtspConnectionId)) { + tokenObj = this.consumeToken(session.getSessionId(), rtspConnectionId, token); + } + Participant ipcamParticipant = this.newIpcamParticipant(session.getSessionId(), rtspConnectionId, tokenObj, + location, mediaOptions.getTypeOfVideo()); + + // Store a "fake" final user for the IpCam connection + final String finalUserId = rtspConnectionId; + this.sessionidFinalUsers.get(session.getSessionId()).computeIfAbsent(finalUserId, k -> { + return new FinalUser(finalUserId, session.getSessionId(), ipcamParticipant); + }).addConnectionIfAbsent(ipcamParticipant); + + // Join the participant to the session + kSession.join(ipcamParticipant, EndpointType.PLAYER_ENDPOINT); + + // Publish the IpCam stream into the session + KurentoParticipant kParticipant = (KurentoParticipant) this.getParticipant(rtspConnectionId); + this.publishVideo(kParticipant, mediaOptions, null); + return kParticipant; + } + @Override public String getParticipantPrivateIdFromStreamId(String sessionId, String streamId) { Session session = this.getSession(sessionId); diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/EndpointType.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/EndpointType.java new file mode 100644 index 00000000..a60764e6 --- /dev/null +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/EndpointType.java @@ -0,0 +1,7 @@ +package io.openvidu.server.kurento.endpoint; + +public enum EndpointType { + + WEBRTC_ENDPOINT, PLAYER_ENDPOINT, RTP_ENDPOINT + +} diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/MediaEndpoint.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/MediaEndpoint.java index 301bf268..c5d0be7d 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/MediaEndpoint.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/MediaEndpoint.java @@ -25,6 +25,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; import org.kurento.client.Continuation; +import org.kurento.client.Endpoint; import org.kurento.client.ErrorEvent; import org.kurento.client.EventListener; import org.kurento.client.IceCandidate; @@ -32,6 +33,7 @@ import org.kurento.client.ListenerSubscription; import org.kurento.client.MediaElement; import org.kurento.client.MediaPipeline; import org.kurento.client.OnIceCandidateEvent; +import org.kurento.client.PlayerEndpoint; import org.kurento.client.RtpEndpoint; import org.kurento.client.SdpEndpoint; import org.kurento.client.WebRtcEndpoint; @@ -46,12 +48,14 @@ import io.openvidu.client.OpenViduException; import io.openvidu.client.OpenViduException.Code; import io.openvidu.server.config.OpenviduConfig; import io.openvidu.server.core.Participant; +import io.openvidu.server.kurento.core.KurentoMediaOptions; import io.openvidu.server.kurento.core.KurentoParticipant; import io.openvidu.server.kurento.core.KurentoTokenOptions; /** - * {@link WebRtcEndpoint} wrapper that supports buffering of - * {@link IceCandidate}s until the {@link WebRtcEndpoint} is created. + * {@link Endpoint} wrapper. Can be based on WebRtcEndpoint (that supports + * buffering of {@link IceCandidate}s until the {@link WebRtcEndpoint} is + * created), PlayerEndpoint (to play RTSP or file streams) and RtpEndpoint. * Connections to other peers are opened using the corresponding method of the * internal endpoint. * @@ -61,10 +65,11 @@ public abstract class MediaEndpoint { private static Logger log; private OpenviduConfig openviduConfig; - private boolean web = false; + private EndpointType endpointType; private WebRtcEndpoint webEndpoint = null; private RtpEndpoint endpoint = null; + private PlayerEndpoint playerEndpoint = null; private final int maxRecvKbps; private final int minRecvKbps; @@ -98,14 +103,14 @@ public abstract class MediaEndpoint { * @param pipeline * @param log */ - public MediaEndpoint(boolean web, KurentoParticipant owner, String endpointName, MediaPipeline pipeline, - OpenviduConfig openviduConfig, Logger log) { + public MediaEndpoint(EndpointType endpointType, KurentoParticipant owner, String endpointName, + MediaPipeline pipeline, OpenviduConfig openviduConfig, Logger log) { if (log == null) { MediaEndpoint.log = LoggerFactory.getLogger(MediaEndpoint.class); } else { MediaEndpoint.log = log; } - this.web = web; + this.endpointType = endpointType; this.owner = owner; this.setEndpointName(endpointName); this.setMediaPipeline(pipeline); @@ -135,7 +140,11 @@ public abstract class MediaEndpoint { } public boolean isWeb() { - return web; + return EndpointType.WEBRTC_ENDPOINT.equals(this.endpointType); + } + + public boolean isPlayerEndpoint() { + return EndpointType.PLAYER_ENDPOINT.equals(this.endpointType); } /** @@ -148,9 +157,11 @@ public abstract class MediaEndpoint { /** * @return the internal endpoint ({@link RtpEndpoint} or {@link WebRtcEndpoint}) */ - public SdpEndpoint getEndpoint() { + public Endpoint getEndpoint() { if (this.isWeb()) { return this.webEndpoint; + } else if (this.isPlayerEndpoint()) { + return this.playerEndpoint; } else { return this.endpoint; } @@ -164,10 +175,14 @@ public abstract class MediaEndpoint { return webEndpoint; } - protected RtpEndpoint getRtpEndpoint() { + public RtpEndpoint getRtpEndpoint() { return endpoint; } + public PlayerEndpoint getPlayerEndpoint() { + return playerEndpoint; + } + /** * If this object doesn't have a {@link WebRtcEndpoint}, it is created in a * thread-safe way using the internal {@link MediaPipeline}. Otherwise no @@ -179,8 +194,8 @@ public abstract class MediaEndpoint { * * @return the existing endpoint, if any */ - public synchronized SdpEndpoint createEndpoint(CountDownLatch endpointLatch) { - SdpEndpoint old = this.getEndpoint(); + public synchronized Endpoint createEndpoint(CountDownLatch endpointLatch) { + Endpoint old = this.getEndpoint(); if (old == null) { internalEndpointInitialization(endpointLatch); } else { @@ -271,6 +286,31 @@ public abstract class MediaEndpoint { log.error("EP {}: Failed to create a new WebRtcEndpoint", endpointName, cause); } }); + } else if (this.isPlayerEndpoint()) { + KurentoMediaOptions mediaOptions = (KurentoMediaOptions) this.owner.getPublisherMediaOptions(); + PlayerEndpoint.Builder playerBuilder = new PlayerEndpoint.Builder(pipeline, mediaOptions.rtspUri); + + if (!mediaOptions.adaptativeBitrate) { + playerBuilder = playerBuilder.useEncodedMedia(); + } + + playerBuilder.buildAsync(new Continuation() { + + @Override + public void onSuccess(PlayerEndpoint result) throws Exception { + playerEndpoint = result; + log.trace("EP {}: Created a new PlayerEndpoint", endpointName); + endpointSubscription = registerElemErrListener(playerEndpoint); + playerEndpoint.play(); + endpointLatch.countDown(); + } + + @Override + public void onError(Throwable cause) throws Exception { + endpointLatch.countDown(); + log.error("EP {}: Failed to create a new PlayerEndpoint", endpointName, cause); + } + }); } else { new RtpEndpoint.Builder(pipeline).buildAsync(new Continuation() { @Override @@ -353,6 +393,8 @@ public abstract class MediaEndpoint { "Can't process offer when WebRtcEndpoint is null (ep: " + endpointName + ")"); } return webEndpoint.processOffer(offer); + } else if (this.isPlayerEndpoint()) { + return ""; } else { if (endpoint == null) { throw new OpenViduException(Code.MEDIA_RTP_ENDPOINT_ERROR_CODE, @@ -362,29 +404,6 @@ public abstract class MediaEndpoint { } } - /** - * Orders the internal endpoint ({@link RtpEndpoint} or {@link WebRtcEndpoint}) - * to generate the offer String that can be used to initiate a connection. - * - * @see SdpEndpoint#generateOffer() - * @return the Sdp offer - */ - protected String generateOffer() throws OpenViduException { - if (this.isWeb()) { - if (webEndpoint == null) { - throw new OpenViduException(Code.MEDIA_WEBRTC_ENDPOINT_ERROR_CODE, - "Can't generate offer when WebRtcEndpoint is null (ep: " + endpointName + ")"); - } - return webEndpoint.generateOffer(); - } else { - if (endpoint == null) { - throw new OpenViduException(Code.MEDIA_RTP_ENDPOINT_ERROR_CODE, - "Can't generate offer when RtpEndpoint is null (ep: " + endpointName + ")"); - } - return endpoint.generateOffer(); - } - } - /** * Orders the internal endpoint ({@link RtpEndpoint} or {@link WebRtcEndpoint}) * to process the answer String. @@ -400,6 +419,8 @@ public abstract class MediaEndpoint { "Can't process answer when WebRtcEndpoint is null (ep: " + endpointName + ")"); } return webEndpoint.processAnswer(answer); + } else if (this.isPlayerEndpoint()) { + return ""; } else { if (endpoint == null) { throw new OpenViduException(Code.MEDIA_RTP_ENDPOINT_ERROR_CODE, @@ -489,8 +510,10 @@ public abstract class MediaEndpoint { JsonObject json = new JsonObject(); json.addProperty("createdAt", this.createdAt); json.addProperty("webrtcEndpointName", this.getEndpointName()); - json.addProperty("remoteSdp", this.getEndpoint().getRemoteSessionDescriptor()); - json.addProperty("localSdp", this.getEndpoint().getLocalSessionDescriptor()); + if (!this.isPlayerEndpoint()) { + json.addProperty("remoteSdp", ((SdpEndpoint) this.getEndpoint()).getRemoteSessionDescriptor()); + json.addProperty("localSdp", ((SdpEndpoint) this.getEndpoint()).getLocalSessionDescriptor()); + } json.add("receivedCandidates", new GsonBuilder().create().toJsonTree(this.receivedCandidateList)); json.addProperty("localCandidate", this.selectedLocalIceCandidate); json.addProperty("remoteCandidate", this.selectedRemoteIceCandidate); diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/PublisherEndpoint.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/PublisherEndpoint.java index 4b1276cb..4827f67f 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/PublisherEndpoint.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/PublisherEndpoint.java @@ -46,6 +46,7 @@ import io.openvidu.client.OpenViduException; import io.openvidu.client.OpenViduException.Code; import io.openvidu.server.config.OpenviduConfig; import io.openvidu.server.core.MediaOptions; +import io.openvidu.server.kurento.core.KurentoMediaOptions; import io.openvidu.server.kurento.core.KurentoParticipant; import io.openvidu.server.utils.JsonUtils; @@ -73,9 +74,9 @@ public class PublisherEndpoint extends MediaEndpoint { private Map elementsErrorSubscriptions = new HashMap(); - public PublisherEndpoint(boolean web, KurentoParticipant owner, String endpointName, MediaPipeline pipeline, - OpenviduConfig openviduConfig) { - super(web, owner, endpointName, pipeline, openviduConfig, log); + public PublisherEndpoint(EndpointType endpointType, KurentoParticipant owner, String endpointName, + MediaPipeline pipeline, OpenviduConfig openviduConfig) { + super(endpointType, owner, endpointName, pipeline, openviduConfig, log); } @Override @@ -172,15 +173,10 @@ public class PublisherEndpoint extends MediaEndpoint { * @return the SDP response (the answer if processing an offer SDP, otherwise is * the updated offer generated previously by this endpoint) */ - public synchronized String publish(SdpType sdpType, String sdpString, boolean doLoopback, - MediaElement loopbackAlternativeSrc, MediaType loopbackConnectionType) { + public synchronized String publish(SdpType sdpType, String sdpString, boolean doLoopback) { registerOnIceCandidateEventListener(this.getOwner().getParticipantPublicId()); if (doLoopback) { - if (loopbackAlternativeSrc == null) { - connect(this.getEndpoint(), loopbackConnectionType); - } else { - connectAltLoopbackSrc(loopbackAlternativeSrc, loopbackConnectionType); - } + connect(this.getEndpoint(), null); } else { innerConnect(); } @@ -200,10 +196,6 @@ public class PublisherEndpoint extends MediaEndpoint { return sdpResponse; } - public synchronized String preparePublishConnection() { - return generateOffer(); - } - public synchronized void connect(MediaElement sink) { if (!connected) { innerConnect(); @@ -415,13 +407,6 @@ public class PublisherEndpoint extends MediaEndpoint { return elementIds.get(idx - 1); } - private void connectAltLoopbackSrc(MediaElement loopbackAlternativeSrc, MediaType loopbackConnectionType) { - if (!connected) { - innerConnect(); - } - internalSinkConnect(loopbackAlternativeSrc, this.getEndpoint(), loopbackConnectionType); - } - private void innerConnect() { if (this.getEndpoint() == null) { throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE, @@ -549,6 +534,9 @@ public class PublisherEndpoint extends MediaEndpoint { public JsonObject toJson() { JsonObject json = super.toJson(); json.addProperty("streamId", this.getStreamId()); + if (this.isPlayerEndpoint()) { + json.addProperty("rtspUri", ((KurentoMediaOptions) this.mediaOptions).rtspUri); + } json.add("mediaOptions", this.mediaOptions.toJson()); return json; } diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/SubscriberEndpoint.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/SubscriberEndpoint.java index f2f71752..17c1dc44 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/SubscriberEndpoint.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/SubscriberEndpoint.java @@ -42,9 +42,9 @@ public class SubscriberEndpoint extends MediaEndpoint { private String publisherStreamId; - public SubscriberEndpoint(boolean web, KurentoParticipant owner, String endpointName, MediaPipeline pipeline, - OpenviduConfig openviduConfig) { - super(web, owner, endpointName, pipeline, openviduConfig, log); + public SubscriberEndpoint(EndpointType endpointType, KurentoParticipant owner, String endpointName, + MediaPipeline pipeline, OpenviduConfig openviduConfig) { + super(endpointType, owner, endpointName, pipeline, openviduConfig, log); } public synchronized String subscribe(String sdpOffer, PublisherEndpoint publisher) { diff --git a/openvidu-server/src/main/java/io/openvidu/server/rest/SessionRestController.java b/openvidu-server/src/main/java/io/openvidu/server/rest/SessionRestController.java index eaf7fb46..5b7a73cc 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/rest/SessionRestController.java +++ b/openvidu-server/src/main/java/io/openvidu/server/rest/SessionRestController.java @@ -17,6 +17,7 @@ package io.openvidu.server.rest; +import java.net.MalformedURLException; import java.util.ArrayList; import java.util.Collection; import java.util.Map; @@ -57,6 +58,7 @@ import io.openvidu.server.core.EndReason; import io.openvidu.server.core.Participant; import io.openvidu.server.core.Session; import io.openvidu.server.core.SessionManager; +import io.openvidu.server.kurento.core.KurentoMediaOptions; import io.openvidu.server.kurento.core.KurentoTokenOptions; import io.openvidu.server.recording.Recording; import io.openvidu.server.recording.service.RecordingManager; @@ -217,14 +219,14 @@ public class SessionRestController { if (session != null) { this.sessionManager.closeSession(sessionId, EndReason.sessionClosedByServer); return new ResponseEntity<>(HttpStatus.NO_CONTENT); + } + + Session sessionNotActive = this.sessionManager.getSessionNotActive(sessionId); + if (sessionNotActive != null) { + this.sessionManager.closeSessionAndEmptyCollections(sessionNotActive, EndReason.sessionClosedByServer); + return new ResponseEntity<>(HttpStatus.NO_CONTENT); } else { - Session sessionNotActive = this.sessionManager.getSessionNotActive(sessionId); - if (sessionNotActive != null) { - this.sessionManager.closeSessionAndEmptyCollections(sessionNotActive, EndReason.sessionClosedByServer); - return new ResponseEntity<>(HttpStatus.NO_CONTENT); - } else { - return new ResponseEntity<>(HttpStatus.NOT_FOUND); - } + return new ResponseEntity<>(HttpStatus.NOT_FOUND); } } @@ -261,11 +263,21 @@ public class SessionRestController { session = this.sessionManager.getSession(sessionId); if (session != null) { - if (this.sessionManager.unpublishStream(session, streamId, null, null, EndReason.forceUnpublishByServer)) { - return new ResponseEntity<>(HttpStatus.NO_CONTENT); - } else { + + final String participantPrivateId = this.sessionManager.getParticipantPrivateIdFromStreamId(sessionId, + streamId); + + if (participantPrivateId == null) { return new ResponseEntity<>(HttpStatus.NOT_FOUND); } + + Participant participant = this.sessionManager.getParticipant(participantPrivateId); + if (participant.isIpcam()) { + return new ResponseEntity<>(HttpStatus.METHOD_NOT_ALLOWED); + } + + this.sessionManager.unpublishStream(session, streamId, null, null, EndReason.forceUnpublishByServer); + return new ResponseEntity<>(HttpStatus.NO_CONTENT); } else { return new ResponseEntity<>(HttpStatus.NOT_FOUND); } @@ -652,6 +664,62 @@ public class SessionRestController { return new ResponseEntity<>(HttpStatus.OK); } + @RequestMapping(value = "/sessions/{sessionId}/connection", method = RequestMethod.POST) + public ResponseEntity publishIpcam(@PathVariable("sessionId") String sessionId, @RequestBody Map params) { + + if (params == null) { + return this.generateErrorResponse("Error in body parameters. Cannot be empty", "/api/rtsp", + HttpStatus.BAD_REQUEST); + } + + log.info("REST API: POST /api/sessions/{}/connection {}", sessionId, params.toString()); + + Session session = this.sessionManager.getSession(sessionId); + if (session == null) { + return new ResponseEntity<>(HttpStatus.NOT_FOUND); + } + + String type; + String rtspUri; + Boolean adaptativeBitrate; + try { + type = (String) params.get("type"); + rtspUri = (String) params.get("rtspUri"); + adaptativeBitrate = (Boolean) params.get("adaptativeBitrate"); + } catch (ClassCastException e) { + return this.generateErrorResponse("Type error in some parameter", + "/api/sessions/" + sessionId + "/connection", HttpStatus.BAD_REQUEST); + } + if (rtspUri == null) { + return this.generateErrorResponse("\"rtspUri\" parameter is mandatory", + "/api/sessions/" + sessionId + "/connection", HttpStatus.BAD_REQUEST); + } + + type = type != null ? type : "IPCAM"; + adaptativeBitrate = adaptativeBitrate != null ? adaptativeBitrate : true; + + boolean hasAudio = true; + boolean hasVideo = true; + boolean audioActive = true; + boolean videoActive = true; + String typeOfVideo = type; + Integer frameRate = null; + String videoDimensions = null; + KurentoMediaOptions mediaOptions = new KurentoMediaOptions(true, null, hasAudio, hasVideo, audioActive, + videoActive, typeOfVideo, frameRate, videoDimensions, null, false, rtspUri, adaptativeBitrate); + + try { + Participant ipcamParticipant = this.sessionManager.publishIpcam(session, mediaOptions); + return new ResponseEntity<>(ipcamParticipant.toJson().toString(), getResponseHeaders(), HttpStatus.OK); + } catch (MalformedURLException e) { + return this.generateErrorResponse("\"rtspUri\" parameter is not a valid rtsp uri", + "/api/sessions/" + sessionId + "/connection", HttpStatus.BAD_REQUEST); + } catch (Exception e) { + return this.generateErrorResponse(e.getMessage(), "/api/sessions/" + sessionId + "/connection", + HttpStatus.INTERNAL_SERVER_ERROR); + } + } + private ResponseEntity generateErrorResponse(String errorMessage, String path, HttpStatus status) { JsonObject responseJson = new JsonObject(); responseJson.addProperty("timestamp", System.currentTimeMillis()); diff --git a/openvidu-server/src/main/java/io/openvidu/server/rpc/RpcNotificationService.java b/openvidu-server/src/main/java/io/openvidu/server/rpc/RpcNotificationService.java index a897c40c..38cf4a6f 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/rpc/RpcNotificationService.java +++ b/openvidu-server/src/main/java/io/openvidu/server/rpc/RpcNotificationService.java @@ -58,8 +58,10 @@ public class RpcNotificationService { public void sendResponse(String participantPrivateId, Integer transactionId, Object result) { Transaction t = getAndRemoveTransaction(participantPrivateId, transactionId); if (t == null) { - log.error("No transaction {} found for paticipant with private id {}, unable to send result {}", - transactionId, participantPrivateId, result); + if (!isIpcamParticipant(participantPrivateId)) { + log.error("No transaction {} found for paticipant with private id {}, unable to send result {}", + transactionId, participantPrivateId, result); + } return; } try { @@ -73,8 +75,10 @@ public class RpcNotificationService { OpenViduException error) { Transaction t = getAndRemoveTransaction(participantPrivateId, transactionId); if (t == null) { - log.error("No transaction {} found for paticipant with private id {}, unable to send result {}", - transactionId, participantPrivateId, data); + if (!isIpcamParticipant(participantPrivateId)) { + log.error("No transaction {} found for paticipant with private id {}, unable to send result {}", + transactionId, participantPrivateId, data); + } return; } try { @@ -88,8 +92,10 @@ public class RpcNotificationService { public void sendNotification(final String participantPrivateId, final String method, final Object params) { RpcConnection rpcSession = rpcConnections.get(participantPrivateId); if (rpcSession == null || rpcSession.getSession() == null) { - log.error("No rpc session found for private id {}, unable to send notification {}: {}", - participantPrivateId, method, params); + if (!isIpcamParticipant(participantPrivateId)) { + log.error("No rpc session found for private id {}, unable to send notification {}: {}", + participantPrivateId, method, params); + } return; } Session s = rpcSession.getSession(); @@ -105,7 +111,9 @@ public class RpcNotificationService { public RpcConnection closeRpcSession(String participantPrivateId) { RpcConnection rpcSession = rpcConnections.remove(participantPrivateId); if (rpcSession == null || rpcSession.getSession() == null) { - log.error("No session found for private id {}, unable to cleanup", participantPrivateId); + if (!isIpcamParticipant(participantPrivateId)) { + log.error("No session found for private id {}, unable to cleanup", participantPrivateId); + } return null; } Session s = rpcSession.getSession(); @@ -123,7 +131,9 @@ public class RpcNotificationService { private Transaction getAndRemoveTransaction(String participantPrivateId, Integer transactionId) { RpcConnection rpcSession = rpcConnections.get(participantPrivateId); if (rpcSession == null) { - log.warn("Invalid WebSocket session id {}", participantPrivateId); + if (!isIpcamParticipant(participantPrivateId)) { + log.warn("Invalid WebSocket session id {}", participantPrivateId); + } return null; } log.trace("#{} - {} transactions", participantPrivateId, rpcSession.getTransactions().size()); @@ -140,4 +150,8 @@ public class RpcNotificationService { return this.rpcConnections.get(participantPrivateId); } + private boolean isIpcamParticipant(String participantPrivateId) { + return participantPrivateId.startsWith("IPCAM-"); + } + }