WebRtcConnection源码分析,分析每个函数的作用
// 异步加入远端candidate
boost::future<void> WebRtcConnection::addRemoteCandidate(std::string mid, int mLineIndex, std::string sdp) {
return asyncTask([mid, mLineIndex, sdp] (std::shared_ptr<WebRtcConnection> connection) {
connection->addRemoteCandidateSync(mid, mLineIndex, sdp);
});
}
// 加入远端candidate实现,向remote_sdp_加入candidate信息,类似前面processRemoteSdp
bool WebRtcConnection::addRemoteCandidateSync(std::string mid, int mLineIndex, std::string sdp) {
// TODO(pedro) Check type of transport.
ELOG_DEBUG("%s message: Adding remote Candidate, candidate: %s, mid: %s, sdpMLine: %d",
toLog(), sdp.c_str(), mid.c_str(), mLineIndex);
if (video_transport_ == nullptr && audio_transport_ == nullptr) {
ELOG_WARN("%s message: addRemoteCandidate on NULL transport", toLog());
return false;
}
MediaType theType;
std::string theMid;
// TODO(pedro) check if this works with video+audio and no bundle
if (mLineIndex == -1) {
ELOG_DEBUG("%s message: All candidates received", toLog());
if (video_transport_) {
video_transport_->getIceConnection()->setReceivedLastCandidate(true);
} else if (audio_transport_) {
audio_transport_->getIceConnection()->setReceivedLastCandidate(true);
}
return true;
}
if ((!mid.compare("video")) || (mLineIndex == remote_sdp_->videoSdpMLine)) {
theType = VIDEO_TYPE;
theMid = "video";
} else {
theType = AUDIO_TYPE;
theMid = "audio";
}
SdpInfo tempSdp(rtp_mappings_);
std::string username = remote_sdp_->getUsername(theType);
std::string password = remote_sdp_->getPassword(theType);
tempSdp.setCredentials(username, password, OTHER);
bool res = false;
if (tempSdp.initWithSdp(sdp, theMid)) {
if (theType == VIDEO_TYPE || bundle_) {
res = video_transport_->setRemoteCandidates(tempSdp.getCandidateInfos(), bundle_);
} else if (theType == AUDIO_TYPE) {
res = audio_transport_->setRemoteCandidates(tempSdp.getCandidateInfos(), bundle_);
} else {
ELOG_ERROR("%s message: add remote candidate with no Media (video or audio), candidate: %s",
toLog(), sdp.c_str() );
}
}
for (uint8_t it = 0; it < tempSdp.getCandidateInfos().size(); it++) {
remote_sdp_->addCandidate(tempSdp.getCandidateInfos()[it]);
}
return res;
}
// 在audio_transport_与video_transport_中处理本地local_sdp_,并返回sdp信息
std::string WebRtcConnection::getLocalSdp() {
ELOG_DEBUG("%s message: Getting Local Sdp", toLog());
if (video_transport_ != nullptr && getCurrentState() != CONN_READY) {
video_transport_->processLocalSdp(local_sdp_.get());
}
if (!bundle_ && audio_transport_ != nullptr && getCurrentState() != CONN_READY) {
audio_transport_->processLocalSdp(local_sdp_.get());
}
local_sdp_->profile = remote_sdp_->profile;
return local_sdp_->getSdp();
}
// 获取json格式candidate sdp字符串
std::string WebRtcConnection::getJSONCandidate(const std::string& mid, const std::string& sdp) {
std::map <std::string, std::string> object;
object["sdpMid"] = mid;
object["candidate"] = sdp;
object["sdpMLineIndex"] =
std::to_string((mid.compare("video")?local_sdp_->audioSdpMLine : local_sdp_->videoSdpMLine));
std::ostringstream theString;
theString << "{";
for (std::map<std::string, std::string>::const_iterator it = object.begin(); it != object.end(); ++it) {
theString << "\"" << it->first << "\":\"" << it->second << "\"";
if (++it != object.end()) {
theString << ",";
}
--it;
}
theString << "}";
return theString.str();
}
// 向上回调远端 audio或video的candidate json字符串
void WebRtcConnection::onCandidate(const CandidateInfo& cand, Transport *transport) {
std::string sdp = local_sdp_->addCandidate(cand);
ELOG_DEBUG("%s message: Discovered New Candidate, candidate: %s", toLog(), sdp.c_str());
if (trickle_enabled_) {
if (!bundle_) {
std::string object = this->getJSONCandidate(transport->transport_name, sdp);
maybeNotifyWebRtcConnectionEvent(CONN_CANDIDATE, object);
} else {
if (remote_sdp_->hasAudio) {
std::string object = this->getJSONCandidate("audio", sdp);
maybeNotifyWebRtcConnectionEvent(CONN_CANDIDATE, object);
}
if (remote_sdp_->hasVideo) {
std::string object2 = this->getJSONCandidate("video", sdp);
maybeNotifyWebRtcConnectionEvent(CONN_CANDIDATE, object2);
}
}
}
}
// remb的流控回调
void WebRtcConnection::onREMBFromTransport(RtcpHeader *chead, Transport *transport) {
std::vector<std::shared_ptr<MediaStream>> streams;
for (uint8_t index = 0; index < chead->getREMBNumSSRC(); index++) {
uint32_t ssrc_feed = chead->getREMBFeedSSRC(index);
forEachMediaStream([ssrc_feed, &streams] (const std::shared_ptr<MediaStream> &media_stream) {
if (media_stream->isSinkSSRC(ssrc_feed)) {
streams.push_back(media_stream);
}
});
}
distributor_->distribute(chead->getREMBBitRate(), chead->getSSRC(), streams, transport);
}
// rtcp的数据回调,并发送给media_streams_
void WebRtcConnection::onRtcpFromTransport(std::shared_ptr<DataPacket> packet, Transport *transport) {
RtpUtils::forEachRtcpBlock(packet, [this, packet, transport](RtcpHeader *chead) {
uint32_t ssrc = chead->isFeedback() ? chead->getSourceSSRC() : chead->getSSRC();
if (chead->isREMB()) {
onREMBFromTransport(chead, transport);
return;
}
std::shared_ptr<DataPacket> rtcp = std::make_shared<DataPacket>(*packet);
rtcp->length = (ntohs(chead->length) + 1) * 4;
std::memcpy(rtcp->data, chead, rtcp->length);
forEachMediaStream([rtcp, transport, ssrc] (const std::shared_ptr<MediaStream> &media_stream) {
if (media_stream->isSourceSSRC(ssrc) || media_stream->isSinkSSRC(ssrc)) {
media_stream->onTransportData(rtcp, transport);
}
});
});
}
// rtp的数据回调,并发送给media_streams_
void WebRtcConnection::onTransportData(std::shared_ptr<DataPacket> packet, Transport *transport) {
if (getCurrentState() != CONN_READY) {
return;
}
char* buf = packet->data;
RtcpHeader *chead = reinterpret_cast<RtcpHeader*> (buf);
if (chead->isRtcp()) {
onRtcpFromTransport(packet, transport);
return;
} else {
RtpHeader *head = reinterpret_cast<RtpHeader*> (buf);
uint32_t ssrc = head->getSSRC();
forEachMediaStream([packet, transport, ssrc] (const std::shared_ptr<MediaStream> &media_stream) {
if (media_stream->isSourceSSRC(ssrc) || media_stream->isSinkSSRC(ssrc)) {
media_stream->onTransportData(packet, transport);
}
});
}
}
// 事件回调通知上层
void WebRtcConnection::maybeNotifyWebRtcConnectionEvent(const WebRTCEvent& event, const std::string& message) {
boost::mutex::scoped_lock lock(event_listener_mutex_);
if (!conn_event_listener_) {
return;
}
conn_event_listener_->notifyEvent(event, message);
}
// 异步执行任务函数
boost::future<void> WebRtcConnection::asyncTask(
std::function<void(std::shared_ptr<WebRtcConnection>)> f) {
auto task_promise = std::make_shared<boost::promise<void>>();
std::weak_ptr<WebRtcConnection> weak_this = shared_from_this();
worker_->task([weak_this, f, task_promise] {
if (auto this_ptr = weak_this.lock()) {
f(this_ptr);
}
task_promise->set_value();
});
return task_promise->get_future();
}
// 更新WebRtcConnection当前状态,started、gathered、ready、failed,并回调状态至上层
void WebRtcConnection::updateState(TransportState state, Transport * transport) {
boost::mutex::scoped_lock lock(update_state_mutex_);
WebRTCEvent temp = global_state_;
std::string msg = "";
ELOG_DEBUG("%s transportName: %s, new_state: %d", toLog(), transport->transport_name.c_str(), state);
if (video_transport_.get() == nullptr && audio_transport_.get() == nullptr) {
ELOG_ERROR("%s message: Updating NULL transport, state: %d", toLog(), state);
return;
}
if (global_state_ == CONN_FAILED) {
// if current state is failed -> noop
return;
}
switch (state) {
case TRANSPORT_STARTED:
if (bundle_) {
temp = CONN_STARTED;
} else {
if ((!remote_sdp_->hasAudio || (audio_transport_.get() != nullptr
&& audio_transport_->getTransportState() == TRANSPORT_STARTED)) &&
(!remote_sdp_->hasVideo || (video_transport_.get() != nullptr
&& video_transport_->getTransportState() == TRANSPORT_STARTED))) {
// WebRTCConnection will be ready only when all channels are ready.
temp = CONN_STARTED;
}
}
break;
case TRANSPORT_GATHERED:
if (bundle_) {
if (!remote_sdp_->getCandidateInfos().empty()) {
// Passing now new candidates that could not be passed before
if (remote_sdp_->hasVideo) {
video_transport_->setRemoteCandidates(remote_sdp_->getCandidateInfos(), bundle_);
}
if (!bundle_ && remote_sdp_->hasAudio) {
audio_transport_->setRemoteCandidates(remote_sdp_->getCandidateInfos(), bundle_);
}
}
if (!trickle_enabled_) {
temp = CONN_GATHERED;
msg = this->getLocalSdp();
}
} else {
if ((!local_sdp_->hasAudio || (audio_transport_.get() != nullptr
&& audio_transport_->getTransportState() == TRANSPORT_GATHERED)) &&
(!local_sdp_->hasVideo || (video_transport_.get() != nullptr
&& video_transport_->getTransportState() == TRANSPORT_GATHERED))) {
// WebRTCConnection will be ready only when all channels are ready.
if (!trickle_enabled_) {
temp = CONN_GATHERED;
msg = this->getLocalSdp();
}
}
}
break;
case TRANSPORT_READY:
if (bundle_) {
temp = CONN_READY;
trackTransportInfo();
forEachMediaStreamAsyncNoPromise([] (const std::shared_ptr<MediaStream> &media_stream) {
media_stream->sendPLIToFeedback();
});
} else {
if ((!remote_sdp_->hasAudio || (audio_transport_.get() != nullptr
&& audio_transport_->getTransportState() == TRANSPORT_READY)) &&
(!remote_sdp_->hasVideo || (video_transport_.get() != nullptr
&& video_transport_->getTransportState() == TRANSPORT_READY))) {
// WebRTCConnection will be ready only when all channels are ready.
temp = CONN_READY;
trackTransportInfo();
forEachMediaStreamAsyncNoPromise([] (const std::shared_ptr<MediaStream> &media_stream) {
media_stream->sendPLIToFeedback();
});
}
}
break;
case TRANSPORT_FAILED:
temp = CONN_FAILED;
sending_ = false;
msg = remote_sdp_->getSdp();
ELOG_ERROR("%s message: Transport Failed, transportType: %s", toLog(), transport->transport_name.c_str() );
cond_.notify_one();
break;
default:
ELOG_DEBUG("%s message: Doing nothing on state, state %d", toLog(), state);
break;
}
if (audio_transport_.get() != nullptr && video_transport_.get() != nullptr) {
ELOG_DEBUG("%s message: %s, transportName: %s, videoTransportState: %d"
", audioTransportState: %d, calculatedState: %d, globalState: %d",
toLog(),
"Update Transport State",
transport->transport_name.c_str(),
static_cast<int>(audio_transport_->getTransportState()),
static_cast<int>(video_transport_->getTransportState()),
static_cast<int>(temp),
static_cast<int>(global_state_));
}
if (global_state_ == temp) {
return;
}
global_state_ = temp;
ELOG_INFO("%s newGlobalState: %d", toLog(), temp);
maybeNotifyWebRtcConnectionEvent(global_state_, msg);
}
// media_stream设置audio_info和video_info的candidate信息
void WebRtcConnection::trackTransportInfo() {
CandidatePair candidate_pair;
std::string audio_info;
std::string video_info;
if (video_enabled_ && video_transport_) {
candidate_pair = video_transport_->getIceConnection()->getSelectedPair();
video_info = candidate_pair.clientHostType;
}
if (audio_enabled_ && audio_transport_) {
candidate_pair = audio_transport_->getIceConnection()->getSelectedPair();
audio_info = candidate_pair.clientHostType;
}
asyncTask([audio_info, video_info] (std::shared_ptr<WebRtcConnection> connection) {
connection->forEachMediaStreamAsyncNoPromise(
[audio_info, video_info] (const std::shared_ptr<MediaStream> &media_stream) {
media_stream->setTransportInfo(audio_info, video_info);
});
});
}
// 设置元数据
void WebRtcConnection::setMetadata(std::map<std::string, std::string> metadata) {
setLogContext(metadata);
}
// 设置上层的回调函数
void WebRtcConnection::setWebRtcConnectionEventListener(WebRtcConnectionEventListener* listener) {
boost::mutex::scoped_lock lock(event_listener_mutex_);
this->conn_event_listener_ = listener;
}
// 获取当前WebRtcConnection的状态
WebRTCEvent WebRtcConnection::getCurrentState() {
return global_state_;
}
// 异步写包功能
void WebRtcConnection::write(std::shared_ptr<DataPacket> packet) {
asyncTask([packet] (std::shared_ptr<WebRtcConnection> connection) {
connection->syncWrite(packet);
});
}
// 向audio或video transport中写入包数据
void WebRtcConnection::syncWrite(std::shared_ptr<DataPacket> packet) {
if (!sending_) {
return;
}
Transport *transport = (bundle_ || packet->type == VIDEO_PACKET) ? video_transport_.get() : audio_transport_.get();
if (transport == nullptr) {
return;
}
this->extension_processor_.processRtpExtensions(packet);
transport->write(packet->data, packet->length);
}
// 设置video_transport_,仅用于测试
void WebRtcConnection::setTransport(std::shared_ptr<Transport> transport) { // Only for Testing purposes
video_transport_ = std::move(transport);
bundle_ = true;
}