quic client的数据发送流程

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/u010643777/article/details/84844065

 客户端将数据发送到缓冲区。

void QuicSpdyStream::WriteOrBufferBody(
    QuicStringPiece data,
    bool fin,
    QuicReferenceCountedPointer<QuicAckListenerInterface> ack_listener) {
  WriteOrBufferData(data, fin, std::move(ack_listener));
}

void QuicStream::WriteOrBufferData(
    QuicStringPiece data,
    bool fin,
    QuicReferenceCountedPointer<QuicAckListenerInterface> ack_listener) {
 if (data.length() > 0) {
     //将data包装成iovec
    struct iovec iov(MakeIovec(data));
    QuicStreamOffset offset = send_buffer_.stream_offset();
    if (kMaxStreamLength - offset < data.length()) {
      QUIC_BUG << "Write too many data via stream " << id_;
      CloseConnectionWithDetails(
          QUIC_STREAM_LENGTH_OVERFLOW,
          QuicStrCat("Write too many data via stream ", id_));
      return;
    }
    //iov put data to send_buffer_
    send_buffer_.SaveStreamData(&iov, 1, 0, data.length());
    OnDataBuffered(offset, data.length(), ack_listener);
  }
    if (!had_buffered_data && (HasBufferedData() || fin_buffered_)) {
    // Write data if there is no buffered data before.
    WriteBufferedData();
  }
    }
    
void QuicStreamSendBuffer::SaveStreamData(const struct iovec* iov,
                                          int iov_count,
                                          size_t iov_offset,
                                          QuicByteCount data_length) {
    size_t slice_len = std::min(data_length, max_data_slice_size);
    QuicMemSlice slice(allocator_, slice_len);
    QuicUtils::CopyToBuffer(iov, iov_count, iov_offset, slice_len,
                            const_cast<char*>(slice.data()));                      
    SaveMemSlice(std::move(slice));                                        
                                              }
void QuicStreamSendBuffer::SaveMemSlice(QuicMemSlice slice) {
    buffered_slices_.emplace_back(std::move(slice), stream_offset_);
}

 缓冲区的数据怎么被取走,并发送到网络中。这次倒着看代码。

bool QuicStreamSendBuffer::WriteStreamData(QuicStreamOffset offset,
                                           QuicByteCount data_length,
                                           QuicDataWriter* writer) {
// 缓冲区的数据被写进writer
    if (!writer->WriteBytes(slice_it->slice.data() + slice_offset,
                            copy_length)) {
      QUIC_BUG << "Writer fails to write.";
      return false;
    }
//问题,数据写进writer后,为什么不清除 buffered_slices中对应的数据块?
//此处不清除,什么时候清除?
}
bool QuicStream::WriteStreamData(QuicStreamOffset offset,
                                 QuicByteCount data_length,
                                 QuicDataWriter* writer) {
  DCHECK_LT(0u, data_length);
  QUIC_DVLOG(2) << ENDPOINT << "Write stream " << id_ << " data from offset "
                << offset << " length " << data_length;
  return send_buffer_.WriteStreamData(offset, data_length, writer);
}

WriteStreamDataResult QuicSession::WriteStreamData(QuicStreamId id,
                                                   QuicStreamOffset offset,
                                                   QuicByteCount data_length,
                                                   QuicDataWriter* writer) {
  QuicStream* stream = GetStream(id);
  if (stream->WriteStreamData(offset, data_length, writer)) {
    return WRITE_SUCCESS;
  }    
}
//↑ WriteStreamData被AppendStreamFrame调用
bool QuicFramer::AppendStreamFrame(const QuicStreamFrame& frame,
                                   bool no_stream_frame_length,
                                   QuicDataWriter* writer) {
    if (data_producer_->WriteStreamData(frame.stream_id, frame.offset,
                                        frame.data_length,
                                        writer) != WRITE_SUCCESS) {
      QUIC_BUG << "Writing frame data failed.";
      return false;
    }                                       
}
//↑
size_t QuicFramer::BuildDataPacket(const QuicPacketHeader& header,
                                   const QuicFrames& frames,
                                   char* buffer,
                                   size_t packet_length){
switch (frame.type){
      case STREAM_FRAME:
        if (!AppendStreamFrame(frame.stream_frame, last_frame_in_packet,
                               &writer)) {
          QUIC_BUG << "AppendStreamFrame failed";
          return 0;
        }
        break;    
}                                       
}
//↑
void QuicPacketCreator::SerializePacket(char* encrypted_buffer,
                                        size_t encrypted_buffer_len){
  size_t length = framer_->BuildDataPacket(header, queued_frames_,
                                           encrypted_buffer, packet_size_);                                            
                                        }
  //↑                                
void QuicPacketCreator::Flush(){
SerializePacket(serialized_packet_buffer, kMaxPacketSize);
OnSerializedPacket();//拿到数据,进行回调    
}
//↓
void QuicPacketCreator::OnSerializedPacket()
{
  SerializedPacket packet(std::move(packet_));
  ClearPacket();
  delegate_->OnSerializedPacket(&packet);    
}
//上文的delegate_为QuicConnection
void QuicConnection::OnSerializedPacket(SerializedPacket* serialized_packet){
SendOrQueuePacket(serialized_packet);    
}
void QuicConnection::SendOrQueuePacket(SerializedPacket* packet) {
    if (!queued_packets_.empty() || !WritePacket(packet)) {
    }
}
bool QuicConnection::WritePacket(SerializedPacket* packet){
  WriteResult result = writer_->WritePacket(
      packet->encrypted_buffer, encrypted_length, self_address().host(),
      peer_address(), per_packet_options_);    
}
quic::WriteResult QuicChromiumPacketWriter::WritePacket(
    const char* buffer,
    size_t buf_len,
    const quic::QuicIpAddress& self_address,
    const quic::QuicSocketAddress& peer_address,
    quic::PerPacketOptions* /*options*/) {
  DCHECK(!IsWriteBlocked());
  SetPacket(buffer, buf_len);
  return WritePacketToSocketImpl();
}
//发送到网络中,google封装的socket逻辑。
quic::WriteResult QuicChromiumPacketWriter::WritePacketToSocketImpl() {
  int rv = socket_->Write(packet_.get(), packet_->size(), write_callback_,
                          kTrafficAnnotation);
}    

 解决问题,Flush被那个类调用?

//将缓冲区发送出去的一个总的入口,可以看看这个类都被那些函数调用
void QuicStream::WriteBufferedData() {
  QuicConsumedData consumed_data =
      WritevDataInner(write_length, stream_bytes_written(), fin);    
}
QuicConsumedData QuicStream::WritevDataInner(size_t write_length,
                                             QuicStreamOffset offset,
                                             bool fin) {
  StreamSendingState state = fin ? FIN : NO_FIN;
  if (fin && add_random_padding_after_fin_) {
    state = FIN_AND_PADDING;
  }
  return session()->WritevData(this, id(), write_length, offset, state);
}
QuicConsumedData QuicSession::WritevData(QuicStream* stream,
                                         QuicStreamId id,
                                         size_t write_length,
                                         QuicStreamOffset offset,
                                         StreamSendingState state)
                                         {
  QuicConsumedData data =
      connection_->SendStreamData(id, write_length, offset, state);                                             
                                         }
QuicConsumedData QuicConnection::SendStreamData(QuicStreamId id,
                                                size_t write_length,
                                                QuicStreamOffset offset,
                                                StreamSendingState state){
return packet_generator_.ConsumeData(id, write_length, offset, state);                                                    
                                                }
QuicConsumedData QuicPacketGenerator::ConsumeData(QuicStreamId id,
                                                  size_t write_length,
                                                  QuicStreamOffset offset,
                                                  StreamSendingState state){
    if (!packet_creator_.ConsumeData(id, write_length, total_bytes_consumed,
                                     offset + total_bytes_consumed, fin,
                                     has_handshake, &frame)) {
      // The creator is always flushed if there's not enough room for a new
      // stream frame before ConsumeData, so ConsumeData should always succeed.
      QUIC_BUG << "Failed to ConsumeData, stream:" << id;
      return QuicConsumedData(0, false);
    }
packet_creator_.Flush();    
 }
 bool QuicPacketCreator::ConsumeData(QuicStreamId id,
                                    size_t write_length,
                                    size_t iov_offset,
                                    QuicStreamOffset offset,
                                    bool fin,
                                    bool needs_full_padding,
                                    QuicFrame* frame){
  CreateStreamFrame(id, write_length, iov_offset, offset, fin, frame);                                      
  if (!AddFrame(*frame, /*save_retransmittable_frames=*/true)) {
    // Fails if we try to write unencrypted stream data.
    return false;
  }                                        
}                                                

 其中:WriteBufferedData在WriteOrBufferData中被调用过,基本又回到代码分析的起点。
 丢失数据包重传:

bool QuicSentPacketManager::OnAckFrameEnd(QuicTime ack_receive_time){
for (AckedPacket& acked_packet : packets_acked_){
    MarkPacketHandled(acked_packet.packet_number, info,
                      last_ack_frame_.ack_delay_time);    
}
  PostProcessAfterMarkingPacketHandled(last_ack_frame_, ack_receive_time,
                                       rtt_updated_, prior_bytes_in_flight);    
}
void QuicSentPacketManager::PostProcessAfterMarkingPacketHandled(
    const QuicAckFrame& ack_frame,
    QuicTime ack_receive_time,
    bool rtt_updated,
    QuicByteCount prior_bytes_in_flight) {
        InvokeLossDetection(ack_receive_time);
    }
void QuicSentPacketManager::InvokeLossDetection(QuicTime time){
 MarkForRetransmission(packet.packet_number, LOSS_RETRANSMISSION);    
}
//MarkForRetransmission is called for different retransmission type
void QuicSentPacketManager::MarkForRetransmission(
    QuicPacketNumber packet_number,
    TransmissionType transmission_type){
  QuicTransmissionInfo* transmission_info =
      unacked_packets_.GetMutableTransmissionInfo(packet_number);
HandleRetransmission(transmission_type, transmission_info);      
    }
void QuicSentPacketManager::HandleRetransmission(
    TransmissionType transmission_type,
    QuicTransmissionInfo* transmission_info){
  unacked_packets_.RetransmitFrames(*transmission_info, transmission_type);        
    }
void QuicUnackedPacketMap::RetransmitFrames(const QuicTransmissionInfo& info,
                                            TransmissionType type) {
  DCHECK(session_decides_what_to_write_);
  session_notifier_->RetransmitFrames(info.retransmittable_frames, type);
}
void QuicSession::RetransmitFrames(const QuicFrames& frames,
                                   TransmissionType type){
    if (stream != nullptr &&
        !stream->RetransmitStreamData(frame.stream_frame.offset,
                                      frame.stream_frame.data_length,
                                      frame.stream_frame.fin)) {
      break;
    }                                       
                                   }
bool QuicStream::RetransmitStreamData(QuicStreamOffset offset,
                                      QuicByteCount data_length,
                                      bool fin) {
    consumed = session()->WritevData(this, id_, retransmission_length,
                                     retransmission_offset,
                                     can_bundle_fin ? FIN : NO_FIN);
}

 quic协议packet_number与offset信息的记录。

void QuicConnection::SendOrQueuePacket(SerializedPacket* packet){
  if (!queued_packets_.empty() || !WritePacket(packet)) {
    // Take ownership of the underlying encrypted packet.
    //原来的encrypted_buffer指向的是函数栈内的内存
    packet->encrypted_buffer = CopyBuffer(*packet);
    queued_packets_.push_back(*packet);
    packet->retransmittable_frames.clear();
  }    
}
bool QuicConnection::WritePacket(SerializedPacket* packet){
  bool reset_retransmission_alarm = sent_packet_manager_.OnPacketSent(
      packet, packet->original_packet_number, packet_send_time,
      packet->transmission_type, IsRetransmittable(*packet));    
}
bool QuicSentPacketManager::OnPacketSent(
    SerializedPacket* serialized_packet,
    QuicPacketNumber original_packet_number,
    QuicTime sent_time,
    TransmissionType transmission_type,
    HasRetransmittableData has_retransmittable_data){
    QuicPacketNumber packet_number = serialized_packet->packet_number; 
  unacked_packets_.AddSentPacket(serialized_packet, original_packet_number,
                                 transmission_type, sent_time, in_flight);    
    }
void QuicUnackedPacketMap::AddSentPacket(SerializedPacket* packet,
                                         QuicPacketNumber old_packet_number,
                                         TransmissionType transmission_type,
                                         QuicTime sent_time,
                                         bool set_in_flight){
  QuicTransmissionInfo info(
      packet->encryption_level, packet->packet_number_length, transmission_type,
      sent_time, bytes_sent, has_crypto_handshake, packet->num_padding_bytes);                                             
                                         }

猜你喜欢

转载自blog.csdn.net/u010643777/article/details/84844065