openvidu-server: KMS track muting refactoring

pull/88/merge
pabloFuente 2018-07-06 11:31:41 +02:00
parent cb59dbcc4d
commit 2571a8a33c
8 changed files with 42 additions and 145 deletions

View File

@ -17,6 +17,6 @@
package io.openvidu.server.kurento;
public enum MutedMediaType {
public enum TrackType {
ALL, VIDEO, AUDIO;
}

View File

@ -49,7 +49,7 @@ import io.openvidu.server.config.InfoHandler;
import io.openvidu.server.config.OpenviduConfig;
import io.openvidu.server.core.MediaOptions;
import io.openvidu.server.core.Participant;
import io.openvidu.server.kurento.MutedMediaType;
import io.openvidu.server.kurento.TrackType;
import io.openvidu.server.kurento.endpoint.KmsEvent;
import io.openvidu.server.kurento.endpoint.MediaEndpoint;
import io.openvidu.server.kurento.endpoint.PublisherEndpoint;
@ -107,7 +107,7 @@ public class KurentoParticipant extends Participant {
+ RandomStringUtils.random(5, true, false).toUpperCase();
this.publisher.getEndpoint().addTag("name", publisherStreamId);
addEndpointListeners(this.publisher);
// Remove streamId from publisher's map
this.session.publishedStreamIds.putIfAbsent(this.getPublisherStreamId(), this.getParticipantPrivateId());
@ -353,54 +353,12 @@ public class KurentoParticipant extends Participant {
}
}
public void mutePublishedMedia(MutedMediaType muteType) {
if (muteType == null) {
throw new OpenViduException(Code.MEDIA_MUTE_ERROR_CODE, "Mute type cannot be null");
}
this.getPublisher().mute(muteType);
public void mutePublishedMedia(TrackType trackType) {
this.getPublisher().mute(trackType);
}
public void unmutePublishedMedia() {
if (this.getPublisher().getMuteType() == null) {
log.warn("PARTICIPANT {}: Trying to unmute published media. " + "But media is not muted.",
this.getParticipantPublicId());
} else {
this.getPublisher().unmute();
}
}
public void muteSubscribedMedia(Participant sender, MutedMediaType muteType) {
if (muteType == null) {
throw new OpenViduException(Code.MEDIA_MUTE_ERROR_CODE, "Mute type cannot be null");
}
String senderName = sender.getParticipantPublicId();
SubscriberEndpoint subscriberEndpoint = subscribers.get(senderName);
if (subscriberEndpoint == null || subscriberEndpoint.getEndpoint() == null) {
log.warn("PARTICIPANT {}: Trying to mute incoming media from user {}. "
+ "But there is no such subscriber endpoint.", this.getParticipantPublicId(), senderName);
} else {
log.debug("PARTICIPANT {}: Mute subscriber endpoint linked to user {}", this.getParticipantPublicId(),
senderName);
subscriberEndpoint.mute(muteType);
}
}
public void unmuteSubscribedMedia(Participant sender) {
String senderName = sender.getParticipantPublicId();
SubscriberEndpoint subscriberEndpoint = subscribers.get(senderName);
if (subscriberEndpoint == null || subscriberEndpoint.getEndpoint() == null) {
log.warn("PARTICIPANT {}: Trying to unmute incoming media from user {}. "
+ "But there is no such subscriber endpoint.", this.getParticipantPublicId(), senderName);
} else {
if (subscriberEndpoint.getMuteType() == null) {
log.warn("PARTICIPANT {}: Trying to unmute incoming media from user {}. " + "But media is not muted.",
this.getParticipantPublicId(), senderName);
} else {
log.debug("PARTICIPANT {}: Unmute subscriber endpoint linked to user {}", this.getParticipantPublicId(),
senderName);
subscriberEndpoint.unmute();
}
}
public void unmutePublishedMedia(TrackType trackType) {
this.getPublisher().unmute(trackType);
}
public void close(String reason) {
@ -471,7 +429,7 @@ public class KurentoParticipant extends Participant {
private void releasePublisherEndpoint(String reason) {
if (publisher != null && publisher.getEndpoint() != null) {
// Store streamId from publisher's map
this.session.publishedStreamIds.remove(this.getPublisherStreamId());

View File

@ -36,20 +36,20 @@ import com.google.gson.JsonSyntaxException;
import io.openvidu.client.OpenViduException;
import io.openvidu.client.OpenViduException.Code;
import io.openvidu.client.internal.ProtocolElements;
import io.openvidu.java.client.MediaMode;
import io.openvidu.java.client.RecordingLayout;
import io.openvidu.java.client.RecordingMode;
import io.openvidu.java.client.RecordingProperties;
import io.openvidu.java.client.MediaMode;
import io.openvidu.java.client.SessionProperties;
import io.openvidu.server.core.MediaOptions;
import io.openvidu.server.core.Participant;
import io.openvidu.server.core.Session;
import io.openvidu.server.core.SessionManager;
import io.openvidu.server.kurento.KurentoClientProvider;
import io.openvidu.server.kurento.KurentoClientSessionInfo;
import io.openvidu.server.kurento.OpenViduKurentoClientSessionInfo;
import io.openvidu.server.kurento.endpoint.SdpType;
import io.openvidu.server.rpc.RpcHandler;
import io.openvidu.server.core.MediaOptions;
import io.openvidu.server.core.Participant;
import io.openvidu.server.core.Session;
public class KurentoSessionManager extends SessionManager {
@ -105,7 +105,8 @@ public class KurentoSessionManager extends SessionManager {
}
@Override
public synchronized void leaveRoom(Participant participant, Integer transactionId, String reason, boolean closeWebSocket) {
public synchronized void leaveRoom(Participant participant, Integer transactionId, String reason,
boolean closeWebSocket) {
log.debug("Request [LEAVE_ROOM] ({})", participant.getParticipantPublicId());
KurentoParticipant kParticipant = (KurentoParticipant) participant;

View File

@ -45,7 +45,6 @@ import org.slf4j.LoggerFactory;
import io.openvidu.client.OpenViduException;
import io.openvidu.client.OpenViduException.Code;
import io.openvidu.server.core.Participant;
import io.openvidu.server.kurento.MutedMediaType;
import io.openvidu.server.kurento.core.KurentoParticipant;
/**
@ -73,8 +72,6 @@ public abstract class MediaEndpoint {
private final List<IceCandidate> receivedCandidateList = new LinkedList<IceCandidate>();
private LinkedList<IceCandidate> candidates = new LinkedList<IceCandidate>();
private MutedMediaType muteType;
public Map<String, MediaType> flowInMedia = new ConcurrentHashMap<>();
public Map<String, MediaType> flowOutMedia = new ConcurrentHashMap<>();
@ -204,50 +201,6 @@ public abstract class MediaEndpoint {
unregisterElementErrListener(endpoint, endpointSubscription);
}
/**
* Mute the media stream.
*
* @param muteType
* which type of leg to disconnect (audio, video or both)
*/
public abstract void mute(MutedMediaType muteType);
/**
* Reconnect the muted media leg(s).
*/
public abstract void unmute();
public void setMuteType(MutedMediaType muteType) {
this.muteType = muteType;
}
public MutedMediaType getMuteType() {
return this.muteType;
}
protected void resolveCurrentMuteType(MutedMediaType newMuteType) {
MutedMediaType prev = this.getMuteType();
if (prev != null) {
switch (prev) {
case AUDIO:
if (muteType.equals(MutedMediaType.VIDEO)) {
this.setMuteType(MutedMediaType.ALL);
return;
}
break;
case VIDEO:
if (muteType.equals(MutedMediaType.AUDIO)) {
this.setMuteType(MutedMediaType.ALL);
return;
}
break;
case ALL:
return;
}
}
this.setMuteType(newMuteType);
}
/**
* Creates the endpoint (RTP or WebRTC) and any other additional elements (if
* needed).

View File

@ -37,7 +37,7 @@ import org.slf4j.LoggerFactory;
import io.openvidu.client.OpenViduException;
import io.openvidu.client.OpenViduException.Code;
import io.openvidu.server.core.MediaOptions;
import io.openvidu.server.kurento.MutedMediaType;
import io.openvidu.server.kurento.TrackType;
import io.openvidu.server.kurento.core.KurentoParticipant;
/**
@ -46,6 +46,7 @@ import io.openvidu.server.kurento.core.KurentoParticipant;
* @author <a href="mailto:rvlad@naevatec.com">Radu Tom Vlad</a>
*/
public class PublisherEndpoint extends MediaEndpoint {
private final static Logger log = LoggerFactory.getLogger(PublisherEndpoint.class);
protected MediaOptions mediaOptions;
@ -283,8 +284,7 @@ public class PublisherEndpoint extends MediaEndpoint {
}
}
@Override
public synchronized void mute(MutedMediaType muteType) {
public synchronized void mute(TrackType muteType) {
MediaElement sink = passThru;
if (!elements.isEmpty()) {
String sinkId = elementIds.peekLast();
@ -308,11 +308,9 @@ public class PublisherEndpoint extends MediaEndpoint {
internalSinkDisconnect(this.getEndpoint(), sink, MediaType.VIDEO);
break;
}
resolveCurrentMuteType(muteType);
}
@Override
public synchronized void unmute() {
public synchronized void unmute(TrackType muteType) {
MediaElement sink = passThru;
if (!elements.isEmpty()) {
String sinkId = elementIds.peekLast();
@ -325,8 +323,17 @@ public class PublisherEndpoint extends MediaEndpoint {
} else {
log.debug("Will unmute connection of WebRTC and PassThrough (no other elems)");
}
internalSinkConnect(this.getEndpoint(), sink);
setMuteType(null);
switch (muteType) {
case ALL:
internalSinkConnect(this.getEndpoint(), sink);
break;
case AUDIO:
internalSinkConnect(this.getEndpoint(), sink, MediaType.AUDIO);
break;
case VIDEO:
internalSinkConnect(this.getEndpoint(), sink, MediaType.VIDEO);
break;
}
}
private String getNext(String uid) {
@ -466,7 +473,7 @@ public class PublisherEndpoint extends MediaEndpoint {
});
}
}
@Override
public PublisherEndpoint getPublisher() {
return this;

View File

@ -19,12 +19,9 @@ package io.openvidu.server.kurento.endpoint;
import org.json.simple.JSONObject;
import org.kurento.client.MediaPipeline;
import org.kurento.client.MediaType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.openvidu.client.OpenViduException;
import io.openvidu.client.OpenViduException.Code;
import io.openvidu.server.kurento.core.KurentoParticipant;
/**
@ -70,31 +67,6 @@ public class SubscriberEndpoint extends MediaEndpoint {
this.publisher = publisher;
}
@Override
public synchronized void mute(io.openvidu.server.kurento.MutedMediaType muteType) {
if (this.publisher == null) {
throw new OpenViduException(Code.MEDIA_MUTE_ERROR_CODE, "Publisher endpoint not found");
}
switch (muteType) {
case ALL:
this.publisher.disconnectFrom(this.getEndpoint());
break;
case AUDIO:
this.publisher.disconnectFrom(this.getEndpoint(), MediaType.AUDIO);
break;
case VIDEO:
this.publisher.disconnectFrom(this.getEndpoint(), MediaType.VIDEO);
break;
}
resolveCurrentMuteType(muteType);
}
@Override
public synchronized void unmute() {
this.publisher.connect(this.getEndpoint());
setMuteType(null);
}
@SuppressWarnings("unchecked")
@Override
public JSONObject toJSON() {

View File

@ -210,6 +210,14 @@ public class SessionRestController {
}
}
/*
* @RequestMapping(value = "/sessions/{sessionId}/stream/{streamId}", method =
* RequestMethod.PUT) public ResponseEntity<JSONObject>
* muteMedia(@PathVariable("sessionId") String sessionId,
*
* @PathVariable("streamId") String streamId, @RequestBody Map<?, ?> params) { }
*/
@SuppressWarnings("unchecked")
@RequestMapping(value = "/tokens", method = RequestMethod.POST)
public ResponseEntity<JSONObject> newToken(@RequestBody Map<?, ?> params) {

View File

@ -56,7 +56,6 @@ public class RpcHandler extends DefaultJsonRpcHandler<JsonObject> {
RpcNotificationService notificationService;
private ConcurrentMap<String, Boolean> webSocketEOFTransportError = new ConcurrentHashMap<>();
// private ConcurrentMap<String, Boolean> webSocketBrokenPipeTransportError = new ConcurrentHashMap<>();
@Override
public void handleRequest(Transaction transaction, Request<JsonObject> request) throws Exception {
@ -286,17 +285,17 @@ public class RpcHandler extends DefaultJsonRpcHandler<JsonObject> {
sessionManager.unpublishVideo(participant, request.getId(), "unpublish");
}
public void streamPropertyChanged(RpcConnection rpcConnection, Request<JsonObject> request) {
String participantPrivateId = rpcConnection.getParticipantPrivateId();
String sessionId = rpcConnection.getSessionId();
Participant participant = sessionManager.getParticipant(sessionId, participantPrivateId);
String streamId = getStringParam(request, ProtocolElements.STREAMPROPERTYCHANGED_STREAMID_PARAM);
String property = getStringParam(request, ProtocolElements.STREAMPROPERTYCHANGED_PROPERTY_PARAM);
JsonElement newValue = getParam(request, ProtocolElements.STREAMPROPERTYCHANGED_NEWVALUE_PARAM);
String reason = getStringParam(request, ProtocolElements.STREAMPROPERTYCHANGED_REASON_PARAM);
sessionManager.streamPropertyChanged(participant, request.getId(), streamId, property, newValue, reason);
}
@ -360,7 +359,6 @@ public class RpcHandler extends DefaultJsonRpcHandler<JsonObject> {
if ("IOException".equals(exception.getClass().getSimpleName())
&& "Broken pipe".equals(exception.getCause().getMessage())) {
log.warn("Parcipant with private id {} unexpectedly closed the websocket", rpcSession.getSessionId());
// this.webSocketBrokenPipeTransportError.put(rpcSession.getSessionId(), true);
}
if ("EOFException".equals(exception.getClass().getSimpleName())) {
// Store WebSocket connection interrupted exception for this web socket to
@ -402,7 +400,7 @@ public class RpcHandler extends DefaultJsonRpcHandler<JsonObject> {
}
return request.getParams().get(key).getAsBoolean();
}
public static JsonElement getParam(Request<JsonObject> request, String key) {
if (request.getParams() == null || request.getParams().get(key) == null) {
throw new RuntimeException("Request element '" + key + "' is missing in method '" + request.getMethod()