diff --git a/openvidu-server/src/main/java/io/openvidu/server/core/SessionManager.java b/openvidu-server/src/main/java/io/openvidu/server/core/SessionManager.java index aa7854c8..ef9c4fef 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/core/SessionManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/core/SessionManager.java @@ -538,42 +538,41 @@ public abstract class SessionManager { if (session == null) { throw new OpenViduException(Code.ROOM_NOT_FOUND_ERROR_CODE, "Session '" + sessionId + "' not found"); } - if (session.isClosed()) { - this.cleanCollections(sessionId); - throw new OpenViduException(Code.ROOM_CLOSED_ERROR_CODE, "Session '" + sessionId + "' already closed"); - } - Set participants = getParticipants(sessionId); - boolean sessionClosedByLastParticipant = false; - - for (Participant p : participants) { - try { - sessionClosedByLastParticipant = this.evictParticipant(p, null, null, reason); - } catch (OpenViduException e) { - log.warn("Error evicting participant '{}' from session '{}'", p.getParticipantPublicId(), sessionId, e); - } - } - - if (!sessionClosedByLastParticipant) { - // This code should only be executed when there were no participants connected - // to the session. That is: if the session was in the automatic recording stop - // timeout with INDIVIDUAL recording (no docker participant connected) - try { - if (session.closingLock.writeLock().tryLock(15, TimeUnit.SECONDS)) { - try { - if (session.isClosed()) { - return; - } - this.closeSessionAndEmptyCollections(session, reason, true); - } finally { - session.closingLock.writeLock().unlock(); + try { + if (session.closingLock.writeLock().tryLock(15, TimeUnit.SECONDS)) { + try { + if (session.isClosed()) { + this.cleanCollections(sessionId); + throw new OpenViduException(Code.ROOM_CLOSED_ERROR_CODE, + "Session '" + sessionId + "' already closed"); } - } else { - log.error("Timeout waiting for Session {} closing lock to be available", sessionId); + + boolean sessionClosedByLastParticipant = false; + Set participants = getParticipants(sessionId); + for (Participant p : participants) { + try { + sessionClosedByLastParticipant = this.evictParticipant(p, null, null, reason); + } catch (OpenViduException e) { + log.warn("Error evicting participant '{}' from session '{}'", p.getParticipantPublicId(), + sessionId, e); + } + } + if (!sessionClosedByLastParticipant) { + // This code should only be executed when there were no participants connected + // to the session. That is: if the session was in the automatic recording stop + // timeout with INDIVIDUAL recording (no docker participant connected) + this.closeSessionAndEmptyCollections(session, reason, true); + } + + } finally { + session.closingLock.writeLock().unlock(); } - } catch (InterruptedException e) { - log.error("InterruptedException while waiting for Session {} closing lock to be available", sessionId); + } else { + log.error("Timeout waiting for Session {} closing lock to be available", sessionId); } + } catch (InterruptedException e) { + log.error("InterruptedException while waiting for Session {} closing lock to be available", sessionId); } } @@ -669,15 +668,15 @@ public abstract class SessionManager { } public void closeAllSessionsAndRecordingsOfKms(Kms kms, EndReason reason) { - // Close all active sessions - kms.getKurentoSessions().forEach(kSession -> { - this.closeSession(kSession.getSessionId(), reason); - }); // Close all non active sessions configured with this Media Node this.closeNonActiveSessions(sessionNotActive -> { return (sessionNotActive.getSessionProperties().mediaNode() != null && kms.getId().equals(sessionNotActive.getSessionProperties().mediaNode())); }); + // Close all active sessions + kms.getKurentoSessions().forEach(kSession -> { + this.closeSession(kSession.getSessionId(), reason); + }); // Stop all external recordings kms.getActiveRecordings().forEach(recordingIdSessionId -> { diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/KmsManager.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/KmsManager.java index 54abea9e..d19ee880 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/KmsManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/KmsManager.java @@ -186,6 +186,13 @@ public abstract class KmsManager { public void disconnected() { final Kms kms = kmss.get(kmsId); + // TODO: take a look at this +// if (kms.getTimeOfKurentoClientDisconnection() > 0) { +// log.warn("Event disconnected of KurentoClient {} is already being processed by other thread", +// kms.getKurentoClient().toString()); +// return; +// } + kms.setKurentoClientConnected(false); kms.setTimeOfKurentoClientDisconnection(System.currentTimeMillis()); @@ -225,7 +232,11 @@ public abstract class KmsManager { log.warn("Removing Media Node {} after crash", kms.getId()); String environmentId = removeMediaNodeUponCrash(kms.getId()); - // 2. Close all sessions and recordings with reason "nodeCrashed" + // 2. Send nodeCrashed webhook event + sessionEventsHandler.onMediaNodeCrashed(kms, environmentId, timeOfKurentoDisconnection, + affectedSessionIds, affectedRecordingIds); + + // 3. Close all sessions and recordings with reason "nodeCrashed" log.warn("Closing {} sessions hosted by KMS with uri {}: {}", kms.getKurentoSessions().size(), kms.getUri(), kms.getKurentoSessions().stream().map(s -> s.getSessionId()) .collect(Collectors.joining(",", "[", "]"))); @@ -237,10 +248,6 @@ public abstract class KmsManager { RemoteOperationUtils.revertToRunRemoteOperations(); } - // 3. Send nodeCrashed webhook event - sessionEventsHandler.onMediaNodeCrashed(kms, environmentId, timeOfKurentoDisconnection, - affectedSessionIds, affectedRecordingIds); - if (infiniteRetry()) { disconnected(); } diff --git a/openvidu-server/src/main/java/io/openvidu/server/rest/SessionRestController.java b/openvidu-server/src/main/java/io/openvidu/server/rest/SessionRestController.java index 461c389f..45635a94 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/rest/SessionRestController.java +++ b/openvidu-server/src/main/java/io/openvidu/server/rest/SessionRestController.java @@ -25,9 +25,9 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; import java.util.stream.Collectors; -import io.openvidu.java.client.*; import org.apache.commons.lang3.RandomStringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,7 +53,18 @@ import com.google.gson.JsonParser; import io.openvidu.client.OpenViduException; import io.openvidu.client.OpenViduException.Code; import io.openvidu.client.internal.ProtocolElements; +import io.openvidu.java.client.ConnectionProperties; +import io.openvidu.java.client.ConnectionType; +import io.openvidu.java.client.IceServerProperties; +import io.openvidu.java.client.KurentoOptions; +import io.openvidu.java.client.MediaMode; +import io.openvidu.java.client.OpenViduRole; import io.openvidu.java.client.Recording.OutputMode; +import io.openvidu.java.client.RecordingLayout; +import io.openvidu.java.client.RecordingMode; +import io.openvidu.java.client.RecordingProperties; +import io.openvidu.java.client.SessionProperties; +import io.openvidu.java.client.VideoCodec; import io.openvidu.server.config.OpenviduConfig; import io.openvidu.server.core.EndReason; import io.openvidu.server.core.IdentifierPrefixes; @@ -101,26 +112,66 @@ public class SessionRestController { } String sessionId; - if (sessionProperties.customSessionId() != null && !sessionProperties.customSessionId().isEmpty()) { - if (sessionManager.getSessionWithNotActive(sessionProperties.customSessionId()) != null) { - log.warn("Session {} is already created", sessionProperties.customSessionId()); - return new ResponseEntity<>(HttpStatus.CONFLICT); + Lock sessionLock = null; + + try { + if (sessionProperties.customSessionId() != null && !sessionProperties.customSessionId().isEmpty()) { + // Session has custom session id + sessionId = sessionProperties.customSessionId(); + Session session = sessionManager.getSessionWithNotActive(sessionProperties.customSessionId()); + if (session != null) { + // The session appears to already exist + if (session.closingLock.readLock().tryLock()) { + // The session indeed exists and is not being closed + try { + log.warn("Session {} is already created", sessionProperties.customSessionId()); + return new ResponseEntity<>(HttpStatus.CONFLICT); + } finally { + session.closingLock.readLock().unlock(); + } + } else { + // The session exists but is being closed + log.warn("Session {} is in the process of closing while calling POST {}/sessions", sessionId, + RequestMappings.API); + try { + if (session.closingLock.writeLock().tryLock(15, TimeUnit.SECONDS)) { + if (sessionManager + .getSessionWithNotActive(sessionProperties.customSessionId()) != null) { + // Other thread took the lock before and rebuilt the closing session + session.closingLock.writeLock().unlock(); + return new ResponseEntity<>(HttpStatus.CONFLICT); + } else { + // This thread will rebuild the closing session + sessionLock = session.closingLock.writeLock(); + } + } else { + log.error("Timeout waiting for Session {} closing lock to be available", sessionId); + } + } catch (InterruptedException e) { + log.error("InterruptedException while waiting for Session {} closing lock to be available", + sessionId); + } + + } + } + } else { + sessionId = IdentifierPrefixes.SESSION_ID + RandomStringUtils.randomAlphabetic(1).toUpperCase() + + RandomStringUtils.randomAlphanumeric(9); } - sessionId = sessionProperties.customSessionId(); - } else { - sessionId = IdentifierPrefixes.SESSION_ID + RandomStringUtils.randomAlphabetic(1).toUpperCase() - + RandomStringUtils.randomAlphanumeric(9); - } - Session sessionNotActive = sessionManager.storeSessionNotActive(sessionId, sessionProperties); - - if (sessionNotActive == null) { - return new ResponseEntity<>(HttpStatus.CONFLICT); - } else { - log.info("New session {} created {}", sessionId, this.sessionManager.getSessionsWithNotActive().stream() - .map(Session::getSessionId).collect(Collectors.toList()).toString()); - return new ResponseEntity<>(sessionNotActive.toJson(false, false).toString(), - RestUtils.getResponseHeaders(), HttpStatus.OK); + Session sessionNotActive = sessionManager.storeSessionNotActive(sessionId, sessionProperties); + if (sessionNotActive == null) { + return new ResponseEntity<>(HttpStatus.CONFLICT); + } else { + log.info("New session {} created {}", sessionId, this.sessionManager.getSessionsWithNotActive().stream() + .map(Session::getSessionId).collect(Collectors.toList()).toString()); + return new ResponseEntity<>(sessionNotActive.toJson(false, false).toString(), + RestUtils.getResponseHeaders(), HttpStatus.OK); + } + } finally { + if (sessionLock != null) { + sessionLock.unlock(); + } } } @@ -133,8 +184,17 @@ public class SessionRestController { Session session = this.sessionManager.getSession(sessionId); if (session != null) { - JsonObject response = session.toJson(pendingConnections, webRtcStats); - return new ResponseEntity<>(response.toString(), RestUtils.getResponseHeaders(), HttpStatus.OK); + try { + JsonObject response = session.toJson(pendingConnections, webRtcStats); + return new ResponseEntity<>(response.toString(), RestUtils.getResponseHeaders(), HttpStatus.OK); + } catch (OpenViduException e) { + if (e.getCodeValue() == Code.ROOM_CLOSED_ERROR_CODE.getValue()) { + log.warn("Session closed while calling GET {}/sessions/{}", RequestMappings.API, sessionId); + return new ResponseEntity<>(HttpStatus.NOT_FOUND); + } else { + throw e; + } + } } else { Session sessionNotActive = this.sessionManager.getSessionNotActive(sessionId); if (sessionNotActive != null) { @@ -157,10 +217,16 @@ public class SessionRestController { JsonObject json = new JsonObject(); JsonArray jsonArray = new JsonArray(); sessions.forEach(session -> { - JsonObject sessionJson = session.toJson(pendingConnections, webRtcStats); - jsonArray.add(sessionJson); + try { + JsonObject sessionJson = session.toJson(pendingConnections, webRtcStats); + jsonArray.add(sessionJson); + } catch (OpenViduException e) { + if (e.getCodeValue() != Code.ROOM_CLOSED_ERROR_CODE.getValue()) { + throw e; + } + } }); - json.addProperty("numberOfElements", sessions.size()); + json.addProperty("numberOfElements", jsonArray.size()); json.add("content", jsonArray); return new ResponseEntity<>(json.toString(), RestUtils.getResponseHeaders(), HttpStatus.OK); } @@ -655,6 +721,9 @@ public class SessionRestController { Token token = sessionManager.newToken(session, connectionProperties.getRole(), connectionProperties.getData(), connectionProperties.record(), connectionProperties.getKurentoOptions(), connectionProperties.getCustomIceServers()); + + log.info("Generated token {}", token.getToken()); + return new ResponseEntity<>(token.toJsonAsParticipant().toString(), RestUtils.getResponseHeaders(), HttpStatus.OK); } catch (Exception e) { @@ -923,7 +992,8 @@ public class SessionRestController { IceServerProperties.Builder iceServerPropertiesBuilder = new IceServerProperties.Builder(); iceServerPropertiesBuilder.url(customIceServerJson.get("url").getAsString()); if (customIceServerJson.has("staticAuthSecret")) { - iceServerPropertiesBuilder.staticAuthSecret(customIceServerJson.get("staticAuthSecret").getAsString()); + iceServerPropertiesBuilder + .staticAuthSecret(customIceServerJson.get("staticAuthSecret").getAsString()); } if (customIceServerJson.has("username")) { iceServerPropertiesBuilder.username(customIceServerJson.get("username").getAsString()); @@ -937,9 +1007,10 @@ public class SessionRestController { } catch (Exception e) { throw new Exception("Type error in some parameter of 'customIceServers': " + e.getMessage()); } - } else if(!openviduConfig.getWebrtcIceServersBuilders().isEmpty()){ + } else if (!openviduConfig.getWebrtcIceServersBuilders().isEmpty()) { // If not defined in connection, check if defined in openvidu config - for (IceServerProperties.Builder iceServerPropertiesBuilder: openviduConfig.getWebrtcIceServersBuilders()) { + for (IceServerProperties.Builder iceServerPropertiesBuilder : openviduConfig + .getWebrtcIceServersBuilders()) { IceServerProperties.Builder configIceBuilder = iceServerPropertiesBuilder.clone(); builder.addCustomIceServer(configIceBuilder.build()); } @@ -970,8 +1041,6 @@ public class SessionRestController { .onlyPlayWithSubscribers(onlyPlayWithSubscribers).networkCache(networkCache).build(); } - - return builder; } diff --git a/openvidu-server/src/main/java/io/openvidu/server/rpc/RpcHandler.java b/openvidu-server/src/main/java/io/openvidu/server/rpc/RpcHandler.java index a2e68396..dd31df78 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/rpc/RpcHandler.java +++ b/openvidu-server/src/main/java/io/openvidu/server/rpc/RpcHandler.java @@ -322,7 +322,7 @@ public class RpcHandler extends DefaultJsonRpcHandler { } else { log.error("ERROR: token not valid"); throw new OpenViduException(Code.USER_UNAUTHORIZED_ERROR_CODE, - "Unable to join session " + sessionId + ". Token " + token + "is not valid"); + "Unable to join session " + sessionId + ". Token " + token + " is not valid"); } }