quic-go 客户端代码阅读记录

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/u010643777/article/details/81316489

 Why do I spare time on reading go quic code? I prepare to collecting some data based this code for my paper. Since forged data can not verify a good idea. Hence, since yesterday, I began learning go language and wrote some go codes. Here, I will analysis how quic client send data out.

//example/client/main.go
    roundTripper := &h2quic.RoundTripper{
        QuicConfig: &quic.Config{Versions: versions},
    }
    hclient := &http.Client{
        Transport: roundTripper,
    }   
rsp, err := hclient.Get(addr)
//go/src/net/http/cient.go
func (c *Client) Get(url string) (resp *Response, err error) {
    req, err := NewRequest("GET", url, nil)
    if err != nil {
        return nil, err
    }
    return c.Do(req)
}
func (c *Client) Do(req *Request) (*Response, error) {
if resp, didTimeout, err = c.send(req, deadline); err != nil {
}
}
func (c *Client) send(req *Request, deadline time.Time) (resp *Response, didTimeout func() bool, err error) {
    resp, didTimeout, err = send(req, c.transport(), deadline)
}
func send(ireq *Request, rt RoundTripper, deadline time.Time) (resp *Response, didTimeout func() bool, err error) {
    resp, err = rt.RoundTrip(req)
}

 这里的rt就是h2quic.RoundTripper.

// RoundTrip does a round trip.
func (r *RoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
    return r.RoundTripOpt(req, RoundTripOpt{})
}
func (r *RoundTripper) RoundTripOpt(req *http.Request, opt RoundTripOpt) (*http.Response, error) {
cl, err := r.getClient(hostname, opt.OnlyCachedConn)
return cl.RoundTrip(req)
}

 getClien创建client对象。

//quic-go/h2quic/client.go
func (c *client) RoundTrip(req *http.Request) (*http.Response, error) {
    dataStream, err := c.session.OpenStreamSync()
    if hasBody {
        go func() {
            resc <- c.writeRequestBody(dataStream, req.Body)
        }()
    }
}
func (c *client) writeRequestBody(dataStream quic.Stream, body io.ReadCloser) (err error) {
//将数据copy到dataStream,也就是说这个dataStream实现了io中的Write(p []byte)接口io.Copy中会调用Write函数
_, err = io.Copy(dataStream, body)
}

 这里就不过多废话了,直接来到send_stream.go中实现的Write函数。

//quic-go/send_stream.go
func (s *sendStream) Write(p []byte) (int, error) {
    s.dataForWriting = make([]byte, len(p))
    copy(s.dataForWriting, p)
    s.sender.onHasStreamData(s.streamID)
}

 目前数据已被拷贝到dataForWriting缓冲区中。onHasStreamData是个接口函数,session中实现了这个接口。

//quic-go/session.go
func (s *session) onHasStreamData(id protocol.StreamID) {
    s.streamFramer.AddActiveStream(id)
    s.scheduleSending()
}
func (s *session) scheduleSending() {
    select {
    case s.sendingScheduled <- struct{}{}:
    default:
    }
}

  s.sendingScheduled是个channel。信号的接收处理逻辑在哪?

func (s *session) run() error {
        case <-s.sendingScheduled:

        if err := s.sendPackets(); err != nil {
            s.closeLocal(err)
        }
}
func (s *session) sendPackets() error {
sentPacket, err := s.sendPacket()
}
func (s *session) sendPacket() (bool, error) {
packet, err := s.packer.PackPacket()
//放入历史队列中,为了重传
    s.sentPacketHandler.SentPacket(packet.ToAckHandlerPacket())
    if err := s.sendPackedPacket(packet); err != nil {
        return false, err
    }
}
func (s *session) sendPackedPacket(packet *packedPacket) error {
    defer putPacketBuffer(&packet.raw)
    s.logPacket(packet)
    //将数据发送出去
    return s.conn.Write(packet.raw)
}
//quic-go/packet_packer.go
func (p *packetPacker) PackPacket() (*packedPacket, error) {
    payloadFrames, err := p.composeNextPacket(maxSize, p.canSendData(encLevel))
}
func (p *packetPacker) composeNextPacket(
    maxFrameSize protocol.ByteCount,
    canSendStreamFrames bool,
) ([]wire.Frame, error) {
    fs := p.streams.PopStreamFrames(maxFrameSize - payloadLength)
}

func (f *streamFramer) PopStreamFrames(maxTotalLen protocol.ByteCount) []*wire.StreamFrame {
        frame, hasMoreData := str.popStreamFrame(maxTotalLen - currentLen)
}

 数据是在send_stream中的dataForWriting的slice中存储的,需要追踪下这个函数的逻辑str.popStreamFrame,怎么到stream中拿取数据?

//quic-go/stream_framer.go?line=64
func (f *streamFramer) PopStreamFrames(maxTotalLen protocol.ByteCount) []*wire.StreamFrame {
//一种stream的轮训调度的机制
        id := f.streamQueue[0]
        f.streamQueue = f.streamQueue[1:]
        str, err := f.streamGetter.GetOrOpenSendStream(id)
        frame, hasMoreData := str.popStreamFrame(maxTotalLen - currentLen)
}
//popStreamFrame这个函数是个接口,直接找到其实现
//quic-go/send_stream.go?line=135
func (s *sendStream) popStreamFrame(maxBytes protocol.ByteCount) (*wire.StreamFrame, bool /* has more data to send */) {
    completed, frame, hasMoreData := s.popStreamFrameImpl(maxBytes)
    if completed {
        s.sender.onStreamCompleted(s.streamID)
    }
    return frame, hasMoreData
}
func (s *sendStream) popStreamFrameImpl(maxBytes protocol.ByteCount) (bool /* completed */, *wire.StreamFrame, bool /* has more data to send */) {
//从dataForWriting获取数据 
frame.Data, frame.FinBit = s.getDataForWriting(maxDataLen)
}

 关于重传。s.sentPacketHandler.SentPacket(packet.ToAckHandlerPacket())在数据包发送出去前,会将数据放入历史队列。

func (h *sentPacketHandler) SentPacket(packet *Packet) {
    if isRetransmittable := h.sentPacketImpl(packet); isRetransmittable {
        h.packetHistory.SentPacket(packet)
        h.updateLossDetectionAlarm()
    }
}
func (h *sentPacketHistory) SentPacket(p *Packet) {
    h.sentPacketImpl(p)
}
func (h *sentPacketHistory) sentPacketImpl(p *Packet) *PacketElement {
    el := h.packetList.PushBack(*p)
}

 那么必有一个逻辑,需要判断已传输的数据包是否丢失。

//quic-go/internal/ackhandler/sent_packet_handler.go
func (h *sentPacketHandler) detectLostPackets(now time.Time, priorInFlight protocol.ByteCount) error {
    h.lossTime = time.Time{}
        h.packetHistory.Iterate(func(packet *Packet) (bool, error) {
        if packet.PacketNumber > h.largestAcked {
            return false, nil
        }
    for _, p := range lostPackets {
        // the bytes in flight need to be reduced no matter if this packet will be retransmitted
        if p.includedInBytesInFlight {
            h.bytesInFlight -= p.Length
            h.congestion.OnPacketLost(p.PacketNumber, p.Length, priorInFlight)
        }
        if p.canBeRetransmitted {
            // queue the packet for retransmission, and report the loss to the congestion controller
            if err := h.queuePacketForRetransmission(p); err != nil {
                return err
            }
        }
        h.packetHistory.Remove(p.PacketNumber)
    }
}

 问题来了,detectLostPackets何时被调用。当然是有ack到来的时候或者定时器超时,才能判断数据包是否丢失。

// on ack frame
func (s *session) run() error {
        case p := <-s.receivedPackets:
            err := s.handlePacketImpl(p)
}
func (s *session) handlePacketImpl(p *receivedPacket) error {
    return s.handleFrames(packet.frames, packet.encryptionLevel)
}
func (s *session) handleFrames(fs []wire.Frame, encLevel protocol.EncryptionLevel) error {
        case *wire.AckFrame:
            err = s.handleAckFrame(frame, encLevel)
}
func (s *session) handleAckFrame(frame *wire.AckFrame, encLevel protocol.EncryptionLevel) error {
    if err := s.sentPacketHandler.ReceivedAck(frame, s.lastRcvdPacketNumber, encLevel, s.lastNetworkActivityTime); err != nil {
        return err
    }
    s.receivedPacketHandler.IgnoreBelow(s.sentPacketHandler.GetLowestPacketNotConfirmedAcked())
    return nil
}
func (h *sentPacketHandler) ReceivedAck(ackFrame *wire.AckFrame, withPacketNumber protocol.PacketNumber, encLevel protocol.EncryptionLevel, rcvTime time.Time) error {
    if err := h.detectLostPackets(rcvTime, priorInFlight); err != nil {
        return err
    }
}
//time out
func (h *sentPacketHandler) OnAlarm() error {
err = h.detectLostPackets(now, h.bytesInFlight)
}
func (s *session) maybeSendRetransmission() (bool, error) {
    var retransmitPacket *ackhandler.Packet
    for {
        retransmitPacket = s.sentPacketHandler.DequeuePacketForRetransmission()
        if retransmitPacket == nil {
            return false, nil
        }
    packets, err := s.packer.PackRetransmission(retransmitPacket)
    ackhandlerPackets := make([]*ackhandler.Packet, len(packets))
    for i, packet := range packets {
        ackhandlerPackets[i] = packet.ToAckHandlerPacket()
    }       s.sentPacketHandler.SentPacketsAsRetransmission(ackhandlerPackets, retransmitPacket.PacketNumber)
    for _, packet := range packets {
        if err := s.sendPackedPacket(packet); err != nil {
            return false, err
        }
    }
}

s.sentPacketHandler.SentPacketsAsRetransmission这个函数会见重传的数据包再次放入历史队列。因为数据包序号已经发生了变化,quic协议中每次数据包重传都使用新的序列号。因此,在发送重传的数据包时候,也就会发送Stop_Waiting_Fream,告诉对端不用等待以前的序列号了。

//quic-go/internal/ackhandler/sent_packet_handler.go
func (h *sentPacketHandler) SentPacketsAsRetransmission(packets []*Packet, retransmissionOf protocol.PacketNumber) {
    h.packetHistory.SentPacketsAsRetransmission(p, retransmissionOf)
}
func (h *sentPacketHistory) SentPacketsAsRetransmission(packets []*Packet, retransmissionOf protocol.PacketNumber) {
    if !ok {
        for _, packet := range packets {
            h.sentPacketImpl(packet)
        }
        return
    }
}

猜你喜欢

转载自blog.csdn.net/u010643777/article/details/81316489