From fa52c1e76a19c3a1f9ffde787f0b5600efb6bcb9 Mon Sep 17 00:00:00 2001 From: pabloFuente Date: Wed, 4 Jul 2018 14:58:05 +0200 Subject: [PATCH] openvidu-server: GET /sessions simple response --- .../io/openvidu/server/core/MediaOptions.java | 33 +++-- .../kurento/core/KurentoParticipant.java | 24 ++-- .../server/kurento/core/KurentoSession.java | 64 ++++++--- .../kurento/endpoint/MediaEndpoint.java | 27 ++-- .../kurento/endpoint/PublisherEndpoint.java | 37 ++++++ .../kurento/endpoint/SubscriberEndpoint.java | 123 ++++++++++-------- 6 files changed, 190 insertions(+), 118 deletions(-) diff --git a/openvidu-server/src/main/java/io/openvidu/server/core/MediaOptions.java b/openvidu-server/src/main/java/io/openvidu/server/core/MediaOptions.java index 2376e938..5017cb88 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/core/MediaOptions.java +++ b/openvidu-server/src/main/java/io/openvidu/server/core/MediaOptions.java @@ -40,23 +40,6 @@ public class MediaOptions { this.videoDimensions = videoDimensions; } - @SuppressWarnings("unchecked") - public JSONObject toJson() { - JSONObject json = new JSONObject(); - json.put("hasAudio", this.hasAudio); - if (hasAudio) - json.put("audioActive", this.audioActive); - json.put("hasVideo", this.hasVideo); - if (hasVideo) - json.put("videoActive", this.videoActive); - if (this.hasVideo && this.videoActive) { - json.put("typeOfVideo", this.typeOfVideo); - json.put("frameRate", this.frameRate); - json.put("videoDimensions", this.videoDimensions); - } - return json; - } - public boolean hasAudio() { return this.hasAudio; } @@ -84,5 +67,21 @@ public class MediaOptions { public String getVideoDimensions() { return this.videoDimensions; } + + @SuppressWarnings("unchecked") + public JSONObject toJSON() { + JSONObject json = new JSONObject(); + json.put("hasAudio", this.hasAudio); + if (hasAudio) + json.put("audioActive", this.audioActive); + json.put("hasVideo", this.hasVideo); + if (hasVideo) { + json.put("videoActive", this.videoActive); + json.put("typeOfVideo", this.typeOfVideo); + json.put("frameRate", this.frameRate); + json.put("videoDimensions", this.videoDimensions); + } + return json; + } } diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoParticipant.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoParticipant.java index dfccf2d5..4332a2d0 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoParticipant.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoParticipant.java @@ -193,20 +193,21 @@ public class KurentoParticipant extends Participant { return session; } - public boolean isSubscribed() { + public Set getAllConnectedSubscribedEndpoints() { + Set subscribedToSet = new HashSet<>(); for (SubscriberEndpoint se : subscribers.values()) { if (se.isConnectedToPublisher()) { - return true; + subscribedToSet.add(se); } } - return false; + return subscribedToSet; } - public Set getConnectedSubscribedEndpoints() { - Set subscribedToSet = new HashSet(); + public Set getConnectedSubscribedEndpoints(PublisherEndpoint publisher) { + Set subscribedToSet = new HashSet<>(); for (SubscriberEndpoint se : subscribers.values()) { - if (se.isConnectedToPublisher()) { - subscribedToSet.add(se.getEndpointName()); + if (se.isConnectedToPublisher() && se.getPublisher().equals(publisher)) { + subscribedToSet.add(se); } } return subscribedToSet; @@ -431,7 +432,6 @@ public class KurentoParticipant extends Participant { */ public SubscriberEndpoint getNewOrExistingSubscriber(String senderPublicId) { - KurentoParticipant kSender = (KurentoParticipant) this.session.getParticipantByPublicId(senderPublicId); SubscriberEndpoint sendingEndpoint = new SubscriberEndpoint(webParticipant, this, senderPublicId, pipeline); SubscriberEndpoint existingSendingEndpoint = this.subscribers.putIfAbsent(senderPublicId, sendingEndpoint); @@ -444,8 +444,6 @@ public class KurentoParticipant extends Participant { senderPublicId); } - sendingEndpoint.setMediaOptions(kSender.getPublisherMediaOptions()); - return sendingEndpoint; } @@ -645,7 +643,8 @@ public class KurentoParticipant extends Participant { + " | MEDIATYPE: " + event.getMediaType() + " | TIMESTAMP: " + System.currentTimeMillis(); endpoint.flowInMedia.put(event.getSource().getName(), event.getMediaType()); - if (endpoint.getMediaOptions().hasAudio() && endpoint.getMediaOptions().hasVideo() + if (endpoint.getPublisher().getMediaOptions().hasAudio() + && endpoint.getPublisher().getMediaOptions().hasVideo() && endpoint.flowInMedia.values().size() == 2) { endpoint.kmsEvents.add(new KmsEvent(event)); } else if (endpoint.flowInMedia.values().size() == 1) { @@ -662,7 +661,8 @@ public class KurentoParticipant extends Participant { + " | MEDIATYPE: " + event.getMediaType() + " | TIMESTAMP: " + System.currentTimeMillis(); endpoint.flowOutMedia.put(event.getSource().getName(), event.getMediaType()); - if (endpoint.getMediaOptions().hasAudio() && endpoint.getMediaOptions().hasVideo() + if (endpoint.getPublisher().getMediaOptions().hasAudio() + && endpoint.getPublisher().getMediaOptions().hasVideo() && endpoint.flowOutMedia.values().size() == 2) { endpoint.kmsEvents.add(new KmsEvent(event)); } else if (endpoint.flowOutMedia.values().size() == 1) { diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSession.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSession.java index dd8c9cf0..ff14b98e 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSession.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSession.java @@ -24,6 +24,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import org.json.simple.JSONArray; import org.json.simple.JSONObject; @@ -43,6 +44,8 @@ import io.openvidu.java.client.SessionProperties; import io.openvidu.server.cdr.CallDetailRecord; import io.openvidu.server.core.Participant; import io.openvidu.server.core.Session; +import io.openvidu.server.kurento.endpoint.PublisherEndpoint; +import io.openvidu.server.kurento.endpoint.SubscriberEndpoint; /** * @author Pablo Fuente (pablofuenteperez@gmail.com) @@ -71,11 +74,11 @@ public class KurentoSession implements Session { private Object pipelineReleaseLock = new Object(); private volatile boolean pipelineReleased = false; private boolean destroyKurentoClient; - + private CallDetailRecord CDR; - public KurentoSession(String sessionId, SessionProperties sessionProperties, KurentoClient kurentoClient, KurentoSessionEventsHandler kurentoSessionHandler, - boolean destroyKurentoClient, CallDetailRecord CDR) { + public KurentoSession(String sessionId, SessionProperties sessionProperties, KurentoClient kurentoClient, + KurentoSessionEventsHandler kurentoSessionHandler, boolean destroyKurentoClient, CallDetailRecord CDR) { this.sessionId = sessionId; this.sessionProperties = sessionProperties; this.kurentoClient = kurentoClient; @@ -100,7 +103,8 @@ public class KurentoSession implements Session { checkClosed(); createPipeline(); - KurentoParticipant kurentoParticipant = new KurentoParticipant(participant, this, getPipeline(), kurentoSessionHandler.getInfoHandler(), this.CDR); + KurentoParticipant kurentoParticipant = new KurentoParticipant(participant, this, getPipeline(), + kurentoSessionHandler.getInfoHandler(), this.CDR); participants.put(participant.getParticipantPrivateId(), kurentoParticipant); filterStates.forEach((filterId, state) -> { @@ -139,14 +143,14 @@ public class KurentoSession implements Session { continue; } subscriber.cancelReceivingMedia(participant.getParticipantPublicId(), reason); - + } - log.debug("SESSION {}: Unsubscribed other participants {} from the publisher {}", sessionId, participants.values(), - participant.getParticipantPublicId()); + log.debug("SESSION {}: Unsubscribed other participants {} from the publisher {}", sessionId, + participants.values(), participant.getParticipantPublicId()); } - + @Override public void leave(String participantPrivateId, String reason) throws OpenViduException { @@ -154,8 +158,8 @@ public class KurentoSession implements Session { KurentoParticipant participant = participants.get(participantPrivateId); if (participant == null) { - throw new OpenViduException(Code.USER_NOT_FOUND_ERROR_CODE, - "Participant with private id " + participantPrivateId + " not found in session '" + sessionId + "'"); + throw new OpenViduException(Code.USER_NOT_FOUND_ERROR_CODE, "Participant with private id " + + participantPrivateId + " not found in session '" + sessionId + "'"); } participant.releaseAllFilters(); @@ -170,19 +174,19 @@ public class KurentoSession implements Session { CDR.recordParticipantLeft(participant, participant.getSession().getSessionId(), reason); } } - + @Override public Set getParticipants() { checkClosed(); return new HashSet(this.participants.values()); } - + @Override public Participant getParticipantByPrivateId(String participantPrivateId) { checkClosed(); return participants.get(participantPrivateId); } - + @Override public Participant getParticipantByPublicId(String participantPublicId) { checkClosed(); @@ -193,7 +197,12 @@ public class KurentoSession implements Session { } return null; } - + + public Set getAllSubscribersForPublisher(PublisherEndpoint publisher) { + return this.participants.values().stream().flatMap(kp -> kp.getConnectedSubscribedEndpoints(publisher).stream()) + .collect(Collectors.toSet()); + } + @Override public boolean close(String reason) { if (!closed) { @@ -246,12 +255,13 @@ public class KurentoSession implements Session { participants.remove(participant.getParticipantPrivateId()); - log.debug("SESSION {}: Cancel receiving media from participant '{}' for other participant", this.sessionId, participant.getParticipantPublicId()); + log.debug("SESSION {}: Cancel receiving media from participant '{}' for other participant", this.sessionId, + participant.getParticipantPublicId()); for (KurentoParticipant other : participants.values()) { other.cancelReceivingMedia(participant.getParticipantPublicId(), reason); } } - + @Override public int getActivePublishers() { return activePublishers.get(); @@ -348,7 +358,7 @@ public class KurentoSession implements Session { kurentoSessionHandler.updateFilter(this.sessionId, participant, filterId, newState); } } - + @Override @SuppressWarnings("unchecked") public JSONObject toJSON() { @@ -357,14 +367,20 @@ public class KurentoSession implements Session { json.put("mediaMode", this.sessionProperties.mediaMode().name()); json.put("recordingMode", this.sessionProperties.recordingMode().name()); json.put("defaultRecordingLayout", this.sessionProperties.defaultRecordingLayout().name()); - if (this.sessionProperties.defaultCustomLayout() != null && !this.sessionProperties.defaultCustomLayout().isEmpty()) { + if (this.sessionProperties.defaultCustomLayout() != null + && !this.sessionProperties.defaultCustomLayout().isEmpty()) { json.put("defaultCustomLayout", this.sessionProperties.defaultCustomLayout()); } JSONArray participants = new JSONArray(); this.participants.values().forEach(p -> { participants.add(p.toJSON()); }); - json.put("connections", participants); + + JSONObject connections = new JSONObject(); + connections.put("count", participants.size()); + connections.put("items", participants); + json.put("connections", connections); + return json; } @@ -376,14 +392,20 @@ public class KurentoSession implements Session { json.put("mediaMode", this.sessionProperties.mediaMode().name()); json.put("recordingMode", this.sessionProperties.recordingMode().name()); json.put("defaultRecordingLayout", this.sessionProperties.defaultRecordingLayout().name()); - if (this.sessionProperties.defaultCustomLayout() != null && !this.sessionProperties.defaultCustomLayout().isEmpty()) { + if (this.sessionProperties.defaultCustomLayout() != null + && !this.sessionProperties.defaultCustomLayout().isEmpty()) { json.put("defaultCustomLayout", this.sessionProperties.defaultCustomLayout()); } JSONArray participants = new JSONArray(); this.participants.values().forEach(p -> { participants.add(p.withStatsToJSON()); }); - json.put("connections", participants); + + JSONObject connections = new JSONObject(); + connections.put("count", participants.size()); + connections.put("items", participants); + json.put("connections", connections); + return json; } diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/MediaEndpoint.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/MediaEndpoint.java index 62878085..471115a2 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/MediaEndpoint.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/MediaEndpoint.java @@ -18,6 +18,7 @@ package io.openvidu.server.kurento.endpoint; import java.util.LinkedList; +import java.util.List; import java.util.Map; import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; @@ -43,7 +44,6 @@ import org.slf4j.LoggerFactory; import io.openvidu.client.OpenViduException; import io.openvidu.client.OpenViduException.Code; -import io.openvidu.server.core.MediaOptions; import io.openvidu.server.core.Participant; import io.openvidu.server.kurento.MutedMediaType; import io.openvidu.server.kurento.core.KurentoParticipant; @@ -70,11 +70,11 @@ public abstract class MediaEndpoint { private MediaPipeline pipeline = null; private ListenerSubscription endpointSubscription = null; + private final List receivedCandidateList = new LinkedList(); private LinkedList candidates = new LinkedList(); private MutedMediaType muteType; - private MediaOptions mediaOptions; public Map flowInMedia = new ConcurrentHashMap<>(); public Map flowOutMedia = new ConcurrentHashMap<>(); @@ -104,14 +104,6 @@ public abstract class MediaEndpoint { this.setMediaPipeline(pipeline); } - public MediaOptions getMediaOptions() { - return mediaOptions; - } - - public void setMediaOptions(MediaOptions mediaOptions) { - this.mediaOptions = mediaOptions; - } - public boolean isWeb() { return web; } @@ -490,6 +482,7 @@ public abstract class MediaEndpoint { throw new OpenViduException(Code.MEDIA_WEBRTC_ENDPOINT_ERROR_CODE, "Can't add existing ICE candidates to null WebRtcEndpoint (ep: " + endpointName + ")"); } + this.receivedCandidateList.add(candidate); this.webEndpoint.addIceCandidate(candidate, new Continuation() { @Override public void onSuccess(Void result) throws Exception { @@ -502,30 +495,30 @@ public abstract class MediaEndpoint { } }); } - - @SuppressWarnings("unchecked") + + public abstract PublisherEndpoint getPublisher(); + public JSONObject toJSON() { JSONObject json = new JSONObject(); - json.put("mediaOptions", this.mediaOptions); return json; } @SuppressWarnings("unchecked") public JSONObject withStatsToJSON() { JSONObject json = new JSONObject(); - json.put("mediaOptions", this.mediaOptions); json.put("webrtcTagName", this.getEndpoint().getTag("name")); + json.put("receivedCandidates", this.receivedCandidateList); json.put("localCandidate", this.selectedLocalIceCandidate); json.put("remoteCandidate", this.selectedRemoteIceCandidate); - + JSONArray jsonArray = new JSONArray(); - + for (KmsEvent event : this.kmsEvents) { JSONObject jsonKmsEvent = new JSONObject(); jsonKmsEvent.put(event.event.getType(), event.timestamp); jsonArray.add(jsonKmsEvent); } - + json.put("events", jsonArray); return json; } diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/PublisherEndpoint.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/PublisherEndpoint.java index 32e7de08..31adeb76 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/PublisherEndpoint.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/PublisherEndpoint.java @@ -23,6 +23,7 @@ import java.util.LinkedList; import java.util.Map; import java.util.concurrent.CountDownLatch; +import org.json.simple.JSONObject; import org.kurento.client.Continuation; import org.kurento.client.ListenerSubscription; import org.kurento.client.MediaElement; @@ -35,6 +36,7 @@ import org.slf4j.LoggerFactory; import io.openvidu.client.OpenViduException; import io.openvidu.client.OpenViduException.Code; +import io.openvidu.server.core.MediaOptions; import io.openvidu.server.kurento.MutedMediaType; import io.openvidu.server.kurento.core.KurentoParticipant; @@ -46,6 +48,8 @@ import io.openvidu.server.kurento.core.KurentoParticipant; public class PublisherEndpoint extends MediaEndpoint { private final static Logger log = LoggerFactory.getLogger(PublisherEndpoint.class); + protected MediaOptions mediaOptions; + private PassThrough passThru = null; private ListenerSubscription passThruSubscription = null; @@ -462,4 +466,37 @@ public class PublisherEndpoint extends MediaEndpoint { }); } } + + @Override + public PublisherEndpoint getPublisher() { + return this; + } + + public MediaOptions getMediaOptions() { + return mediaOptions; + } + + public void setMediaOptions(MediaOptions mediaOptions) { + this.mediaOptions = mediaOptions; + } + + @SuppressWarnings("unchecked") + @Override + public JSONObject toJSON() { + JSONObject json = super.toJSON(); + json.put("streamId", this.getEndpoint().getTag("name")); + json.put("mediaOptions", this.mediaOptions.toJSON()); + return json; + } + + @SuppressWarnings("unchecked") + @Override + public JSONObject withStatsToJSON() { + JSONObject json = super.withStatsToJSON(); + JSONObject toJSON = this.toJSON(); + for (Object key : toJSON.keySet()) { + json.put(key, toJSON.get(key)); + } + return json; + } } diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/SubscriberEndpoint.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/SubscriberEndpoint.java index 26732cf8..f0c82205 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/SubscriberEndpoint.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/SubscriberEndpoint.java @@ -17,6 +17,7 @@ package io.openvidu.server.kurento.endpoint; +import org.json.simple.JSONObject; import org.kurento.client.MediaPipeline; import org.kurento.client.MediaType; import org.slf4j.Logger; @@ -32,65 +33,85 @@ import io.openvidu.server.kurento.core.KurentoParticipant; * @author Radu Tom Vlad */ public class SubscriberEndpoint extends MediaEndpoint { - private final static Logger log = LoggerFactory.getLogger(SubscriberEndpoint.class); + private final static Logger log = LoggerFactory.getLogger(SubscriberEndpoint.class); - private boolean connectedToPublisher = false; + private boolean connectedToPublisher = false; - private PublisherEndpoint publisher = null; + private PublisherEndpoint publisher = null; - public SubscriberEndpoint(boolean web, KurentoParticipant owner, String endpointName, - MediaPipeline pipeline) { - super(web, owner, endpointName, pipeline, log); - } + public SubscriberEndpoint(boolean web, KurentoParticipant owner, String endpointName, MediaPipeline pipeline) { + super(web, owner, endpointName, pipeline, log); + } - public synchronized String subscribe(String sdpOffer, PublisherEndpoint publisher) { - registerOnIceCandidateEventListener(); - String sdpAnswer = processOffer(sdpOffer); - gatherCandidates(); - publisher.connect(this.getEndpoint()); - setConnectedToPublisher(true); - setPublisher(publisher); - return sdpAnswer; - } + public synchronized String subscribe(String sdpOffer, PublisherEndpoint publisher) { + registerOnIceCandidateEventListener(); + String sdpAnswer = processOffer(sdpOffer); + gatherCandidates(); + publisher.connect(this.getEndpoint()); + setConnectedToPublisher(true); + setPublisher(publisher); + return sdpAnswer; + } - public boolean isConnectedToPublisher() { - return connectedToPublisher; - } + public boolean isConnectedToPublisher() { + return connectedToPublisher; + } - public void setConnectedToPublisher(boolean connectedToPublisher) { - this.connectedToPublisher = connectedToPublisher; - } + public void setConnectedToPublisher(boolean connectedToPublisher) { + this.connectedToPublisher = connectedToPublisher; + } - public PublisherEndpoint getPublisher() { - return publisher; - } + @Override + public PublisherEndpoint getPublisher() { + return this.publisher; + } - public void setPublisher(PublisherEndpoint publisher) { - this.publisher = publisher; - } + public void setPublisher(PublisherEndpoint publisher) { + this.publisher = publisher; + } - @Override - public synchronized void mute(io.openvidu.server.kurento.MutedMediaType muteType) { - if (this.publisher == null) { - throw new OpenViduException(Code.MEDIA_MUTE_ERROR_CODE, "Publisher endpoint not found"); - } - switch (muteType) { - case ALL : - this.publisher.disconnectFrom(this.getEndpoint()); - break; - case AUDIO : - this.publisher.disconnectFrom(this.getEndpoint(), MediaType.AUDIO); - break; - case VIDEO : - this.publisher.disconnectFrom(this.getEndpoint(), MediaType.VIDEO); - break; - } - resolveCurrentMuteType(muteType); - } + @Override + public synchronized void mute(io.openvidu.server.kurento.MutedMediaType muteType) { + if (this.publisher == null) { + throw new OpenViduException(Code.MEDIA_MUTE_ERROR_CODE, "Publisher endpoint not found"); + } + switch (muteType) { + case ALL: + this.publisher.disconnectFrom(this.getEndpoint()); + break; + case AUDIO: + this.publisher.disconnectFrom(this.getEndpoint(), MediaType.AUDIO); + break; + case VIDEO: + this.publisher.disconnectFrom(this.getEndpoint(), MediaType.VIDEO); + break; + } + resolveCurrentMuteType(muteType); + } - @Override - public synchronized void unmute() { - this.publisher.connect(this.getEndpoint()); - setMuteType(null); - } + @Override + public synchronized void unmute() { + this.publisher.connect(this.getEndpoint()); + setMuteType(null); + } + + @SuppressWarnings("unchecked") + @Override + public JSONObject toJSON() { + JSONObject json = super.toJSON(); + json.put("streamId", this.publisher.getEndpoint().getTag("name")); + json.put("publisher", this.publisher.getEndpointName()); + return json; + } + + @SuppressWarnings("unchecked") + @Override + public JSONObject withStatsToJSON() { + JSONObject json = super.withStatsToJSON(); + JSONObject toJSON = this.toJSON(); + for (Object key : toJSON.keySet()) { + json.put(key, toJSON.get(key)); + } + return json; + } }