概述
KCP是一种网络传输协议(ARQ,自动重传请求),可以视它为TCP的代替品,但是它运行于用户空间,它不管底层的发送与接收,只是个纯算法实现可靠传输,它的特点是牺牲带宽来降低延迟。因为TCP协议的大公无私,经常牺牲自己速度来减少网络拥塞,它是从大局上考虑的。而KCP是自私的,它只顾自己的传输效率,从不管整个网络的拥塞情况。
举个例子,TCP检测到丢包的时候,首先想到的是网络拥塞了,要放慢自己的速度别让网络更糟,而KCP想到的赶紧重传别耽误事。
为了提高udp可靠性,在udp协议上封装一层可靠性传输机制(类似tcp的ACK机制、重传机制、序号机制、重排机制、窗口机制),就做到了兼具tcp的安全性(流量控制和拥塞控制等)和udp的实时性,并且具备一定的灵活性(超时重传、ack等),其中一个代表是kcp协议。
KCP的特点是可靠传输(累积确认、超时重传、选择确认)、流量控制(滑动窗口)、拥塞控制(慢开始、拥塞避免、快重传、快恢复)、面向连接。KCP对这些参数基本都可配,也没有建立或关闭连接的过程。
当前走读的版本,为 KCP v1.7
基本数据结构
IKCPSEG
KICPSEG 存储发送和接收的数据段状态。SEGMENT 是一种切片结构,为内存状态,仅有部分字段会编码到传输协议中,而ikcp_encode_seg 定义了传输协议头部结构。
struct IKCPSEG {
struct IQUEUEHEAD node; // 队列节点,IKCPSEG 作为一个队列元素,此结构指向了队列后前后元素
IUINT32 conv; // 会话编号
IUINT32 cmd; // 指令类型
IUINT32 frg; // 分片号 (fragment),发送数据大于MSS时将被分片,0为最后一个分片,意味着数据可以被recv,如果是流模式,所有分片号都为0
IUINT32 wnd; // 窗口大小
IUINT32 ts; // 时间戳
IUINT32 sn; // 序号 (sequence number)
IUINT32 una; // 未确认的序号 (unacknowledged)
IUINT32 len; // 数据长度
IUINT32 resendts; // 重传时间 (resend timestamp)
IUINT32 rto; // 重传的超时时间 (retransmission timeout)
IUINT32 fastack; // 快速确认计数 (fast acknowledge)
IUINT32 xmit; // 发送次数 (transmit)
char data[1]; // 数据内容,用于索引结构体尾部的数据,额外分配的内存扩展了运行时的 data 字段数组的实际长度
};
IKCPCB
该结构存储了KCP控制对象的所有数据、上下文状态、以及回调函数。
struct IKCPCB {
/**
* conv: 会话编号,标识这个KCP会话
* mtu: 最⼤传输单元,默认1400
* mss: 最大报文长度,默认1400-24=1376
* state: 此会话是否有效, 0:有效,~0:无效
*/
IUINT32 conv, mtu, mss, state;
/**
* snd_una: 发送的未确认数据段序号
* snd_nxt: 发送的下一个数据段序号
* rcv_nxt: 期望接收到的下一个数据段的序号
*/
IUINT32 snd_una, snd_nxt, rcv_nxt;
/**
* ts_recent: (弃用)
* ts_lastack: (弃用)
* ssthresh: 慢启动阈值
*/
IUINT32 ts_recent, ts_lastack, ssthresh;
/**
* rx_rttval: 平滑网络抖动时间
* rx_srtt: 平滑往返时间
* rx_rto: 重传超时时间
* rx_minrto: 最小重传超时时间
*/
IINT32 rx_rttval, rx_srtt, rx_rto, rx_minrto;
/**
* snd_wnd: 发送窗口大小,默认32,可通过ikcp_wndsize()修改
* rcv_wnd: 接收窗口大小,默认128,
* rmt_wnd: 远端窗口大小,默认128,由接收端发来的切片中KCP头字段 wnd 来决定
* cwnd: 拥塞窗口大小,每次通过ikcp_input()收到数据时会按照算法来增长
* probe: 窗口探测标记位,在flush时发送特殊的探测包
*/
IUINT32 snd_wnd, rcv_wnd, rmt_wnd, cwnd, probe;
/**
* current: 当前时间 (ms)
* interval: 内部flush刷新间隔
* ts_flush: 期望的下一次 update/flush 时间
* xmit: 全局重传次数计数
*/
IUINT32 current, interval, ts_flush, xmit;
/**
* nrcv_buf: 接收缓冲区长度
* nsnd_buf: 发送缓冲区长度
* nrcv_que: 接收队列长度
* nsnd_que: 发送队列长度
*/
IUINT32 nrcv_buf, nsnd_buf;
IUINT32 nrcv_que, nsnd_que;
/**
* nodelay: 是否启用nodelay模式?影响的是超时重传RTO,未开启时RTO以rto的倍数增长,否则以rx_rto的1.5倍增长
* updated: 是否调⽤过update函数??
*/
IUINT32 nodelay, updated;
/**
* ts_probe: 窗口探测标记位
* probe_wait: 零窗口探测等待时间,默认7000 (7秒)
*/
IUINT32 ts_probe, probe_wait;
/**
* dead_link: 死链接条件,默认为20,单个数据段重传次数到达此值时 kcp->state 会被设置为 UINT_MAX
* incr: 以字节长度为单位表示的拥塞窗口
*/
IUINT32 dead_link, incr;
/**
* snd_queue: 发送队列,cnt:kcp.nsnd_que
* rcv_queue: 接收队列,cnt:kcp.nsnd_buf
* snd_buf: 发送缓存区,cnt:kcp.nrcv_que
* rcv_buf: 接收缓存区,cnt:kcp.nrcv_buf
*/
struct IQUEUEHEAD snd_queue;
struct IQUEUEHEAD rcv_queue;
struct IQUEUEHEAD snd_buf;
struct IQUEUEHEAD rcv_buf;
/**
* acklist: 待发送的ACK列表,包含了序号和时间戳对(pair)的数组元素
* ackcount: acklist中ACK的数量,每个ACK在acklist中存储ts,sn两个数据共 2*sizeof(IINT32) 长度
* ackblock: 标识acklist最大可容纳的ack数量,为2的倍数
*/
IUINT32 *acklist;
IUINT32 ackcount;
IUINT32 ackblock;
/**
* user: 用户数据指针,传入到回调函数中
* buffer: 临时缓冲区,用在ikcp_output()回调上
* fastresend: 是否启用快速重传?? 0:不开启, >0:开启(触发快速重传的重复ACK个数)
* fastlimit: 快速重传最大次数限制?? 默认为5
*/
void *user;
char *buffer;
int fastresend;
int fastlimit;
/**
* nocwnd: 控流模式,0:关闭,1:不关闭
* stream: 流模式,0:包模式,1:流模式
*/
int nocwnd, stream;
/**
* logmask:日志类型
*/
int logmask;
// 数据包输出的回调
int (*output)(const char *buf, int len, struct IKCPCB *kcp, void *user);
// 日志输出的回调
void (*writelog)(const char *log, struct IKCPCB *kcp, void *user);
};
KcpHeader
KCP头部结构为ikcp_encode_seg
,一共24个字节。
/**
* KCP头一共24个字节
* 1 2 3 4
* 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
* +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
* | conv(32bit) |
* +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
* | cmd(8bit) | frg(8bit) | wnd(16bit) |
* +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
* | ts(32bit) |
* +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
* | sn(32bit) |
* +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
* | una(32bit) |
* +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
* | len(32bit) |
* +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
*/
static char *ikcp_encode_seg(char *ptr, const IKCPSEG *seg) {
ptr = ikcp_encode32u(ptr, seg->conv);
ptr = ikcp_encode8u(ptr, (IUINT8)seg->cmd);
ptr = ikcp_encode8u(ptr, (IUINT8)seg->frg);
ptr = ikcp_encode16u(ptr, (IUINT16)seg->wnd);
ptr = ikcp_encode32u(ptr, seg->ts);
ptr = ikcp_encode32u(ptr, seg->sn);
ptr = ikcp_encode32u(ptr, seg->una);
ptr = ikcp_encode32u(ptr, seg->len);
return ptr;
}
IQUEUEHEAD
该结构用于发送与接受队列和缓冲区的队列,包括:snd_queue
、rcv_queue
、snd_buf
、rcv_buf
,这是一个简单的双向链表。
从代码中可以看出,IQUEUEHEAD
可以是一个链表,也可以是一个链表中的元素。
struct IQUEUEHEAD {
struct IQUEUEHEAD *next, *prev;
};
当IQUEUEHEAD
为链表对象(queue)时,成员prev
表示链表头部元素,next
表示链表末尾元素;当IQUEUEHEAD
为链表中的元素(node)时,prev
为node的后向元素,next为node的前向元素;
队列与窗口
窗口大小
窗口(wnd)用于流量控制,用在标记snd_queue
、rcv_queue
中的一段数据。类似于TCP滑动窗口,KCP的窗口位置也一样会不断移动,从低序号向高序号移动,在这里也可以称为“滑动窗口”(sliding window),其大小会随着算法变化而变化。
对发送端来说,有三种窗口:1)拥塞窗口(cwnd)、2)发送窗口(snd_wnd)、3)远端窗口(rmt_wnd)
/**
* 发送窗口大小,默认32,可通过ikcp_wndsize()修改
*/
kcp->snd_wnd = IKCP_WND_SND;
/**
* 远端窗口大小,默认128,由接收端发来的切片中KCP头字段 wnd 来决定
*/
kcp->rmt_wnd = IKCP_WND_RCV;
/**
* 拥塞窗口大小,每次通过ikcp_input()收到数据时会按照算法来增长
* cwnd = _imin_(kcp->snd_wnd, kcp->rmt_wnd);
*/
kcp->cwnd = 0;
对接收端来说,只有一种窗口:接收窗口(rcv_wnd)
/**
* 接收窗口大小,默认128,可通过ikcp_wndsize()修改
*/
kcp->rcv_wnd = IKCP_WND_RCV;
拥塞窗口与控制
拥塞窗口不同于滑动窗口,“拥塞”是对发送端而言的,“滑动”是对接收端而言的。
需要注意的是:当开启拥塞控制时,KCP的拥塞机制与TCP类似,但在具体算法上是有区别的,如果需要排查弱网情况的优化时就是一个关注点,从当前算法来看,KCP拥塞控制的启动速度要比TCP更慢一些。
由下面代码可知,ikcp_flush
内部实现了发送端的拥塞控制逻辑。若在刷出数据 ikcp_flush 时检测到遇到丢包和快速重传则按照算法重新计算。
void ikcp_flush(ikcpcb *kcp) {
// ...
// 如果关闭拥塞控制,那么cwnd固定取kcp->snd_wnd与kcp->rmt_wnd的最小值
cwnd = _imin_(kcp->snd_wnd, kcp->rmt_wnd);
if (kcp->nocwnd == 0) cwnd = _imin_(kcp->cwnd, cwnd);
// 拥塞控制
while (_itimediff(kcp->snd_nxt, kcp->snd_una + cwnd) < 0) {
// 数据进入发送缓存 ...
}
// ...
// 当快重传发生时,触发快恢复机制
if (change) {
IUINT32 inflight = kcp->snd_nxt - kcp->snd_una;
kcp->ssthresh = inflight / 2; // 新的ssthresh为未确认序号跨度的一半
if (kcp->ssthresh < IKCP_THRESH_MIN)
kcp->ssthresh = IKCP_THRESH_MIN; // ssthresh是有底线的=2
kcp->cwnd = kcp->ssthresh + resent; // 新的拥塞窗口为新的ssthresh+快速重传ACK次数,至少是>ssthresh
kcp->incr = kcp->cwnd * kcp->mss; // 拥塞窗口字节长为 cwnd*MSS (拥塞避免)
}
// 当超时重传发生时
if (lost) {
kcp->ssthresh = cwnd / 2; // 新的ssthresh为拥塞窗口的一半
if (kcp->ssthresh < IKCP_THRESH_MIN)
kcp->ssthresh = IKCP_THRESH_MIN;
kcp->cwnd = 1; // 拥塞窗口强制归一
kcp->incr = kcp->mss; // 拥塞窗口字节长为 1*MSS (开启慢启动)
}
// ...
}
拥塞窗口是通过计算的值,在每次通过 ikcp_input()
收到数据时会按照算法来增长:
int ikcp_input(ikcpcb *kcp, const char *data, long size) {
// 其他处理 ...
/**
* 拥塞控制参数设置:
* - 慢启动阶段
* - 拥塞避免阶段
*/
if (_itimediff(kcp->snd_una, prev_una) > 0) {
if (kcp->cwnd < kcp->rmt_wnd) {
IUINT32 mss = kcp->mss;
if (kcp->cwnd < kcp->ssthresh) { // 慢启动阶段
kcp->cwnd++; // +1
kcp->incr += mss; // 拥塞窗口字节长增加一个MSS(相对激进)
} else { // 拥塞避免阶段
if (kcp->incr < mss)
kcp->incr = mss;
kcp->incr += (mss * mss) / kcp->incr + (mss / 16); // 拥塞窗口字节长缓慢增长(相对保守)
if ((kcp->cwnd + 1) * mss <= kcp->incr) {
#if 1
kcp->cwnd = (kcp->incr + mss - 1) / ((mss > 0) ? mss : 1);
#else
kcp->cwnd++;
#endif
}
}
// 如果拥塞窗口过大并超过了远端接收窗口,那么就停止增长
if (kcp->cwnd > kcp->rmt_wnd) {
kcp->cwnd = kcp->rmt_wnd;
kcp->incr = kcp->rmt_wnd * mss;
}
}
}
return 0;
}
拥塞窗口通过 kcp.cwnd
和 kcp.incr
的值来记录。由于 kcp.cwnd
记录的单位为包,需要额外的 kcp.incr
来记录以字节长度为单位表示的拥塞窗口。
1)拥塞窗口增长:
在确认数据包的过程中,每次 kcp.snd_buf
队列头部数据发生确认时,且拥塞窗口小于记录的远端窗口 kcp.rmt_wnd
时,进行拥塞窗口增长。
KCP慢启动是以固定值增长
,而到了拥塞阶段以更低斜率的固定值增长,直到触摸天花板。在该模式下,发送端会合并发送确认包。每次调用 ikcp_input()
函数时增长拥塞窗口的流程仅会执行一次,所以合并接收多个确认包并不会有多次增长拥塞窗口的效果。
- 慢启动阶段
若拥塞窗口小于慢启动阈值 kcp.ssthresh
时,处于慢启动
阶段,此时拥塞窗口增长相对激进,拥塞窗口增长一个单位
kcp->incr += mss; // incr
kcp->cwnd++; // cwnd
- 拥塞避免阶段
若拥塞窗口大于等于慢启动阈值时,处于拥塞避免
阶段,拥塞窗口增长相对保守。若 kcp.incr
每次增加 mss/16
时,需要 16 个有效 UNA 确认后才增长一个单位拥塞窗口
kcp->incr += (mss * mss) / kcp->incr + (mss / 16); // incr
kcp->cwnd = (kcp->incr + mss - 1) / ((mss > 0) ? mss : 1); // cwnd
案例图:
2)拥塞窗口减小:
当 ikcp_flush()
函数检测到跨越重传或超时丢包时,进行拥塞窗口减小。
- 快速重传
发生跨越重传时,慢启动阈值 kcp.ssthresh
设置为未确认序号跨度的一半,拥塞窗口大小为慢启动阈值加上快速重传的配置值 kcp.resend
kcp->ssthresh = (kcp->snd_nxt - kcp->snd_una) / 2; // ssthresh
kcp->cwnd = kcp->ssthresh + resent; // cwnd
kcp->incr = kcp->cwnd * kcp->mss; // incr
- 超时重传
检测到丢包超时时,慢启动阈值设置成当前拥塞窗口的一半,拥塞窗口设置为 1
kcp->ssthresh = cwnd / 2; // ssthresh
kcp->cwnd = 1; // cwnd
kcp->incr = kcp->mss; // incr
数据发送与接收流程
发送流程:ikcp_send()
、ikcp_update()
、ikcp_output()
首次数据发送
假设数据首次发送,写入长度为 4096 的数据,ikcp_send()
对其进行切片,切成3个长度为 1376-1376-1344 的包,每个包的分片 frg 分别为 2-1-0。其中,mss 的初始化为kcp->mss = kcp->mtu - 24
,这24就是KCP头的固定长度,算下来最大kcp->mss = 1376
。
由此可见,此时的发送队列以及发送缓存的情况如下:
snd_queue |
|||
seg |
sn=?? |
frg=2 |
len=1376 |
seg |
sn=?? |
frg=1 |
len=1376 |
seg |
sn=?? |
frg=0 |
len=1344 |
snd_buf |
|||
- |
- |
- |
- |
struct IKCPCB {
struct IQUEUEHEAD snd_queue; // 发送队列
}
// iqueue_init(&kcp->snd_queue);
int ikcp_send(ikcpcb *kcp, const char *buffer, int len) {
IKCPSEG *seg;
int count, i;
// ....
// 判断是否要切包? 以MSS为分界线,MSS=MTU-24
if (len <= (int)kcp->mss) count = 1;
else count = (int)((len + kcp->mss - 1) / kcp->mss);
// ...
// 数据切片,哪怕是1个字节也占一个切片
for (i = 0; i < count; i++) {
// 1~1376
int size = len > (int)kcp->mss ? (int)kcp->mss : len;
seg = ikcp_segment_new(kcp, size);
// ...
if (buffer && len > 0) memcpy(seg->data, buffer, size);
seg->len = size;
// 切片号倒序
seg->frg = (kcp->stream == 0) ? (count - i - 1) : 0;
// 元素初始化
iqueue_init(&seg->node);
// 添加到发送队列的链表尾
iqueue_add_tail(&seg->node, &kcp->snd_queue);
kcp->nsnd_que++;
// ...
}
return 0;
}
刷新队列的时钟间隔
调用接口ikcp_update
时,需要按照一个节奏来使用。会设定一个初始预期,比如:0-100-200-300-400,我们一看便知这是一个以100为间隔的数列,理想情况下,数列中每一个时间戳“滴答”的时候,update就会被执行。
但这毕竟只是理想,实际情况中很难避免因各种原因导致的偏差,在KCP中,可以容忍迟到,但绝对无法容忍早到,一旦发生早到,就会什么都不干等到下一次“滴答”时再一起处理。
void ikcp_update(ikcpcb *kcp, IUINT32 current) {
IINT32 slap;
kcp->current = current; // 当前时间戳,单位毫秒
// 初始化同步期望值
if (kcp->updated == 0) {
kcp->updated = 1;
kcp->ts_flush = kcp->current;
}
/**
* 理想中每一次update都能按照一个固定的间隔"滴答滴答"地去推进,比如:0-100-200-300-400...以此看做"期望值"
* 但实际情况肯定不是理想的那么美好,中间必然存在误差,所以每次调用时,都会去统计一下当前的时间戳和原本期望的某时间戳偏离了多少差值??
* 这个差值就是slap,它可能大于0,也可能小于0
*/
slap = (IINT32)_itimediff(kcp->current, kcp->ts_flush);
// 如果偏离值太大就重新初始化同步期望值
if (slap >= 10000 || slap < -10000) {
kcp->ts_flush = kcp->current;
slap = 0;
}
/**
* 如果偏离值小于0,表示早于计划,就等待下次再执行;如果偏离值大于0,偏移该期望值
* 这么做的目的,就是期望 ikcp_update() 执行的间隔能大于某个阈值
*
* 例如我们以一个固定的时钟间隔推进,每次间隔跨度为5个点,例如:
* 0 1 2 3 4 5
* |-----|-----|-----|-----|-----| (期望时间戳)
* ^ ^ ^ ^ ^ (实际调用update的时间戳)
*
* 但如果跨越了1个间隔,那就是异常,需要重新演化时钟,例如:
* 0 1 2 3 4 5
* |-----|-----|-----|-----|-----| (期望时间戳)
* ^ ^ ^ ^ (实际中出现了1个或以上间隔的跨越)
*
* 于是,就要矫正这个时钟
* 0 1 2 3 4 5
* |-----|-----|-----|-----|-----| (期望时间戳)
* ^ ^ ^ ^ (实际中出现了1个或以上间隔的跨越)
* 3 4 5 6
* |-----|-----|-----| (期望时间戳矫正)
* ^ ^ ^
*/
if (slap >= 0) {
kcp->ts_flush += kcp->interval;
if (_itimediff(kcp->current, kcp->ts_flush) >= 0) {
kcp->ts_flush = kcp->current + kcp->interval;
}
ikcp_flush(kcp); // 处理队列中的数据
}
}
写入发送缓存区
其中的ikcp_flush()
内部会做什么呢?从中我们发现:
- 有以下几种数据会被发送:1)确认应答(IKCP_CMD_WASK);2)窗口探测(IKCP_CMD_WASK);3)窗口应答(IKCP_CMD_WINS);4)正常数据(IKCP_CMD_PUSH);
- 确认回复(ACK)并非即刻的,而受到 ikcp_update() 调用频率的影响(这里就会产生一定的延迟);如果想要立刻回复,只能靠自己改代码;
- 正常数据将在此从发送队列(snd_queue)被转移到发送缓存(snd_buf),但具体转移量会受到拥塞窗口的影响,哪怕发送队列里还有很多数据等待发送;
- 拥塞控制策略,包括ssthresh、cwnd、incr的更新调整;
此时,发送队列与发送缓存的情况如下(首次写入时,拥塞窗口cwnd==1)
snd_queue |
|||
- |
- |
- |
- |
seg |
sn=?? |
frg=1 |
len=1376 |
seg |
sn=?? |
frg=0 |
len=1344 |
snd_buf |
|||
seg |
sn=0 |
frg=2 |
len=1376 |
void ikcp_flush(ikcpcb *kcp) {
IUINT32 current = kcp->current;
char *buffer = kcp->buffer;
char *ptr = buffer;
// ...
seg.cmd = IKCP_CMD_ACK;
// ...
/**
* IKCP_CMD_ACK
* 先刷新ACK列表的数据:kcp->acklist
* 套上KCP头后写入buffer,若撑满一个MTU大小就直接回调上去
*/
count = (int)kcp->ackcount;
for (i = 0; i < count; i++) {
size = (int)(ptr - buffer);
if (size + (int)IKCP_OVERHEAD > (int)kcp->mtu) {
ikcp_output(kcp, buffer, size);
ptr = buffer;
}
ikcp_ack_get(kcp, i, &seg.sn, &seg.ts);
ptr = ikcp_encode_seg(ptr, &seg);
}
kcp->ackcount = 0;
/**
* IKCP_CMD_WASK
* 如果远端窗口为0,那就需要刷新1条窗口探测消息
* 套上KCP头后写入buffer,若撑满一个MTU大小就直接回调上去
*/
if (kcp->rmt_wnd == 0) {
// ...
kcp->probe |= IKCP_ASK_SEND;
} else {
kcp->ts_probe = 0;
kcp->probe_wait = 0;
}
if (kcp->probe & IKCP_ASK_SEND) {
seg.cmd = IKCP_CMD_WASK;
size = (int)(ptr - buffer);
if (size + (int)IKCP_OVERHEAD > (int)kcp->mtu) {
ikcp_output(kcp, buffer, size);
ptr = buffer;
}
ptr = ikcp_encode_seg(ptr, &seg);
}
/**
* IKCP_CMD_WINS
* 如果有窗口应答需要,那就再刷新1条窗口应答消息
* 套上KCP头后写入buffer,若撑满一个MTU大小就直接回调上去
*/
if (kcp->probe & IKCP_ASK_TELL) {
seg.cmd = IKCP_CMD_WINS;
size = (int)(ptr - buffer);
if (size + (int)IKCP_OVERHEAD > (int)kcp->mtu) {
ikcp_output(kcp, buffer, size);
ptr = buffer;
}
ptr = ikcp_encode_seg(ptr, &seg);
}
kcp->probe = 0;
// 更新拥塞窗口大小
cwnd = _imin_(kcp->snd_wnd, kcp->rmt_wnd);
// ...
/**
* 将snd_queue的数据逐一追加到snd_buf
* 发送缓存内的数据包括: 1)已发送但尚未收到确认的; 2)尚未发送的;
* 其中,snd_una为滑动窗头, snd_nxt为滑动窗尾,受到滑动窗口大小影响,首包cwnd==1
*
* |<----- cwnd ---->|
* |<--------------- snd_wnd -------------->|
* |<--- rmt_wnd --->|
* -----|--1--|--2--|--3--|--4--|--(...)--|--32--|----- (snd_buf)
* ^ ^
* (snd_una) (snd_nxt)
*/
while (_itimediff(kcp->snd_nxt, kcp->snd_una + cwnd) < 0) { // 拥塞控制
IKCPSEG *newseg;
if (iqueue_is_empty(&kcp->snd_queue))
break;
newseg = iqueue_entry(kcp->snd_queue.next, IKCPSEG, node);
iqueue_del(&newseg->node); // 从发送队列中移除
iqueue_add_tail(&newseg->node, &kcp->snd_buf); // 再添加到发送缓存队尾
// ...
newseg->cmd = IKCP_CMD_PUSH; // 正常数据包
// ...
}
// 计算丢包重传
resent = (kcp->fastresend > 0) ? (IUINT32)kcp->fastresend : 0xffffffff;
rtomin = (kcp->nodelay == 0) ? (kcp->rx_rto >> 3) : 0;
/**
* IKCP_CMD_PUSH
* 刷新发送缓存中的数据包
* 套上KCP头后写入buffer,若撑满一个MTU大小就直接回调上去
*/
for (p = kcp->snd_buf.next; p != &kcp->snd_buf; p = p->next) {
IKCPSEG *segment = iqueue_entry(p, IKCPSEG, node);
int needsend = 0;
if (segment->xmit == 0) {
needsend = 1;
// 首次发送 ...
} else if (_itimediff(current, segment->resendts) >= 0) {
needsend = 1;
// 超时重传 ...
} else if (segment->fastack >= resent) {
needsend = 1;
// 快速重传 ...
}
if (needsend) {
// 套上KCP头后写入buffer,若撑满一个MTU大小就直接回调上去
// ...
}
}
// 刷新buffer中剩余的数据并回调
size = (int)(ptr - buffer);
if (size > 0) {
ikcp_output(kcp, buffer, size);
}
// 拥塞控制 ...
}
这里要注意这么一句代码:
while (_itimediff(kcp->snd_nxt, kcp->snd_una + cwnd) < 0) { ... }
意思就是每次将数据从发送队列迁移到发送缓存的时候,具体可发送多少量是受到cwnd控制的。首次发送时只有一个包,随后遵循慢启动
、拥塞避免
、快速恢复
的算法机制。
ickp_flush()
调用的过程中,可能会发生多次ikcp_output()
回调,把snd_buf
中不同类型的消息一次又一次回调上去发送,优先级按以下顺序排列:
IKCP_CMD_ACK、IKCP_CMD_WASK、IKCP_CMD_WINS、IKCP_CMD_PUSH
数据接收和处理
通过调用ikcp_input()
实现对数据的接收处理,输入数据包后,会进行解析包头以及合法性检查。
- UNA确认:
收到的任何包都会先尝试进行 UNA 确认,通过ACK包的 seg.una
值确认并移除了所有 kcp.snd_buf
队列中 seg.sn
值小于 una
值的包
- ACK确认:
先通过ACK消息计算RTT、SRTT、RTTVAL、RTO等数据,然后该ACK对应的包若在发送缓冲中,将之移除
- 普通数据:
当前包的序列号在接收窗口范围内时,先将该包的 sn
,ts
放入 kcp->acklist
,等待 ikcp_flush()
发送ACK确认,然后解析数据包,将包存入 kcp->rcv_queue
int ikcp_input(ikcpcb *kcp, const char *data, long size) {
// ...
while (1) {
// ...
if (size < (int)IKCP_OVERHEAD)
break;
// 解析KCP头共24个字节
data = ikcp_decode32u(data, &conv);
if (conv != kcp->conv)
return -1;
data = ikcp_decode8u(data, &cmd);
data = ikcp_decode8u(data, &frg);
data = ikcp_decode16u(data, &wnd);
data = ikcp_decode32u(data, &ts);
data = ikcp_decode32u(data, &sn);
data = ikcp_decode32u(data, &una);
data = ikcp_decode32u(data, &len);
size -= IKCP_OVERHEAD;
if ((long)size < (long)len || (int)len < 0)
return -2;
// 只有四种类型数据
if (cmd != IKCP_CMD_PUSH && cmd != IKCP_CMD_ACK && cmd != IKCP_CMD_WASK && cmd != IKCP_CMD_WINS)
return -3;
// 根据消息头中的wnd字段获取远端接收窗口大小
kcp->rmt_wnd = wnd;
// 解析消息头中的una字段获取未确认的序列号,所有包都会先经过una确认
ikcp_parse_una(kcp, una);
// move kcp->snd_una
ikcp_shrink_buf(kcp);
// ...
if (cmd == IKCP_CMD_ACK) {
// 计算RTT、SRTT、RTTVAL、RTO
if (_itimediff(kcp->current, ts) >= 0) {
ikcp_update_ack(kcp, (IINT32)_itimediff(kcp->current, ts));
}
// 该ACK对应的包若在发送缓冲中,将之移除
ikcp_parse_ack(kcp, sn);
ikcp_shrink_buf(kcp);
if (flag == 0) {
flag = 1;
maxack = sn;
latest_ts = ts;
} else {
if (_itimediff(sn, maxack) > 0) {
#ifndef IKCP_FASTACK_CONSERVE
maxack = sn;
latest_ts = ts;
#else
if (_itimediff(ts, latest_ts) > 0) {
maxack = sn;
latest_ts = ts;
}
#endif
}
}
} else if (cmd == IKCP_CMD_PUSH) {
/**
* 当前包的序列号在接收窗口范围内
* 先将该包的sn,ts放入kcp->acklist,等待ikcp_flush()发送ACK确认
* 然后解析数据包,将包存入kcp->rcv_queue
*/
if (_itimediff(sn, kcp->rcv_nxt + kcp->rcv_wnd) < 0) {
// kcp->acklist
ikcp_ack_push(kcp, sn, ts);
if (_itimediff(sn, kcp->rcv_nxt) >= 0) {
seg = ikcp_segment_new(kcp, (int)len);
seg->conv = conv;
seg->cmd = cmd;
seg->frg = frg;
seg->wnd = wnd;
seg->ts = ts;
seg->sn = sn;
seg->una = una;
seg->len = len;
if (len > 0) {
memcpy(seg->data, data, len);
}
// kcp->rcv_buf -> kcp->rcv_queue
ikcp_parse_data(kcp, seg);
}
}
} else if (cmd == IKCP_CMD_WASK) {
// ...
} else if (cmd == IKCP_CMD_WINS) {
// ...
}
}
}
接收缓存rcv_buf
内存储的,是暂时无法处理的数据包,通过 ikcp_input
传入的所有数据包均会优先到达此队列,同时会按照原始到达顺序记录信息到 kcp->acklist
。
只有两种情况下,数据会滞留在此队列(rcv_buf)中:
- 数据包的序号发生了丢包或乱序:
当先收到的包符合可用包的条件,直接移动到 kcp->rev_queue
;
当收到的包不是期望包 seg.sn != kcp.rcv_nxt
,这会导致此包依然滞留在 kcp->rcv_buf
中;
当重新收到之前“丢失”的包,然后会重新将滞留的“连贯”包列移动到 kcp->rcv_queue
;
- 接收窗口已满:
当 kcp->rcv_queue
接收队列长度达到了接收窗口大小 kcp->rcv_wnd
时,结束迁移;
void ikcp_parse_data(ikcpcb *kcp, IKCPSEG *newseg) {
struct IQUEUEHEAD *p, *prev;
IUINT32 sn = newseg->sn;
int repeat = 0;
// 当前sn必须要在接收队列序列内
if (_itimediff(sn, kcp->rcv_nxt + kcp->rcv_wnd) >= 0 || _itimediff(sn, kcp->rcv_nxt) < 0) {
ikcp_segment_delete(kcp, newseg);
return;
}
/**
* 先将seg加入到kcp->rcv_buf
* 如果重复就不再存入,为了解决冗余包以及多次重传的问题
*/
for (p = kcp->rcv_buf.prev; p != &kcp->rcv_buf; p = prev) {
IKCPSEG *seg = iqueue_entry(p, IKCPSEG, node);
prev = p->prev;
if (seg->sn == sn) {
repeat = 1;
break;
}
if (_itimediff(sn, seg->sn) > 0) {
break;
}
}
if (repeat == 0) {
iqueue_init(&newseg->node);
iqueue_add(&newseg->node, p);
kcp->nrcv_buf++;
} else {
ikcp_segment_delete(kcp, newseg);
}
/**
* 将kcp->rcv_buf中适合的包存入kcp->rcv_queue
* 有两种情况,数据会滞留在此队列rcv_buf中:
* 1、数据包的序号发生了丢包或乱序;
* 2、接收窗口已满;
*/
while (!iqueue_is_empty(&kcp->rcv_buf)) {
IKCPSEG *seg = iqueue_entry(kcp->rcv_buf.next, IKCPSEG, node);
if (seg->sn == kcp->rcv_nxt && kcp->nrcv_que < kcp->rcv_wnd) {
iqueue_del(&seg->node);
kcp->nrcv_buf--;
iqueue_add_tail(&seg->node, &kcp->rcv_queue);
kcp->nrcv_que++;
kcp->rcv_nxt++;
} else {
break;
}
}
}
切片组包后取到上层
int ikcp_recv(ikcpcb *kcp, char *buffer, int len) {
struct IQUEUEHEAD *p;
int ispeek = (len < 0) ? 1 : 0;
int peeksize;
int recover = 0;
IKCPSEG *seg;
assert(kcp);
if (iqueue_is_empty(&kcp->rcv_queue)) // 接收队列为空返回错误
return -1;
if (len < 0)
len = -len; // 长度取绝对值
peeksize = ikcp_peeksize(kcp); // 接收队列中可读取的切片数量
if (peeksize < 0)
return -2;
if (peeksize > len)
return -3;
if (kcp->nrcv_que >= kcp->rcv_wnd)
recover = 1;
// 切片组包并写入到buffer
for (len = 0, p = kcp->rcv_queue.next; p != &kcp->rcv_queue;) {
int fragment;
seg = iqueue_entry(p, IKCPSEG, node);
p = p->next;
if (buffer) {
memcpy(buffer, seg->data, seg->len);
buffer += seg->len;
}
len += (IINT32)seg->len;
fragment = (IINT32)seg->frg;
if (ikcp_canlog(kcp, IKCP_LOG_RECV)) {
ikcp_log(kcp, IKCP_LOG_RECV, "recv sn=%lu", (unsigned long)seg->sn);
}
if (ispeek == 0) {
iqueue_del(&seg->node);
ikcp_segment_delete(kcp, seg);
kcp->nrcv_que--;
}
if (fragment == 0)
break;
}
assert(len == peeksize);
// 将更多的数据从rcv_buf中读取到rcv_queue
// 等待下一次组包时合并
while (!iqueue_is_empty(&kcp->rcv_buf)) {
seg = iqueue_entry(kcp->rcv_buf.next, IKCPSEG, node);
if (seg->sn == kcp->rcv_nxt && kcp->nrcv_que < kcp->rcv_wnd) {
iqueue_del(&seg->node);
kcp->nrcv_buf--;
iqueue_add_tail(&seg->node, &kcp->rcv_queue);
kcp->nrcv_que++;
kcp->rcv_nxt++;
} else {
break;
}
}
// 将当前窗口大小信息抛给发送端
if (kcp->nrcv_que < kcp->rcv_wnd && recover) {
// ready to send back IKCP_CMD_WINS in ikcp_flush
// tell remote my window size
kcp->probe |= IKCP_ASK_TELL;
}
return len;
}
整体流程图
细节机制
RTO
RTO是发送包的超时重传时间,当这个时间到了还未收到ACK时,就触发重传机制。这个值来源于平滑RTT与一个间隔值的和,其中,间隔值与抖动有关,由此可见,当抖动值越大,RTO也越大。
ikcpcb *ikcp_create(IUINT32 conv, void *user) {
// ...
kcp->rx_rto = (IINT32)IKCP_RTO_DEF; // 200ms
kcp->rx_minrto = (IINT32)IKCP_RTO_MIN; // 100ms
// ...
}
int ikcp_nodelay(ikcpcb *kcp, int nodelay, int interval, int resend, int nc) {
if (nodelay >= 0) {
kcp->nodelay = nodelay;
if (nodelay) {
kcp->rx_minrto = (IINT32)IKCP_RTO_NDL; // 30ms
} else {
kcp->rx_minrto = (IINT32)IKCP_RTO_MIN; // 100ms
}
}
// ...
}
static void ikcp_update_ack(ikcpcb *kcp, IINT32 rtt) {
IINT32 rto = 0;
// calculate kcp->rx_srtt,kcp->rx_rttval
// ...
// rto = 平滑往返时间 + _max_(发送间隔, 4*平滑抖动值))
rto = (IINT32)(kcp->rx_srtt + _imax_(kcp->interval, 4 * kcp->rx_rttval));
// rto = _min_(_max_(kcp->rx_minrto, rto), 60000) => 边界处理
kcp->rx_rto = (IINT32)(_ibound_(kcp->rx_minrto, rto, IKCP_RTO_MAX));
}
RTTVAL
平滑抖动时间,每一次收到ACK时都会重新更新一次,为了起到平滑效果,当次的值只按四分之一的分量来平滑历史抖动。
static void ikcp_update_ack(ikcpcb *kcp, IINT32 rtt) {
IINT32 rto = 0;
if (kcp->rx_srtt == 0) { // ACK首次确认
// ...
kcp->rx_rttval = rtt / 2;
} else {
long delta = rtt - kcp->rx_srtt; // 当次往返时间与平滑RTT的差
if (delta < 0)
delta = -delta;
// 以四分之一的分量来平滑抖动值
kcp->rx_rttval = (IINT32)((3 * kcp->rx_rttval + delta) / 4);
// ...
}
// ...
}
SRTT
即平滑的往返时间,是每一次send-ack之后得到的RTT经过平滑算法后得到的结果值。比起抖动值的四分之一分量,RTT更只有八分之一的分量参与平滑运算。
static void ikcp_update_ack(ikcpcb *kcp, IINT32 rtt) {
IINT32 rto = 0;
if (kcp->rx_srtt == 0) { // ACK首次确认
kcp->rx_srtt = rtt;
// ...
} else {
// ...
// 以八分之一的分量来平滑RTT
kcp->rx_srtt = (7 * kcp->rx_srtt + rtt) / 8;
if (kcp->rx_srtt < 1)
kcp->rx_srtt = 1;
}
// ...
}