diff --git a/openvidu-client/src/main/java/io/openvidu/client/internal/ProtocolElements.java b/openvidu-client/src/main/java/io/openvidu/client/internal/ProtocolElements.java index de7b0f2d..83b19aa7 100644 --- a/openvidu-client/src/main/java/io/openvidu/client/internal/ProtocolElements.java +++ b/openvidu-client/src/main/java/io/openvidu/client/internal/ProtocolElements.java @@ -112,11 +112,11 @@ public class ProtocolElements { public static final String REMOVEFILTEREVENTLISTENER_METHOD = "removeFilterEventListener"; public static final String FILTEREVENTDISPATCHED_METHOD = "filterEventDispatched"; - public static final String ADDFILTEREVENTLISTENER_CONNECTIONID_PARAM = "connectionId"; - public static final String ADDFILTEREVENTLISTENER_STREAMID_PARAM = "streamId"; - public static final String ADDFILTEREVENTLISTENER_FILTERTYPE_PARAM = "filterType"; - public static final String ADDFILTEREVENTLISTENER_EVENTTYPE_PARAM = "eventType"; - public static final String ADDFILTEREVENTLISTENER_DATA_PARAM = "data"; + public static final String FILTEREVENTLISTENER_CONNECTIONID_PARAM = "connectionId"; + public static final String FILTEREVENTLISTENER_STREAMID_PARAM = "streamId"; + public static final String FILTEREVENTLISTENER_FILTERTYPE_PARAM = "filterType"; + public static final String FILTEREVENTLISTENER_EVENTTYPE_PARAM = "eventType"; + public static final String FILTEREVENTLISTENER_DATA_PARAM = "data"; // ---------------------------- SERVER RESPONSES & EVENTS ----------------- diff --git a/openvidu-server/src/main/java/io/openvidu/server/core/SessionEventsHandler.java b/openvidu-server/src/main/java/io/openvidu/server/core/SessionEventsHandler.java index fec607f3..41d42fda 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/core/SessionEventsHandler.java +++ b/openvidu-server/src/main/java/io/openvidu/server/core/SessionEventsHandler.java @@ -454,15 +454,15 @@ public class SessionEventsHandler { Set participants, String streamId, KurentoFilter filter, OpenViduException error, String reason) { boolean isRpcFromModerator = transactionId != null && moderator != null; - boolean isRpcFromOwner = transactionId != null && moderator == null; if (isRpcFromModerator) { + // A moderator forced the application of the filter if (error != null) { rpcNotificationService.sendErrorResponse(moderator.getParticipantPrivateId(), transactionId, null, error); return; } - rpcNotificationService.sendResponse(participant.getParticipantPrivateId(), transactionId, new JsonObject()); + rpcNotificationService.sendResponse(moderator.getParticipantPrivateId(), transactionId, new JsonObject()); } JsonObject params = new JsonObject(); @@ -484,11 +484,13 @@ public class SessionEventsHandler { for (Participant p : participants) { if (p.getParticipantPrivateId().equals(participant.getParticipantPrivateId())) { - // Send response to the affected participant - if (!isRpcFromOwner) { + // Affected participant + if (isRpcFromModerator) { + // Force by moderator. Send notification to affected participant rpcNotificationService.sendNotification(p.getParticipantPrivateId(), ProtocolElements.STREAMPROPERTYCHANGED_METHOD, params); } else { + // Send response to participant if (error != null) { rpcNotificationService.sendErrorResponse(p.getParticipantPrivateId(), transactionId, null, error); @@ -498,8 +500,8 @@ public class SessionEventsHandler { } } else { // Send response to every other user in the session different than the affected - // participant - if (error == null) { + // participant or the moderator + if (error == null && (moderator == null || !p.getParticipantPrivateId().equals(moderator.getParticipantPrivateId()))) { rpcNotificationService.sendNotification(p.getParticipantPrivateId(), ProtocolElements.STREAMPROPERTYCHANGED_METHOD, params); } @@ -510,11 +512,11 @@ public class SessionEventsHandler { public void onFilterEventDispatched(String connectionId, String streamId, String filterType, String eventType, Object data, Set participants, Set subscribedParticipants) { JsonObject params = new JsonObject(); - params.addProperty(ProtocolElements.ADDFILTEREVENTLISTENER_CONNECTIONID_PARAM, connectionId); - params.addProperty(ProtocolElements.ADDFILTEREVENTLISTENER_STREAMID_PARAM, streamId); - params.addProperty(ProtocolElements.ADDFILTEREVENTLISTENER_FILTERTYPE_PARAM, filterType); - params.addProperty(ProtocolElements.ADDFILTEREVENTLISTENER_EVENTTYPE_PARAM, eventType); - params.addProperty(ProtocolElements.ADDFILTEREVENTLISTENER_DATA_PARAM, data.toString()); + params.addProperty(ProtocolElements.FILTEREVENTLISTENER_CONNECTIONID_PARAM, connectionId); + params.addProperty(ProtocolElements.FILTEREVENTLISTENER_STREAMID_PARAM, streamId); + params.addProperty(ProtocolElements.FILTEREVENTLISTENER_FILTERTYPE_PARAM, filterType); + params.addProperty(ProtocolElements.FILTEREVENTLISTENER_EVENTTYPE_PARAM, eventType); + params.addProperty(ProtocolElements.FILTEREVENTLISTENER_DATA_PARAM, data.toString()); for (Participant p : participants) { if (subscribedParticipants.contains(p.getParticipantPublicId())) { rpcNotificationService.sendNotification(p.getParticipantPrivateId(), diff --git a/openvidu-server/src/main/java/io/openvidu/server/core/SessionManager.java b/openvidu-server/src/main/java/io/openvidu/server/core/SessionManager.java index e7a32a0e..4095bea0 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/core/SessionManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/core/SessionManager.java @@ -115,6 +115,8 @@ public abstract class SessionManager { public abstract void removeFilterEventListener(Session session, Participant subscriber, String streamId, String eventType); + + public abstract String getParticipantPrivateIdFromStreamId(String sessionId, String streamId) throws OpenViduException; /** * Returns a Session given its id diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSessionManager.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSessionManager.java index 9754ffa7..b95f678d 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSessionManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSessionManager.java @@ -589,42 +589,93 @@ public class KurentoSessionManager extends SessionManager { Participant moderator, Integer transactionId, String reason) { String participantPrivateId = ((KurentoSession) session).getParticipantPrivateIdFromStreamId(streamId); if (participantPrivateId != null) { - Participant participant = this.getParticipant(participantPrivateId); + Participant publisher = this.getParticipant(participantPrivateId); + moderator = (moderator != null + && publisher.getParticipantPublicId().equals(moderator.getParticipantPublicId())) ? null + : moderator; log.debug("Request [APPLY_FILTER] over stream [{}] for reason [{}]", streamId, reason); + KurentoParticipant kParticipantPublisher = (KurentoParticipant) publisher; + if (!publisher.isStreaming()) { + log.warn( + "PARTICIPANT {}: Requesting to applyFilter to user {} " + + "in session {} but user is not streaming media", + moderator != null ? moderator.getParticipantPublicId() : publisher.getParticipantPublicId(), + publisher.getParticipantPublicId(), session.getSessionId()); + throw new OpenViduException(Code.USER_NOT_STREAMING_ERROR_CODE, + "User '" + publisher.getParticipantPublicId() + " not streaming media in session '" + + session.getSessionId() + "'"); + } else if (kParticipantPublisher.getPublisher().getFilter() != null) { + log.warn( + "PARTICIPANT {}: Requesting to applyFilter to user {} " + + "in session {} but user already has a filter", + moderator != null ? moderator.getParticipantPublicId() : publisher.getParticipantPublicId(), + publisher.getParticipantPublicId(), session.getSessionId()); + throw new OpenViduException(Code.EXISTING_FILTER_ALREADY_APPLIED_ERROR_CODE, + "User '" + publisher.getParticipantPublicId() + " already has a filter applied in session '" + + session.getSessionId() + "'"); + } else { + try { + KurentoFilter filter = new KurentoFilter(filterType, filterOptions); + this.applyFilterInPublisher(kParticipantPublisher, filter); + Set participants = kParticipantPublisher.getSession().getParticipants(); + sessionEventsHandler.onFilterChanged(publisher, moderator, transactionId, participants, streamId, + filter, null, reason); + } catch (OpenViduException e) { + log.warn("PARTICIPANT {}: Error applying filter", publisher.getParticipantPublicId(), e); + sessionEventsHandler.onFilterChanged(publisher, moderator, transactionId, new HashSet<>(), streamId, + null, e, ""); + } + } + + log.info("State of filter for participant {}: {}", publisher.getParticipantPublicId(), + ((KurentoParticipant) publisher).getPublisher().filterCollectionsToString()); + + } else { + log.warn("PARTICIPANT {}: Requesting to applyFilter to stream {} " + + "in session {} but the owner cannot be found", streamId, session.getSessionId()); + throw new OpenViduException(Code.USER_NOT_FOUND_ERROR_CODE, + "Owner of stream '" + streamId + "' not found in session '" + session.getSessionId() + "'"); + } + } + + @Override + public void removeFilter(Session session, String streamId, Participant moderator, Integer transactionId, + String reason) { + String participantPrivateId = ((KurentoSession) session).getParticipantPrivateIdFromStreamId(streamId); + if (participantPrivateId != null) { + Participant participant = this.getParticipant(participantPrivateId); + log.debug("Request [REMOVE_FILTER] over stream [{}] for reason [{}]", streamId, reason); KurentoParticipant kParticipant = (KurentoParticipant) participant; if (!participant.isStreaming()) { log.warn( - "PARTICIPANT {}: Requesting to applyFilter to user {} " + "PARTICIPANT {}: Requesting to removeFilter to user {} " + "in session {} but user is not streaming media", moderator != null ? moderator.getParticipantPublicId() : participant.getParticipantPublicId(), participant.getParticipantPublicId(), session.getSessionId()); throw new OpenViduException(Code.USER_NOT_STREAMING_ERROR_CODE, "User '" + participant.getParticipantPublicId() + " not streaming media in session '" + session.getSessionId() + "'"); - } else if (kParticipant.getPublisher().getFilter() != null) { + } else if (kParticipant.getPublisher().getFilter() == null) { log.warn( - "PARTICIPANT {}: Requesting to applyFilter to user {} " - + "in session {} but user already has a filter", + "PARTICIPANT {}: Requesting to removeFilter to user {} " + + "in session {} but user does NOT have a filter", moderator != null ? moderator.getParticipantPublicId() : participant.getParticipantPublicId(), participant.getParticipantPublicId(), session.getSessionId()); - throw new OpenViduException(Code.EXISTING_FILTER_ALREADY_APPLIED_ERROR_CODE, - "User '" + participant.getParticipantPublicId() + " already has a filter applied in session '" + throw new OpenViduException(Code.FILTER_NOT_APPLIED_ERROR_CODE, + "User '" + participant.getParticipantPublicId() + " has no filter applied in session '" + session.getSessionId() + "'"); } else { - try { - KurentoFilter filter = new KurentoFilter(filterType, filterOptions); - this.applyFilterInPublisher(kParticipant, filter); - Set participants = kParticipant.getSession().getParticipants(); - sessionEventsHandler.onFilterChanged(participant, moderator, transactionId, participants, streamId, - filter, null, reason); - } catch (OpenViduException e) { - log.warn("PARTICIPANT {}: Error applying filter", participant.getParticipantPublicId(), e); - sessionEventsHandler.onFilterChanged(participant, moderator, transactionId, new HashSet<>(), - streamId, null, e, ""); - } + this.removeFilterInPublisher(kParticipant); + Set participants = kParticipant.getSession().getParticipants(); + sessionEventsHandler.onFilterChanged(participant, moderator, transactionId, participants, streamId, + null, null, reason); } + + log.info("State of filter for participant {}: {}", kParticipant.getParticipantPublicId(), + kParticipant.getPublisher().filterCollectionsToString()); + } else { - log.warn("PARTICIPANT {}: Requesting to applyFilter to stream {} " + log.warn("PARTICIPANT {}: Requesting to removeFilter to stream {} " + "in session {} but the owner cannot be found", streamId, session.getSessionId()); throw new OpenViduException(Code.USER_NOT_FOUND_ERROR_CODE, "Owner of stream '" + streamId + "' not found in session '" + session.getSessionId() + "'"); @@ -673,79 +724,46 @@ public class KurentoSessionManager extends SessionManager { } @Override - public void removeFilter(Session session, String streamId, Participant moderator, Integer transactionId, - String reason) { - String participantPrivateId = ((KurentoSession) session).getParticipantPrivateIdFromStreamId(streamId); - if (participantPrivateId != null) { - Participant participant = this.getParticipant(participantPrivateId); - log.debug("Request [REMOVE_FILTER] over stream [{}] for reason [{}]", streamId, reason); - KurentoParticipant kParticipant = (KurentoParticipant) participant; - if (!participant.isStreaming()) { - log.warn( - "PARTICIPANT {}: Requesting to removeFilter to user {} " - + "in session {} but user is not streaming media", - moderator != null ? moderator.getParticipantPublicId() : participant.getParticipantPublicId(), - participant.getParticipantPublicId(), session.getSessionId()); - throw new OpenViduException(Code.USER_NOT_STREAMING_ERROR_CODE, - "User '" + participant.getParticipantPublicId() + " not streaming media in session '" - + session.getSessionId() + "'"); - } else if (kParticipant.getPublisher().getFilter() == null) { - log.warn( - "PARTICIPANT {}: Requesting to removeFilter to user {} " - + "in session {} but user does NOT have a filter", - moderator != null ? moderator.getParticipantPublicId() : participant.getParticipantPublicId(), - participant.getParticipantPublicId(), session.getSessionId()); - throw new OpenViduException(Code.FILTER_NOT_APPLIED_ERROR_CODE, - "User '" + participant.getParticipantPublicId() + " has no filter applied in session '" - + session.getSessionId() + "'"); - } else { - this.removeFilterInPublisher(kParticipant); - Set participants = kParticipant.getSession().getParticipants(); - sessionEventsHandler.onFilterChanged(participant, moderator, transactionId, participants, streamId, - null, null, reason); - } - } else { - log.warn("PARTICIPANT {}: Requesting to removeFilter to stream {} " - + "in session {} but the owner cannot be found", streamId, session.getSessionId()); - throw new OpenViduException(Code.USER_NOT_FOUND_ERROR_CODE, - "Owner of stream '" + streamId + "' not found in session '" + session.getSessionId() + "'"); - } - } - - @Override - public void addFilterEventListener(Session session, Participant subscriber, String streamId, String eventType) + public void addFilterEventListener(Session session, Participant userSubscribing, String streamId, String eventType) throws OpenViduException { - String participantPrivateId = ((KurentoSession) session).getParticipantPrivateIdFromStreamId(streamId); - if (participantPrivateId != null) { - Participant participant = this.getParticipant(participantPrivateId); + String publisherPrivateId = ((KurentoSession) session).getParticipantPrivateIdFromStreamId(streamId); + if (publisherPrivateId != null) { log.debug("Request [ADD_FILTER_LISTENER] over stream [{}]", streamId); - KurentoParticipant kParticipant = (KurentoParticipant) participant; - if (!participant.isStreaming()) { + KurentoParticipant kParticipantPublishing = (KurentoParticipant) this.getParticipant(publisherPrivateId); + KurentoParticipant kParticipantSubscribing = (KurentoParticipant) userSubscribing; + if (!kParticipantPublishing.isStreaming()) { log.warn( "PARTICIPANT {}: Requesting to addFilterEventListener to stream {} " - + "in session {} but user is not streaming media", - subscriber.getParticipantPublicId(), streamId, session.getSessionId()); + + "in session {} but the publisher is not streaming media", + userSubscribing.getParticipantPublicId(), streamId, session.getSessionId()); throw new OpenViduException(Code.USER_NOT_STREAMING_ERROR_CODE, - "User '" + participant.getParticipantPublicId() + " not streaming media in session '" + "User '" + kParticipantPublishing.getParticipantPublicId() + " not streaming media in session '" + session.getSessionId() + "'"); - } else if (kParticipant.getPublisher().getFilter() == null) { + } else if (kParticipantPublishing.getPublisher().getFilter() == null) { log.warn( "PARTICIPANT {}: Requesting to addFilterEventListener to user {} " + "in session {} but user does NOT have a filter", - subscriber.getParticipantPublicId(), participant.getParticipantPublicId(), - session.getSessionId()); + kParticipantSubscribing.getParticipantPublicId(), + kParticipantPublishing.getParticipantPublicId(), session.getSessionId()); throw new OpenViduException(Code.FILTER_NOT_APPLIED_ERROR_CODE, - "User '" + participant.getParticipantPublicId() + " has no filter applied in session '" - + session.getSessionId() + "'"); + "User '" + kParticipantPublishing.getParticipantPublicId() + + " has no filter applied in session '" + session.getSessionId() + "'"); } else { try { - this.addFilterEventListenerInPublisher(kParticipant, eventType); - kParticipant.getPublisher().addParticipantAsListenerOfFilterEvent(eventType, - participant.getParticipantPublicId()); + this.addFilterEventListenerInPublisher(kParticipantPublishing, eventType); + kParticipantPublishing.getPublisher().addParticipantAsListenerOfFilterEvent(eventType, + userSubscribing.getParticipantPublicId()); } catch (OpenViduException e) { throw e; } } + + log.info("State of filter for participant {}: {}", kParticipantPublishing.getParticipantPublicId(), + kParticipantPublishing.getPublisher().filterCollectionsToString()); + + } else { + throw new OpenViduException(Code.USER_NOT_FOUND_ERROR_CODE, + "Not user found for streamId '" + streamId + "' in session '" + session.getSessionId() + "'"); } } @@ -754,41 +772,51 @@ public class KurentoSessionManager extends SessionManager { throws OpenViduException { String participantPrivateId = ((KurentoSession) session).getParticipantPrivateIdFromStreamId(streamId); if (participantPrivateId != null) { - Participant participant = this.getParticipant(participantPrivateId); log.debug("Request [REMOVE_FILTER_LISTENER] over stream [{}]", streamId); - KurentoParticipant kParticipant = (KurentoParticipant) participant; - if (!participant.isStreaming()) { + Participant participantPublishing = this.getParticipant(participantPrivateId); + KurentoParticipant kParticipantPublishing = (KurentoParticipant) participantPublishing; + if (!participantPublishing.isStreaming()) { log.warn( "PARTICIPANT {}: Requesting to removeFilterEventListener to stream {} " + "in session {} but user is not streaming media", subscriber.getParticipantPublicId(), streamId, session.getSessionId()); throw new OpenViduException(Code.USER_NOT_STREAMING_ERROR_CODE, - "User '" + participant.getParticipantPublicId() + " not streaming media in session '" + "User '" + participantPublishing.getParticipantPublicId() + " not streaming media in session '" + session.getSessionId() + "'"); - } else if (kParticipant.getPublisher().getFilter() == null) { + } else if (kParticipantPublishing.getPublisher().getFilter() == null) { log.warn( "PARTICIPANT {}: Requesting to removeFilterEventListener to user {} " + "in session {} but user does NOT have a filter", - subscriber.getParticipantPublicId(), participant.getParticipantPublicId(), + subscriber.getParticipantPublicId(), participantPublishing.getParticipantPublicId(), session.getSessionId()); throw new OpenViduException(Code.FILTER_NOT_APPLIED_ERROR_CODE, - "User '" + participant.getParticipantPublicId() + " has no filter applied in session '" - + session.getSessionId() + "'"); + "User '" + participantPublishing.getParticipantPublicId() + + " has no filter applied in session '" + session.getSessionId() + "'"); } else { try { - PublisherEndpoint pub = kParticipant.getPublisher(); - if (pub.removeParticipantAsListenerOfFilterEvent(eventType, participant.getParticipantPublicId())) { - // If there are no more participants listening to the event remove the vent from - // the GenericMediaElement - this.removeFilterEventListenerInPublisher(kParticipant, eventType); + PublisherEndpoint pub = kParticipantPublishing.getPublisher(); + if (pub.removeParticipantAsListenerOfFilterEvent(eventType, subscriber.getParticipantPublicId())) { + // If there are no more participants listening to the event remove the event + // from the GenericMediaElement + this.removeFilterEventListenerInPublisher(kParticipantPublishing, eventType); } } catch (OpenViduException e) { throw e; } } + + log.info("State of filter for participant {}: {}", kParticipantPublishing.getParticipantPublicId(), + kParticipantPublishing.getPublisher().filterCollectionsToString()); + } } + @Override + public String getParticipantPrivateIdFromStreamId(String sessionId, String streamId) { + Session session = this.getSession(sessionId); + return ((KurentoSession) session).getParticipantPrivateIdFromStreamId(streamId); + } + private void applyFilterInPublisher(KurentoParticipant kParticipant, KurentoFilter filter) throws OpenViduException { GenericMediaElement.Builder builder = new GenericMediaElement.Builder(kParticipant.getPipeline(), @@ -801,6 +829,12 @@ public class KurentoSessionManager extends SessionManager { kParticipant.getPublisher().getMediaOptions().setFilter(filter); } + private void removeFilterInPublisher(KurentoParticipant kParticipant) { + kParticipant.getPublisher().cleanAllFilterListeners(); + kParticipant.getPublisher().revert(kParticipant.getPublisher().getFilter()); + kParticipant.getPublisher().getMediaOptions().setFilter(null); + } + private KurentoFilter execFilterMethodInPublisher(KurentoParticipant kParticipant, String method, JsonObject params) { kParticipant.getPublisher().execMethod(method, params); @@ -810,11 +844,6 @@ public class KurentoSessionManager extends SessionManager { return updatedFilter; } - private void removeFilterInPublisher(KurentoParticipant kParticipant) { - kParticipant.getPublisher().revert(kParticipant.getPublisher().getFilter()); - kParticipant.getPublisher().getMediaOptions().setFilter(null); - } - private void addFilterEventListenerInPublisher(KurentoParticipant kParticipant, String eventType) throws OpenViduException { PublisherEndpoint pub = kParticipant.getPublisher(); diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/PublisherEndpoint.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/PublisherEndpoint.java index 6b45a28e..fba20d74 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/PublisherEndpoint.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/PublisherEndpoint.java @@ -137,8 +137,8 @@ public class PublisherEndpoint extends MediaEndpoint { public boolean removeParticipantAsListenerOfFilterEvent(String eventType, String participantPublicId) { if (!this.subscribersToFilterEvents.containsKey(eventType)) { String streamId = this.getEndpoint().getTag("name"); - log.error("Request to removeFilterEventListener to stream {} gone wrong: Filter {} has no listener added", streamId, - eventType); + log.error("Request to removeFilterEventListener to stream {} gone wrong: Filter {} has no listener added", + streamId, eventType); throw new OpenViduException(Code.FILTER_EVENT_LISTENER_NOT_FOUND, "Request to removeFilterEventListener to stream " + streamId + " gone wrong: Filter " + eventType + " has no listener added"); @@ -150,6 +150,12 @@ public class PublisherEndpoint extends MediaEndpoint { return this.subscribersToFilterEvents.get(eventType).isEmpty(); } + public void cleanAllFilterListeners() { + for (String eventType : this.subscribersToFilterEvents.keySet()) { + this.removeListener(eventType); + } + } + /** * Initializes this media endpoint for publishing media and processes the SDP * offer or answer. If the internal endpoint is an {@link WebRtcEndpoint}, it @@ -572,4 +578,9 @@ public class PublisherEndpoint extends MediaEndpoint { } return json; } + + public String filterCollectionsToString() { + return "{filter: " + ((this.filter != null) ? this.filter.getName() : "null") + ", listener: " + this.filterListeners.toString() + + ", subscribers: " + this.subscribersToFilterEvents.toString() + "}"; + } } diff --git a/openvidu-server/src/main/java/io/openvidu/server/rpc/RpcHandler.java b/openvidu-server/src/main/java/io/openvidu/server/rpc/RpcHandler.java index adf1bbb7..354e9867 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/rpc/RpcHandler.java +++ b/openvidu-server/src/main/java/io/openvidu/server/rpc/RpcHandler.java @@ -148,14 +148,6 @@ public class RpcHandler extends DefaultJsonRpcHandler { case ProtocolElements.REMOVEFILTEREVENTLISTENER_METHOD: removeFilterEventListener(rpcConnection, request); break; - /* - * case ProtocolElements.FORCEAPPLYFILTER_METHOD: - * forceApplyFilter(rpcConnection, request); break; case - * ProtocolElements.FORCEEXECFILTERMETHOD_METHOD: - * forceExecFilterMethod(rpcConnection, request); break; case - * ProtocolElements.FORCEREMOVEFILTER_METHOD: forceRemoveFilter(rpcConnection, - * request); break; - */ default: log.error("Unrecognized request {}", request); break; @@ -380,7 +372,14 @@ public class RpcHandler extends DefaultJsonRpcHandler { } String filterType = getStringParam(request, ProtocolElements.FILTER_TYPE_PARAM); - if (participant.getToken().getKurentoTokenOptions().isFilterAllowed(filterType)) { + String streamId = getStringParam(request, ProtocolElements.FILTER_STREAMID_PARAM); + boolean isModerator = this.sessionManager.isModeratorInSession(rpcConnection.getSessionId(), participant); + + // Allow applying filter if the user is a MODERATOR (owning the stream or other + // user's stream) or if the user is the owner of the stream and has a token + // configured with this specific filter + if (isModerator || (this.userIsStreamOwner(rpcConnection.getSessionId(), participant, streamId) + && participant.getToken().getKurentoTokenOptions().isFilterAllowed(filterType))) { JsonObject filterOptions; try { filterOptions = new JsonParser().parse(getStringParam(request, ProtocolElements.FILTER_OPTIONS_PARAM)) @@ -389,9 +388,9 @@ public class RpcHandler extends DefaultJsonRpcHandler { throw new OpenViduException(Code.FILTER_NOT_APPLIED_ERROR_CODE, "'options' parameter is not a JSON object: " + e.getMessage()); } - String streamId = getStringParam(request, ProtocolElements.FILTER_STREAMID_PARAM); + Participant moderator = isModerator ? participant : null; sessionManager.applyFilter(sessionManager.getSession(rpcConnection.getSessionId()), streamId, filterType, - filterOptions, null, request.getId(), "applyFilter"); + filterOptions, moderator, request.getId(), "applyFilter"); } else { log.error("Error: participant {} is not a moderator", participant.getParticipantPublicId()); throw new OpenViduException(Code.USER_UNAUTHORIZED_ERROR_CODE, @@ -399,29 +398,53 @@ public class RpcHandler extends DefaultJsonRpcHandler { } } - private void execFilterMethod(RpcConnection rpcConnection, Request request) { + private void removeFilter(RpcConnection rpcConnection, Request request) { + Participant participant; try { - sanityCheckOfSession(rpcConnection, "applyFilter"); + participant = sanityCheckOfSession(rpcConnection, "removeFilter"); } catch (OpenViduException e) { return; } + String streamId = getStringParam(request, ProtocolElements.FILTER_STREAMID_PARAM); + boolean isModerator = this.sessionManager.isModeratorInSession(rpcConnection.getSessionId(), participant); + + // Allow removing filter if the user is a MODERATOR (owning the stream or other + // user's stream) or if the user is the owner of the stream + if (isModerator || this.userIsStreamOwner(rpcConnection.getSessionId(), participant, streamId)) { + Participant moderator = isModerator ? participant : null; + sessionManager.removeFilter(sessionManager.getSession(rpcConnection.getSessionId()), streamId, moderator, + request.getId(), "removeFilter"); + } else { + log.error("Error: participant {} is not a moderator", participant.getParticipantPublicId()); + throw new OpenViduException(Code.USER_UNAUTHORIZED_ERROR_CODE, + "Unable to remove filter. The user does not have a valid token"); + } + } + + private void execFilterMethod(RpcConnection rpcConnection, Request request) { + Participant participant; + try { + participant = sanityCheckOfSession(rpcConnection, "execFilterMethod"); + } catch (OpenViduException e) { + return; + } + String streamId = getStringParam(request, ProtocolElements.FILTER_STREAMID_PARAM); String filterMethod = getStringParam(request, ProtocolElements.FILTER_METHOD_PARAM); JsonObject filterParams = new JsonParser().parse(getStringParam(request, ProtocolElements.FILTER_PARAMS_PARAM)) .getAsJsonObject(); - String streamId = getStringParam(request, ProtocolElements.FILTER_STREAMID_PARAM); - sessionManager.execFilterMethod(sessionManager.getSession(rpcConnection.getSessionId()), streamId, filterMethod, - filterParams, null, request.getId(), "execFilterMethod"); - } + boolean isModerator = this.sessionManager.isModeratorInSession(rpcConnection.getSessionId(), participant); - private void removeFilter(RpcConnection rpcConnection, Request request) { - try { - sanityCheckOfSession(rpcConnection, "removeFilter"); - } catch (OpenViduException e) { - return; + // Allow executing filter method if the user is a MODERATOR (owning the stream + // or other user's stream) or if the user is the owner of the stream + if (isModerator || this.userIsStreamOwner(rpcConnection.getSessionId(), participant, streamId)) { + Participant moderator = isModerator ? participant : null; + sessionManager.execFilterMethod(sessionManager.getSession(rpcConnection.getSessionId()), streamId, + filterMethod, filterParams, moderator, request.getId(), "execFilterMethod"); + } else { + log.error("Error: participant {} is not a moderator", participant.getParticipantPublicId()); + throw new OpenViduException(Code.USER_UNAUTHORIZED_ERROR_CODE, + "Unable to execute filter method. The user does not have a valid token"); } - String streamId = getStringParam(request, ProtocolElements.FILTER_STREAMID_PARAM); - sessionManager.removeFilter(sessionManager.getSession(rpcConnection.getSessionId()), streamId, null, - request.getId(), "removeFilter"); } private void addFilterEventListener(RpcConnection rpcConnection, Request request) { @@ -432,15 +455,25 @@ public class RpcHandler extends DefaultJsonRpcHandler { return; } String streamId = getStringParam(request, ProtocolElements.FILTER_STREAMID_PARAM); - String eventType = getStringParam(request, ProtocolElements.FILTER_TYPE_PARAM); - try { - sessionManager.addFilterEventListener(sessionManager.getSession(rpcConnection.getSessionId()), participant, - streamId, eventType); - this.notificationService.sendResponse(participant.getParticipantPrivateId(), request.getId(), - new JsonObject()); - } catch (OpenViduException e) { - this.notificationService.sendErrorResponse(participant.getParticipantPrivateId(), request.getId(), - new JsonObject(), e); + String eventType = getStringParam(request, ProtocolElements.FILTEREVENTLISTENER_EVENTTYPE_PARAM); + boolean isModerator = this.sessionManager.isModeratorInSession(rpcConnection.getSessionId(), participant); + + // Allow adding a filter event listener if the user is a MODERATOR (owning the + // stream or other user's stream) or if the user is the owner of the stream + if (isModerator || this.userIsStreamOwner(rpcConnection.getSessionId(), participant, streamId)) { + try { + sessionManager.addFilterEventListener(sessionManager.getSession(rpcConnection.getSessionId()), + participant, streamId, eventType); + this.notificationService.sendResponse(participant.getParticipantPrivateId(), request.getId(), + new JsonObject()); + } catch (OpenViduException e) { + this.notificationService.sendErrorResponse(participant.getParticipantPrivateId(), request.getId(), + new JsonObject(), e); + } + } else { + log.error("Error: participant {} is not a moderator", participant.getParticipantPublicId()); + throw new OpenViduException(Code.USER_UNAUTHORIZED_ERROR_CODE, + "Unable to add filter event listener. The user does not have a valid token"); } } @@ -452,15 +485,25 @@ public class RpcHandler extends DefaultJsonRpcHandler { return; } String streamId = getStringParam(request, ProtocolElements.FILTER_STREAMID_PARAM); - String eventType = getStringParam(request, ProtocolElements.FILTER_TYPE_PARAM); - try { - sessionManager.removeFilterEventListener(sessionManager.getSession(rpcConnection.getSessionId()), - participant, streamId, eventType); - this.notificationService.sendResponse(participant.getParticipantPrivateId(), request.getId(), - new JsonObject()); - } catch (OpenViduException e) { - this.notificationService.sendErrorResponse(participant.getParticipantPrivateId(), request.getId(), - new JsonObject(), e); + String eventType = getStringParam(request, ProtocolElements.FILTEREVENTLISTENER_EVENTTYPE_PARAM); + boolean isModerator = this.sessionManager.isModeratorInSession(rpcConnection.getSessionId(), participant); + + // Allow removing a filter event listener if the user is a MODERATOR (owning the + // stream or other user's stream) or if the user is the owner of the stream + if (isModerator || this.userIsStreamOwner(rpcConnection.getSessionId(), participant, streamId)) { + try { + sessionManager.removeFilterEventListener(sessionManager.getSession(rpcConnection.getSessionId()), + participant, streamId, eventType); + this.notificationService.sendResponse(participant.getParticipantPrivateId(), request.getId(), + new JsonObject()); + } catch (OpenViduException e) { + this.notificationService.sendErrorResponse(participant.getParticipantPrivateId(), request.getId(), + new JsonObject(), e); + } + } else { + log.error("Error: participant {} is not a moderator", participant.getParticipantPublicId()); + throw new OpenViduException(Code.USER_UNAUTHORIZED_ERROR_CODE, + "Unable to remove filter event listener. The user does not have a valid token"); } } @@ -604,4 +647,9 @@ public class RpcHandler extends DefaultJsonRpcHandler { } } + private boolean userIsStreamOwner(String sessionId, Participant participant, String streamId) { + return participant.getParticipantPrivateId() + .equals(this.sessionManager.getParticipantPrivateIdFromStreamId(sessionId, streamId)); + } + }