openvidu-server: GET /sessions, streamId refactoring

pull/88/merge
pabloFuente 2018-06-15 16:21:53 +02:00
parent c7dd18c21c
commit a9fb7b0739
15 changed files with 589 additions and 520 deletions

View File

@ -50,6 +50,7 @@ public class ProtocolElements {
public static final String PUBLISHVIDEO_SDPOFFER_PARAM = "sdpOffer"; public static final String PUBLISHVIDEO_SDPOFFER_PARAM = "sdpOffer";
public static final String PUBLISHVIDEO_DOLOOPBACK_PARAM = "doLoopback"; public static final String PUBLISHVIDEO_DOLOOPBACK_PARAM = "doLoopback";
public static final String PUBLISHVIDEO_SDPANSWER_PARAM = "sdpAnswer"; public static final String PUBLISHVIDEO_SDPANSWER_PARAM = "sdpAnswer";
public static final String PUBLISHVIDEO_STREAMID_PARAM = "id";
public static final String PUBLISHVIDEO_AUDIOACTIVE_PARAM = "audioActive"; public static final String PUBLISHVIDEO_AUDIOACTIVE_PARAM = "audioActive";
public static final String PUBLISHVIDEO_VIDEOACTIVE_PARAM = "videoActive"; public static final String PUBLISHVIDEO_VIDEOACTIVE_PARAM = "videoActive";
public static final String PUBLISHVIDEO_TYPEOFVIDEO_PARAM = "typeOfVideo"; public static final String PUBLISHVIDEO_TYPEOFVIDEO_PARAM = "typeOfVideo";

View File

@ -36,15 +36,17 @@ public class SecurityConfig extends WebSecurityConfigurerAdapter {
protected void configure(HttpSecurity http) throws Exception { protected void configure(HttpSecurity http) throws Exception {
// Security for API REST // Security for API REST
ExpressionUrlAuthorizationConfigurer<HttpSecurity>.ExpressionInterceptUrlRegistry conf = http.csrf().disable() ExpressionUrlAuthorizationConfigurer<HttpSecurity>.ExpressionInterceptUrlRegistry conf = http.cors().and()
.authorizeRequests().antMatchers(HttpMethod.POST, "/api/sessions").authenticated() .csrf().disable().authorizeRequests().antMatchers(HttpMethod.POST, "/api/sessions").authenticated()
.antMatchers(HttpMethod.POST, "/api/sessions/**").authenticated()
.antMatchers(HttpMethod.POST, "/api/tokens").authenticated() .antMatchers(HttpMethod.POST, "/api/tokens").authenticated()
.antMatchers(HttpMethod.POST, "/api/recordings/start").authenticated() .antMatchers(HttpMethod.POST, "/api/recordings/start").authenticated()
.antMatchers(HttpMethod.POST, "/api/recordings/stop").authenticated() .antMatchers(HttpMethod.POST, "/api/recordings/stop").authenticated()
.antMatchers(HttpMethod.GET, "/api/recordings").authenticated() .antMatchers(HttpMethod.GET, "/api/recordings").authenticated()
.antMatchers(HttpMethod.GET, "/api/recordings/**").authenticated() .antMatchers(HttpMethod.GET, "/api/recordings/**").authenticated()
.antMatchers(HttpMethod.DELETE, "/api/recordings/**").authenticated() .antMatchers(HttpMethod.DELETE, "/api/recordings/**").authenticated()
.antMatchers(HttpMethod.GET, "/config/**").authenticated().antMatchers("/").authenticated(); .antMatchers(HttpMethod.GET, "/config/openvidu-publicurl").anonymous()
.antMatchers(HttpMethod.GET, "/config/**").authenticated();
// Security for layouts // Security for layouts
conf.antMatchers("/layouts/*").authenticated(); conf.antMatchers("/layouts/*").authenticated();

View File

@ -17,6 +17,8 @@
package io.openvidu.server.core; package io.openvidu.server.core;
import org.json.simple.JSONObject;
public class Participant { public class Participant {
private String participantPrivatetId; // ID to identify the user on server (org.kurento.jsonrpc.Session.id) private String participantPrivatetId; // ID to identify the user on server (org.kurento.jsonrpc.Session.id)
@ -128,6 +130,10 @@ public class Participant {
this.frameRate = frameRate; this.frameRate = frameRate;
} }
public String getPublisherStremId() {
return null;
}
public String getFullMetadata() { public String getFullMetadata() {
String fullMetadata; String fullMetadata;
if ((!this.clientMetadata.isEmpty()) && (!this.serverMetadata.isEmpty())) { if ((!this.clientMetadata.isEmpty()) && (!this.serverMetadata.isEmpty())) {
@ -194,4 +200,12 @@ public class Participant {
return builder.toString(); return builder.toString();
} }
@SuppressWarnings("unchecked")
public JSONObject toJSON() {
JSONObject json = new JSONObject();
json.put("connectionId", this.participantPublicId);
json.put("token", this.token.toJSON());
return json;
}
} }

View File

@ -19,6 +19,8 @@ package io.openvidu.server.core;
import java.util.Set; import java.util.Set;
import org.json.simple.JSONObject;
import io.openvidu.java.client.SessionProperties; import io.openvidu.java.client.SessionProperties;
public interface Session { public interface Session {
@ -43,4 +45,6 @@ public interface Session {
int getActivePublishers(); int getActivePublishers();
JSONObject toJSON();
} }

View File

@ -94,19 +94,9 @@ public class SessionEventsHandler {
existingParticipant.getFullMetadata()); existingParticipant.getFullMetadata());
if (existingParticipant.isStreaming()) { if (existingParticipant.isStreaming()) {
String streamId = "";
if ("SCREEN".equals(existingParticipant.getTypeOfVideo())) {
streamId = "SCREEN";
} else if (existingParticipant.isVideoActive()) {
streamId = "CAMERA";
} else if (existingParticipant.isAudioActive()) {
streamId = "MICRO";
}
JsonObject stream = new JsonObject(); JsonObject stream = new JsonObject();
stream.addProperty(ProtocolElements.JOINROOM_PEERSTREAMID_PARAM, stream.addProperty(ProtocolElements.JOINROOM_PEERSTREAMID_PARAM,
existingParticipant.getParticipantPublicId() + "_" + streamId); existingParticipant.getPublisherStremId());
stream.addProperty(ProtocolElements.JOINROOM_PEERSTREAMAUDIOACTIVE_PARAM, stream.addProperty(ProtocolElements.JOINROOM_PEERSTREAMAUDIOACTIVE_PARAM,
existingParticipant.isAudioActive()); existingParticipant.isAudioActive());
stream.addProperty(ProtocolElements.JOINROOM_PEERSTREAMVIDEOACTIVE_PARAM, stream.addProperty(ProtocolElements.JOINROOM_PEERSTREAMVIDEOACTIVE_PARAM,
@ -178,7 +168,7 @@ public class SessionEventsHandler {
} }
} }
public void onPublishMedia(Participant participant, String sessionId, MediaOptions mediaOptions, String sdpAnswer, public void onPublishMedia(Participant participant, String streamId, String sessionId, MediaOptions mediaOptions, String sdpAnswer,
Set<Participant> participants, Integer transactionId, OpenViduException error) { Set<Participant> participants, Integer transactionId, OpenViduException error) {
if (error != null) { if (error != null) {
rpcNotificationService.sendErrorResponse(participant.getParticipantPrivateId(), transactionId, null, error); rpcNotificationService.sendErrorResponse(participant.getParticipantPrivateId(), transactionId, null, error);
@ -186,23 +176,14 @@ public class SessionEventsHandler {
} }
JsonObject result = new JsonObject(); JsonObject result = new JsonObject();
result.addProperty(ProtocolElements.PUBLISHVIDEO_SDPANSWER_PARAM, sdpAnswer); result.addProperty(ProtocolElements.PUBLISHVIDEO_SDPANSWER_PARAM, sdpAnswer);
result.addProperty(ProtocolElements.PUBLISHVIDEO_STREAMID_PARAM, streamId);
rpcNotificationService.sendResponse(participant.getParticipantPrivateId(), transactionId, result); rpcNotificationService.sendResponse(participant.getParticipantPrivateId(), transactionId, result);
JsonObject params = new JsonObject(); JsonObject params = new JsonObject();
params.addProperty(ProtocolElements.PARTICIPANTPUBLISHED_USER_PARAM, participant.getParticipantPublicId()); params.addProperty(ProtocolElements.PARTICIPANTPUBLISHED_USER_PARAM, participant.getParticipantPublicId());
JsonObject stream = new JsonObject(); JsonObject stream = new JsonObject();
String streamId = ""; stream.addProperty(ProtocolElements.PARTICIPANTPUBLISHED_STREAMID_PARAM, streamId);
if ("SCREEN".equals(mediaOptions.typeOfVideo)) {
streamId = "SCREEN";
} else if (mediaOptions.videoActive) {
streamId = "CAMERA";
} else if (mediaOptions.audioActive) {
streamId = "MICRO";
}
stream.addProperty(ProtocolElements.PARTICIPANTPUBLISHED_STREAMID_PARAM,
participant.getParticipantPublicId() + "_" + streamId);
stream.addProperty(ProtocolElements.PARTICIPANTPUBLISHED_AUDIOACTIVE_PARAM, mediaOptions.audioActive); stream.addProperty(ProtocolElements.PARTICIPANTPUBLISHED_AUDIOACTIVE_PARAM, mediaOptions.audioActive);
stream.addProperty(ProtocolElements.PARTICIPANTPUBLISHED_VIDEOACTIVE_PARAM, mediaOptions.videoActive); stream.addProperty(ProtocolElements.PARTICIPANTPUBLISHED_VIDEOACTIVE_PARAM, mediaOptions.videoActive);
stream.addProperty(ProtocolElements.PARTICIPANTPUBLISHED_TYPEOFVIDEO_PARAM, mediaOptions.typeOfVideo); stream.addProperty(ProtocolElements.PARTICIPANTPUBLISHED_TYPEOFVIDEO_PARAM, mediaOptions.typeOfVideo);

View File

@ -17,6 +17,7 @@
package io.openvidu.server.core; package io.openvidu.server.core;
import java.util.Collection;
import java.util.HashSet; import java.util.HashSet;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@ -116,6 +117,15 @@ public abstract class SessionManager {
return new HashSet<String>(sessions.keySet()); return new HashSet<String>(sessions.keySet());
} }
/**
* Returns all currently active (opened) sessions.
*
* @return set of the session's identifiers
*/
public Collection<Session> getSessionObjects() {
return sessions.values();
}
/** /**
* Returns all the participants inside a session. * Returns all the participants inside a session.
* *
@ -193,81 +203,6 @@ public abstract class SessionManager {
public String newToken(String sessionId, ParticipantRole role, String serverMetadata) throws OpenViduException { public String newToken(String sessionId, ParticipantRole role, String serverMetadata) throws OpenViduException {
/*if (!isMetadataFormatCorrect(serverMetadata)) {
log.error("Data invalid format. Max length allowed is 10000 chars");
throw new OpenViduException(Code.GENERIC_ERROR_CODE,
"Data invalid format. Max length allowed is 10000 chars");
}
String token = OpenViduServer.publicUrl;
token += "?sessionId=" + sessionId;
token += "&token=" + this.generateRandomChain();
token += "&role=" + role.name();
TurnCredentials turnCredentials = null;
if (this.coturnCredentialsService.isCoturnAvailable()) {
turnCredentials = coturnCredentialsService.createUser();
if (turnCredentials != null) {
if (turnCredentials != null) {
token += "&turnUsername=" + turnCredentials.getUsername();
token += "&turnCredential=" + turnCredentials.getCredential();
}
}
}
Token t = new Token(token, role, serverMetadata, turnCredentials);
final String finalToken = token;
ConcurrentHashMap<String, Token> tok = this.sessionidTokenTokenobj.computeIfPresent(sessionId, (key, value) -> {
value.putIfAbsent(finalToken, t);
return value;
});
if (tok == null) {
log.error("sessionId [" + sessionId + "] is not valid");
throw new OpenViduException(Code.ROOM_NOT_FOUND_ERROR_CODE, "sessionId [" + sessionId + "] not found");
} else {
return tok.get(token).getToken();
}*/
/*if (!isMetadataFormatCorrect(serverMetadata)) {
log.error("Data invalid format. Max length allowed is 10000 chars");
throw new OpenViduException(Code.GENERIC_ERROR_CODE,
"Data invalid format. Max length allowed is 10000 chars");
}
final String[] tokenArray = {""};
try {
sessionidTokenTokenobj.computeIfPresent(sessionId, (key, value) -> {
String token = OpenViduServer.publicUrl;
token += "?sessionId=" + sessionId;
token += "&token=" + this.generateRandomChain();
token += "&role=" + role.name();
TurnCredentials turnCredentials = null;
if (this.coturnCredentialsService.isCoturnAvailable()) {
turnCredentials = coturnCredentialsService.createUser();
if (turnCredentials != null) {
token += "&turnUsername=" + turnCredentials.getUsername();
token += "&turnCredential=" + turnCredentials.getCredential();
}
}
Token t = new Token(token, role, serverMetadata, turnCredentials);
value.putIfAbsent(token, t);
tokenArray[0] = token;
throw new RuntimeException();
});
} catch(RuntimeException e) {
log.info("Token succesfully created");
}
if (tokenArray[0].isEmpty()) {
log.error("sessionId [" + sessionId + "] is not valid");
throw new OpenViduException(Code.ROOM_NOT_FOUND_ERROR_CODE, "sessionId [" + sessionId + "] not found");
}
return tokenArray[0];*/
ConcurrentHashMap<String, Token> map = this.sessionidTokenTokenobj.putIfAbsent(sessionId, new ConcurrentHashMap<>()); ConcurrentHashMap<String, Token> map = this.sessionidTokenTokenobj.putIfAbsent(sessionId, new ConcurrentHashMap<>());
if (map != null) { if (map != null) {

View File

@ -17,14 +17,16 @@
package io.openvidu.server.core; package io.openvidu.server.core;
import org.json.simple.JSONObject;
import io.openvidu.server.coturn.TurnCredentials; import io.openvidu.server.coturn.TurnCredentials;
public class Token { public class Token {
String token; private String token;
ParticipantRole role; private ParticipantRole role;
String serverMetadata = ""; private String serverMetadata = "";
TurnCredentials turnCredentials; private TurnCredentials turnCredentials;
public Token(String token) { public Token(String token) {
this.token = token; this.token = token;
@ -61,4 +63,13 @@ public class Token {
return this.token; return this.token;
} }
@SuppressWarnings("unchecked")
public JSONObject toJSON() {
JSONObject json = new JSONObject();
json.put("token", this.token);
json.put("role", this.role.name());
json.put("data", this.serverMetadata);
return json;
}
} }

View File

@ -24,6 +24,9 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.RandomStringUtils;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.kurento.client.Continuation; import org.kurento.client.Continuation;
import org.kurento.client.ErrorEvent; import org.kurento.client.ErrorEvent;
import org.kurento.client.Filter; import org.kurento.client.Filter;
@ -85,15 +88,18 @@ public class KurentoParticipant extends Participant {
} }
public void createPublishingEndpoint(MediaOptions mediaOptions) { public void createPublishingEndpoint(MediaOptions mediaOptions) {
publisher.createEndpoint(endPointLatch); publisher.createEndpoint(endPointLatch);
if (getPublisher().getEndpoint() == null) { if (getPublisher().getEndpoint() == null) {
throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE, "Unable to create publisher endpoint"); throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE, "Unable to create publisher endpoint");
} }
this.publisher.getEndpoint().addTag("name", "PUBLISHER " + this.getParticipantPublicId());
String publisherStreamId = this.getParticipantPublicId() + "_" +
(mediaOptions.videoActive ? mediaOptions.typeOfVideo : "MICRO") + "_" +
RandomStringUtils.random(5, true, false).toUpperCase();
this.publisher.getEndpoint().addTag("name", publisherStreamId);
addEndpointListeners(this.publisher); addEndpointListeners(this.publisher);
CDR.recordNewPublisher(this, this.session.getSessionId(), mediaOptions); CDR.recordNewPublisher(this, this.session.getSessionId(), mediaOptions);
} }
@ -279,8 +285,9 @@ public class KurentoParticipant extends Participant {
throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE, "Unable to create subscriber endpoint"); throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE, "Unable to create subscriber endpoint");
} }
subscriber.getEndpoint().addTag("name", String subscriberStreamId = this.getParticipantPublicId() + "_" + kSender.getPublisherStremId();
"SUBSCRIBER " + senderName + " for user " + this.getParticipantPublicId());
subscriber.getEndpoint().addTag("name", subscriberStreamId);
addEndpointListeners(subscriber); addEndpointListeners(subscriber);
@ -515,7 +522,7 @@ public class KurentoParticipant extends Participant {
* System.out.println(msg); this.infoHandler.sendInfo(msg); }); * System.out.println(msg); this.infoHandler.sendInfo(msg); });
*/ */
endpoint.getWebEndpoint().addErrorListener((event) -> { /*endpoint.getWebEndpoint().addErrorListener((event) -> {
String msg = " Error (PUBLISHER) -> " + "ERRORCODE: " + event.getErrorCode() String msg = " Error (PUBLISHER) -> " + "ERRORCODE: " + event.getErrorCode()
+ " | DESCRIPTION: " + event.getDescription() + " | TIMESTAMP: " + System.currentTimeMillis(); + " | DESCRIPTION: " + event.getDescription() + " | TIMESTAMP: " + System.currentTimeMillis();
log.debug(msg); log.debug(msg);
@ -620,8 +627,42 @@ public class KurentoParticipant extends Participant {
+ " | TIMESTAMP: " + System.currentTimeMillis(); + " | TIMESTAMP: " + System.currentTimeMillis();
log.debug(msg); log.debug(msg);
this.infoHandler.sendInfo(msg); this.infoHandler.sendInfo(msg);
}); });*/
endpoint.getWebEndpoint().addNewCandidatePairSelectedListener((event) -> {
endpoint.selectedLocalIceCandidate = event.getCandidatePair().getLocalCandidate();
endpoint.selectedRemoteIceCandidate = event.getCandidatePair().getRemoteCandidate();
String msg = "ICE CANDIDATE SELECTED (" + endpoint.getEndpoint().getTag("name")
+ "): LOCAL CANDIDATE: " + endpoint.selectedLocalIceCandidate +
" | REMOTE CANDIDATE: " + endpoint.selectedRemoteIceCandidate +
" | TIMESTAMP: " + System.currentTimeMillis();
log.warn(msg);
this.infoHandler.sendInfo(msg);
});
}
@Override
@SuppressWarnings("unchecked")
public JSONObject toJSON() {
JSONObject json = super.toJSON();
JSONArray publisherEnpoints = new JSONArray();
if (this.streaming && this.publisher.getEndpoint() != null) {
publisherEnpoints.add(this.publisher.toJSON());
}
JSONArray subscriberEndpoints = new JSONArray();
for (MediaEndpoint sub : this.subscribers.values()) {
if (sub.getEndpoint() != null) {
subscriberEndpoints.add(sub.toJSON());
}
}
json.put("publishers", publisherEnpoints);
json.put("subscribers", subscriberEndpoints);
return json;
}
@Override
public String getPublisherStremId() {
return this.publisher.getEndpoint().getTag("name");
} }
} }

View File

@ -25,6 +25,8 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.kurento.client.Continuation; import org.kurento.client.Continuation;
import org.kurento.client.ErrorEvent; import org.kurento.client.ErrorEvent;
import org.kurento.client.EventListener; import org.kurento.client.EventListener;
@ -347,4 +349,23 @@ public class KurentoSession implements Session {
} }
} }
@Override
@SuppressWarnings("unchecked")
public JSONObject toJSON() {
JSONObject json = new JSONObject();
json.put("sessionId", this.sessionId);
json.put("mediaMode", this.sessionProperties.mediaMode().name());
json.put("recordingMode", this.sessionProperties.recordingMode().name());
json.put("defaultRecordingLayout", this.sessionProperties.defaultRecordingLayout().name());
if (this.sessionProperties.defaultCustomLayout() != null && !this.sessionProperties.defaultCustomLayout().isEmpty()) {
json.put("defaultCustomLayout", this.sessionProperties.defaultCustomLayout());
}
JSONArray participants = new JSONArray();
this.participants.values().forEach(p -> {
participants.add(p.toJSON());
});
json.put("connections", participants);
return json;
}
} }

View File

@ -248,7 +248,7 @@ public class KurentoSessionManager extends SessionManager {
OpenViduException e = new OpenViduException(Code.MEDIA_SDP_ERROR_CODE, OpenViduException e = new OpenViduException(Code.MEDIA_SDP_ERROR_CODE,
"Error generating SDP response for publishing user " + participant.getParticipantPublicId()); "Error generating SDP response for publishing user " + participant.getParticipantPublicId());
log.error("PARTICIPANT {}: Error publishing media", participant.getParticipantPublicId(), e); log.error("PARTICIPANT {}: Error publishing media", participant.getParticipantPublicId(), e);
sessionEventsHandler.onPublishMedia(participant, session.getSessionId(), mediaOptions, sdpAnswer, sessionEventsHandler.onPublishMedia(participant, null, session.getSessionId(), mediaOptions, sdpAnswer,
participants, transactionId, e); participants, transactionId, e);
} }
@ -276,7 +276,7 @@ public class KurentoSessionManager extends SessionManager {
participants = kurentoParticipant.getSession().getParticipants(); participants = kurentoParticipant.getSession().getParticipants();
if (sdpAnswer != null) { if (sdpAnswer != null) {
sessionEventsHandler.onPublishMedia(participant, session.getSessionId(), mediaOptions, sdpAnswer, sessionEventsHandler.onPublishMedia(participant, participant.getPublisherStremId(), session.getSessionId(), mediaOptions, sdpAnswer,
participants, transactionId, null); participants, transactionId, null);
} }
} }

View File

@ -22,6 +22,7 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import org.json.simple.JSONObject;
import org.kurento.client.Continuation; import org.kurento.client.Continuation;
import org.kurento.client.ErrorEvent; import org.kurento.client.ErrorEvent;
import org.kurento.client.EventListener; import org.kurento.client.EventListener;
@ -44,437 +45,456 @@ import io.openvidu.server.kurento.MutedMediaType;
import io.openvidu.server.kurento.core.KurentoParticipant; import io.openvidu.server.kurento.core.KurentoParticipant;
/** /**
* {@link WebRtcEndpoint} wrapper that supports buffering of {@link IceCandidate}s until the * {@link WebRtcEndpoint} wrapper that supports buffering of
* {@link WebRtcEndpoint} is created. Connections to other peers are opened using the corresponding * {@link IceCandidate}s until the {@link WebRtcEndpoint} is created.
* method of the internal endpoint. * Connections to other peers are opened using the corresponding method of the
* internal endpoint.
* *
* @author Pablo Fuente (pablofuenteperez@gmail.com) * @author Pablo Fuente (pablofuenteperez@gmail.com)
*/ */
public abstract class MediaEndpoint { public abstract class MediaEndpoint {
private static Logger log; private static Logger log;
private boolean web = false; private boolean web = false;
private WebRtcEndpoint webEndpoint = null; private WebRtcEndpoint webEndpoint = null;
private RtpEndpoint endpoint = null; private RtpEndpoint endpoint = null;
private KurentoParticipant owner; private KurentoParticipant owner;
private String endpointName; private String endpointName;
private MediaPipeline pipeline = null; private MediaPipeline pipeline = null;
private ListenerSubscription endpointSubscription = null; private ListenerSubscription endpointSubscription = null;
private LinkedList<IceCandidate> candidates = new LinkedList<IceCandidate>(); private LinkedList<IceCandidate> candidates = new LinkedList<IceCandidate>();
private MutedMediaType muteType; private MutedMediaType muteType;
public Map<String, MediaObject> flowInMedia = new ConcurrentHashMap<>(); public Map<String, MediaObject> flowInMedia = new ConcurrentHashMap<>();
public Map<String, MediaObject> flowOutMedia = new ConcurrentHashMap<>(); public Map<String, MediaObject> flowOutMedia = new ConcurrentHashMap<>();
/** public String selectedLocalIceCandidate;
* Constructor to set the owner, the endpoint's name and the media pipeline. public String selectedRemoteIceCandidate;
*
* @param web
* @param owner
* @param endpointName
* @param pipeline
* @param log
*/
public MediaEndpoint(boolean web, KurentoParticipant owner, String endpointName,
MediaPipeline pipeline, Logger log) {
if (log == null) {
MediaEndpoint.log = LoggerFactory.getLogger(MediaEndpoint.class);
} else {
MediaEndpoint.log = log;
}
this.web = web;
this.owner = owner;
this.setEndpointName(endpointName);
this.setMediaPipeline(pipeline);
}
public boolean isWeb() { /**
return web; * Constructor to set the owner, the endpoint's name and the media pipeline.
} *
* @param web
* @param owner
* @param endpointName
* @param pipeline
* @param log
*/
public MediaEndpoint(boolean web, KurentoParticipant owner, String endpointName, MediaPipeline pipeline,
Logger log) {
if (log == null) {
MediaEndpoint.log = LoggerFactory.getLogger(MediaEndpoint.class);
} else {
MediaEndpoint.log = log;
}
this.web = web;
this.owner = owner;
this.setEndpointName(endpointName);
this.setMediaPipeline(pipeline);
}
/** public boolean isWeb() {
* @return the user session that created this endpoint return web;
*/ }
public Participant getOwner() {
return owner;
}
/** /**
* @return the internal endpoint ({@link RtpEndpoint} or {@link WebRtcEndpoint}) * @return the user session that created this endpoint
*/ */
public SdpEndpoint getEndpoint() { public Participant getOwner() {
if (this.isWeb()) { return owner;
return this.webEndpoint; }
} else {
return this.endpoint;
}
}
public WebRtcEndpoint getWebEndpoint() { /**
return webEndpoint; * @return the internal endpoint ({@link RtpEndpoint} or {@link WebRtcEndpoint})
} */
public SdpEndpoint getEndpoint() {
if (this.isWeb()) {
return this.webEndpoint;
} else {
return this.endpoint;
}
}
protected RtpEndpoint getRtpEndpoint() { public WebRtcEndpoint getWebEndpoint() {
return endpoint; return webEndpoint;
} }
/** protected RtpEndpoint getRtpEndpoint() {
* If this object doesn't have a {@link WebRtcEndpoint}, it is created in a thread-safe way using return endpoint;
* the internal {@link MediaPipeline}. Otherwise no actions are taken. It also registers an error }
* listener for the endpoint and for any additional media elements.
*
* @param endpointLatch
* latch whose countdown is performed when the asynchronous call to build the
* {@link WebRtcEndpoint} returns
*
* @return the existing endpoint, if any
*/
public synchronized SdpEndpoint createEndpoint(CountDownLatch endpointLatch) {
SdpEndpoint old = this.getEndpoint();
if (old == null) {
internalEndpointInitialization(endpointLatch);
} else {
endpointLatch.countDown();
}
if (this.isWeb()) {
while (!candidates.isEmpty()) {
internalAddIceCandidate(candidates.removeFirst());
}
}
return old;
}
/** /**
* @return the pipeline * If this object doesn't have a {@link WebRtcEndpoint}, it is created in a
*/ * thread-safe way using the internal {@link MediaPipeline}. Otherwise no
public MediaPipeline getPipeline() { * actions are taken. It also registers an error listener for the endpoint and
return this.pipeline; * for any additional media elements.
} *
* @param endpointLatch
* latch whose countdown is performed when the asynchronous call to
* build the {@link WebRtcEndpoint} returns
*
* @return the existing endpoint, if any
*/
public synchronized SdpEndpoint createEndpoint(CountDownLatch endpointLatch) {
SdpEndpoint old = this.getEndpoint();
if (old == null) {
internalEndpointInitialization(endpointLatch);
} else {
endpointLatch.countDown();
}
if (this.isWeb()) {
while (!candidates.isEmpty()) {
internalAddIceCandidate(candidates.removeFirst());
}
}
return old;
}
/** /**
* Sets the {@link MediaPipeline} used to create the internal {@link WebRtcEndpoint}. * @return the pipeline
* */
* @param pipeline public MediaPipeline getPipeline() {
* the {@link MediaPipeline} return this.pipeline;
*/ }
public void setMediaPipeline(MediaPipeline pipeline) {
this.pipeline = pipeline;
}
/** /**
* @return name of this endpoint (as indicated by the browser) * Sets the {@link MediaPipeline} used to create the internal
*/ * {@link WebRtcEndpoint}.
public String getEndpointName() { *
return endpointName; * @param pipeline
} * the {@link MediaPipeline}
*/
public void setMediaPipeline(MediaPipeline pipeline) {
this.pipeline = pipeline;
}
/** /**
* Sets the endpoint's name (as indicated by the browser). * @return name of this endpoint (as indicated by the browser)
* */
* @param endpointName public String getEndpointName() {
* the name return endpointName;
*/ }
public void setEndpointName(String endpointName) {
this.endpointName = endpointName;
}
/** /**
* Unregisters all error listeners created for media elements owned by this instance. * Sets the endpoint's name (as indicated by the browser).
*/ *
public synchronized void unregisterErrorListeners() { * @param endpointName
unregisterElementErrListener(endpoint, endpointSubscription); * the name
} */
public void setEndpointName(String endpointName) {
this.endpointName = endpointName;
}
/** /**
* Mute the media stream. * Unregisters all error listeners created for media elements owned by this
* * instance.
* @param muteType */
* which type of leg to disconnect (audio, video or both) public synchronized void unregisterErrorListeners() {
*/ unregisterElementErrListener(endpoint, endpointSubscription);
public abstract void mute(MutedMediaType muteType); }
/** /**
* Reconnect the muted media leg(s). * Mute the media stream.
*/ *
public abstract void unmute(); * @param muteType
* which type of leg to disconnect (audio, video or both)
*/
public abstract void mute(MutedMediaType muteType);
public void setMuteType(MutedMediaType muteType) { /**
this.muteType = muteType; * Reconnect the muted media leg(s).
} */
public abstract void unmute();
public MutedMediaType getMuteType() { public void setMuteType(MutedMediaType muteType) {
return this.muteType; this.muteType = muteType;
} }
protected void resolveCurrentMuteType(MutedMediaType newMuteType) { public MutedMediaType getMuteType() {
MutedMediaType prev = this.getMuteType(); return this.muteType;
if (prev != null) { }
switch (prev) {
case AUDIO :
if (muteType.equals(MutedMediaType.VIDEO)) {
this.setMuteType(MutedMediaType.ALL);
return;
}
break;
case VIDEO :
if (muteType.equals(MutedMediaType.AUDIO)) {
this.setMuteType(MutedMediaType.ALL);
return;
}
break;
case ALL :
return;
}
}
this.setMuteType(newMuteType);
}
/** protected void resolveCurrentMuteType(MutedMediaType newMuteType) {
* Creates the endpoint (RTP or WebRTC) and any other additional elements (if needed). MutedMediaType prev = this.getMuteType();
* if (prev != null) {
* @param endpointLatch switch (prev) {
*/ case AUDIO:
protected void internalEndpointInitialization(final CountDownLatch endpointLatch) { if (muteType.equals(MutedMediaType.VIDEO)) {
if (this.isWeb()) { this.setMuteType(MutedMediaType.ALL);
WebRtcEndpoint.Builder builder = new WebRtcEndpoint.Builder(pipeline); return;
/*if (this.dataChannels) { }
builder.useDataChannels(); break;
}*/ case VIDEO:
builder.buildAsync(new Continuation<WebRtcEndpoint>() { if (muteType.equals(MutedMediaType.AUDIO)) {
@Override this.setMuteType(MutedMediaType.ALL);
public void onSuccess(WebRtcEndpoint result) throws Exception { return;
webEndpoint = result; }
break;
case ALL:
return;
}
}
this.setMuteType(newMuteType);
}
webEndpoint.setMaxVideoRecvBandwidth(600); /**
webEndpoint.setMinVideoRecvBandwidth(300); * Creates the endpoint (RTP or WebRTC) and any other additional elements (if
webEndpoint.setMaxVideoSendBandwidth(600); * needed).
webEndpoint.setMinVideoSendBandwidth(300); *
* @param endpointLatch
*/
protected void internalEndpointInitialization(final CountDownLatch endpointLatch) {
if (this.isWeb()) {
WebRtcEndpoint.Builder builder = new WebRtcEndpoint.Builder(pipeline);
/*
* if (this.dataChannels) { builder.useDataChannels(); }
*/
builder.buildAsync(new Continuation<WebRtcEndpoint>() {
@Override
public void onSuccess(WebRtcEndpoint result) throws Exception {
webEndpoint = result;
endpointLatch.countDown(); webEndpoint.setMaxVideoRecvBandwidth(600);
log.trace("EP {}: Created a new WebRtcEndpoint", endpointName); webEndpoint.setMinVideoRecvBandwidth(300);
endpointSubscription = registerElemErrListener(webEndpoint); webEndpoint.setMaxVideoSendBandwidth(600);
} webEndpoint.setMinVideoSendBandwidth(300);
@Override endpointLatch.countDown();
public void onError(Throwable cause) throws Exception { log.trace("EP {}: Created a new WebRtcEndpoint", endpointName);
endpointLatch.countDown(); endpointSubscription = registerElemErrListener(webEndpoint);
log.error("EP {}: Failed to create a new WebRtcEndpoint", endpointName, cause); }
}
});
} else {
new RtpEndpoint.Builder(pipeline).buildAsync(new Continuation<RtpEndpoint>() {
@Override
public void onSuccess(RtpEndpoint result) throws Exception {
endpoint = result;
endpointLatch.countDown();
log.trace("EP {}: Created a new RtpEndpoint", endpointName);
endpointSubscription = registerElemErrListener(endpoint);
}
@Override @Override
public void onError(Throwable cause) throws Exception { public void onError(Throwable cause) throws Exception {
endpointLatch.countDown(); endpointLatch.countDown();
log.error("EP {}: Failed to create a new RtpEndpoint", endpointName, cause); log.error("EP {}: Failed to create a new WebRtcEndpoint", endpointName, cause);
} }
}); });
} } else {
} new RtpEndpoint.Builder(pipeline).buildAsync(new Continuation<RtpEndpoint>() {
@Override
public void onSuccess(RtpEndpoint result) throws Exception {
endpoint = result;
endpointLatch.countDown();
log.trace("EP {}: Created a new RtpEndpoint", endpointName);
endpointSubscription = registerElemErrListener(endpoint);
}
/** @Override
* Add a new {@link IceCandidate} received gathered by the remote peer of this public void onError(Throwable cause) throws Exception {
* {@link WebRtcEndpoint}. endpointLatch.countDown();
* log.error("EP {}: Failed to create a new RtpEndpoint", endpointName, cause);
* @param candidate }
* the remote candidate });
*/ }
public synchronized void addIceCandidate(IceCandidate candidate) throws OpenViduException { }
if (!this.isWeb()) {
throw new OpenViduException(Code.MEDIA_NOT_A_WEB_ENDPOINT_ERROR_CODE, "Operation not supported");
}
if (webEndpoint == null) {
candidates.addLast(candidate);
} else {
internalAddIceCandidate(candidate);
}
}
/** /**
* Registers a listener for when the {@link MediaElement} triggers an {@link ErrorEvent}. Notifies * Add a new {@link IceCandidate} received gathered by the remote peer of this
* the owner with the error. * {@link WebRtcEndpoint}.
* *
* @param element * @param candidate
* the {@link MediaElement} * the remote candidate
* @return {@link ListenerSubscription} that can be used to deregister the listener */
*/ public synchronized void addIceCandidate(IceCandidate candidate) throws OpenViduException {
protected ListenerSubscription registerElemErrListener(MediaElement element) { if (!this.isWeb()) {
return element.addErrorListener(new EventListener<ErrorEvent>() { throw new OpenViduException(Code.MEDIA_NOT_A_WEB_ENDPOINT_ERROR_CODE, "Operation not supported");
@Override }
public void onEvent(ErrorEvent event) { if (webEndpoint == null) {
owner.sendMediaError(event); candidates.addLast(candidate);
} } else {
}); internalAddIceCandidate(candidate);
} }
}
/** /**
* Unregisters the error listener from the media element using the provided subscription. * Registers a listener for when the {@link MediaElement} triggers an
* * {@link ErrorEvent}. Notifies the owner with the error.
* @param element *
* the {@link MediaElement} * @param element
* @param subscription * the {@link MediaElement}
* the associated {@link ListenerSubscription} * @return {@link ListenerSubscription} that can be used to deregister the
*/ * listener
protected void unregisterElementErrListener(MediaElement element, */
final ListenerSubscription subscription) { protected ListenerSubscription registerElemErrListener(MediaElement element) {
if (element == null || subscription == null) { return element.addErrorListener(new EventListener<ErrorEvent>() {
return; @Override
} public void onEvent(ErrorEvent event) {
element.removeErrorListener(subscription); owner.sendMediaError(event);
} }
});
}
/** /**
* Orders the internal endpoint ({@link RtpEndpoint} or {@link WebRtcEndpoint}) to process the * Unregisters the error listener from the media element using the provided
* offer String. * subscription.
* *
* @see SdpEndpoint#processOffer(String) * @param element
* @param offer * the {@link MediaElement}
* String with the Sdp offer * @param subscription
* @return the Sdp answer * the associated {@link ListenerSubscription}
*/ */
protected String processOffer(String offer) throws OpenViduException { protected void unregisterElementErrListener(MediaElement element, final ListenerSubscription subscription) {
if (this.isWeb()) { if (element == null || subscription == null) {
if (webEndpoint == null) { return;
throw new OpenViduException(Code.MEDIA_WEBRTC_ENDPOINT_ERROR_CODE, }
"Can't process offer when WebRtcEndpoint is null (ep: " + endpointName + ")"); element.removeErrorListener(subscription);
} }
return webEndpoint.processOffer(offer);
} else {
if (endpoint == null) {
throw new OpenViduException(Code.MEDIA_RTP_ENDPOINT_ERROR_CODE,
"Can't process offer when RtpEndpoint is null (ep: " + endpointName + ")");
}
return endpoint.processOffer(offer);
}
}
/** /**
* Orders the internal endpoint ({@link RtpEndpoint} or {@link WebRtcEndpoint}) to generate the * Orders the internal endpoint ({@link RtpEndpoint} or {@link WebRtcEndpoint})
* offer String that can be used to initiate a connection. * to process the offer String.
* *
* @see SdpEndpoint#generateOffer() * @see SdpEndpoint#processOffer(String)
* @return the Sdp offer * @param offer
*/ * String with the Sdp offer
protected String generateOffer() throws OpenViduException { * @return the Sdp answer
if (this.isWeb()) { */
if (webEndpoint == null) { protected String processOffer(String offer) throws OpenViduException {
throw new OpenViduException(Code.MEDIA_WEBRTC_ENDPOINT_ERROR_CODE, if (this.isWeb()) {
"Can't generate offer when WebRtcEndpoint is null (ep: " + endpointName + ")"); if (webEndpoint == null) {
} throw new OpenViduException(Code.MEDIA_WEBRTC_ENDPOINT_ERROR_CODE,
return webEndpoint.generateOffer(); "Can't process offer when WebRtcEndpoint is null (ep: " + endpointName + ")");
} else { }
if (endpoint == null) { return webEndpoint.processOffer(offer);
throw new OpenViduException(Code.MEDIA_RTP_ENDPOINT_ERROR_CODE, } else {
"Can't generate offer when RtpEndpoint is null (ep: " + endpointName + ")"); if (endpoint == null) {
} throw new OpenViduException(Code.MEDIA_RTP_ENDPOINT_ERROR_CODE,
return endpoint.generateOffer(); "Can't process offer when RtpEndpoint is null (ep: " + endpointName + ")");
} }
} return endpoint.processOffer(offer);
}
}
/** /**
* Orders the internal endpoint ({@link RtpEndpoint} or {@link WebRtcEndpoint}) to process the * Orders the internal endpoint ({@link RtpEndpoint} or {@link WebRtcEndpoint})
* answer String. * to generate the offer String that can be used to initiate a connection.
* *
* @see SdpEndpoint#processAnswer(String) * @see SdpEndpoint#generateOffer()
* @param answer * @return the Sdp offer
* String with the Sdp answer from remote */
* @return the updated Sdp offer, based on the received answer protected String generateOffer() throws OpenViduException {
*/ if (this.isWeb()) {
protected String processAnswer(String answer) throws OpenViduException { if (webEndpoint == null) {
if (this.isWeb()) { throw new OpenViduException(Code.MEDIA_WEBRTC_ENDPOINT_ERROR_CODE,
if (webEndpoint == null) { "Can't generate offer when WebRtcEndpoint is null (ep: " + endpointName + ")");
throw new OpenViduException(Code.MEDIA_WEBRTC_ENDPOINT_ERROR_CODE, }
"Can't process answer when WebRtcEndpoint is null (ep: " + endpointName + ")"); return webEndpoint.generateOffer();
} } else {
return webEndpoint.processAnswer(answer); if (endpoint == null) {
} else { throw new OpenViduException(Code.MEDIA_RTP_ENDPOINT_ERROR_CODE,
if (endpoint == null) { "Can't generate offer when RtpEndpoint is null (ep: " + endpointName + ")");
throw new OpenViduException(Code.MEDIA_RTP_ENDPOINT_ERROR_CODE, }
"Can't process answer when RtpEndpoint is null (ep: " + endpointName + ")"); return endpoint.generateOffer();
} }
return endpoint.processAnswer(answer); }
}
}
/** /**
* If supported, it registers a listener for when a new {@link IceCandidate} is gathered by the * Orders the internal endpoint ({@link RtpEndpoint} or {@link WebRtcEndpoint})
* internal endpoint ({@link WebRtcEndpoint}) and sends it to the remote User Agent as a * to process the answer String.
* notification using the messaging capabilities of the {@link Participant}. *
* * @see SdpEndpoint#processAnswer(String)
* @see WebRtcEndpoint#addOnIceCandidateListener(org.kurento.client.EventListener) * @param answer
* @see Participant#sendIceCandidate(String, IceCandidate) * String with the Sdp answer from remote
* @throws OpenViduException * @return the updated Sdp offer, based on the received answer
* if thrown, unable to register the listener */
*/ protected String processAnswer(String answer) throws OpenViduException {
protected void registerOnIceCandidateEventListener() throws OpenViduException { if (this.isWeb()) {
if (!this.isWeb()) { if (webEndpoint == null) {
return; throw new OpenViduException(Code.MEDIA_WEBRTC_ENDPOINT_ERROR_CODE,
} "Can't process answer when WebRtcEndpoint is null (ep: " + endpointName + ")");
if (webEndpoint == null) { }
throw new OpenViduException(Code.MEDIA_WEBRTC_ENDPOINT_ERROR_CODE, return webEndpoint.processAnswer(answer);
"Can't register event listener for null WebRtcEndpoint (ep: " + endpointName + ")"); } else {
} if (endpoint == null) {
webEndpoint.addOnIceCandidateListener(new EventListener<OnIceCandidateEvent>() { throw new OpenViduException(Code.MEDIA_RTP_ENDPOINT_ERROR_CODE,
@Override "Can't process answer when RtpEndpoint is null (ep: " + endpointName + ")");
public void onEvent(OnIceCandidateEvent event) { }
owner.sendIceCandidate(endpointName, event.getCandidate()); return endpoint.processAnswer(answer);
} }
}); }
}
/** /**
* If supported, it instructs the internal endpoint to start gathering {@link IceCandidate}s. * If supported, it registers a listener for when a new {@link IceCandidate} is
*/ * gathered by the internal endpoint ({@link WebRtcEndpoint}) and sends it to
protected void gatherCandidates() throws OpenViduException { * the remote User Agent as a notification using the messaging capabilities of
if (!this.isWeb()) { * the {@link Participant}.
return; *
} * @see WebRtcEndpoint#addOnIceCandidateListener(org.kurento.client.EventListener)
if (webEndpoint == null) { * @see Participant#sendIceCandidate(String, IceCandidate)
throw new OpenViduException(Code.MEDIA_WEBRTC_ENDPOINT_ERROR_CODE, * @throws OpenViduException
"Can't start gathering ICE candidates on null WebRtcEndpoint (ep: " + endpointName + ")"); * if thrown, unable to register the listener
} */
webEndpoint.gatherCandidates(new Continuation<Void>() { protected void registerOnIceCandidateEventListener() throws OpenViduException {
@Override if (!this.isWeb()) {
public void onSuccess(Void result) throws Exception { return;
log.trace("EP {}: Internal endpoint started to gather candidates", endpointName); }
} if (webEndpoint == null) {
throw new OpenViduException(Code.MEDIA_WEBRTC_ENDPOINT_ERROR_CODE,
"Can't register event listener for null WebRtcEndpoint (ep: " + endpointName + ")");
}
webEndpoint.addOnIceCandidateListener(new EventListener<OnIceCandidateEvent>() {
@Override
public void onEvent(OnIceCandidateEvent event) {
owner.sendIceCandidate(endpointName, event.getCandidate());
}
});
}
@Override /**
public void onError(Throwable cause) throws Exception { * If supported, it instructs the internal endpoint to start gathering
log.warn("EP {}: Internal endpoint failed to start gathering candidates", endpointName, * {@link IceCandidate}s.
cause); */
} protected void gatherCandidates() throws OpenViduException {
}); if (!this.isWeb()) {
} return;
}
if (webEndpoint == null) {
throw new OpenViduException(Code.MEDIA_WEBRTC_ENDPOINT_ERROR_CODE,
"Can't start gathering ICE candidates on null WebRtcEndpoint (ep: " + endpointName + ")");
}
webEndpoint.gatherCandidates(new Continuation<Void>() {
@Override
public void onSuccess(Void result) throws Exception {
log.trace("EP {}: Internal endpoint started to gather candidates", endpointName);
}
private void internalAddIceCandidate(IceCandidate candidate) throws OpenViduException { @Override
if (webEndpoint == null) { public void onError(Throwable cause) throws Exception {
throw new OpenViduException(Code.MEDIA_WEBRTC_ENDPOINT_ERROR_CODE, log.warn("EP {}: Internal endpoint failed to start gathering candidates", endpointName, cause);
"Can't add existing ICE candidates to null WebRtcEndpoint (ep: " + endpointName + ")"); }
} });
this.webEndpoint.addIceCandidate(candidate, new Continuation<Void>() { }
@Override
public void onSuccess(Void result) throws Exception {
log.trace("Ice candidate added to the internal endpoint");
}
@Override private void internalAddIceCandidate(IceCandidate candidate) throws OpenViduException {
public void onError(Throwable cause) throws Exception { if (webEndpoint == null) {
log.warn("EP {}: Failed to add ice candidate to the internal endpoint", endpointName, cause); throw new OpenViduException(Code.MEDIA_WEBRTC_ENDPOINT_ERROR_CODE,
} "Can't add existing ICE candidates to null WebRtcEndpoint (ep: " + endpointName + ")");
}); }
} this.webEndpoint.addIceCandidate(candidate, new Continuation<Void>() {
@Override
public void onSuccess(Void result) throws Exception {
log.trace("Ice candidate added to the internal endpoint");
}
@Override
public void onError(Throwable cause) throws Exception {
log.warn("EP {}: Failed to add ice candidate to the internal endpoint", endpointName, cause);
}
});
}
@SuppressWarnings("unchecked")
public JSONObject toJSON() {
JSONObject json = new JSONObject();
json.put("webrtcTagName", this.getEndpoint().getTag("name"));
json.put("localCandidate", this.selectedLocalIceCandidate);
json.put("remoteCandidate", this.selectedRemoteIceCandidate);
return json;
}
} }

View File

@ -140,7 +140,7 @@ public class ComposedRecordingService {
envs.add("RECORDING_JSON=" + recording.toJson().toJSONString()); envs.add("RECORDING_JSON=" + recording.toJson().toJSONString());
log.info(recording.toJson().toJSONString()); log.info(recording.toJson().toJSONString());
log.debug("Recorder connecting to url {}", layoutUrl); log.info("Recorder connecting to url {}", layoutUrl);
String containerId = this.runRecordingContainer(envs, "recording_" + recordingId); String containerId = this.runRecordingContainer(envs, "recording_" + recordingId);

View File

@ -30,7 +30,7 @@ import io.openvidu.server.config.OpenviduConfig;
* @author Pablo Fuente Pérez * @author Pablo Fuente Pérez
*/ */
@RestController @RestController
@CrossOrigin(origins = "*") @CrossOrigin
@RequestMapping("/config") @RequestMapping("/config")
public class ConfigRestController { public class ConfigRestController {

View File

@ -20,7 +20,6 @@ package io.openvidu.server.rest;
import java.util.Collection; import java.util.Collection;
import java.util.Map; import java.util.Map;
import java.util.NoSuchElementException; import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import org.json.simple.JSONArray; import org.json.simple.JSONArray;
@ -54,7 +53,7 @@ import io.openvidu.server.recording.ComposedRecordingService;
* @author Pablo Fuente Pérez * @author Pablo Fuente Pérez
*/ */
@RestController @RestController
@CrossOrigin(origins = "*") @CrossOrigin
@RequestMapping("/api") @RequestMapping("/api")
public class SessionRestController { public class SessionRestController {
@ -67,11 +66,6 @@ public class SessionRestController {
@Autowired @Autowired
private OpenviduConfig openviduConfig; private OpenviduConfig openviduConfig;
@RequestMapping(value = "/sessions", method = RequestMethod.GET)
public Set<String> getAllSessions() {
return sessionManager.getSessions();
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@RequestMapping(value = "/sessions", method = RequestMethod.POST) @RequestMapping(value = "/sessions", method = RequestMethod.POST)
public ResponseEntity<JSONObject> getSessionId(@RequestBody(required = false) Map<?, ?> params) { public ResponseEntity<JSONObject> getSessionId(@RequestBody(required = false) Map<?, ?> params) {
@ -136,9 +130,34 @@ public class SessionRestController {
sessionManager.storeSessionId(sessionId, sessionProperties); sessionManager.storeSessionId(sessionId, sessionProperties);
JSONObject responseJson = new JSONObject(); JSONObject responseJson = new JSONObject();
responseJson.put("id", sessionId); responseJson.put("id", sessionId);
return new ResponseEntity<>(responseJson, HttpStatus.OK); return new ResponseEntity<>(responseJson, HttpStatus.OK);
} }
@RequestMapping(value = "/sessions/{sessionId}", method = RequestMethod.GET)
public ResponseEntity<JSONObject> getSession(@PathVariable("sessionId") String sessionId) {
Session session = this.sessionManager.getSession(sessionId);
if (session != null) {
return new ResponseEntity<>(session.toJSON(), HttpStatus.OK);
} else {
return new ResponseEntity<>(HttpStatus.NOT_FOUND);
}
}
@SuppressWarnings("unchecked")
@RequestMapping(value = "/sessions", method = RequestMethod.GET)
public ResponseEntity<JSONObject> listSessions() {
Collection<Session> sessions = this.sessionManager.getSessionObjects();
JSONObject json = new JSONObject();
JSONArray jsonArray = new JSONArray();
sessions.forEach(s -> {
jsonArray.add(s.toJSON());
});
json.put("count", sessions.size());
json.put("items", jsonArray);
return new ResponseEntity<>(json, HttpStatus.OK);
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@RequestMapping(value = "/tokens", method = RequestMethod.POST) @RequestMapping(value = "/tokens", method = RequestMethod.POST)
public ResponseEntity<JSONObject> newToken(@RequestBody Map<?, ?> params) { public ResponseEntity<JSONObject> newToken(@RequestBody Map<?, ?> params) {

View File

@ -54,7 +54,8 @@ public class RpcHandler extends DefaultJsonRpcHandler<JsonObject> {
@Autowired @Autowired
RpcNotificationService notificationService; RpcNotificationService notificationService;
private ConcurrentMap<String, Boolean> webSocketTransportError = new ConcurrentHashMap<>(); private ConcurrentMap<String, Boolean> webSocketEOFTransportError = new ConcurrentHashMap<>();
// private ConcurrentMap<String, Boolean> webSocketBrokenPipeTransportError = new ConcurrentHashMap<>();
@Override @Override
public void handleRequest(Transaction transaction, Request<JsonObject> request) throws Exception { public void handleRequest(Transaction transaction, Request<JsonObject> request) throws Exception {
@ -301,23 +302,37 @@ public class RpcHandler extends DefaultJsonRpcHandler<JsonObject> {
public void afterConnectionClosed(Session rpcSession, String status) throws Exception { public void afterConnectionClosed(Session rpcSession, String status) throws Exception {
log.info("After connection closed for WebSocket session: {} - Status: {}", rpcSession.getSessionId(), status); log.info("After connection closed for WebSocket session: {} - Status: {}", rpcSession.getSessionId(), status);
String rpcSessionId = rpcSession.getSessionId();
String message = "";
if ("Close for not receive ping from client".equals(status)) { if ("Close for not receive ping from client".equals(status)) {
RpcConnection rpc = this.notificationService.closeRpcSession(rpcSession.getSessionId()); message = "Evicting participant with private id {} because of a network disconnection";
} else if (status == null) { // && this.webSocketBrokenPipeTransportError.remove(rpcSessionId) != null)) {
try {
Participant p = sessionManager.getParticipant(rpcSession.getSessionId());
if (p != null) {
message = "Evicting participant with private id {} because its websocket unexpectedly closed in the client side";
}
} catch (OpenViduException exception) {
}
}
if (!message.isEmpty()) {
RpcConnection rpc = this.notificationService.closeRpcSession(rpcSessionId);
if (rpc != null && rpc.getSessionId() != null) { if (rpc != null && rpc.getSessionId() != null) {
io.openvidu.server.core.Session session = this.sessionManager.getSession(rpc.getSessionId()); io.openvidu.server.core.Session session = this.sessionManager.getSession(rpc.getSessionId());
if (session != null && session.getParticipantByPrivateId(rpc.getParticipantPrivateId()) != null) { if (session != null && session.getParticipantByPrivateId(rpc.getParticipantPrivateId()) != null) {
log.info(message, rpc.getParticipantPrivateId());
leaveRoomAfterConnClosed(rpc.getParticipantPrivateId(), "networkDisconnect"); leaveRoomAfterConnClosed(rpc.getParticipantPrivateId(), "networkDisconnect");
} }
} }
} }
String rpcSessionId = rpcSession.getSessionId(); if (this.webSocketEOFTransportError.remove(rpcSessionId) != null) {
if (this.webSocketTransportError.get(rpcSessionId) != null) {
log.warn( log.warn(
"Evicting participant with private id {} because a transport error took place and its web socket connection is now closed", "Evicting participant with private id {} because a transport error took place and its web socket connection is now closed",
rpcSession.getSessionId()); rpcSession.getSessionId());
this.leaveRoomAfterConnClosed(rpcSessionId, "networkDisconnect"); this.leaveRoomAfterConnClosed(rpcSessionId, "networkDisconnect");
this.webSocketTransportError.remove(rpcSessionId);
} }
} }
@ -325,10 +340,15 @@ public class RpcHandler extends DefaultJsonRpcHandler<JsonObject> {
public void handleTransportError(Session rpcSession, Throwable exception) throws Exception { public void handleTransportError(Session rpcSession, Throwable exception) throws Exception {
log.error("Transport exception for WebSocket session: {} - Exception: {}", rpcSession.getSessionId(), log.error("Transport exception for WebSocket session: {} - Exception: {}", rpcSession.getSessionId(),
exception); exception);
if ("IOException".equals(exception.getClass().getSimpleName())
&& "Broken pipe".equals(exception.getCause().getMessage())) {
log.warn("Parcipant with private id {} unexpectedly closed the websocket", rpcSession.getSessionId());
// this.webSocketBrokenPipeTransportError.put(rpcSession.getSessionId(), true);
}
if ("EOFException".equals(exception.getClass().getSimpleName())) { if ("EOFException".equals(exception.getClass().getSimpleName())) {
// Store WebSocket connection interrupted exception for this web socket to // Store WebSocket connection interrupted exception for this web socket to
// automatically evict the participant on "afterConnectionClosed" event // automatically evict the participant on "afterConnectionClosed" event
this.webSocketTransportError.put(rpcSession.getSessionId(), true); this.webSocketEOFTransportError.put(rpcSession.getSessionId(), true);
} }
} }