mirror of https://github.com/OpenVidu/openvidu.git
openvidu-server: filter events feature
parent
c3d3fa7b15
commit
8156c34a08
|
@ -507,6 +507,22 @@ public class SessionEventsHandler {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void onFilterEventDispatched(String connectionId, String streamId, String filterType, String eventType, Object data,
|
||||||
|
Set<Participant> participants, Set<String> subscribedParticipants) {
|
||||||
|
JsonObject params = new JsonObject();
|
||||||
|
params.addProperty(ProtocolElements.ADDFILTEREVENTLISTENER_CONNECTIONID_PARAM, connectionId);
|
||||||
|
params.addProperty(ProtocolElements.ADDFILTEREVENTLISTENER_STREAMID_PARAM, streamId);
|
||||||
|
params.addProperty(ProtocolElements.ADDFILTEREVENTLISTENER_FILTERTYPE_PARAM, filterType);
|
||||||
|
params.addProperty(ProtocolElements.ADDFILTEREVENTLISTENER_EVENTTYPE_PARAM, eventType);
|
||||||
|
params.addProperty(ProtocolElements.ADDFILTEREVENTLISTENER_DATA_PARAM, data.toString());
|
||||||
|
for (Participant p : participants) {
|
||||||
|
if (subscribedParticipants.contains(p.getParticipantPublicId())) {
|
||||||
|
rpcNotificationService.sendNotification(p.getParticipantPrivateId(),
|
||||||
|
ProtocolElements.FILTEREVENTDISPATCHED_METHOD, params);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public void closeRpcSession(String participantPrivateId) {
|
public void closeRpcSession(String participantPrivateId) {
|
||||||
this.rpcNotificationService.closeRpcSession(participantPrivateId);
|
this.rpcNotificationService.closeRpcSession(participantPrivateId);
|
||||||
}
|
}
|
||||||
|
|
|
@ -104,12 +104,18 @@ public abstract class SessionManager {
|
||||||
public abstract void applyFilter(Session session, String streamId, String filterType, JsonObject filterOptions,
|
public abstract void applyFilter(Session session, String streamId, String filterType, JsonObject filterOptions,
|
||||||
Participant moderator, Integer transactionId, String reason);
|
Participant moderator, Integer transactionId, String reason);
|
||||||
|
|
||||||
public abstract void execFilterMethod(Session session, String streamId, String filterMethod, JsonObject filterParams,
|
public abstract void execFilterMethod(Session session, String streamId, String filterMethod,
|
||||||
Participant moderator, Integer transactionId, String reason);
|
JsonObject filterParams, Participant moderator, Integer transactionId, String reason);
|
||||||
|
|
||||||
public abstract void removeFilter(Session session, String streamId, Participant moderator, Integer transactionId,
|
public abstract void removeFilter(Session session, String streamId, Participant moderator, Integer transactionId,
|
||||||
String reason);
|
String reason);
|
||||||
|
|
||||||
|
public abstract void addFilterEventListener(Session session, Participant subscriber, String streamId,
|
||||||
|
String eventType);
|
||||||
|
|
||||||
|
public abstract void removeFilterEventListener(Session session, Participant subscriber, String streamId,
|
||||||
|
String eventType);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns a Session given its id
|
* Returns a Session given its id
|
||||||
*
|
*
|
||||||
|
|
|
@ -25,6 +25,7 @@ import java.util.Set;
|
||||||
import org.kurento.client.GenericMediaElement;
|
import org.kurento.client.GenericMediaElement;
|
||||||
import org.kurento.client.IceCandidate;
|
import org.kurento.client.IceCandidate;
|
||||||
import org.kurento.client.KurentoClient;
|
import org.kurento.client.KurentoClient;
|
||||||
|
import org.kurento.client.ListenerSubscription;
|
||||||
import org.kurento.jsonrpc.Props;
|
import org.kurento.jsonrpc.Props;
|
||||||
import org.kurento.jsonrpc.message.Request;
|
import org.kurento.jsonrpc.message.Request;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
@ -52,6 +53,7 @@ import io.openvidu.server.kurento.KurentoClientProvider;
|
||||||
import io.openvidu.server.kurento.KurentoClientSessionInfo;
|
import io.openvidu.server.kurento.KurentoClientSessionInfo;
|
||||||
import io.openvidu.server.kurento.KurentoFilter;
|
import io.openvidu.server.kurento.KurentoFilter;
|
||||||
import io.openvidu.server.kurento.OpenViduKurentoClientSessionInfo;
|
import io.openvidu.server.kurento.OpenViduKurentoClientSessionInfo;
|
||||||
|
import io.openvidu.server.kurento.endpoint.PublisherEndpoint;
|
||||||
import io.openvidu.server.kurento.endpoint.SdpType;
|
import io.openvidu.server.kurento.endpoint.SdpType;
|
||||||
import io.openvidu.server.rpc.RpcHandler;
|
import io.openvidu.server.rpc.RpcHandler;
|
||||||
import io.openvidu.server.utils.JsonUtils;
|
import io.openvidu.server.utils.JsonUtils;
|
||||||
|
@ -710,6 +712,83 @@ public class KurentoSessionManager extends SessionManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void addFilterEventListener(Session session, Participant subscriber, String streamId, String eventType)
|
||||||
|
throws OpenViduException {
|
||||||
|
String participantPrivateId = ((KurentoSession) session).getParticipantPrivateIdFromStreamId(streamId);
|
||||||
|
if (participantPrivateId != null) {
|
||||||
|
Participant participant = this.getParticipant(participantPrivateId);
|
||||||
|
log.debug("Request [ADD_FILTER_LISTENER] over stream [{}]", streamId);
|
||||||
|
KurentoParticipant kParticipant = (KurentoParticipant) participant;
|
||||||
|
if (!participant.isStreaming()) {
|
||||||
|
log.warn(
|
||||||
|
"PARTICIPANT {}: Requesting to addFilterEventListener to stream {} "
|
||||||
|
+ "in session {} but user is not streaming media",
|
||||||
|
subscriber.getParticipantPublicId(), streamId, session.getSessionId());
|
||||||
|
throw new OpenViduException(Code.USER_NOT_STREAMING_ERROR_CODE,
|
||||||
|
"User '" + participant.getParticipantPublicId() + " not streaming media in session '"
|
||||||
|
+ session.getSessionId() + "'");
|
||||||
|
} else if (kParticipant.getPublisher().getFilter() == null) {
|
||||||
|
log.warn(
|
||||||
|
"PARTICIPANT {}: Requesting to addFilterEventListener to user {} "
|
||||||
|
+ "in session {} but user does NOT have a filter",
|
||||||
|
subscriber.getParticipantPublicId(), participant.getParticipantPublicId(),
|
||||||
|
session.getSessionId());
|
||||||
|
throw new OpenViduException(Code.FILTER_NOT_APPLIED_ERROR_CODE,
|
||||||
|
"User '" + participant.getParticipantPublicId() + " has no filter applied in session '"
|
||||||
|
+ session.getSessionId() + "'");
|
||||||
|
} else {
|
||||||
|
try {
|
||||||
|
this.addFilterEventListenerInPublisher(kParticipant, eventType);
|
||||||
|
kParticipant.getPublisher().addParticipantAsListenerOfFilterEvent(eventType,
|
||||||
|
participant.getParticipantPublicId());
|
||||||
|
} catch (OpenViduException e) {
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void removeFilterEventListener(Session session, Participant subscriber, String streamId, String eventType)
|
||||||
|
throws OpenViduException {
|
||||||
|
String participantPrivateId = ((KurentoSession) session).getParticipantPrivateIdFromStreamId(streamId);
|
||||||
|
if (participantPrivateId != null) {
|
||||||
|
Participant participant = this.getParticipant(participantPrivateId);
|
||||||
|
log.debug("Request [REMOVE_FILTER_LISTENER] over stream [{}]", streamId);
|
||||||
|
KurentoParticipant kParticipant = (KurentoParticipant) participant;
|
||||||
|
if (!participant.isStreaming()) {
|
||||||
|
log.warn(
|
||||||
|
"PARTICIPANT {}: Requesting to removeFilterEventListener to stream {} "
|
||||||
|
+ "in session {} but user is not streaming media",
|
||||||
|
subscriber.getParticipantPublicId(), streamId, session.getSessionId());
|
||||||
|
throw new OpenViduException(Code.USER_NOT_STREAMING_ERROR_CODE,
|
||||||
|
"User '" + participant.getParticipantPublicId() + " not streaming media in session '"
|
||||||
|
+ session.getSessionId() + "'");
|
||||||
|
} else if (kParticipant.getPublisher().getFilter() == null) {
|
||||||
|
log.warn(
|
||||||
|
"PARTICIPANT {}: Requesting to removeFilterEventListener to user {} "
|
||||||
|
+ "in session {} but user does NOT have a filter",
|
||||||
|
subscriber.getParticipantPublicId(), participant.getParticipantPublicId(),
|
||||||
|
session.getSessionId());
|
||||||
|
throw new OpenViduException(Code.FILTER_NOT_APPLIED_ERROR_CODE,
|
||||||
|
"User '" + participant.getParticipantPublicId() + " has no filter applied in session '"
|
||||||
|
+ session.getSessionId() + "'");
|
||||||
|
} else {
|
||||||
|
try {
|
||||||
|
PublisherEndpoint pub = kParticipant.getPublisher();
|
||||||
|
if (pub.removeParticipantAsListenerOfFilterEvent(eventType, participant.getParticipantPublicId())) {
|
||||||
|
// If there are no more participants listening to the event remove the vent from
|
||||||
|
// the GenericMediaElement
|
||||||
|
this.removeFilterEventListenerInPublisher(kParticipant, eventType);
|
||||||
|
}
|
||||||
|
} catch (OpenViduException e) {
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void applyFilterInPublisher(KurentoParticipant kParticipant, KurentoFilter filter)
|
private void applyFilterInPublisher(KurentoParticipant kParticipant, KurentoFilter filter)
|
||||||
throws OpenViduException {
|
throws OpenViduException {
|
||||||
GenericMediaElement.Builder builder = new GenericMediaElement.Builder(kParticipant.getPipeline(),
|
GenericMediaElement.Builder builder = new GenericMediaElement.Builder(kParticipant.getPipeline(),
|
||||||
|
@ -736,13 +815,35 @@ public class KurentoSessionManager extends SessionManager {
|
||||||
kParticipant.getPublisher().getMediaOptions().setFilter(null);
|
kParticipant.getPublisher().getMediaOptions().setFilter(null);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
private void addFilterEventListenerInPublisher(KurentoParticipant kParticipant, String eventType)
|
||||||
* private void addFilterEventListenerInPublisher(KurentoParticipant
|
throws OpenViduException {
|
||||||
* kParticipant) { this.listener =
|
PublisherEndpoint pub = kParticipant.getPublisher();
|
||||||
* kParticipant.getPublisher().getFilter().addEventListener("CodeFound", event
|
if (!pub.isListenerAddedToFilterEvent(eventType)) {
|
||||||
* -> { System.out.println(event.getData()); }); } private void
|
final String connectionId = kParticipant.getParticipantPublicId();
|
||||||
* removeFilterEventListenerInPublisher(KurentoParticipant kParticipant) {
|
final String streamId = kParticipant.getPublisherStreamId();
|
||||||
* kParticipant.getPublisher().getFilter().removeEventListener(this.listener); }
|
final String filterType = kParticipant.getPublisherMediaOptions().getFilter().getType();
|
||||||
*/
|
try {
|
||||||
|
ListenerSubscription listener = pub.getFilter().addEventListener(eventType, event -> {
|
||||||
|
sessionEventsHandler.onFilterEventDispatched(connectionId, streamId, filterType, event.getType(),
|
||||||
|
event.getData(), kParticipant.getSession().getParticipants(),
|
||||||
|
kParticipant.getPublisher().getPartipantsListentingToFilterEvent(eventType));
|
||||||
|
});
|
||||||
|
pub.storeListener(eventType, listener);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("Request to addFilterEventListener to stream {} gone wrong. Error: {}", streamId,
|
||||||
|
e.getMessage());
|
||||||
|
throw new OpenViduException(Code.FILTER_EVENT_LISTENER_NOT_FOUND,
|
||||||
|
"Request to addFilterEventListener to stream " + streamId + " gone wrong: " + e.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void removeFilterEventListenerInPublisher(KurentoParticipant kParticipant, String eventType) {
|
||||||
|
PublisherEndpoint pub = kParticipant.getPublisher();
|
||||||
|
if (pub.isListenerAddedToFilterEvent(eventType)) {
|
||||||
|
GenericMediaElement filter = kParticipant.getPublisher().getFilter();
|
||||||
|
filter.removeEventListener(pub.removeListener(eventType));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,9 +19,12 @@ package io.openvidu.server.kurento.endpoint;
|
||||||
|
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Map.Entry;
|
import java.util.Map.Entry;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
|
||||||
import org.kurento.client.Continuation;
|
import org.kurento.client.Continuation;
|
||||||
|
@ -62,6 +65,9 @@ public class PublisherEndpoint extends MediaEndpoint {
|
||||||
private ListenerSubscription passThruSubscription = null;
|
private ListenerSubscription passThruSubscription = null;
|
||||||
|
|
||||||
private GenericMediaElement filter;
|
private GenericMediaElement filter;
|
||||||
|
private Map<String, Set<String>> subscribersToFilterEvents = new ConcurrentHashMap<>();
|
||||||
|
private Map<String, ListenerSubscription> filterListeners = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
private Map<String, MediaElement> elements = new HashMap<String, MediaElement>();
|
private Map<String, MediaElement> elements = new HashMap<String, MediaElement>();
|
||||||
private LinkedList<String> elementIds = new LinkedList<String>();
|
private LinkedList<String> elementIds = new LinkedList<String>();
|
||||||
private boolean connected = false;
|
private boolean connected = false;
|
||||||
|
@ -104,6 +110,46 @@ public class PublisherEndpoint extends MediaEndpoint {
|
||||||
return this.filter;
|
return this.filter;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean isListenerAddedToFilterEvent(String eventType) {
|
||||||
|
return (this.subscribersToFilterEvents.containsKey(eventType) && this.filterListeners.containsKey(eventType));
|
||||||
|
}
|
||||||
|
|
||||||
|
public Set<String> getPartipantsListentingToFilterEvent(String eventType) {
|
||||||
|
return this.subscribersToFilterEvents.get(eventType);
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean storeListener(String eventType, ListenerSubscription listener) {
|
||||||
|
return (this.filterListeners.putIfAbsent(eventType, listener) == null);
|
||||||
|
}
|
||||||
|
|
||||||
|
public ListenerSubscription removeListener(String eventType) {
|
||||||
|
// Clean all participant subscriptions to this event
|
||||||
|
this.subscribersToFilterEvents.remove(eventType);
|
||||||
|
// Clean ListenerSubscription object for this event
|
||||||
|
return this.filterListeners.remove(eventType);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addParticipantAsListenerOfFilterEvent(String eventType, String participantPublicId) {
|
||||||
|
this.subscribersToFilterEvents.putIfAbsent(eventType, new HashSet<>());
|
||||||
|
this.subscribersToFilterEvents.get(eventType).add(participantPublicId);
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean removeParticipantAsListenerOfFilterEvent(String eventType, String participantPublicId) {
|
||||||
|
if (!this.subscribersToFilterEvents.containsKey(eventType)) {
|
||||||
|
String streamId = this.getEndpoint().getTag("name");
|
||||||
|
log.error("Request to removeFilterEventListener to stream {} gone wrong: Filter {} has no listener added", streamId,
|
||||||
|
eventType);
|
||||||
|
throw new OpenViduException(Code.FILTER_EVENT_LISTENER_NOT_FOUND,
|
||||||
|
"Request to removeFilterEventListener to stream " + streamId + " gone wrong: Filter " + eventType
|
||||||
|
+ " has no listener added");
|
||||||
|
}
|
||||||
|
this.subscribersToFilterEvents.computeIfPresent(eventType, (type, subs) -> {
|
||||||
|
subs.remove(participantPublicId);
|
||||||
|
return subs;
|
||||||
|
});
|
||||||
|
return this.subscribersToFilterEvents.get(eventType).isEmpty();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Initializes this media endpoint for publishing media and processes the SDP
|
* Initializes this media endpoint for publishing media and processes the SDP
|
||||||
* offer or answer. If the internal endpoint is an {@link WebRtcEndpoint}, it
|
* offer or answer. If the internal endpoint is an {@link WebRtcEndpoint}, it
|
||||||
|
|
|
@ -142,6 +142,12 @@ public class RpcHandler extends DefaultJsonRpcHandler<JsonObject> {
|
||||||
case ProtocolElements.REMOVEFILTER_METHOD:
|
case ProtocolElements.REMOVEFILTER_METHOD:
|
||||||
removeFilter(rpcConnection, request);
|
removeFilter(rpcConnection, request);
|
||||||
break;
|
break;
|
||||||
|
case ProtocolElements.ADDFILTEREVENTLISTENER_METHOD:
|
||||||
|
addFilterEventListener(rpcConnection, request);
|
||||||
|
break;
|
||||||
|
case ProtocolElements.REMOVEFILTEREVENTLISTENER_METHOD:
|
||||||
|
removeFilterEventListener(rpcConnection, request);
|
||||||
|
break;
|
||||||
/*
|
/*
|
||||||
* case ProtocolElements.FORCEAPPLYFILTER_METHOD:
|
* case ProtocolElements.FORCEAPPLYFILTER_METHOD:
|
||||||
* forceApplyFilter(rpcConnection, request); break; case
|
* forceApplyFilter(rpcConnection, request); break; case
|
||||||
|
@ -419,6 +425,46 @@ public class RpcHandler extends DefaultJsonRpcHandler<JsonObject> {
|
||||||
request.getId(), "removeFilter");
|
request.getId(), "removeFilter");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void addFilterEventListener(RpcConnection rpcConnection, Request<JsonObject> request) {
|
||||||
|
Participant participant;
|
||||||
|
try {
|
||||||
|
participant = sanityCheckOfSession(rpcConnection, "addFilterEventListener");
|
||||||
|
} catch (OpenViduException e) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
String streamId = getStringParam(request, ProtocolElements.FILTER_STREAMID_PARAM);
|
||||||
|
String eventType = getStringParam(request, ProtocolElements.FILTER_TYPE_PARAM);
|
||||||
|
try {
|
||||||
|
sessionManager.addFilterEventListener(sessionManager.getSession(rpcConnection.getSessionId()), participant,
|
||||||
|
streamId, eventType);
|
||||||
|
this.notificationService.sendResponse(participant.getParticipantPrivateId(), request.getId(),
|
||||||
|
new JsonObject());
|
||||||
|
} catch (OpenViduException e) {
|
||||||
|
this.notificationService.sendErrorResponse(participant.getParticipantPrivateId(), request.getId(),
|
||||||
|
new JsonObject(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void removeFilterEventListener(RpcConnection rpcConnection, Request<JsonObject> request) {
|
||||||
|
Participant participant;
|
||||||
|
try {
|
||||||
|
participant = sanityCheckOfSession(rpcConnection, "removeFilterEventListener");
|
||||||
|
} catch (OpenViduException e) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
String streamId = getStringParam(request, ProtocolElements.FILTER_STREAMID_PARAM);
|
||||||
|
String eventType = getStringParam(request, ProtocolElements.FILTER_TYPE_PARAM);
|
||||||
|
try {
|
||||||
|
sessionManager.removeFilterEventListener(sessionManager.getSession(rpcConnection.getSessionId()),
|
||||||
|
participant, streamId, eventType);
|
||||||
|
this.notificationService.sendResponse(participant.getParticipantPrivateId(), request.getId(),
|
||||||
|
new JsonObject());
|
||||||
|
} catch (OpenViduException e) {
|
||||||
|
this.notificationService.sendErrorResponse(participant.getParticipantPrivateId(), request.getId(),
|
||||||
|
new JsonObject(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public void leaveRoomAfterConnClosed(String participantPrivateId, String reason) {
|
public void leaveRoomAfterConnClosed(String participantPrivateId, String reason) {
|
||||||
try {
|
try {
|
||||||
sessionManager.evictParticipant(this.sessionManager.getParticipant(participantPrivateId), null, null,
|
sessionManager.evictParticipant(this.sessionManager.getParticipant(participantPrivateId), null, null,
|
||||||
|
|
Loading…
Reference in New Issue