diff --git a/openvidu-server/src/main/java/io/openvidu/server/core/SessionManager.java b/openvidu-server/src/main/java/io/openvidu/server/core/SessionManager.java index 17ec8d9f..6d44281d 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/core/SessionManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/core/SessionManager.java @@ -27,7 +27,6 @@ public abstract class SessionManager { protected ConcurrentMap sessions = new ConcurrentHashMap<>(); protected ConcurrentMap> sessionidTokenTokenobj = new ConcurrentHashMap<>(); protected ConcurrentMap> sessionidParticipantpublicidParticipant = new ConcurrentHashMap<>(); - protected ConcurrentMap insecureUsers = new ConcurrentHashMap<>(); private volatile boolean closed = false; @@ -152,7 +151,7 @@ public abstract class SessionManager { this.sessionidTokenTokenobj.put(sessionId, new ConcurrentHashMap<>()); this.sessionidParticipantpublicidParticipant.put(sessionId, new ConcurrentHashMap<>()); - showMap(); + showTokens(); return sessionId; } @@ -162,7 +161,7 @@ public abstract class SessionManager { if (isMetadataFormatCorrect(serverMetadata)) { String token = new BigInteger(130, new SecureRandom()).toString(32); this.sessionidTokenTokenobj.get(sessionId).put(token, new Token(token, role, serverMetadata)); - showMap(); + showTokens(); return token; } else { throw new OpenViduException(Code.GENERIC_ERROR_CODE, @@ -184,14 +183,7 @@ public abstract class SessionManager { } else { this.sessionidParticipantpublicidParticipant.putIfAbsent(sessionId, new ConcurrentHashMap<>()); this.sessionidTokenTokenobj.putIfAbsent(sessionId, new ConcurrentHashMap<>()); - this.sessionidTokenTokenobj.get(sessionId).putIfAbsent(token, new Token(token)); - /* - * this.sessionidParticipantpublicidParticipant.get(sessionId).putIfAbsent( - * token, new Participant()); - * this.sessionidTokenTokenobj.get(sessionId).putIfAbsent(token, new - * Token(token)); - */ return true; } } @@ -268,10 +260,16 @@ public abstract class SessionManager { } } - protected void showMap() { - System.out.println("------------------------------"); - System.out.println(this.sessionidTokenTokenobj.toString()); - System.out.println("------------------------------"); + public void showTokens() { + log.info(": {}", this.sessionidTokenTokenobj.toString()); + } + + public void showInsecureParticipants() { + log.info(": {}", this.insecureUsers.toString()); + } + + public void showAllParticipants() { + log.info(": {}", this.sessionidParticipantpublicidParticipant.toString()); } diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSession.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSession.java index 8e7c4132..997b3aa3 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSession.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSession.java @@ -167,7 +167,7 @@ public class KurentoSession implements Session { closePipeline(); - log.debug("Room {} closed", this.sessionId); + log.debug("Session {} closed", this.sessionId); if (destroyKurentoClient) { kurentoClient.destroy(); @@ -204,7 +204,7 @@ public class KurentoSession implements Session { participants.remove(participant.getParticipantPrivateId()); - log.debug("SESSION {}: Cancel receiving media from user '{}' for other users", this.sessionId, participant.getParticipantPublicId()); + log.debug("SESSION {}: Cancel receiving media from participant '{}' for other participant", this.sessionId, participant.getParticipantPublicId()); for (KurentoParticipant other : participants.values()) { other.cancelReceivingMedia(participant.getParticipantPublicId()); } diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSessionManager.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSessionManager.java index 2e3683b8..7ebfdfb3 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSessionManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSessionManager.java @@ -108,7 +108,7 @@ public class KurentoSessionManager extends SessionManager { } } - showMap(); + showTokens(); Set remainingParticipants = null; try { @@ -125,7 +125,7 @@ public class KurentoSessionManager extends SessionManager { sessionidParticipantpublicidParticipant.remove(sessionId); sessionidTokenTokenobj.remove(sessionId); - showMap(); + showTokens(); log.warn("Session '{}' removed and closed", sessionId); } diff --git a/openvidu-server/src/main/java/io/openvidu/server/rpc/RpcHandler.java b/openvidu-server/src/main/java/io/openvidu/server/rpc/RpcHandler.java index dbf8902c..98501484 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/rpc/RpcHandler.java +++ b/openvidu-server/src/main/java/io/openvidu/server/rpc/RpcHandler.java @@ -2,6 +2,8 @@ package io.openvidu.server.rpc; import java.util.Arrays; import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.kurento.jsonrpc.DefaultJsonRpcHandler; import org.kurento.jsonrpc.Session; @@ -35,6 +37,8 @@ public class RpcHandler extends DefaultJsonRpcHandler { @Autowired RpcNotificationService notificationService; + private ConcurrentMap webSocketTransportError = new ConcurrentHashMap<>(); + @Override public void handleRequest(Transaction transaction, Request request) throws Exception { @@ -50,9 +54,6 @@ public class RpcHandler extends DefaultJsonRpcHandler { RpcConnection rpcConnection = notificationService.addTransaction(transaction, request); - // ParticipantRequest participantRequest = new ParticipantRequest(rpcSessionId, - // Integer.toString(request.getId())); - transaction.startAsync(); switch (request.getMethod()) { @@ -111,12 +112,12 @@ public class RpcHandler extends DefaultJsonRpcHandler { sessionManager.joinRoom(participant, sessionId, request.getId()); } else { - System.out.println("Error: metadata format is incorrect"); + log.error("ERROR: Metadata format is incorrect"); throw new OpenViduException(Code.USER_METADATA_FORMAT_INVALID_ERROR_CODE, "Unable to join room. The metadata received has an invalid format"); } } else { - System.out.println("Error: sessionId or token not valid"); + log.error("ERROR: sessionId or token not valid"); throw new OpenViduException(Code.USER_UNAUTHORIZED_ERROR_CODE, "Unable to join room. The user is not authorized"); } @@ -210,7 +211,7 @@ public class RpcHandler extends DefaultJsonRpcHandler { sessionManager.sendMessage(participant, message, request.getId()); } - + private void unpublishVideo(RpcConnection rpcConnection, Request request) { String participantPrivateId = rpcConnection.getParticipantPrivateId(); @@ -238,18 +239,31 @@ public class RpcHandler extends DefaultJsonRpcHandler { @Override public void afterConnectionClosed(Session rpcSession, String status) throws Exception { log.info("Connection closed for WebSocket session: {} - Status: {}", rpcSession.getSessionId(), status); + this.notificationService.showRpcConnections(); + String rpcSessionId = rpcSession.getSessionId(); + if (this.webSocketTransportError.get(rpcSessionId) != null) { + log.warn( + "Evicting participant with private id {} because a transport error took place and its web socket connection is now closed", + rpcSession.getSessionId()); + this.leaveRoomAfterConnClosed(rpcSessionId); + this.webSocketTransportError.remove(rpcSessionId); + } } @Override public void handleTransportError(Session rpcSession, Throwable exception) throws Exception { log.error("Transport exception for WebSocket session: {} - Exception: {}", rpcSession.getSessionId(), - exception.getMessage()); + exception); + if ("EOFException".equals(exception.getClass().getSimpleName())) { + // Store WebSocket connection interrupted exception for this web socket to + // automatically evict the participant on "afterConnectionClosed" event + this.webSocketTransportError.put(rpcSession.getSessionId(), true); + } } @Override public void handleUncaughtException(Session rpcSession, Exception exception) { - log.error("Uncaught exception for WebSocket session: {} - Exception: {}", rpcSession.getSessionId(), - exception.getMessage()); + log.error("Uncaught exception for WebSocket session: {} - Exception: {}", rpcSession.getSessionId(), exception); } @Override diff --git a/openvidu-server/src/main/java/io/openvidu/server/rpc/RpcNotificationService.java b/openvidu-server/src/main/java/io/openvidu/server/rpc/RpcNotificationService.java index c52ee6c1..e45fd661 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/rpc/RpcNotificationService.java +++ b/openvidu-server/src/main/java/io/openvidu/server/rpc/RpcNotificationService.java @@ -15,7 +15,7 @@ import com.google.gson.JsonObject; import io.openvidu.client.OpenViduException; public class RpcNotificationService { - + private static final Logger log = LoggerFactory.getLogger(RpcNotificationService.class); private static ConcurrentMap rpcConnections = new ConcurrentHashMap<>(); @@ -38,7 +38,8 @@ public class RpcNotificationService { public void sendResponse(String participantPrivateId, Integer transactionId, Object result) { Transaction t = getAndRemoveTransaction(participantPrivateId, transactionId); if (t == null) { - log.error("No transaction {} found for paticipant with private id {}, unable to send result {}", transactionId, participantPrivateId, result); + log.error("No transaction {} found for paticipant with private id {}, unable to send result {}", + transactionId, participantPrivateId, result); return; } try { @@ -48,10 +49,12 @@ public class RpcNotificationService { } } - public void sendErrorResponse(String participantPrivateId, Integer transactionId, Object data, OpenViduException error) { + public void sendErrorResponse(String participantPrivateId, Integer transactionId, Object data, + OpenViduException error) { Transaction t = getAndRemoveTransaction(participantPrivateId, transactionId); if (t == null) { - log.error("No transaction {} found for paticipant with private id {}, unable to send result {}", transactionId, participantPrivateId, data); + log.error("No transaction {} found for paticipant with private id {}, unable to send result {}", + transactionId, participantPrivateId, data); return; } try { @@ -65,7 +68,8 @@ public class RpcNotificationService { public void sendNotification(final String participantPrivateId, final String method, final Object params) { RpcConnection rpcSession = rpcConnections.get(participantPrivateId); if (rpcSession == null || rpcSession.getSession() == null) { - log.error("No rpc session found for private id {}, unable to send notification {}: {}", participantPrivateId, method, params); + log.error("No rpc session found for private id {}, unable to send notification {}: {}", + participantPrivateId, method, params); return; } Session s = rpcSession.getSession(); @@ -73,7 +77,8 @@ public class RpcNotificationService { try { s.sendNotification(method, params); } catch (Exception e) { - log.error("Exception sending notification '{}': {} to participant with private id {}", method, params, participantPrivateId, e); + log.error("Exception sending notification '{}': {} to participant with private id {}", method, params, + participantPrivateId, e); } } @@ -92,7 +97,7 @@ public class RpcNotificationService { } rpcConnections.remove(participantPrivateId); } - + private Transaction getAndRemoveTransaction(String participantPrivateId, Integer transactionId) { RpcConnection rpcSession = rpcConnections.get(participantPrivateId); if (rpcSession == null) { @@ -105,4 +110,8 @@ public class RpcNotificationService { return t; } + public void showRpcConnections() { + log.info(": {}", RpcNotificationService.rpcConnections.toString()); + } + }