版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/u010643777/article/details/84844065
客户端将数据发送到缓冲区。
void QuicSpdyStream::WriteOrBufferBody(
QuicStringPiece data,
bool fin,
QuicReferenceCountedPointer<QuicAckListenerInterface> ack_listener) {
WriteOrBufferData(data, fin, std::move(ack_listener));
}
void QuicStream::WriteOrBufferData(
QuicStringPiece data,
bool fin,
QuicReferenceCountedPointer<QuicAckListenerInterface> ack_listener) {
if (data.length() > 0) {
//将data包装成iovec
struct iovec iov(MakeIovec(data));
QuicStreamOffset offset = send_buffer_.stream_offset();
if (kMaxStreamLength - offset < data.length()) {
QUIC_BUG << "Write too many data via stream " << id_;
CloseConnectionWithDetails(
QUIC_STREAM_LENGTH_OVERFLOW,
QuicStrCat("Write too many data via stream ", id_));
return;
}
//iov put data to send_buffer_
send_buffer_.SaveStreamData(&iov, 1, 0, data.length());
OnDataBuffered(offset, data.length(), ack_listener);
}
if (!had_buffered_data && (HasBufferedData() || fin_buffered_)) {
// Write data if there is no buffered data before.
WriteBufferedData();
}
}
void QuicStreamSendBuffer::SaveStreamData(const struct iovec* iov,
int iov_count,
size_t iov_offset,
QuicByteCount data_length) {
size_t slice_len = std::min(data_length, max_data_slice_size);
QuicMemSlice slice(allocator_, slice_len);
QuicUtils::CopyToBuffer(iov, iov_count, iov_offset, slice_len,
const_cast<char*>(slice.data()));
SaveMemSlice(std::move(slice));
}
void QuicStreamSendBuffer::SaveMemSlice(QuicMemSlice slice) {
buffered_slices_.emplace_back(std::move(slice), stream_offset_);
}
缓冲区的数据怎么被取走,并发送到网络中。这次倒着看代码。
bool QuicStreamSendBuffer::WriteStreamData(QuicStreamOffset offset,
QuicByteCount data_length,
QuicDataWriter* writer) {
// 缓冲区的数据被写进writer
if (!writer->WriteBytes(slice_it->slice.data() + slice_offset,
copy_length)) {
QUIC_BUG << "Writer fails to write.";
return false;
}
//问题,数据写进writer后,为什么不清除 buffered_slices中对应的数据块?
//此处不清除,什么时候清除?
}
bool QuicStream::WriteStreamData(QuicStreamOffset offset,
QuicByteCount data_length,
QuicDataWriter* writer) {
DCHECK_LT(0u, data_length);
QUIC_DVLOG(2) << ENDPOINT << "Write stream " << id_ << " data from offset "
<< offset << " length " << data_length;
return send_buffer_.WriteStreamData(offset, data_length, writer);
}
WriteStreamDataResult QuicSession::WriteStreamData(QuicStreamId id,
QuicStreamOffset offset,
QuicByteCount data_length,
QuicDataWriter* writer) {
QuicStream* stream = GetStream(id);
if (stream->WriteStreamData(offset, data_length, writer)) {
return WRITE_SUCCESS;
}
}
//↑ WriteStreamData被AppendStreamFrame调用
bool QuicFramer::AppendStreamFrame(const QuicStreamFrame& frame,
bool no_stream_frame_length,
QuicDataWriter* writer) {
if (data_producer_->WriteStreamData(frame.stream_id, frame.offset,
frame.data_length,
writer) != WRITE_SUCCESS) {
QUIC_BUG << "Writing frame data failed.";
return false;
}
}
//↑
size_t QuicFramer::BuildDataPacket(const QuicPacketHeader& header,
const QuicFrames& frames,
char* buffer,
size_t packet_length){
switch (frame.type){
case STREAM_FRAME:
if (!AppendStreamFrame(frame.stream_frame, last_frame_in_packet,
&writer)) {
QUIC_BUG << "AppendStreamFrame failed";
return 0;
}
break;
}
}
//↑
void QuicPacketCreator::SerializePacket(char* encrypted_buffer,
size_t encrypted_buffer_len){
size_t length = framer_->BuildDataPacket(header, queued_frames_,
encrypted_buffer, packet_size_);
}
//↑
void QuicPacketCreator::Flush(){
SerializePacket(serialized_packet_buffer, kMaxPacketSize);
OnSerializedPacket();//拿到数据,进行回调
}
//↓
void QuicPacketCreator::OnSerializedPacket()
{
SerializedPacket packet(std::move(packet_));
ClearPacket();
delegate_->OnSerializedPacket(&packet);
}
//上文的delegate_为QuicConnection
void QuicConnection::OnSerializedPacket(SerializedPacket* serialized_packet){
SendOrQueuePacket(serialized_packet);
}
void QuicConnection::SendOrQueuePacket(SerializedPacket* packet) {
if (!queued_packets_.empty() || !WritePacket(packet)) {
}
}
bool QuicConnection::WritePacket(SerializedPacket* packet){
WriteResult result = writer_->WritePacket(
packet->encrypted_buffer, encrypted_length, self_address().host(),
peer_address(), per_packet_options_);
}
quic::WriteResult QuicChromiumPacketWriter::WritePacket(
const char* buffer,
size_t buf_len,
const quic::QuicIpAddress& self_address,
const quic::QuicSocketAddress& peer_address,
quic::PerPacketOptions* /*options*/) {
DCHECK(!IsWriteBlocked());
SetPacket(buffer, buf_len);
return WritePacketToSocketImpl();
}
//发送到网络中,google封装的socket逻辑。
quic::WriteResult QuicChromiumPacketWriter::WritePacketToSocketImpl() {
int rv = socket_->Write(packet_.get(), packet_->size(), write_callback_,
kTrafficAnnotation);
}
解决问题,Flush被那个类调用?
//将缓冲区发送出去的一个总的入口,可以看看这个类都被那些函数调用
void QuicStream::WriteBufferedData() {
QuicConsumedData consumed_data =
WritevDataInner(write_length, stream_bytes_written(), fin);
}
QuicConsumedData QuicStream::WritevDataInner(size_t write_length,
QuicStreamOffset offset,
bool fin) {
StreamSendingState state = fin ? FIN : NO_FIN;
if (fin && add_random_padding_after_fin_) {
state = FIN_AND_PADDING;
}
return session()->WritevData(this, id(), write_length, offset, state);
}
QuicConsumedData QuicSession::WritevData(QuicStream* stream,
QuicStreamId id,
size_t write_length,
QuicStreamOffset offset,
StreamSendingState state)
{
QuicConsumedData data =
connection_->SendStreamData(id, write_length, offset, state);
}
QuicConsumedData QuicConnection::SendStreamData(QuicStreamId id,
size_t write_length,
QuicStreamOffset offset,
StreamSendingState state){
return packet_generator_.ConsumeData(id, write_length, offset, state);
}
QuicConsumedData QuicPacketGenerator::ConsumeData(QuicStreamId id,
size_t write_length,
QuicStreamOffset offset,
StreamSendingState state){
if (!packet_creator_.ConsumeData(id, write_length, total_bytes_consumed,
offset + total_bytes_consumed, fin,
has_handshake, &frame)) {
// The creator is always flushed if there's not enough room for a new
// stream frame before ConsumeData, so ConsumeData should always succeed.
QUIC_BUG << "Failed to ConsumeData, stream:" << id;
return QuicConsumedData(0, false);
}
packet_creator_.Flush();
}
bool QuicPacketCreator::ConsumeData(QuicStreamId id,
size_t write_length,
size_t iov_offset,
QuicStreamOffset offset,
bool fin,
bool needs_full_padding,
QuicFrame* frame){
CreateStreamFrame(id, write_length, iov_offset, offset, fin, frame);
if (!AddFrame(*frame, /*save_retransmittable_frames=*/true)) {
// Fails if we try to write unencrypted stream data.
return false;
}
}
其中:WriteBufferedData在WriteOrBufferData中被调用过,基本又回到代码分析的起点。
丢失数据包重传:
bool QuicSentPacketManager::OnAckFrameEnd(QuicTime ack_receive_time){
for (AckedPacket& acked_packet : packets_acked_){
MarkPacketHandled(acked_packet.packet_number, info,
last_ack_frame_.ack_delay_time);
}
PostProcessAfterMarkingPacketHandled(last_ack_frame_, ack_receive_time,
rtt_updated_, prior_bytes_in_flight);
}
void QuicSentPacketManager::PostProcessAfterMarkingPacketHandled(
const QuicAckFrame& ack_frame,
QuicTime ack_receive_time,
bool rtt_updated,
QuicByteCount prior_bytes_in_flight) {
InvokeLossDetection(ack_receive_time);
}
void QuicSentPacketManager::InvokeLossDetection(QuicTime time){
MarkForRetransmission(packet.packet_number, LOSS_RETRANSMISSION);
}
//MarkForRetransmission is called for different retransmission type
void QuicSentPacketManager::MarkForRetransmission(
QuicPacketNumber packet_number,
TransmissionType transmission_type){
QuicTransmissionInfo* transmission_info =
unacked_packets_.GetMutableTransmissionInfo(packet_number);
HandleRetransmission(transmission_type, transmission_info);
}
void QuicSentPacketManager::HandleRetransmission(
TransmissionType transmission_type,
QuicTransmissionInfo* transmission_info){
unacked_packets_.RetransmitFrames(*transmission_info, transmission_type);
}
void QuicUnackedPacketMap::RetransmitFrames(const QuicTransmissionInfo& info,
TransmissionType type) {
DCHECK(session_decides_what_to_write_);
session_notifier_->RetransmitFrames(info.retransmittable_frames, type);
}
void QuicSession::RetransmitFrames(const QuicFrames& frames,
TransmissionType type){
if (stream != nullptr &&
!stream->RetransmitStreamData(frame.stream_frame.offset,
frame.stream_frame.data_length,
frame.stream_frame.fin)) {
break;
}
}
bool QuicStream::RetransmitStreamData(QuicStreamOffset offset,
QuicByteCount data_length,
bool fin) {
consumed = session()->WritevData(this, id_, retransmission_length,
retransmission_offset,
can_bundle_fin ? FIN : NO_FIN);
}
quic协议packet_number与offset信息的记录。
void QuicConnection::SendOrQueuePacket(SerializedPacket* packet){
if (!queued_packets_.empty() || !WritePacket(packet)) {
// Take ownership of the underlying encrypted packet.
//原来的encrypted_buffer指向的是函数栈内的内存
packet->encrypted_buffer = CopyBuffer(*packet);
queued_packets_.push_back(*packet);
packet->retransmittable_frames.clear();
}
}
bool QuicConnection::WritePacket(SerializedPacket* packet){
bool reset_retransmission_alarm = sent_packet_manager_.OnPacketSent(
packet, packet->original_packet_number, packet_send_time,
packet->transmission_type, IsRetransmittable(*packet));
}
bool QuicSentPacketManager::OnPacketSent(
SerializedPacket* serialized_packet,
QuicPacketNumber original_packet_number,
QuicTime sent_time,
TransmissionType transmission_type,
HasRetransmittableData has_retransmittable_data){
QuicPacketNumber packet_number = serialized_packet->packet_number;
unacked_packets_.AddSentPacket(serialized_packet, original_packet_number,
transmission_type, sent_time, in_flight);
}
void QuicUnackedPacketMap::AddSentPacket(SerializedPacket* packet,
QuicPacketNumber old_packet_number,
TransmissionType transmission_type,
QuicTime sent_time,
bool set_in_flight){
QuicTransmissionInfo info(
packet->encryption_level, packet->packet_number_length, transmission_type,
sent_time, bytes_sent, has_crypto_handshake, packet->num_padding_bytes);
}