openvidu-server: delay websocket closeup by the server-side

pull/621/head
pabloFuente 2021-04-22 17:38:20 +02:00
parent e6808f987c
commit 63b312227f
5 changed files with 49 additions and 33 deletions

View File

@ -179,7 +179,7 @@ public class SessionEventsHandler {
} }
public void onParticipantLeft(Participant participant, String sessionId, Set<Participant> remainingParticipants, public void onParticipantLeft(Participant participant, String sessionId, Set<Participant> remainingParticipants,
Integer transactionId, OpenViduException error, EndReason reason) { Integer transactionId, OpenViduException error, EndReason reason, boolean scheduleWebsocketClose) {
if (error != null) { if (error != null) {
rpcNotificationService.sendErrorResponse(participant.getParticipantPrivateId(), transactionId, null, error); rpcNotificationService.sendErrorResponse(participant.getParticipantPrivateId(), transactionId, null, error);
return; return;
@ -205,6 +205,13 @@ public class SessionEventsHandler {
rpcNotificationService.sendResponse(participant.getParticipantPrivateId(), transactionId, new JsonObject()); rpcNotificationService.sendResponse(participant.getParticipantPrivateId(), transactionId, new JsonObject());
} }
if (scheduleWebsocketClose) {
// Schedule the close up of this WebSocket connection. This is only as an extra
// guarantee: the client-side should always close it after receiving the
// response to "leaveRoom" method
this.rpcNotificationService.scheduleCloseRpcSession(participant.getParticipantPrivateId(), 10000);
}
if (!ProtocolElements.RECORDER_PARTICIPANT_PUBLICID.equals(participant.getParticipantPublicId())) { if (!ProtocolElements.RECORDER_PARTICIPANT_PUBLICID.equals(participant.getParticipantPublicId())) {
CDR.recordParticipantLeft(participant, sessionId, reason); CDR.recordParticipantLeft(participant, sessionId, reason);
} }
@ -483,6 +490,11 @@ public class SessionEventsHandler {
ProtocolElements.PARTICIPANTEVICTED_METHOD, params); ProtocolElements.PARTICIPANTEVICTED_METHOD, params);
} }
} }
// Schedule the close up of this WebSocket connection. This is only as an extra
// guarantee: the client-side should always close it after receiving the
// participantEvicted notification
this.rpcNotificationService.scheduleCloseRpcSession(evictedParticipant.getParticipantPrivateId(), 10000);
} }
public void sendRecordingStartedNotification(Session session, Recording recording) { public void sendRecordingStartedNotification(Session session, Recording recording) {
@ -624,14 +636,6 @@ public class SessionEventsHandler {
rpcNotificationService.sendResponse(participant.getParticipantPrivateId(), transactionId, new JsonObject()); rpcNotificationService.sendResponse(participant.getParticipantPrivateId(), transactionId, new JsonObject());
} }
public void closeRpcSession(String participantPrivateId) {
this.rpcNotificationService.closeRpcSession(participantPrivateId);
}
public void storeRecordingToSendClientEvent(Recording recording) {
recordingsToSendClientEvents.put(recording.getSessionId(), recording);
}
/** /**
* This handler must be called before cleaning any sessions or recordings hosted * This handler must be called before cleaning any sessions or recordings hosted
* by the crashed Media Node * by the crashed Media Node
@ -643,6 +647,10 @@ public class SessionEventsHandler {
public void onMasterNodeCrashed() { public void onMasterNodeCrashed() {
} }
public void storeRecordingToSendClientEvent(Recording recording) {
recordingsToSendClientEvents.put(recording.getSessionId(), recording);
}
protected Set<Participant> filterParticipantsByRole(OpenViduRole[] roles, Set<Participant> participants) { protected Set<Participant> filterParticipantsByRole(OpenViduRole[] roles, Set<Participant> participants) {
return participants.stream().filter(part -> { return participants.stream().filter(part -> {
if (ProtocolElements.RECORDER_PARTICIPANT_PUBLICID.equals(part.getParticipantPublicId())) { if (ProtocolElements.RECORDER_PARTICIPANT_PUBLICID.equals(part.getParticipantPublicId())) {

View File

@ -106,7 +106,7 @@ public abstract class SessionManager {
public abstract void joinRoom(Participant participant, String sessionId, Integer transactionId); public abstract void joinRoom(Participant participant, String sessionId, Integer transactionId);
public abstract boolean leaveRoom(Participant participant, Integer transactionId, EndReason reason, public abstract boolean leaveRoom(Participant participant, Integer transactionId, EndReason reason,
boolean closeWebSocket); boolean scheduleWebocketClose);
public abstract void publishVideo(Participant participant, MediaOptions mediaOptions, Integer transactionId); public abstract void publishVideo(Participant participant, MediaOptions mediaOptions, Integer transactionId);

View File

@ -192,7 +192,8 @@ public class KurentoSessionManager extends SessionManager {
} }
@Override @Override
public boolean leaveRoom(Participant participant, Integer transactionId, EndReason reason, boolean closeWebSocket) { public boolean leaveRoom(Participant participant, Integer transactionId, EndReason reason,
boolean scheduleWebsocketClose) {
log.info("Request [LEAVE_ROOM] for participant {} of session {} with reason {}", log.info("Request [LEAVE_ROOM] for participant {} of session {} with reason {}",
participant.getParticipantPublicId(), participant.getSessionId(), participant.getParticipantPublicId(), participant.getSessionId(),
reason != null ? reason.name() : "NULL"); reason != null ? reason.name() : "NULL");
@ -252,7 +253,7 @@ public class KurentoSessionManager extends SessionManager {
remainingParticipants = Collections.emptySet(); remainingParticipants = Collections.emptySet();
} }
sessionEventsHandler.onParticipantLeft(participant, sessionId, remainingParticipants, transactionId, sessionEventsHandler.onParticipantLeft(participant, sessionId, remainingParticipants, transactionId,
null, reason); null, reason, scheduleWebsocketClose);
if (!EndReason.sessionClosedByServer.equals(reason)) { if (!EndReason.sessionClosedByServer.equals(reason)) {
// If session is closed by a call to "DELETE /api/sessions" do NOT stop the // If session is closed by a call to "DELETE /api/sessions" do NOT stop the
@ -316,11 +317,6 @@ public class KurentoSessionManager extends SessionManager {
} }
} }
// Finally close websocket session if required
if (closeWebSocket) {
sessionEventsHandler.closeRpcSession(participant.getParticipantPrivateId());
}
return sessionClosedByLastParticipant; return sessionClosedByLastParticipant;
} finally { } finally {
@ -789,21 +785,21 @@ public class KurentoSessionManager extends SessionManager {
boolean sessionClosedByLastParticipant = false; boolean sessionClosedByLastParticipant = false;
if (evictedParticipant != null) { if (evictedParticipant != null) {
KurentoParticipant kParticipant = (KurentoParticipant) evictedParticipant; KurentoParticipant kParticipant = (KurentoParticipant) evictedParticipant;
Set<Participant> participants = kParticipant.getSession().getParticipants(); Set<Participant> participants = kParticipant.getSession().getParticipants();
sessionClosedByLastParticipant = this.leaveRoom(kParticipant, null, reason, false); sessionClosedByLastParticipant = this.leaveRoom(kParticipant, null, reason, false);
this.sessionEventsHandler.onForceDisconnect(moderator, evictedParticipant, participants, transactionId, sessionEventsHandler.onForceDisconnect(moderator, evictedParticipant, participants, transactionId, null,
null, reason); reason);
sessionEventsHandler.closeRpcSession(evictedParticipant.getParticipantPrivateId());
} else { } else if (moderator != null && transactionId != null) {
if (moderator != null && transactionId != null) {
this.sessionEventsHandler.onForceDisconnect(moderator, evictedParticipant, this.sessionEventsHandler.onForceDisconnect(moderator, evictedParticipant,
new HashSet<>(Arrays.asList(moderator)), transactionId, new HashSet<>(Arrays.asList(moderator)), transactionId,
new OpenViduException(Code.USER_NOT_FOUND_ERROR_CODE, new OpenViduException(Code.USER_NOT_FOUND_ERROR_CODE,
"Connection not found when calling 'forceDisconnect'"), "Connection not found when calling 'forceDisconnect'"),
null); null);
} }
}
return sessionClosedByLastParticipant; return sessionClosedByLastParticipant;
} }

View File

@ -767,7 +767,7 @@ public class RpcHandler extends DefaultJsonRpcHandler<JsonObject> {
} }
if (!message.isEmpty()) { if (!message.isEmpty()) {
RpcConnection rpc = this.notificationService.closeRpcSession(rpcSessionId); RpcConnection rpc = this.notificationService.immediatelyCloseRpcSession(rpcSessionId);
if (rpc != null && rpc.getSessionId() != null) { if (rpc != null && rpc.getSessionId() != null) {
io.openvidu.server.core.Session session = this.sessionManager.getSession(rpc.getSessionId()); io.openvidu.server.core.Session session = this.sessionManager.getSession(rpc.getSessionId());
if (session != null && session.getParticipantByPrivateId(rpc.getParticipantPrivateId()) != null) { if (session != null && session.getParticipantByPrivateId(rpc.getParticipantPrivateId()) != null) {

View File

@ -20,6 +20,9 @@ package io.openvidu.server.rpc;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.kurento.jsonrpc.Session; import org.kurento.jsonrpc.Session;
import org.kurento.jsonrpc.Transaction; import org.kurento.jsonrpc.Transaction;
@ -38,6 +41,9 @@ public class RpcNotificationService {
private ConcurrentMap<String, RpcConnection> rpcConnections = new ConcurrentHashMap<>(); private ConcurrentMap<String, RpcConnection> rpcConnections = new ConcurrentHashMap<>();
private ScheduledExecutorService closeWsScheduler = Executors
.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());
public RpcConnection newRpcConnection(Transaction t, Request<JsonObject> request) { public RpcConnection newRpcConnection(Transaction t, Request<JsonObject> request) {
String participantPrivateId = t.getSession().getSessionId(); String participantPrivateId = t.getSession().getSessionId();
RpcConnection connection = new RpcConnection(t.getSession()); RpcConnection connection = new RpcConnection(t.getSession());
@ -109,26 +115,32 @@ public class RpcNotificationService {
} }
} }
public RpcConnection closeRpcSession(String participantPrivateId) { public RpcConnection immediatelyCloseRpcSession(String participantPrivateId) {
RpcConnection rpcSession = rpcConnections.remove(participantPrivateId); RpcConnection rpcSession = rpcConnections.remove(participantPrivateId);
if (rpcSession == null || rpcSession.getSession() == null) { if (rpcSession == null || rpcSession.getSession() == null) {
if (!isIpcamParticipant(participantPrivateId)) { if (!isIpcamParticipant(participantPrivateId)) {
log.error("No session found for private id {}, unable to cleanup", participantPrivateId); log.error("No rpc session found for private id {}, unable to cleanup", participantPrivateId);
} }
return null; return null;
} }
Session s = rpcSession.getSession(); Session s = rpcSession.getSession();
try { try {
s.close(); s.close();
log.info("Closed session for participant with private id {}", participantPrivateId); log.info("Closed rpc session for participant with private id {}", participantPrivateId);
this.showRpcConnections(); this.showRpcConnections();
return rpcSession; return rpcSession;
} catch (IOException e) { } catch (IOException e) {
log.error("Error closing session for participant with private id {}", participantPrivateId, e); log.error("Error closing rpc session for participant with private id {}: {}", participantPrivateId,
e.getMessage());
} }
return null; return null;
} }
public void scheduleCloseRpcSession(String participantPrivateId, int timeoutMs) {
closeWsScheduler.schedule(() -> immediatelyCloseRpcSession(participantPrivateId), timeoutMs,
TimeUnit.MILLISECONDS);
}
private Transaction getAndRemoveTransaction(String participantPrivateId, Integer transactionId) { private Transaction getAndRemoveTransaction(String participantPrivateId, Integer transactionId) {
RpcConnection rpcSession = rpcConnections.get(participantPrivateId); RpcConnection rpcSession = rpcConnections.get(participantPrivateId);
if (rpcSession == null) { if (rpcSession == null) {