diff --git a/openvidu-server/src/main/java/io/openvidu/server/core/SessionEventsHandler.java b/openvidu-server/src/main/java/io/openvidu/server/core/SessionEventsHandler.java index 29e9d22a..656f422d 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/core/SessionEventsHandler.java +++ b/openvidu-server/src/main/java/io/openvidu/server/core/SessionEventsHandler.java @@ -96,7 +96,7 @@ public class SessionEventsHandler { JsonObject stream = new JsonObject(); stream.addProperty(ProtocolElements.JOINROOM_PEERSTREAMID_PARAM, - existingParticipant.getPublisherStremId()); + existingParticipant.getPublisherStreamId()); stream.addProperty(ProtocolElements.JOINROOM_PEERSTREAMHASAUDIO_PARAM, kParticipant.getPublisherMediaOptions().hasAudio); stream.addProperty(ProtocolElements.JOINROOM_PEERSTREAMHASVIDEO_PARAM, @@ -216,11 +216,15 @@ public class SessionEventsHandler { public void onUnpublishMedia(Participant participant, Set participants, Integer transactionId, OpenViduException error, String reason) { - if (error != null) { - rpcNotificationService.sendErrorResponse(participant.getParticipantPrivateId(), transactionId, null, error); - return; + boolean force = reason.contains("force") || transactionId == null; + if (!force) { + if (error != null) { + rpcNotificationService.sendErrorResponse(participant.getParticipantPrivateId(), transactionId, null, + error); + return; + } + rpcNotificationService.sendResponse(participant.getParticipantPrivateId(), transactionId, new JsonObject()); } - rpcNotificationService.sendResponse(participant.getParticipantPrivateId(), transactionId, new JsonObject()); JsonObject params = new JsonObject(); params.addProperty(ProtocolElements.PARTICIPANTUNPUBLISHED_NAME_PARAM, participant.getParticipantPublicId()); @@ -228,7 +232,12 @@ public class SessionEventsHandler { for (Participant p : participants) { if (p.getParticipantPrivateId().equals(participant.getParticipantPrivateId())) { - continue; + if (force) { + rpcNotificationService.sendNotification(p.getParticipantPrivateId(), + ProtocolElements.PARTICIPANTUNPUBLISHED_METHOD, params); + } else { + continue; + } } else { rpcNotificationService.sendNotification(p.getParticipantPrivateId(), ProtocolElements.PARTICIPANTUNPUBLISHED_METHOD, params); @@ -351,7 +360,8 @@ public class SessionEventsHandler { public void onParticipantEvicted(Participant participant, String reason) { JsonObject params = new JsonObject(); - params.addProperty(ProtocolElements.PARTICIPANTEVICTED_CONNECTIONID_PARAM, participant.getParticipantPublicId()); + params.addProperty(ProtocolElements.PARTICIPANTEVICTED_CONNECTIONID_PARAM, + participant.getParticipantPublicId()); params.addProperty(ProtocolElements.PARTICIPANTEVICTED_REASON_PARAM, reason); rpcNotificationService.sendNotification(participant.getParticipantPrivateId(), ProtocolElements.PARTICIPANTEVICTED_METHOD, params); diff --git a/openvidu-server/src/main/java/io/openvidu/server/core/SessionManager.java b/openvidu-server/src/main/java/io/openvidu/server/core/SessionManager.java index ab36a20a..fa90596b 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/core/SessionManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/core/SessionManager.java @@ -94,6 +94,8 @@ public abstract class SessionManager { public abstract void onIceCandidate(Participant participant, String endpointName, String candidate, int sdpMLineIndex, String sdpMid, Integer transactionId); + public abstract boolean unpublishStream(Session session, String streamId, String reason); + /** * Application-originated request to remove a participant from a session.
* Side effects: The session event handler should notify the diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoParticipant.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoParticipant.java index 1194b72c..57066d98 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoParticipant.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoParticipant.java @@ -107,6 +107,9 @@ public class KurentoParticipant extends Participant { + RandomStringUtils.random(5, true, false).toUpperCase(); this.publisher.getEndpoint().addTag("name", publisherStreamId); addEndpointListeners(this.publisher); + + // Remove streamId from publisher's map + this.session.publishedStreamIds.putIfAbsent(this.getPublisherStreamId(), this.getParticipantPrivateId()); CDR.recordNewPublisher(this, this.session.getSessionId(), mediaOptions); @@ -300,7 +303,7 @@ public class KurentoParticipant extends Participant { throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE, "Unable to create subscriber endpoint"); } - String subscriberStreamId = this.getParticipantPublicId() + "_" + kSender.getPublisherStremId(); + String subscriberStreamId = this.getParticipantPublicId() + "_" + kSender.getPublisherStreamId(); subscriber.getEndpoint().addTag("name", subscriberStreamId); @@ -468,6 +471,10 @@ public class KurentoParticipant extends Participant { private void releasePublisherEndpoint(String reason) { if (publisher != null && publisher.getEndpoint() != null) { + + // Store streamId from publisher's map + this.session.publishedStreamIds.remove(this.getPublisherStreamId()); + publisher.unregisterErrorListeners(); for (MediaElement el : publisher.getMediaElements()) { releaseElement(getParticipantPublicId(), el); @@ -695,7 +702,7 @@ public class KurentoParticipant extends Participant { } @Override - public String getPublisherStremId() { + public String getPublisherStreamId() { return this.publisher.getEndpoint().getTag("name"); } diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSession.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSession.java index 4d88122a..96203336 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSession.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSession.java @@ -25,7 +25,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; -import java.util.stream.Collectors; import org.json.simple.JSONArray; import org.json.simple.JSONObject; @@ -45,8 +44,6 @@ import io.openvidu.java.client.SessionProperties; import io.openvidu.server.cdr.CallDetailRecord; import io.openvidu.server.core.Participant; import io.openvidu.server.core.Session; -import io.openvidu.server.kurento.endpoint.PublisherEndpoint; -import io.openvidu.server.kurento.endpoint.SubscriberEndpoint; /** * @author Pablo Fuente (pablofuenteperez@gmail.com) @@ -77,6 +74,8 @@ public class KurentoSession implements Session { private boolean destroyKurentoClient; private CallDetailRecord CDR; + + public final ConcurrentHashMap publishedStreamIds = new ConcurrentHashMap<>(); public KurentoSession(String sessionId, SessionProperties sessionProperties, KurentoClient kurentoClient, KurentoSessionEventsHandler kurentoSessionHandler, boolean destroyKurentoClient, CallDetailRecord CDR) { @@ -199,11 +198,6 @@ public class KurentoSession implements Session { return null; } - public Set getAllSubscribersForPublisher(PublisherEndpoint publisher) { - return this.participants.values().stream().flatMap(kp -> kp.getConnectedSubscribedEndpoints(publisher).stream()) - .collect(Collectors.toSet()); - } - @Override public boolean close(String reason) { if (!closed) { @@ -392,4 +386,8 @@ public class KurentoSession implements Session { return json; } + public String getParticipantPrivateIdFromStreamId(String streamId) { + return this.publishedStreamIds.get(streamId); + } + } diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSessionManager.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSessionManager.java index 8956910d..e3dc1b80 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSessionManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSessionManager.java @@ -261,7 +261,7 @@ public class KurentoSessionManager extends SessionManager { participants = kurentoParticipant.getSession().getParticipants(); if (sdpAnswer != null) { - sessionEventsHandler.onPublishMedia(participant, participant.getPublisherStremId(), session.getSessionId(), + sessionEventsHandler.onPublishMedia(participant, participant.getPublisherStreamId(), session.getSessionId(), mediaOptions, sdpAnswer, participants, transactionId, null); } } @@ -373,7 +373,7 @@ public class KurentoSessionManager extends SessionManager { public void streamPropertyChanged(Participant participant, Integer transactionId, String streamId, String property, JsonElement newValue, String reason) { KurentoParticipant kParticipant = (KurentoParticipant) participant; - streamId = kParticipant.getPublisherStremId(); + streamId = kParticipant.getPublisherStreamId(); MediaOptions streamProperties = kParticipant.getPublisherMediaOptions(); Boolean hasAudio = streamProperties.hasAudio(); @@ -509,4 +509,20 @@ public class KurentoSessionManager extends SessionManager { typeOfVideo, frameRate, videoDimensions, doLoopback); } + @Override + public boolean unpublishStream(Session session, String streamId, String reason) { + String participantPrivateId = ((KurentoSession) session).getParticipantPrivateIdFromStreamId(streamId); + if (participantPrivateId != null) { + Participant participant = this.getParticipant(participantPrivateId); + if (participant != null) { + this.unpublishVideo(participant, null, reason); + return true; + } else { + return false; + } + } else { + return false; + } + } + } diff --git a/openvidu-server/src/main/java/io/openvidu/server/rest/SessionRestController.java b/openvidu-server/src/main/java/io/openvidu/server/rest/SessionRestController.java index caf0c7ff..4c626a40 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/rest/SessionRestController.java +++ b/openvidu-server/src/main/java/io/openvidu/server/rest/SessionRestController.java @@ -43,6 +43,7 @@ import io.openvidu.java.client.RecordingMode; import io.openvidu.java.client.RecordingProperties; import io.openvidu.java.client.SessionProperties; import io.openvidu.server.config.OpenviduConfig; +import io.openvidu.server.core.Participant; import io.openvidu.server.core.ParticipantRole; import io.openvidu.server.core.Session; import io.openvidu.server.core.SessionManager; @@ -177,6 +178,38 @@ public class SessionRestController { } } + @RequestMapping(value = "/sessions/{sessionId}/connection/{connectionId}", method = RequestMethod.DELETE) + public ResponseEntity disconnectParticipant(@PathVariable("sessionId") String sessionId, + @PathVariable("connectionId") String participantPublicId) { + Session session = this.sessionManager.getSession(sessionId); + if (session != null) { + Participant participant = session.getParticipantByPublicId(participantPublicId); + if (participant != null) { + this.sessionManager.evictParticipant(participant.getParticipantPrivateId(), "forceDisconnectByServer"); + return new ResponseEntity<>(HttpStatus.NO_CONTENT); + } else { + return new ResponseEntity<>(HttpStatus.NOT_FOUND); + } + } else { + return new ResponseEntity<>(HttpStatus.BAD_REQUEST); + } + } + + @RequestMapping(value = "/sessions/{sessionId}/stream/{streamId}", method = RequestMethod.DELETE) + public ResponseEntity unpublishStream(@PathVariable("sessionId") String sessionId, + @PathVariable("streamId") String streamId) { + Session session = this.sessionManager.getSession(sessionId); + if (session != null) { + if (this.sessionManager.unpublishStream(session, streamId, "forceUnpublishByServer")) { + return new ResponseEntity<>(HttpStatus.NO_CONTENT); + } else { + return new ResponseEntity<>(HttpStatus.NOT_FOUND); + } + } else { + return new ResponseEntity<>(HttpStatus.BAD_REQUEST); + } + } + @SuppressWarnings("unchecked") @RequestMapping(value = "/tokens", method = RequestMethod.POST) public ResponseEntity newToken(@RequestBody Map params) {