Why do I spare time on reading go quic code? I prepare to collecting some data based this code for my paper. Since forged data can not verify a good idea. Hence, since yesterday, I began learning go language and wrote some go codes. Here, I will analysis how quic client send data out.
//example/client/main.go
roundTripper := &h2quic.RoundTripper{
QuicConfig: &quic.Config{Versions: versions},
}
hclient := &http.Client{
Transport: roundTripper,
}
rsp, err := hclient.Get(addr)
//go/src/net/http/cient.go
func (c *Client) Get(url string) (resp *Response, err error) {
req, err := NewRequest("GET", url, nil)
if err != nil {
return nil, err
}
return c.Do(req)
}
func (c *Client) Do(req *Request) (*Response, error) {
if resp, didTimeout, err = c.send(req, deadline); err != nil {
}
}
func (c *Client) send(req *Request, deadline time.Time) (resp *Response, didTimeout func() bool, err error) {
resp, didTimeout, err = send(req, c.transport(), deadline)
}
func send(ireq *Request, rt RoundTripper, deadline time.Time) (resp *Response, didTimeout func() bool, err error) {
resp, err = rt.RoundTrip(req)
}
这里的rt就是h2quic.RoundTripper.
// RoundTrip does a round trip.
func (r *RoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
return r.RoundTripOpt(req, RoundTripOpt{})
}
func (r *RoundTripper) RoundTripOpt(req *http.Request, opt RoundTripOpt) (*http.Response, error) {
cl, err := r.getClient(hostname, opt.OnlyCachedConn)
return cl.RoundTrip(req)
}
getClien创建client对象。
//quic-go/h2quic/client.go
func (c *client) RoundTrip(req *http.Request) (*http.Response, error) {
dataStream, err := c.session.OpenStreamSync()
if hasBody {
go func() {
resc <- c.writeRequestBody(dataStream, req.Body)
}()
}
}
func (c *client) writeRequestBody(dataStream quic.Stream, body io.ReadCloser) (err error) {
//将数据copy到dataStream,也就是说这个dataStream实现了io中的Write(p []byte)接口io.Copy中会调用Write函数
_, err = io.Copy(dataStream, body)
}
这里就不过多废话了,直接来到send_stream.go中实现的Write函数。
//quic-go/send_stream.go
func (s *sendStream) Write(p []byte) (int, error) {
s.dataForWriting = make([]byte, len(p))
copy(s.dataForWriting, p)
s.sender.onHasStreamData(s.streamID)
}
目前数据已被拷贝到dataForWriting缓冲区中。onHasStreamData是个接口函数,session中实现了这个接口。
//quic-go/session.go
func (s *session) onHasStreamData(id protocol.StreamID) {
s.streamFramer.AddActiveStream(id)
s.scheduleSending()
}
func (s *session) scheduleSending() {
select {
case s.sendingScheduled <- struct{}{}:
default:
}
}
s.sendingScheduled是个channel。信号的接收处理逻辑在哪?
func (s *session) run() error {
case <-s.sendingScheduled:
if err := s.sendPackets(); err != nil {
s.closeLocal(err)
}
}
func (s *session) sendPackets() error {
sentPacket, err := s.sendPacket()
}
func (s *session) sendPacket() (bool, error) {
packet, err := s.packer.PackPacket()
//放入历史队列中,为了重传
s.sentPacketHandler.SentPacket(packet.ToAckHandlerPacket())
if err := s.sendPackedPacket(packet); err != nil {
return false, err
}
}
func (s *session) sendPackedPacket(packet *packedPacket) error {
defer putPacketBuffer(&packet.raw)
s.logPacket(packet)
//将数据发送出去
return s.conn.Write(packet.raw)
}
//quic-go/packet_packer.go
func (p *packetPacker) PackPacket() (*packedPacket, error) {
payloadFrames, err := p.composeNextPacket(maxSize, p.canSendData(encLevel))
}
func (p *packetPacker) composeNextPacket(
maxFrameSize protocol.ByteCount,
canSendStreamFrames bool,
) ([]wire.Frame, error) {
fs := p.streams.PopStreamFrames(maxFrameSize - payloadLength)
}
func (f *streamFramer) PopStreamFrames(maxTotalLen protocol.ByteCount) []*wire.StreamFrame {
frame, hasMoreData := str.popStreamFrame(maxTotalLen - currentLen)
}
数据是在send_stream中的dataForWriting的slice中存储的,需要追踪下这个函数的逻辑str.popStreamFrame,怎么到stream中拿取数据?
//quic-go/stream_framer.go?line=64
func (f *streamFramer) PopStreamFrames(maxTotalLen protocol.ByteCount) []*wire.StreamFrame {
//一种stream的轮训调度的机制
id := f.streamQueue[0]
f.streamQueue = f.streamQueue[1:]
str, err := f.streamGetter.GetOrOpenSendStream(id)
frame, hasMoreData := str.popStreamFrame(maxTotalLen - currentLen)
}
//popStreamFrame这个函数是个接口,直接找到其实现
//quic-go/send_stream.go?line=135
func (s *sendStream) popStreamFrame(maxBytes protocol.ByteCount) (*wire.StreamFrame, bool /* has more data to send */) {
completed, frame, hasMoreData := s.popStreamFrameImpl(maxBytes)
if completed {
s.sender.onStreamCompleted(s.streamID)
}
return frame, hasMoreData
}
func (s *sendStream) popStreamFrameImpl(maxBytes protocol.ByteCount) (bool /* completed */, *wire.StreamFrame, bool /* has more data to send */) {
//从dataForWriting获取数据
frame.Data, frame.FinBit = s.getDataForWriting(maxDataLen)
}
关于重传。s.sentPacketHandler.SentPacket(packet.ToAckHandlerPacket())在数据包发送出去前,会将数据放入历史队列。
func (h *sentPacketHandler) SentPacket(packet *Packet) {
if isRetransmittable := h.sentPacketImpl(packet); isRetransmittable {
h.packetHistory.SentPacket(packet)
h.updateLossDetectionAlarm()
}
}
func (h *sentPacketHistory) SentPacket(p *Packet) {
h.sentPacketImpl(p)
}
func (h *sentPacketHistory) sentPacketImpl(p *Packet) *PacketElement {
el := h.packetList.PushBack(*p)
}
那么必有一个逻辑,需要判断已传输的数据包是否丢失。
//quic-go/internal/ackhandler/sent_packet_handler.go
func (h *sentPacketHandler) detectLostPackets(now time.Time, priorInFlight protocol.ByteCount) error {
h.lossTime = time.Time{}
h.packetHistory.Iterate(func(packet *Packet) (bool, error) {
if packet.PacketNumber > h.largestAcked {
return false, nil
}
for _, p := range lostPackets {
// the bytes in flight need to be reduced no matter if this packet will be retransmitted
if p.includedInBytesInFlight {
h.bytesInFlight -= p.Length
h.congestion.OnPacketLost(p.PacketNumber, p.Length, priorInFlight)
}
if p.canBeRetransmitted {
// queue the packet for retransmission, and report the loss to the congestion controller
if err := h.queuePacketForRetransmission(p); err != nil {
return err
}
}
h.packetHistory.Remove(p.PacketNumber)
}
}
问题来了,detectLostPackets何时被调用。当然是有ack到来的时候或者定时器超时,才能判断数据包是否丢失。
// on ack frame
func (s *session) run() error {
case p := <-s.receivedPackets:
err := s.handlePacketImpl(p)
}
func (s *session) handlePacketImpl(p *receivedPacket) error {
return s.handleFrames(packet.frames, packet.encryptionLevel)
}
func (s *session) handleFrames(fs []wire.Frame, encLevel protocol.EncryptionLevel) error {
case *wire.AckFrame:
err = s.handleAckFrame(frame, encLevel)
}
func (s *session) handleAckFrame(frame *wire.AckFrame, encLevel protocol.EncryptionLevel) error {
if err := s.sentPacketHandler.ReceivedAck(frame, s.lastRcvdPacketNumber, encLevel, s.lastNetworkActivityTime); err != nil {
return err
}
s.receivedPacketHandler.IgnoreBelow(s.sentPacketHandler.GetLowestPacketNotConfirmedAcked())
return nil
}
func (h *sentPacketHandler) ReceivedAck(ackFrame *wire.AckFrame, withPacketNumber protocol.PacketNumber, encLevel protocol.EncryptionLevel, rcvTime time.Time) error {
if err := h.detectLostPackets(rcvTime, priorInFlight); err != nil {
return err
}
}
//time out
func (h *sentPacketHandler) OnAlarm() error {
err = h.detectLostPackets(now, h.bytesInFlight)
}
func (s *session) maybeSendRetransmission() (bool, error) {
var retransmitPacket *ackhandler.Packet
for {
retransmitPacket = s.sentPacketHandler.DequeuePacketForRetransmission()
if retransmitPacket == nil {
return false, nil
}
packets, err := s.packer.PackRetransmission(retransmitPacket)
ackhandlerPackets := make([]*ackhandler.Packet, len(packets))
for i, packet := range packets {
ackhandlerPackets[i] = packet.ToAckHandlerPacket()
} s.sentPacketHandler.SentPacketsAsRetransmission(ackhandlerPackets, retransmitPacket.PacketNumber)
for _, packet := range packets {
if err := s.sendPackedPacket(packet); err != nil {
return false, err
}
}
}
s.sentPacketHandler.SentPacketsAsRetransmission这个函数会见重传的数据包再次放入历史队列。因为数据包序号已经发生了变化,quic协议中每次数据包重传都使用新的序列号。因此,在发送重传的数据包时候,也就会发送Stop_Waiting_Fream,告诉对端不用等待以前的序列号了。
//quic-go/internal/ackhandler/sent_packet_handler.go
func (h *sentPacketHandler) SentPacketsAsRetransmission(packets []*Packet, retransmissionOf protocol.PacketNumber) {
h.packetHistory.SentPacketsAsRetransmission(p, retransmissionOf)
}
func (h *sentPacketHistory) SentPacketsAsRetransmission(packets []*Packet, retransmissionOf protocol.PacketNumber) {
if !ok {
for _, packet := range packets {
h.sentPacketImpl(packet)
}
return
}
}