WebRTC研究:audio 如何发送需要重传的数据

当 WebRTC 收到一个 RTP 包重传请求(一个请求中可能包含多个要重传的 seq)时:

  • 首先,判断是否有足够的带宽来发送需要重传的包,否的话,则直接拒绝本次重传。判断依据:最近 1s 内且最近 60次 (即默认以最近 1s 为参考对象,若最近 1s 内有 60 或以上次数的重传,则以最近 60 次为参考对象)发送的总位数 是否小于 目标比特率(target_bitrate)* 时间长度;

  • 遍历需要重传的 seq,并根据 seq 检索对应的数据包在缓存 stored_packets_ 中的存储位置;

  • 判断包能否被重传,判断依据:若对应数据包是第一次被请求重传,则始终同意,否则只有 当前时间 距离 上一次重传该数据包的时间 超过 min_elapsed_time_ms = avg_rtt(平均 rrt)+ 5 ms,才允许被再次重传;

  • 一个重传请求中可能包含多个要重传的 seq,而每重传完一个 seq 对应的数据包,会累加针对本次重传请求所发送的字节总数 bytes_re_sent 并判断是否超过本次请求最多可重传的字节数,如果是的话,则直接忽视本次请求中剩下的待重传 seq,判断依据:bytes_re_sent > (static_cast<size_t>(target_bitrate / 1000) * avg_rtt) >> 3,即以 target_bitrate (每秒传送的 bit 数)为标准在 avg_rtt 时间内最多能重传的字节数。

  • 当前重传请求处理完毕后,会 将本次重传的总字节数当前时间 分别存入集合 nack_byte_count_ 和 nack_byte_count_times_,供下一次响应重传请求在第一步中使用。

一、判断是否有足够的带宽来发送需要重传的包:

bool RTPSender::ProcessNACKBitRate(uint32_t now) 
{
  uint32_t num = 0;
  size_t byte_count = 0;
  const uint32_t kAvgIntervalMs = 1000;

  /*
  获取目标比特率,即每秒传送的比特(bit)数
  */
  uint32_t target_bitrate = GetTargetBitrate();

  rtc::CritScope lock(&send_critsect_);

  if (target_bitrate == 0) 
  {
    return true; //没有比特率的限制
  }

  /*
  计算最近 1s 内重传的总字节数

  NACK_BYTECOUNT_SIZE = 60

  nack_byte_count_times_:存储最近 NACK_BYTECOUNT_SIZE 次重传的时间,时间越近的排在越前面
  nack_byte_count_:存储最近 NACK_BYTECOUNT_SIZE 次重传的字节数,时间越近的排在越前面,
  若某次重传N个包,则这里的字节数指的是N个包的总字节数
  */
  for (num = 0; num < NACK_BYTECOUNT_SIZE; ++num) 
  {
    if ((now - nack_byte_count_times_[num]) > kAvgIntervalMs) 
	{
      // Don't use data older than 1sec.
      break;
    } 
	else 
	{
      byte_count += nack_byte_count_[num];
    }
  }

  // 时间间隔默认为 1000ms
  uint32_t time_interval = kAvgIntervalMs;

  /*
  若最近 1000ms 内,收到了 NACK_BYTECOUNT_SIZE 及以上次重传请求
  则时间间隔以 当前时间 到 最近倒数第 NACK_BYTECOUNT_SIZE 次重传的时间为准
  */
  if (num == NACK_BYTECOUNT_SIZE) 
  {
    if (nack_byte_count_times_[num - 1] <= now) 
	{
      time_interval = now - nack_byte_count_times_[num - 1];
    }
  }

  /*
  计算最近一段时间内,重传的位数是否超过限制
  */
  return (byte_count * 8) < (target_bitrate / 1000 * time_interval);
}

二、根据 seq 检索对应的数据包在缓存 stored_packets_ 中的存储位置

bool RTPPacketHistory::FindSeqNum(uint16_t sequence_number, int32_t* index) const 
{
  /*
  首先根据 prev_index_ 来连续获取 seq
  这里根据 prev_index > 0,处理了 stored_packets_ 扩容问题

  prev_index_ 表示下一个 rtp 包存入 stored_packets_ 的索引
  stored_packets_ 默认长度为 600,当往其中存入 rtp 包时,若发现 prev_index_ 位置上有包并且包还没有发送,
  则以当前容量的1.5倍进行扩容
  stored_packets_ 最大可扩容到 9600
  */

  /* 找到上一次存入 stored_packets_ 的节点 temp_sequence_number */
  uint16_t temp_sequence_number = 0;
  if (prev_index_ > 0) 
  {
    *index = prev_index_ - 1;
    temp_sequence_number = stored_packets_[*index].sequence_number;
  } 
  else 
  {
    *index = stored_packets_.size() - 1;
    temp_sequence_number = stored_packets_[*index].sequence_number;  // wrap
  }

  /*
  首先按连续方式处理
  prev_index_ - 1:上一个节点存入 stored_packets_ 的索引位置
  */
  int32_t idx = (prev_index_ - 1) - (temp_sequence_number - sequence_number);
  if (idx >= 0 && idx < static_cast<int>(stored_packets_.size())) 
  {
    *index = idx;
    temp_sequence_number = stored_packets_[*index].sequence_number;
  }

  if (temp_sequence_number != sequence_number) 
  {
	  /* stored_packets_ 扩容可能导致节点的不连续,此处一次遍历所有节点 */
    for (uint16_t m = 0; m < stored_packets_.size(); m++) 
	{
      if (stored_packets_[m].sequence_number == sequence_number) 
	  {
        *index = m;
        temp_sequence_number = stored_packets_[*index].sequence_number;
        break;
      }
    }
  }

  if (temp_sequence_number == sequence_number) 
  {
    // We found a match.
    return true;
  }
  return false;
}

三、ReSendPacket() 中会调用 GetPacketAndSetSendTime() 检索数据包,并判断包能否被重传:

bool RTPPacketHistory::GetPacketAndSetSendTime(uint16_t sequence_number,
                                               int64_t min_elapsed_time_ms,
                                               bool retransmit,
                                               uint8_t* packet,
                                               size_t* packet_length,
                                               int64_t* stored_time_ms) 
{
  rtc::CritScope cs(&critsect_);
  RTC_CHECK_GE(*packet_length, static_cast<size_t>(IP_PACKET_SIZE));
  if (!store_)
    return false;

  /* 从缓存 stored_packets_ 中找到包的索引位置 */
  int32_t index = 0;
  bool found = FindSeqNum(sequence_number, &index);
  if (!found) 
  {
    LOG(LS_WARNING) << "No match for getting seqNum " << sequence_number;
    return false;
  }

  size_t length = stored_packets_[index].length;
  assert(length <= IP_PACKET_SIZE);
  if (length == 0) 
  {
    LOG(LS_WARNING) << "No match for getting seqNum " << sequence_number
                    << ", len " << length;
    return false;
  }

  /*
  计算距离该包上一次重传的时间
  min_elapsed_time_ms:最小再次重传时间,= avg_rtt(平均 rrt)+ 5 ms
  如果是第一次请求重传该包,则会始终重传,后续只有超过时间间隔后,才会再次重传
  */
  int64_t now = clock_->TimeInMilliseconds();
  if (min_elapsed_time_ms > 0 && retransmit &&
      stored_packets_[index].has_been_retransmitted &&
      ((now - stored_packets_[index].send_time) < min_elapsed_time_ms)) 
  {
    return false;
  }

  if (retransmit) 
  {
	/* 包不能被重传 */
    if (stored_packets_[index].storage_type == kDontRetransmit) 
	{
      return false;
    }

	/* 将包标记为已重传 */
    stored_packets_[index].has_been_retransmitted = true;
  }

  /* 记录重传包本次发送的时间 */
  stored_packets_[index].send_time = clock_->TimeInMilliseconds();

  /* 拷贝包内容、长度、时间 */
  GetPacket(index, packet, packet_length, stored_time_ms);
  return true;
}

四、每重传完一个数据包后,会累加针对本次重传请求发送的字节总数并判断是否超过本次请求最多可重传的字节数:

void RTPSender::OnReceivedNACK(const std::list<uint16_t>& nack_sequence_numbers, int64_t avg_rtt) 
{
  TRACE_EVENT2(TRACE_DISABLED_BY_DEFAULT("webrtc_rtp"),
               "RTPSender::OnReceivedNACK", "num_seqnum",
               nack_sequence_numbers.size(), "avg_rtt", avg_rtt);
  const int64_t now = clock_->TimeInMilliseconds();
  uint32_t bytes_re_sent = 0;
  uint32_t target_bitrate = GetTargetBitrate();

  // 判断是否有足够的带宽来发送 NACK
  if (!ProcessNACKBitRate(now)) 
  {
    LOG(LS_INFO) << "NACK bitrate reached. Skip sending NACK response. Target "
                 << target_bitrate;
    return;
  }

  for (std::list<uint16_t>::const_iterator it = nack_sequence_numbers.begin();
      it != nack_sequence_numbers.end(); ++it) 
  {
    const int32_t bytes_sent = ReSendPacket(*it, 5 + avg_rtt);
   
	if (bytes_sent > 0) 
	{
      bytes_re_sent += bytes_sent; //累加包的长度
    } 
	else if (bytes_sent == 0) 
	{
      // The packet has previously been resent.
      // Try resending next packet in the list.
      continue;
    } 
	else 
	{
	  /* 若发送出错,则剩下的 seq 不重传了 */
      LOG(LS_WARNING) << "Failed resending RTP packet " << *it
                      << ", Discard rest of packets";
      break;
    }

	// 每发送一个包之后,估算后续带宽:Delay bandwidth
    if (target_bitrate != 0 && avg_rtt) 
	{
      // kbits/s * ms = bits => bits/8 = bytes
	  // 计算每次最多能重传的字节数:rrt 时间内可发送的字节数
      size_t target_bytes = (static_cast<size_t>(target_bitrate / 1000) * avg_rtt) >> 3;

      if (bytes_re_sent > target_bytes) 
	  {
        break;  // 终止重传,直接忽视剩下的 seq
      }
    }
  }

  // 更新本次发送的字节数
  if (bytes_re_sent > 0) 
  {
    UpdateNACKBitRate(bytes_re_sent, now);
  }
}

五、更新本次重传的字节数和重传时间:

void RTPSender::UpdateNACKBitRate(uint32_t bytes, int64_t now) 
{
  rtc::CritScope lock(&send_critsect_);
  if (bytes == 0)
    return;

  /* 累计发送的字节数与次数 */
  nack_bitrate_.Update(bytes);
 
  /* 将 0 ~ NACK_BYTECOUNT_SIZE - 2 区间的节点,往后挪一个位置 */
  for (int i = NACK_BYTECOUNT_SIZE - 2; i >= 0; i--) 
  {
    nack_byte_count_[i + 1] = nack_byte_count_[i];
    nack_byte_count_times_[i + 1] = nack_byte_count_times_[i];
  }

  /* 将本次重传时间与字节总数 放到数组第一个位置 */
  nack_byte_count_[0] = bytes;
  nack_byte_count_times_[0] = now;
}
发布了112 篇原创文章 · 获赞 22 · 访问量 6万+

猜你喜欢

转载自blog.csdn.net/u010601662/article/details/105193389