KMS events stored for each MediaEndpoint

pull/88/merge
pabloFuente 2018-06-19 14:28:04 +02:00
parent 2c90f65535
commit 65cdea70f6
9 changed files with 645 additions and 564 deletions

View File

@ -27,11 +27,6 @@ public class Participant {
private String serverMetadata = ""; // Metadata provided on server side
private Token token; // Token associated to this participant
protected boolean audioActive = true;
protected boolean videoActive = true;
protected String typeOfVideo; // CAMERA, SCREEN
protected int frameRate;
protected boolean streaming = false;
protected volatile boolean closed;
@ -98,38 +93,6 @@ public class Participant {
this.streaming = streaming;
}
public boolean isAudioActive() {
return audioActive;
}
public void setAudioActive(boolean active) {
this.audioActive = active;
}
public boolean isVideoActive() {
return videoActive;
}
public void setVideoActive(boolean active) {
this.videoActive = active;
}
public String getTypeOfVideo() {
return this.typeOfVideo;
}
public void setTypeOfVideo(String typeOfVideo) {
this.typeOfVideo = typeOfVideo;
}
public int getFrameRate() {
return this.frameRate;
}
public void setFrameRate(int frameRate) {
this.frameRate = frameRate;
}
public String getPublisherStremId() {
return null;
}

View File

@ -39,6 +39,7 @@ import io.openvidu.client.internal.ProtocolElements;
import io.openvidu.server.cdr.CallDetailRecord;
import io.openvidu.server.config.InfoHandler;
import io.openvidu.server.config.OpenviduConfig;
import io.openvidu.server.kurento.core.KurentoParticipant;
import io.openvidu.server.recording.Recording;
import io.openvidu.server.rpc.RpcNotificationService;
@ -94,17 +95,20 @@ public class SessionEventsHandler {
existingParticipant.getFullMetadata());
if (existingParticipant.isStreaming()) {
KurentoParticipant kParticipant = (KurentoParticipant) existingParticipant;
JsonObject stream = new JsonObject();
stream.addProperty(ProtocolElements.JOINROOM_PEERSTREAMID_PARAM,
existingParticipant.getPublisherStremId());
stream.addProperty(ProtocolElements.JOINROOM_PEERSTREAMAUDIOACTIVE_PARAM,
existingParticipant.isAudioActive());
kParticipant.getPublisherMediaOptions().audioActive);
stream.addProperty(ProtocolElements.JOINROOM_PEERSTREAMVIDEOACTIVE_PARAM,
existingParticipant.isVideoActive());
kParticipant.getPublisherMediaOptions().videoActive);
stream.addProperty(ProtocolElements.JOINROOM_PEERSTREAMTYPEOFVIDEO_PARAM,
existingParticipant.getTypeOfVideo());
kParticipant.getPublisherMediaOptions().typeOfVideo);
stream.addProperty(ProtocolElements.JOINROOM_PEERSTREAMFRAMERATE_PARAM,
existingParticipant.getFrameRate());
kParticipant.getPublisherMediaOptions().frameRate);
JsonArray streamsArray = new JsonArray();
streamsArray.add(stream);

View File

@ -47,6 +47,7 @@ import io.openvidu.server.config.InfoHandler;
import io.openvidu.server.core.MediaOptions;
import io.openvidu.server.core.Participant;
import io.openvidu.server.kurento.MutedMediaType;
import io.openvidu.server.kurento.endpoint.KmsEvent;
import io.openvidu.server.kurento.endpoint.MediaEndpoint;
import io.openvidu.server.kurento.endpoint.PublisherEndpoint;
import io.openvidu.server.kurento.endpoint.SdpType;
@ -70,13 +71,13 @@ public class KurentoParticipant extends Participant {
private final ConcurrentMap<String, Filter> filters = new ConcurrentHashMap<>();
private final ConcurrentMap<String, SubscriberEndpoint> subscribers = new ConcurrentHashMap<String, SubscriberEndpoint>();
public KurentoParticipant(Participant participant, KurentoSession kurentoSession, MediaPipeline pipeline, InfoHandler infoHandler, CallDetailRecord CDR) {
public KurentoParticipant(Participant participant, KurentoSession kurentoSession, MediaPipeline pipeline,
InfoHandler infoHandler, CallDetailRecord CDR) {
super(participant.getParticipantPrivateId(), participant.getParticipantPublicId(), participant.getToken(),
participant.getClientMetadata());
this.session = kurentoSession;
this.pipeline = pipeline;
this.publisher = new PublisherEndpoint(webParticipant, this, participant.getParticipantPublicId(),
pipeline);
this.publisher = new PublisherEndpoint(webParticipant, this, participant.getParticipantPublicId(), pipeline);
for (Participant other : session.getParticipants()) {
if (!other.getParticipantPublicId().equals(this.getParticipantPublicId())) {
@ -93,10 +94,11 @@ public class KurentoParticipant extends Participant {
if (getPublisher().getEndpoint() == null) {
throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE, "Unable to create publisher endpoint");
}
publisher.setMediaOptions(mediaOptions);
String publisherStreamId = this.getParticipantPublicId() + "_" +
(mediaOptions.videoActive ? mediaOptions.typeOfVideo : "MICRO") + "_" +
RandomStringUtils.random(5, true, false).toUpperCase();
String publisherStreamId = this.getParticipantPublicId() + "_"
+ (mediaOptions.videoActive ? mediaOptions.typeOfVideo : "MICRO") + "_"
+ RandomStringUtils.random(5, true, false).toUpperCase();
this.publisher.getEndpoint().addTag("name", publisherStreamId);
addEndpointListeners(this.publisher);
@ -174,6 +176,10 @@ public class KurentoParticipant extends Participant {
return this.publisher;
}
public MediaOptions getPublisherMediaOptions() {
return this.publisher.getMediaOptions();
}
public KurentoSession getSession() {
return session;
}
@ -230,18 +236,16 @@ public class KurentoParticipant extends Participant {
log.info("PARTICIPANT {}: unpublishing media stream from room {}", this.getParticipantPublicId(),
this.session.getSessionId());
releasePublisherEndpoint(reason);
this.publisher = new PublisherEndpoint(webParticipant, this, this.getParticipantPublicId(),
pipeline);
log.info(
"PARTICIPANT {}: released publisher endpoint and left it initialized (ready for future streaming)",
this.publisher = new PublisherEndpoint(webParticipant, this, this.getParticipantPublicId(), pipeline);
log.info("PARTICIPANT {}: released publisher endpoint and left it initialized (ready for future streaming)",
this.getParticipantPublicId());
}
public String receiveMediaFrom(Participant sender, String sdpOffer) {
final String senderName = sender.getParticipantPublicId();
log.info("PARTICIPANT {}: Request to receive media from {} in room {}", this.getParticipantPublicId(), senderName,
this.session.getSessionId());
log.info("PARTICIPANT {}: Request to receive media from {} in room {}", this.getParticipantPublicId(),
senderName, this.session.getSessionId());
log.trace("PARTICIPANT {}: SdpOffer for {} is {}", this.getParticipantPublicId(), senderName, sdpOffer);
if (senderName.equals(this.getParticipantPublicId())) {
@ -300,8 +304,8 @@ public class KurentoParticipant extends Participant {
try {
String sdpAnswer = subscriber.subscribe(sdpOffer, kSender.getPublisher());
log.trace("PARTICIPANT {}: Subscribing SdpAnswer is {}", this.getParticipantPublicId(), sdpAnswer);
log.info("PARTICIPANT {}: Is now receiving video from {} in room {}", this.getParticipantPublicId(), senderName,
this.session.getSessionId());
log.info("PARTICIPANT {}: Is now receiving video from {} in room {}", this.getParticipantPublicId(),
senderName, this.session.getSessionId());
if (!ProtocolElements.RECORDER_PARTICIPANT_PUBLICID.equals(this.getParticipantPublicId())) {
CDR.recordNewSubscriber(this, this.session.getSessionId(), sender.getParticipantPublicId());
@ -330,8 +334,8 @@ public class KurentoParticipant extends Participant {
+ "But there is no such subscriber endpoint.", this.getParticipantPublicId(), senderName);
} else {
releaseSubscriberEndpoint(senderName, subscriberEndpoint, reason);
log.info("PARTICIPANT {}: stopped receiving media from {} in room {}", this.getParticipantPublicId(), senderName,
this.session.getSessionId());
log.info("PARTICIPANT {}: stopped receiving media from {} in room {}", this.getParticipantPublicId(),
senderName, this.session.getSessionId());
}
}
@ -416,17 +420,23 @@ public class KurentoParticipant extends Participant {
* id of another user
* @return the endpoint instance
*/
public SubscriberEndpoint getNewOrExistingSubscriber(String remotePublicId) {
SubscriberEndpoint sendingEndpoint = new SubscriberEndpoint(webParticipant, this, remotePublicId, pipeline);
SubscriberEndpoint existingSendingEndpoint = this.subscribers.putIfAbsent(remotePublicId, sendingEndpoint);
public SubscriberEndpoint getNewOrExistingSubscriber(String senderPublicId) {
KurentoParticipant kSender = (KurentoParticipant) this.session.getParticipantByPublicId(senderPublicId);
SubscriberEndpoint sendingEndpoint = new SubscriberEndpoint(webParticipant, this, senderPublicId, pipeline);
SubscriberEndpoint existingSendingEndpoint = this.subscribers.putIfAbsent(senderPublicId, sendingEndpoint);
if (existingSendingEndpoint != null) {
sendingEndpoint = existingSendingEndpoint;
log.trace("PARTICIPANT {}: Already exists a subscriber endpoint to user {}", this.getParticipantPublicId(),
remotePublicId);
senderPublicId);
} else {
log.debug("PARTICIPANT {}: New subscriber endpoint to user {}", this.getParticipantPublicId(),
remotePublicId);
senderPublicId);
}
sendingEndpoint.setMediaOptions(kSender.getPublisherMediaOptions());
return sendingEndpoint;
}
@ -522,120 +532,153 @@ public class KurentoParticipant extends Participant {
* System.out.println(msg); this.infoHandler.sendInfo(msg); });
*/
/*endpoint.getWebEndpoint().addErrorListener((event) -> {
String msg = " Error (PUBLISHER) -> " + "ERRORCODE: " + event.getErrorCode()
+ " | DESCRIPTION: " + event.getDescription() + " | TIMESTAMP: " + System.currentTimeMillis();
log.debug(msg);
this.infoHandler.sendInfo(msg);
});
/*
* endpoint.getWebEndpoint().addErrorListener((event) -> { String msg =
* " Error (PUBLISHER) -> " + "ERRORCODE: " +
* event.getErrorCode() + " | DESCRIPTION: " + event.getDescription() +
* " | TIMESTAMP: " + System.currentTimeMillis(); log.debug(msg);
* this.infoHandler.sendInfo(msg); });
*
* endpoint.getWebEndpoint().addMediaFlowInStateChangeListener((event) -> {
* String msg1 = " Media flow in state change (" +
* endpoint.getEndpoint().getTag("name") + ") -> " + "STATE: " +
* event.getState() + " | SOURCE: " + event.getSource().getName() + " | PAD: " +
* event.getPadName() + " | MEDIATYPE: " + event.getMediaType() +
* " | TIMESTAMP: " + System.currentTimeMillis();
*
* endpoint.flowInMedia.put(event.getSource().getName() + "/" +
* event.getMediaType(), event.getSource());
*
* String msg2;
*
* if (endpoint.flowInMedia.values().size() != 2) { msg2 =
* " THERE ARE LESS FLOW IN MEDIA'S THAN EXPECTED IN " +
* endpoint.getEndpoint().getTag("name") + " (" +
* endpoint.flowInMedia.values().size() + ")"; } else { msg2 =
* " NUMBER OF FLOW IN MEDIA'S IS NOW CORRECT IN " +
* endpoint.getEndpoint().getTag("name") + " (" +
* endpoint.flowInMedia.values().size() + ")"; }
*
* log.debug(msg1); log.debug(msg2); this.infoHandler.sendInfo(msg1);
* this.infoHandler.sendInfo(msg2); });
*
* endpoint.getWebEndpoint().addMediaFlowOutStateChangeListener((event) -> {
* String msg1 = " Media flow out state change (" +
* endpoint.getEndpoint().getTag("name") + ") -> " + "STATE: " +
* event.getState() + " | SOURCE: " + event.getSource().getName() + " | PAD: " +
* event.getPadName() + " | MEDIATYPE: " + event.getMediaType() +
* " | TIMESTAMP: " + System.currentTimeMillis();
*
* endpoint.flowOutMedia.put(event.getSource().getName() + "/" +
* event.getMediaType(), event.getSource());
*
* String msg2;
*
* if (endpoint.flowOutMedia.values().size() != 2) { msg2 =
* " THERE ARE LESS FLOW OUT MEDIA'S THAN EXPECTED IN " +
* endpoint.getEndpoint().getTag("name") + " (" +
* endpoint.flowOutMedia.values().size() + ")"; } else { msg2 =
* " NUMBER OF FLOW OUT MEDIA'S IS NOW CORRECT IN " +
* endpoint.getEndpoint().getTag("name") + " (" +
* endpoint.flowOutMedia.values().size() + ")"; }
*
* log.debug(msg1); log.debug(msg2); this.infoHandler.sendInfo(msg1);
* this.infoHandler.sendInfo(msg2); });
*
* endpoint.getWebEndpoint().addMediaSessionStartedListener((event) -> { String
* msg = " Media session started (" +
* endpoint.getEndpoint().getTag("name") + ") | TIMESTAMP: " +
* System.currentTimeMillis(); log.debug(msg); this.infoHandler.sendInfo(msg);
* });
*
* endpoint.getWebEndpoint().addMediaSessionTerminatedListener((event) -> {
* String msg = " Media session terminated (" +
* endpoint.getEndpoint().getTag("name") + ") | TIMESTAMP: " +
* System.currentTimeMillis(); log.debug(msg); this.infoHandler.sendInfo(msg);
* });
*
* endpoint.getWebEndpoint().addMediaStateChangedListener((event) -> { String
* msg = " Media state changed (" +
* endpoint.getEndpoint().getTag("name") + ") from " + event.getOldState() +
* " to " + event.getNewState(); log.debug(msg); this.infoHandler.sendInfo(msg);
* });
*
* endpoint.getWebEndpoint().addConnectionStateChangedListener((event) -> {
* String msg = " Connection state changed (" +
* endpoint.getEndpoint().getTag("name") + ") from " + event.getOldState() +
* " to " + event.getNewState() + " | TIMESTAMP: " + System.currentTimeMillis();
* log.debug(msg); this.infoHandler.sendInfo(msg); });
*
* endpoint.getWebEndpoint().addIceCandidateFoundListener((event) -> { String
* msg = " ICE CANDIDATE FOUND (" +
* endpoint.getEndpoint().getTag("name") + "): CANDIDATE: " +
* event.getCandidate().getCandidate() + " | TIMESTAMP: " +
* System.currentTimeMillis(); log.debug(msg); this.infoHandler.sendInfo(msg);
* });
*
* endpoint.getWebEndpoint().addIceComponentStateChangeListener((event) -> {
* String msg = " ICE COMPONENT STATE CHANGE (" +
* endpoint.getEndpoint().getTag("name") + "): for component " +
* event.getComponentId() + " - STATE: " + event.getState() + " | TIMESTAMP: " +
* System.currentTimeMillis(); log.debug(msg); this.infoHandler.sendInfo(msg);
* });
*
* endpoint.getWebEndpoint().addIceGatheringDoneListener((event) -> { String msg
* = " ICE GATHERING DONE! (" +
* endpoint.getEndpoint().getTag("name") + ")" + " | TIMESTAMP: " +
* System.currentTimeMillis(); log.debug(msg); this.infoHandler.sendInfo(msg);
* });
*/
endpoint.getWebEndpoint().addMediaFlowInStateChangeListener((event) -> {
String msg1 = " Media flow in state change (" + endpoint.getEndpoint().getTag("name")
+ ") -> " + "STATE: " + event.getState() + " | SOURCE: " + event.getSource().getName() + " | PAD: "
+ event.getPadName() + " | MEDIATYPE: " + event.getMediaType() + " | TIMESTAMP: "
+ System.currentTimeMillis();
endpoint.getWebEndpoint().addMediaFlowInStateChangeListener(event -> {
String msg1 = "Media flow in state change (" + endpoint.getEndpoint().getTag("name") + ") -> " + "STATE: "
+ event.getState() + " | SOURCE: " + event.getSource().getName() + " | PAD: " + event.getPadName()
+ " | MEDIATYPE: " + event.getMediaType() + " | TIMESTAMP: " + System.currentTimeMillis();
endpoint.flowInMedia.put(event.getSource().getName() + "/" + event.getMediaType(), event.getSource());
String msg2;
if (endpoint.flowInMedia.values().size() != 2) {
msg2 = " THERE ARE LESS FLOW IN MEDIA'S THAN EXPECTED IN "
+ endpoint.getEndpoint().getTag("name") + " (" + endpoint.flowInMedia.values().size() + ")";
} else {
msg2 = " NUMBER OF FLOW IN MEDIA'S IS NOW CORRECT IN "
+ endpoint.getEndpoint().getTag("name") + " (" + endpoint.flowInMedia.values().size() + ")";
endpoint.flowInMedia.put(event.getSource().getName(), event.getMediaType());
if (endpoint.getMediaOptions().audioActive && endpoint.getMediaOptions().videoActive
&& endpoint.flowInMedia.values().size() == 2) {
endpoint.kmsEvents.add(new KmsEvent(event));
} else if (endpoint.flowInMedia.values().size() == 1) {
endpoint.kmsEvents.add(new KmsEvent(event));
}
log.debug(msg1);
log.debug(msg2);
log.info(msg1);
this.infoHandler.sendInfo(msg1);
this.infoHandler.sendInfo(msg2);
});
endpoint.getWebEndpoint().addMediaFlowOutStateChangeListener((event) -> {
String msg1 = " Media flow out state change (" + endpoint.getEndpoint().getTag("name")
+ ") -> " + "STATE: " + event.getState() + " | SOURCE: " + event.getSource().getName() + " | PAD: "
+ event.getPadName() + " | MEDIATYPE: " + event.getMediaType() + " | TIMESTAMP: "
+ System.currentTimeMillis();
endpoint.getWebEndpoint().addMediaFlowOutStateChangeListener(event -> {
String msg1 = "Media flow out state change (" + endpoint.getEndpoint().getTag("name") + ") -> " + "STATE: "
+ event.getState() + " | SOURCE: " + event.getSource().getName() + " | PAD: " + event.getPadName()
+ " | MEDIATYPE: " + event.getMediaType() + " | TIMESTAMP: " + System.currentTimeMillis();
endpoint.flowOutMedia.put(event.getSource().getName() + "/" + event.getMediaType(), event.getSource());
String msg2;
if (endpoint.flowOutMedia.values().size() != 2) {
msg2 = " THERE ARE LESS FLOW OUT MEDIA'S THAN EXPECTED IN "
+ endpoint.getEndpoint().getTag("name") + " (" + endpoint.flowOutMedia.values().size() + ")";
} else {
msg2 = " NUMBER OF FLOW OUT MEDIA'S IS NOW CORRECT IN "
+ endpoint.getEndpoint().getTag("name") + " (" + endpoint.flowOutMedia.values().size() + ")";
endpoint.flowOutMedia.put(event.getSource().getName(), event.getMediaType());
if (endpoint.getMediaOptions().audioActive && endpoint.getMediaOptions().videoActive
&& endpoint.flowOutMedia.values().size() == 2) {
endpoint.kmsEvents.add(new KmsEvent(event));
} else if (endpoint.flowOutMedia.values().size() == 1) {
endpoint.kmsEvents.add(new KmsEvent(event));
}
log.debug(msg1);
log.debug(msg2);
log.info(msg1);
this.infoHandler.sendInfo(msg1);
this.infoHandler.sendInfo(msg2);
});
endpoint.getWebEndpoint().addMediaSessionStartedListener((event) -> {
String msg = " Media session started (" + endpoint.getEndpoint().getTag("name")
+ ") | TIMESTAMP: " + System.currentTimeMillis();
log.debug(msg);
this.infoHandler.sendInfo(msg);
endpoint.getWebEndpoint().addIceGatheringDoneListener(event -> {
endpoint.kmsEvents.add(new KmsEvent(event));
});
endpoint.getWebEndpoint().addMediaSessionTerminatedListener((event) -> {
String msg = " Media session terminated (" + endpoint.getEndpoint().getTag("name")
+ ") | TIMESTAMP: " + System.currentTimeMillis();
log.debug(msg);
this.infoHandler.sendInfo(msg);
endpoint.getWebEndpoint().addConnectionStateChangedListener(event -> {
endpoint.kmsEvents.add(new KmsEvent(event));
});
endpoint.getWebEndpoint().addMediaStateChangedListener((event) -> {
String msg = " Media state changed (" + endpoint.getEndpoint().getTag("name") + ") from "
+ event.getOldState() + " to " + event.getNewState();
log.debug(msg);
this.infoHandler.sendInfo(msg);
});
endpoint.getWebEndpoint().addConnectionStateChangedListener((event) -> {
String msg = " Connection state changed (" + endpoint.getEndpoint().getTag("name")
+ ") from " + event.getOldState() + " to " + event.getNewState() + " | TIMESTAMP: "
+ System.currentTimeMillis();
log.debug(msg);
this.infoHandler.sendInfo(msg);
});
endpoint.getWebEndpoint().addIceCandidateFoundListener((event) -> {
String msg = " ICE CANDIDATE FOUND (" + endpoint.getEndpoint().getTag("name")
+ "): CANDIDATE: " + event.getCandidate().getCandidate() + " | TIMESTAMP: "
+ System.currentTimeMillis();
log.debug(msg);
this.infoHandler.sendInfo(msg);
});
endpoint.getWebEndpoint().addIceComponentStateChangeListener((event) -> {
String msg = " ICE COMPONENT STATE CHANGE (" + endpoint.getEndpoint().getTag("name")
+ "): for component " + event.getComponentId() + " - STATE: " + event.getState() + " | TIMESTAMP: "
+ System.currentTimeMillis();
log.debug(msg);
this.infoHandler.sendInfo(msg);
});
endpoint.getWebEndpoint().addIceGatheringDoneListener((event) -> {
String msg = " ICE GATHERING DONE! (" + endpoint.getEndpoint().getTag("name") + ")"
+ " | TIMESTAMP: " + System.currentTimeMillis();
log.debug(msg);
this.infoHandler.sendInfo(msg);
});*/
endpoint.getWebEndpoint().addNewCandidatePairSelectedListener((event) -> {
endpoint.getWebEndpoint().addNewCandidatePairSelectedListener(event -> {
endpoint.selectedLocalIceCandidate = event.getCandidatePair().getLocalCandidate();
endpoint.selectedRemoteIceCandidate = event.getCandidatePair().getRemoteCandidate();
String msg = "ICE CANDIDATE SELECTED (" + endpoint.getEndpoint().getTag("name")
+ "): LOCAL CANDIDATE: " + endpoint.selectedLocalIceCandidate +
" | REMOTE CANDIDATE: " + endpoint.selectedRemoteIceCandidate +
" | TIMESTAMP: " + System.currentTimeMillis();
endpoint.kmsEvents.add(new KmsEvent(event));
String msg = "ICE CANDIDATE SELECTED (" + endpoint.getEndpoint().getTag("name") + "): LOCAL CANDIDATE: "
+ endpoint.selectedLocalIceCandidate + " | REMOTE CANDIDATE: " + endpoint.selectedRemoteIceCandidate
+ " | TIMESTAMP: " + System.currentTimeMillis();
log.warn(msg);
this.infoHandler.sendInfo(msg);
});

View File

@ -268,11 +268,6 @@ public class KurentoSessionManager extends SessionManager {
session.newPublisher(participant);
kurentoParticipant.setAudioActive(kurentoOptions.audioActive);
kurentoParticipant.setVideoActive(kurentoOptions.videoActive);
kurentoParticipant.setTypeOfVideo(kurentoOptions.typeOfVideo);
kurentoParticipant.setFrameRate(kurentoOptions.frameRate);
participants = kurentoParticipant.getSession().getParticipants();
if (sdpAnswer != null) {

View File

@ -0,0 +1,14 @@
package io.openvidu.server.kurento.endpoint;
import org.kurento.client.MediaEvent;
public class KmsEvent {
long timestamp;
MediaEvent event;
public KmsEvent(MediaEvent event) {
this.event = event;
this.timestamp = System.currentTimeMillis();
}
}

View File

@ -19,9 +19,12 @@ package io.openvidu.server.kurento.endpoint;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.kurento.client.Continuation;
import org.kurento.client.ErrorEvent;
@ -29,8 +32,8 @@ import org.kurento.client.EventListener;
import org.kurento.client.IceCandidate;
import org.kurento.client.ListenerSubscription;
import org.kurento.client.MediaElement;
import org.kurento.client.MediaObject;
import org.kurento.client.MediaPipeline;
import org.kurento.client.MediaType;
import org.kurento.client.OnIceCandidateEvent;
import org.kurento.client.RtpEndpoint;
import org.kurento.client.SdpEndpoint;
@ -40,6 +43,7 @@ import org.slf4j.LoggerFactory;
import io.openvidu.client.OpenViduException;
import io.openvidu.client.OpenViduException.Code;
import io.openvidu.server.core.MediaOptions;
import io.openvidu.server.core.Participant;
import io.openvidu.server.kurento.MutedMediaType;
import io.openvidu.server.kurento.core.KurentoParticipant;
@ -70,11 +74,13 @@ public abstract class MediaEndpoint {
private MutedMediaType muteType;
public Map<String, MediaObject> flowInMedia = new ConcurrentHashMap<>();
public Map<String, MediaObject> flowOutMedia = new ConcurrentHashMap<>();
private MediaOptions mediaOptions;
public Map<String, MediaType> flowInMedia = new ConcurrentHashMap<>();
public Map<String, MediaType> flowOutMedia = new ConcurrentHashMap<>();
public String selectedLocalIceCandidate;
public String selectedRemoteIceCandidate;
public Queue<KmsEvent> kmsEvents = new ConcurrentLinkedQueue<>();
/**
* Constructor to set the owner, the endpoint's name and the media pipeline.
@ -98,6 +104,14 @@ public abstract class MediaEndpoint {
this.setMediaPipeline(pipeline);
}
public MediaOptions getMediaOptions() {
return mediaOptions;
}
public void setMediaOptions(MediaOptions mediaOptions) {
this.mediaOptions = mediaOptions;
}
public boolean isWeb() {
return web;
}
@ -495,6 +509,16 @@ public abstract class MediaEndpoint {
json.put("webrtcTagName", this.getEndpoint().getTag("name"));
json.put("localCandidate", this.selectedLocalIceCandidate);
json.put("remoteCandidate", this.selectedRemoteIceCandidate);
JSONArray jsonArray = new JSONArray();
for (KmsEvent event : this.kmsEvents) {
JSONObject jsonKmsEvent = new JSONObject();
jsonKmsEvent.put(event.event.getType(), event.timestamp);
jsonArray.add(jsonKmsEvent);
}
json.put("events", jsonArray);
return json;
}
}

View File

@ -53,11 +53,9 @@ public class PublisherEndpoint extends MediaEndpoint {
private LinkedList<String> elementIds = new LinkedList<String>();
private boolean connected = false;
private Map<String, ListenerSubscription> elementsErrorSubscriptions =
new HashMap<String, ListenerSubscription>();
private Map<String, ListenerSubscription> elementsErrorSubscriptions = new HashMap<String, ListenerSubscription>();
public PublisherEndpoint(boolean web, KurentoParticipant owner,
String endpointName, MediaPipeline pipeline) {
public PublisherEndpoint(boolean web, KurentoParticipant owner, String endpointName, MediaPipeline pipeline) {
super(web, owner, endpointName, pipeline, log);
}
@ -78,8 +76,8 @@ public class PublisherEndpoint extends MediaEndpoint {
}
/**
* @return all media elements created for this publisher, except for the main element (
* {@link WebRtcEndpoint})
* @return all media elements created for this publisher, except for the main
* element ( {@link WebRtcEndpoint})
*/
public synchronized Collection<MediaElement> getMediaElements() {
if (passThru != null) {
@ -89,19 +87,25 @@ public class PublisherEndpoint extends MediaEndpoint {
}
/**
* Initializes this media endpoint for publishing media and processes the SDP offer or answer. If
* the internal endpoint is an {@link WebRtcEndpoint}, it first registers an event listener for
* the ICE candidates and instructs the endpoint to start gathering the candidates. If required,
* it connects to itself (after applying the intermediate media elements and the
* Initializes this media endpoint for publishing media and processes the SDP
* offer or answer. If the internal endpoint is an {@link WebRtcEndpoint}, it
* first registers an event listener for the ICE candidates and instructs the
* endpoint to start gathering the candidates. If required, it connects to
* itself (after applying the intermediate media elements and the
* {@link PassThrough}) to allow loopback of the media stream.
*
* @param sdpType indicates the type of the sdpString (offer or answer)
* @param sdpString offer or answer from the remote peer
* @param doLoopback loopback flag
* @param loopbackAlternativeSrc alternative loopback source
* @param loopbackConnectionType how to connect the loopback source
* @return the SDP response (the answer if processing an offer SDP, otherwise is the updated offer
* generated previously by this endpoint)
* @param sdpType
* indicates the type of the sdpString (offer or answer)
* @param sdpString
* offer or answer from the remote peer
* @param doLoopback
* loopback flag
* @param loopbackAlternativeSrc
* alternative loopback source
* @param loopbackConnectionType
* how to connect the loopback source
* @return the SDP response (the answer if processing an offer SDP, otherwise is
* the updated offer generated previously by this endpoint)
*/
public synchronized String publish(SdpType sdpType, String sdpString, boolean doLoopback,
MediaElement loopbackAlternativeSrc, MediaType loopbackConnectionType) {
@ -157,29 +161,36 @@ public class PublisherEndpoint extends MediaEndpoint {
}
/**
* Changes the media passing through a chain of media elements by applying the specified
* element/shaper. The element is plugged into the stream only if the chain has been initialized
* (a.k.a. media streaming has started), otherwise it is left ready for when the connections
* between elements will materialize and the streaming begins.
* Changes the media passing through a chain of media elements by applying the
* specified element/shaper. The element is plugged into the stream only if the
* chain has been initialized (a.k.a. media streaming has started), otherwise it
* is left ready for when the connections between elements will materialize and
* the streaming begins.
*
* @param shaper {@link MediaElement} that will be linked to the end of the chain (e.g. a filter)
* @param shaper
* {@link MediaElement} that will be linked to the end of the chain
* (e.g. a filter)
* @return the element's id
* @throws OpenViduException if thrown, the media element was not added
* @throws OpenViduException
* if thrown, the media element was not added
*/
public String apply(MediaElement shaper) throws OpenViduException {
return apply(shaper, null);
}
/**
* Same as {@link #apply(MediaElement)}, can specify the media type that will be streamed through
* the shaper element.
* Same as {@link #apply(MediaElement)}, can specify the media type that will be
* streamed through the shaper element.
*
* @param shaper {@link MediaElement} that will be linked to the end of the chain (e.g. a filter)
* @param type indicates which type of media will be connected to the shaper
* ({@link MediaType}), if
* null then the connection is mixed
* @param shaper
* {@link MediaElement} that will be linked to the end of the chain
* (e.g. a filter)
* @param type
* indicates which type of media will be connected to the shaper
* ({@link MediaType}), if null then the connection is mixed
* @return the element's id
* @throws OpenViduException if thrown, the media element was not added
* @throws OpenViduException
* if thrown, the media element was not added
*/
public synchronized String apply(MediaElement shaper, MediaType type) throws OpenViduException {
String id = shaper.getId();
@ -210,18 +221,20 @@ public class PublisherEndpoint extends MediaEndpoint {
}
/**
* Removes the media element object found from the media chain structure. The object is released.
* If the chain is connected, both adjacent remaining elements will be interconnected.
* Removes the media element object found from the media chain structure. The
* object is released. If the chain is connected, both adjacent remaining
* elements will be interconnected.
*
* @param shaper {@link MediaElement} that will be removed from the chain
* @throws OpenViduException if thrown, the media element was not removed
* @param shaper
* {@link MediaElement} that will be removed from the chain
* @throws OpenViduException
* if thrown, the media element was not removed
*/
public synchronized void revert(MediaElement shaper) throws OpenViduException {
revert (shaper, true);
revert(shaper, true);
}
public synchronized void revert(MediaElement shaper, boolean releaseElement) throws
OpenViduException {
public synchronized void revert(MediaElement shaper, boolean releaseElement) throws OpenViduException {
final String elementId = shaper.getId();
if (!elements.containsKey(elementId)) {
throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE,
@ -328,8 +341,7 @@ public class PublisherEndpoint extends MediaEndpoint {
return elementIds.get(idx - 1);
}
private void connectAltLoopbackSrc(MediaElement loopbackAlternativeSrc,
MediaType loopbackConnectionType) {
private void connectAltLoopbackSrc(MediaElement loopbackAlternativeSrc, MediaType loopbackConnectionType) {
if (!connected) {
innerConnect();
}
@ -367,38 +379,38 @@ public class PublisherEndpoint extends MediaEndpoint {
@Override
public void onError(Throwable cause) throws Exception {
log.warn("EP {}: Failed to connect media elements (source {} -> sink {})",
getEndpointName(), source.getId(), sink.getId(), cause);
log.warn("EP {}: Failed to connect media elements (source {} -> sink {})", getEndpointName(),
source.getId(), sink.getId(), cause);
}
});
}
/**
* Same as {@link #internalSinkConnect(MediaElement, MediaElement)}, but can specify the type of
* the media that will be streamed.
* Same as {@link #internalSinkConnect(MediaElement, MediaElement)}, but can
* specify the type of the media that will be streamed.
*
* @param source
* @param sink
* @param type if null, {@link #internalSinkConnect(MediaElement, MediaElement)} will be used
* instead
* @param type
* if null, {@link #internalSinkConnect(MediaElement, MediaElement)}
* will be used instead
* @see #internalSinkConnect(MediaElement, MediaElement)
*/
private void internalSinkConnect(final MediaElement source, final MediaElement sink,
final MediaType type) {
private void internalSinkConnect(final MediaElement source, final MediaElement sink, final MediaType type) {
if (type == null) {
internalSinkConnect(source, sink);
} else {
source.connect(sink, type, new Continuation<Void>() {
@Override
public void onSuccess(Void result) throws Exception {
log.debug("EP {}: {} media elements have been connected (source {} -> sink {})",
getEndpointName(), type, source.getId(), sink.getId());
log.debug("EP {}: {} media elements have been connected (source {} -> sink {})", getEndpointName(),
type, source.getId(), sink.getId());
}
@Override
public void onError(Throwable cause) throws Exception {
log.warn("EP {}: Failed to connect {} media elements (source {} -> sink {})",
getEndpointName(), type, source.getId(), sink.getId(), cause);
log.warn("EP {}: Failed to connect {} media elements (source {} -> sink {})", getEndpointName(),
type, source.getId(), sink.getId(), cause);
}
});
}
@ -408,30 +420,30 @@ public class PublisherEndpoint extends MediaEndpoint {
source.disconnect(sink, new Continuation<Void>() {
@Override
public void onSuccess(Void result) throws Exception {
log.debug("EP {}: Elements have been disconnected (source {} -> sink {})",
getEndpointName(), source.getId(), sink.getId());
log.debug("EP {}: Elements have been disconnected (source {} -> sink {})", getEndpointName(),
source.getId(), sink.getId());
}
@Override
public void onError(Throwable cause) throws Exception {
log.warn("EP {}: Failed to disconnect media elements (source {} -> sink {})",
getEndpointName(), source.getId(), sink.getId(), cause);
log.warn("EP {}: Failed to disconnect media elements (source {} -> sink {})", getEndpointName(),
source.getId(), sink.getId(), cause);
}
});
}
/**
* Same as {@link #internalSinkDisconnect(MediaElement, MediaElement)}, but can specify the type
* of the media that will be disconnected.
* Same as {@link #internalSinkDisconnect(MediaElement, MediaElement)}, but can
* specify the type of the media that will be disconnected.
*
* @param source
* @param sink
* @param type if null, {@link #internalSinkConnect(MediaElement, MediaElement)} will be used
* instead
* @param type
* if null, {@link #internalSinkConnect(MediaElement, MediaElement)}
* will be used instead
* @see #internalSinkConnect(MediaElement, MediaElement)
*/
private void internalSinkDisconnect(final MediaElement source, final MediaElement sink,
final MediaType type) {
private void internalSinkDisconnect(final MediaElement source, final MediaElement sink, final MediaType type) {
if (type == null) {
internalSinkDisconnect(source, sink);
} else {
@ -444,8 +456,8 @@ public class PublisherEndpoint extends MediaEndpoint {
@Override
public void onError(Throwable cause) throws Exception {
log.warn("EP {}: Failed to disconnect {} media elements (source {} -> sink {})",
getEndpointName(), type, source.getId(), sink.getId(), cause);
log.warn("EP {}: Failed to disconnect {} media elements (source {} -> sink {})", getEndpointName(),
type, source.getId(), sink.getId(), cause);
}
});
}

View File

@ -179,8 +179,10 @@ export class TestScenariosComponent implements OnInit, OnDestroy {
private startSession() {
for (const user of this.users) {
this.getToken().then(token => {
const startTimeForUser = Date.now();
const OV = new OpenVidu();
if (this.turnConf === 'freeice') {
@ -206,9 +208,9 @@ export class TestScenariosComponent implements OnInit, OnDestroy {
.find(s => s.connectionId === session.connection.connectionId).subs
.find(s => s.streamManager.stream.connection.connectionId === subscriber.stream.connection.connectionId);
if (!!error) {
subAux.state['errorConnecting'] = Date.now();
subAux.state['errorConnecting'] = (Date.now() - startTimeForUser);
} else {
subAux.state['connected'] = Date.now();
subAux.state['connected'] = (Date.now() - startTimeForUser);
}
});
@ -217,16 +219,18 @@ export class TestScenariosComponent implements OnInit, OnDestroy {
this.subscribers.push({
connectionId: session.connection.connectionId,
subs: [{
startTime: startTimeForUser,
connectionId: session.connection.connectionId,
streamManager: subscriber,
state: { 'connecting': Date.now() }
state: { 'connecting': (Date.now() - startTimeForUser) }
}]
});
} else {
sub.subs.push({
startTime: startTimeForUser,
connectionId: session.connection.connectionId,
streamManager: subscriber,
state: { 'connecting': Date.now() }
state: { 'connecting': (Date.now() - startTimeForUser) }
});
}
@ -234,7 +238,7 @@ export class TestScenariosComponent implements OnInit, OnDestroy {
this.subscribers
.find(s => s.connectionId === session.connection.connectionId).subs
.find(s => s.streamManager.stream.connection.connectionId === subscriber.stream.connection.connectionId)
.state['playing'] = Date.now();
.state['playing'] = (Date.now() - startTimeForUser);
});
});
}
@ -245,19 +249,20 @@ export class TestScenariosComponent implements OnInit, OnDestroy {
const publisher = OV.initPublisher(undefined, this.publisherProperties);
const publisherWrapper = {
startTime: startTimeForUser,
connectionId: session.connection.connectionId,
streamManager: publisher,
state: { 'connecting': Date.now() }
state: { 'connecting': (Date.now() - startTimeForUser) }
};
publisher.on('streamCreated', () => {
publisherWrapper.state['connected'] = Date.now();
publisherWrapper.state['connected'] = (Date.now() - startTimeForUser);
});
publisher.on('streamPlaying', () => {
publisherWrapper.state['playing'] = Date.now();
publisherWrapper.state['playing'] = (Date.now() - startTimeForUser);
});
session.publish(publisher).catch(() => {
publisherWrapper.state['errorConnecting'] = Date.now();
publisherWrapper.state['errorConnecting'] = (Date.now() - startTimeForUser);
});
this.publishers.push(publisherWrapper);
@ -308,6 +313,7 @@ export class TestScenariosComponent implements OnInit, OnDestroy {
if (event.streamManager.remote) {
newReport = {
connectionId: event.connectionId,
startTime: event.startTime,
streamId: event.streamManager.stream.streamId,
state: event.state,
candidatePairSelectedByBrowser: {
@ -318,8 +324,10 @@ export class TestScenariosComponent implements OnInit, OnDestroy {
localCandidate: {},
remoteCandidate: {}
},
iceCandidatesSentByBrowser: event.streamManager.stream.getLocalIceCandidateList(),
iceCandidatesReceivedByBrowser: event.streamManager.stream.getRemoteIceCandidateList()
iceCandidatesSentByBrowser:
event.streamManager.stream.getLocalIceCandidateList().map((c: RTCIceCandidate) => c.candidate),
iceCandidatesReceivedByBrowser:
event.streamManager.stream.getRemoteIceCandidateList().map((c: RTCIceCandidate) => c.candidate),
};
this.report.streamsIn.count++;
@ -327,6 +335,7 @@ export class TestScenariosComponent implements OnInit, OnDestroy {
} else {
newReport = {
connectionId: event.connectionId,
startTime: event.startTime,
streamId: event.streamManager.stream.streamId,
state: event.state,
candidatePairSelectedByBrowser: {
@ -337,8 +346,10 @@ export class TestScenariosComponent implements OnInit, OnDestroy {
localCandidate: {},
remoteCandidate: {}
},
iceCandidatesSentByBrowser: event.streamManager.stream.getLocalIceCandidateList(),
iceCandidatesReceivedByBrowser: event.streamManager.stream.getRemoteIceCandidateList()
iceCandidatesSentByBrowser:
event.streamManager.stream.getLocalIceCandidateList().map((c: RTCIceCandidate) => c.candidate),
iceCandidatesReceivedByBrowser:
event.streamManager.stream.getRemoteIceCandidateList().map((c: RTCIceCandidate) => c.candidate)
};
this.report.streamsOut.count++;
@ -370,6 +381,12 @@ export class TestScenariosComponent implements OnInit, OnDestroy {
localCandidate: this.parseRemoteCandidatePair(streamOutRemoteInfo.localCandidate),
remoteCandidate: this.parseRemoteCandidatePair(streamOutRemoteInfo.remoteCandidate)
};
report.serverEvents = streamOutRemoteInfo.events;
for (const ev of report.serverEvents) {
for (const key of Object.keys(ev)) {
ev[key] = Number(ev[key]) - report.startTime;
}
}
});
this.report.streamsIn.items.forEach(report => {
@ -383,6 +400,12 @@ export class TestScenariosComponent implements OnInit, OnDestroy {
localCandidate: this.parseRemoteCandidatePair(streamInRemoteInfo.localCandidate),
remoteCandidate: this.parseRemoteCandidatePair(streamInRemoteInfo.remoteCandidate)
};
report.serverEvents = streamInRemoteInfo.events;
for (const ev of report.serverEvents) {
for (const key of Object.keys(ev)) {
ev[key] = Number(ev[key]) - report.startTime;
}
}
});
this.stringifyAllReports = JSON.stringify(this.report, null, '\t');

View File

@ -31,8 +31,9 @@ export class TableVideoComponent implements AfterViewInit, DoCheck {
ngAfterViewInit() {
this.playingTimeout = setTimeout(() => {
if (!this.state['playing']) {
this.state['timeoutPlaying'] = Date.now();
this.state['timeoutPlaying'] = Date.now() - this.streamManager.startTime;
this.readyForReport.emit({
startTime: this.streamManager.startTime,
connectionId: this.streamManager.connectionId,
state: this.state,
streamManager: this.streamManager.streamManager
@ -48,6 +49,7 @@ export class TableVideoComponent implements AfterViewInit, DoCheck {
if (this.success() || this.fail()) {
clearTimeout(this.playingTimeout);
this.readyForReport.emit({
startTime: this.streamManager.startTime,
connectionId: this.streamManager.connectionId,
state: this.state,
streamManager: this.streamManager.streamManager
@ -78,6 +80,7 @@ export class TableVideoComponent implements AfterViewInit, DoCheck {
}
export interface StreamManagerWrapper {
startTime: number;
connectionId: string;
streamManager: StreamManager;
state: any;