diff --git a/openvidu-server/src/main/java/io/openvidu/server/core/SessionEventsHandler.java b/openvidu-server/src/main/java/io/openvidu/server/core/SessionEventsHandler.java index ecf851f8..fec607f3 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/core/SessionEventsHandler.java +++ b/openvidu-server/src/main/java/io/openvidu/server/core/SessionEventsHandler.java @@ -507,6 +507,22 @@ public class SessionEventsHandler { } } + public void onFilterEventDispatched(String connectionId, String streamId, String filterType, String eventType, Object data, + Set participants, Set subscribedParticipants) { + JsonObject params = new JsonObject(); + params.addProperty(ProtocolElements.ADDFILTEREVENTLISTENER_CONNECTIONID_PARAM, connectionId); + params.addProperty(ProtocolElements.ADDFILTEREVENTLISTENER_STREAMID_PARAM, streamId); + params.addProperty(ProtocolElements.ADDFILTEREVENTLISTENER_FILTERTYPE_PARAM, filterType); + params.addProperty(ProtocolElements.ADDFILTEREVENTLISTENER_EVENTTYPE_PARAM, eventType); + params.addProperty(ProtocolElements.ADDFILTEREVENTLISTENER_DATA_PARAM, data.toString()); + for (Participant p : participants) { + if (subscribedParticipants.contains(p.getParticipantPublicId())) { + rpcNotificationService.sendNotification(p.getParticipantPrivateId(), + ProtocolElements.FILTEREVENTDISPATCHED_METHOD, params); + } + } + } + public void closeRpcSession(String participantPrivateId) { this.rpcNotificationService.closeRpcSession(participantPrivateId); } diff --git a/openvidu-server/src/main/java/io/openvidu/server/core/SessionManager.java b/openvidu-server/src/main/java/io/openvidu/server/core/SessionManager.java index c4173739..e7a32a0e 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/core/SessionManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/core/SessionManager.java @@ -104,12 +104,18 @@ public abstract class SessionManager { public abstract void applyFilter(Session session, String streamId, String filterType, JsonObject filterOptions, Participant moderator, Integer transactionId, String reason); - public abstract void execFilterMethod(Session session, String streamId, String filterMethod, JsonObject filterParams, - Participant moderator, Integer transactionId, String reason); + public abstract void execFilterMethod(Session session, String streamId, String filterMethod, + JsonObject filterParams, Participant moderator, Integer transactionId, String reason); public abstract void removeFilter(Session session, String streamId, Participant moderator, Integer transactionId, String reason); + public abstract void addFilterEventListener(Session session, Participant subscriber, String streamId, + String eventType); + + public abstract void removeFilterEventListener(Session session, Participant subscriber, String streamId, + String eventType); + /** * Returns a Session given its id * diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSessionManager.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSessionManager.java index 2d4912ff..9754ffa7 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSessionManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSessionManager.java @@ -25,6 +25,7 @@ import java.util.Set; import org.kurento.client.GenericMediaElement; import org.kurento.client.IceCandidate; import org.kurento.client.KurentoClient; +import org.kurento.client.ListenerSubscription; import org.kurento.jsonrpc.Props; import org.kurento.jsonrpc.message.Request; import org.slf4j.Logger; @@ -52,6 +53,7 @@ import io.openvidu.server.kurento.KurentoClientProvider; import io.openvidu.server.kurento.KurentoClientSessionInfo; import io.openvidu.server.kurento.KurentoFilter; import io.openvidu.server.kurento.OpenViduKurentoClientSessionInfo; +import io.openvidu.server.kurento.endpoint.PublisherEndpoint; import io.openvidu.server.kurento.endpoint.SdpType; import io.openvidu.server.rpc.RpcHandler; import io.openvidu.server.utils.JsonUtils; @@ -710,6 +712,83 @@ public class KurentoSessionManager extends SessionManager { } } + @Override + public void addFilterEventListener(Session session, Participant subscriber, String streamId, String eventType) + throws OpenViduException { + String participantPrivateId = ((KurentoSession) session).getParticipantPrivateIdFromStreamId(streamId); + if (participantPrivateId != null) { + Participant participant = this.getParticipant(participantPrivateId); + log.debug("Request [ADD_FILTER_LISTENER] over stream [{}]", streamId); + KurentoParticipant kParticipant = (KurentoParticipant) participant; + if (!participant.isStreaming()) { + log.warn( + "PARTICIPANT {}: Requesting to addFilterEventListener to stream {} " + + "in session {} but user is not streaming media", + subscriber.getParticipantPublicId(), streamId, session.getSessionId()); + throw new OpenViduException(Code.USER_NOT_STREAMING_ERROR_CODE, + "User '" + participant.getParticipantPublicId() + " not streaming media in session '" + + session.getSessionId() + "'"); + } else if (kParticipant.getPublisher().getFilter() == null) { + log.warn( + "PARTICIPANT {}: Requesting to addFilterEventListener to user {} " + + "in session {} but user does NOT have a filter", + subscriber.getParticipantPublicId(), participant.getParticipantPublicId(), + session.getSessionId()); + throw new OpenViduException(Code.FILTER_NOT_APPLIED_ERROR_CODE, + "User '" + participant.getParticipantPublicId() + " has no filter applied in session '" + + session.getSessionId() + "'"); + } else { + try { + this.addFilterEventListenerInPublisher(kParticipant, eventType); + kParticipant.getPublisher().addParticipantAsListenerOfFilterEvent(eventType, + participant.getParticipantPublicId()); + } catch (OpenViduException e) { + throw e; + } + } + } + } + + @Override + public void removeFilterEventListener(Session session, Participant subscriber, String streamId, String eventType) + throws OpenViduException { + String participantPrivateId = ((KurentoSession) session).getParticipantPrivateIdFromStreamId(streamId); + if (participantPrivateId != null) { + Participant participant = this.getParticipant(participantPrivateId); + log.debug("Request [REMOVE_FILTER_LISTENER] over stream [{}]", streamId); + KurentoParticipant kParticipant = (KurentoParticipant) participant; + if (!participant.isStreaming()) { + log.warn( + "PARTICIPANT {}: Requesting to removeFilterEventListener to stream {} " + + "in session {} but user is not streaming media", + subscriber.getParticipantPublicId(), streamId, session.getSessionId()); + throw new OpenViduException(Code.USER_NOT_STREAMING_ERROR_CODE, + "User '" + participant.getParticipantPublicId() + " not streaming media in session '" + + session.getSessionId() + "'"); + } else if (kParticipant.getPublisher().getFilter() == null) { + log.warn( + "PARTICIPANT {}: Requesting to removeFilterEventListener to user {} " + + "in session {} but user does NOT have a filter", + subscriber.getParticipantPublicId(), participant.getParticipantPublicId(), + session.getSessionId()); + throw new OpenViduException(Code.FILTER_NOT_APPLIED_ERROR_CODE, + "User '" + participant.getParticipantPublicId() + " has no filter applied in session '" + + session.getSessionId() + "'"); + } else { + try { + PublisherEndpoint pub = kParticipant.getPublisher(); + if (pub.removeParticipantAsListenerOfFilterEvent(eventType, participant.getParticipantPublicId())) { + // If there are no more participants listening to the event remove the vent from + // the GenericMediaElement + this.removeFilterEventListenerInPublisher(kParticipant, eventType); + } + } catch (OpenViduException e) { + throw e; + } + } + } + } + private void applyFilterInPublisher(KurentoParticipant kParticipant, KurentoFilter filter) throws OpenViduException { GenericMediaElement.Builder builder = new GenericMediaElement.Builder(kParticipant.getPipeline(), @@ -736,13 +815,35 @@ public class KurentoSessionManager extends SessionManager { kParticipant.getPublisher().getMediaOptions().setFilter(null); } - /* - * private void addFilterEventListenerInPublisher(KurentoParticipant - * kParticipant) { this.listener = - * kParticipant.getPublisher().getFilter().addEventListener("CodeFound", event - * -> { System.out.println(event.getData()); }); } private void - * removeFilterEventListenerInPublisher(KurentoParticipant kParticipant) { - * kParticipant.getPublisher().getFilter().removeEventListener(this.listener); } - */ + private void addFilterEventListenerInPublisher(KurentoParticipant kParticipant, String eventType) + throws OpenViduException { + PublisherEndpoint pub = kParticipant.getPublisher(); + if (!pub.isListenerAddedToFilterEvent(eventType)) { + final String connectionId = kParticipant.getParticipantPublicId(); + final String streamId = kParticipant.getPublisherStreamId(); + final String filterType = kParticipant.getPublisherMediaOptions().getFilter().getType(); + try { + ListenerSubscription listener = pub.getFilter().addEventListener(eventType, event -> { + sessionEventsHandler.onFilterEventDispatched(connectionId, streamId, filterType, event.getType(), + event.getData(), kParticipant.getSession().getParticipants(), + kParticipant.getPublisher().getPartipantsListentingToFilterEvent(eventType)); + }); + pub.storeListener(eventType, listener); + } catch (Exception e) { + log.error("Request to addFilterEventListener to stream {} gone wrong. Error: {}", streamId, + e.getMessage()); + throw new OpenViduException(Code.FILTER_EVENT_LISTENER_NOT_FOUND, + "Request to addFilterEventListener to stream " + streamId + " gone wrong: " + e.getMessage()); + } + } + } + + private void removeFilterEventListenerInPublisher(KurentoParticipant kParticipant, String eventType) { + PublisherEndpoint pub = kParticipant.getPublisher(); + if (pub.isListenerAddedToFilterEvent(eventType)) { + GenericMediaElement filter = kParticipant.getPublisher().getFilter(); + filter.removeEventListener(pub.removeListener(eventType)); + } + } } diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/PublisherEndpoint.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/PublisherEndpoint.java index ee3569f3..6b45a28e 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/PublisherEndpoint.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/PublisherEndpoint.java @@ -19,9 +19,12 @@ package io.openvidu.server.kurento.endpoint; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedList; import java.util.Map; import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import org.kurento.client.Continuation; @@ -62,6 +65,9 @@ public class PublisherEndpoint extends MediaEndpoint { private ListenerSubscription passThruSubscription = null; private GenericMediaElement filter; + private Map> subscribersToFilterEvents = new ConcurrentHashMap<>(); + private Map filterListeners = new ConcurrentHashMap<>(); + private Map elements = new HashMap(); private LinkedList elementIds = new LinkedList(); private boolean connected = false; @@ -104,6 +110,46 @@ public class PublisherEndpoint extends MediaEndpoint { return this.filter; } + public boolean isListenerAddedToFilterEvent(String eventType) { + return (this.subscribersToFilterEvents.containsKey(eventType) && this.filterListeners.containsKey(eventType)); + } + + public Set getPartipantsListentingToFilterEvent(String eventType) { + return this.subscribersToFilterEvents.get(eventType); + } + + public boolean storeListener(String eventType, ListenerSubscription listener) { + return (this.filterListeners.putIfAbsent(eventType, listener) == null); + } + + public ListenerSubscription removeListener(String eventType) { + // Clean all participant subscriptions to this event + this.subscribersToFilterEvents.remove(eventType); + // Clean ListenerSubscription object for this event + return this.filterListeners.remove(eventType); + } + + public void addParticipantAsListenerOfFilterEvent(String eventType, String participantPublicId) { + this.subscribersToFilterEvents.putIfAbsent(eventType, new HashSet<>()); + this.subscribersToFilterEvents.get(eventType).add(participantPublicId); + } + + public boolean removeParticipantAsListenerOfFilterEvent(String eventType, String participantPublicId) { + if (!this.subscribersToFilterEvents.containsKey(eventType)) { + String streamId = this.getEndpoint().getTag("name"); + log.error("Request to removeFilterEventListener to stream {} gone wrong: Filter {} has no listener added", streamId, + eventType); + throw new OpenViduException(Code.FILTER_EVENT_LISTENER_NOT_FOUND, + "Request to removeFilterEventListener to stream " + streamId + " gone wrong: Filter " + eventType + + " has no listener added"); + } + this.subscribersToFilterEvents.computeIfPresent(eventType, (type, subs) -> { + subs.remove(participantPublicId); + return subs; + }); + return this.subscribersToFilterEvents.get(eventType).isEmpty(); + } + /** * Initializes this media endpoint for publishing media and processes the SDP * offer or answer. If the internal endpoint is an {@link WebRtcEndpoint}, it diff --git a/openvidu-server/src/main/java/io/openvidu/server/rpc/RpcHandler.java b/openvidu-server/src/main/java/io/openvidu/server/rpc/RpcHandler.java index b5de9f51..3346392e 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/rpc/RpcHandler.java +++ b/openvidu-server/src/main/java/io/openvidu/server/rpc/RpcHandler.java @@ -142,6 +142,12 @@ public class RpcHandler extends DefaultJsonRpcHandler { case ProtocolElements.REMOVEFILTER_METHOD: removeFilter(rpcConnection, request); break; + case ProtocolElements.ADDFILTEREVENTLISTENER_METHOD: + addFilterEventListener(rpcConnection, request); + break; + case ProtocolElements.REMOVEFILTEREVENTLISTENER_METHOD: + removeFilterEventListener(rpcConnection, request); + break; /* * case ProtocolElements.FORCEAPPLYFILTER_METHOD: * forceApplyFilter(rpcConnection, request); break; case @@ -419,6 +425,46 @@ public class RpcHandler extends DefaultJsonRpcHandler { request.getId(), "removeFilter"); } + private void addFilterEventListener(RpcConnection rpcConnection, Request request) { + Participant participant; + try { + participant = sanityCheckOfSession(rpcConnection, "addFilterEventListener"); + } catch (OpenViduException e) { + return; + } + String streamId = getStringParam(request, ProtocolElements.FILTER_STREAMID_PARAM); + String eventType = getStringParam(request, ProtocolElements.FILTER_TYPE_PARAM); + try { + sessionManager.addFilterEventListener(sessionManager.getSession(rpcConnection.getSessionId()), participant, + streamId, eventType); + this.notificationService.sendResponse(participant.getParticipantPrivateId(), request.getId(), + new JsonObject()); + } catch (OpenViduException e) { + this.notificationService.sendErrorResponse(participant.getParticipantPrivateId(), request.getId(), + new JsonObject(), e); + } + } + + private void removeFilterEventListener(RpcConnection rpcConnection, Request request) { + Participant participant; + try { + participant = sanityCheckOfSession(rpcConnection, "removeFilterEventListener"); + } catch (OpenViduException e) { + return; + } + String streamId = getStringParam(request, ProtocolElements.FILTER_STREAMID_PARAM); + String eventType = getStringParam(request, ProtocolElements.FILTER_TYPE_PARAM); + try { + sessionManager.removeFilterEventListener(sessionManager.getSession(rpcConnection.getSessionId()), + participant, streamId, eventType); + this.notificationService.sendResponse(participant.getParticipantPrivateId(), request.getId(), + new JsonObject()); + } catch (OpenViduException e) { + this.notificationService.sendErrorResponse(participant.getParticipantPrivateId(), request.getId(), + new JsonObject(), e); + } + } + public void leaveRoomAfterConnClosed(String participantPrivateId, String reason) { try { sessionManager.evictParticipant(this.sessionManager.getParticipant(participantPrivateId), null, null,