接下来看一下librtmp最重要之一的函数 RTMP_SendPacket
首先必须有一份rtmp文档在手配合源码阅读,点击打开链接
顾名思义,即发送一个packet,这是rtmp协议逻辑上数据交互的基本单元(实际物理上还要分成chunk发送)。
先看一下 RTMPPacket 的定义
typedef struct RTMPPacket
{
uint8_t m_headerType;
uint8_t m_packetType;
uint8_t m_hasAbsTimestamp; /* timestamp absolute or relative? */
int m_nChannel;
uint32_t m_nTimeStamp; /* timestamp */
int32_t m_nInfoField2; /* last 4 bytes in a long header */
uint32_t m_nBodySize;
uint32_t m_nBytesRead;
RTMPChunk *m_chunk;
char *m_body;
} RTMPPacket;
重点关注m_body字段
int
RTMPPacket_Alloc(RTMPPacket *p, int nSize)
{
char *ptr = calloc(1, nSize + RTMP_MAX_HEADER_SIZE);
if (!ptr)
return FALSE;
p->m_body = ptr + RTMP_MAX_HEADER_SIZE;
p->m_nBytesRead = 0;
return TRUE;
}
可以看到,实际上分配的空间是nSize + RTMP_MAX_HEADER_SIZE,m_body指向的是所请求空间起始位置加上RTMP_MAX_HEADER_SIZE,这个常量被define为18
接下来正式看下RTMP_SendPacket
int
RTMP_SendPacket( RTMP *r, RTMPPacket *packet, int queue )
{
const RTMPPacket *prevPacket = r->m_vecChannelsOut[packet->m_nChannel];
uint32_t last = 0;
int nSize;
int hSize, cSize;
char *header, *hptr, *hend, hbuf[RTMP_MAX_HEADER_SIZE], c;
uint32_t t;
char *buffer, *tbuf = NULL, *toff = NULL;
int nChunkSize;
int tlen;
prevpacket指当前channel上一个包,rtmp保存上一个包信息的目的便是发送下一个包的时候,有很多重复的包头内容可以不发送,从而节省网络带宽
if ( prevPacket && packet->m_headerType != RTMP_PACKET_SIZE_LARGE )
{
/* compress a bit by using the prev packet's attributes */
if ( prevPacket->m_nBodySize == packet->m_nBodySize
&& prevPacket->m_packetType == packet->m_packetType
&& packet->m_headerType == RTMP_PACKET_SIZE_MEDIUM )
packet->m_headerType = RTMP_PACKET_SIZE_SMALL;
if ( prevPacket->m_nTimeStamp == packet->m_nTimeStamp
&& packet->m_headerType == RTMP_PACKET_SIZE_SMALL )
packet->m_headerType = RTMP_PACKET_SIZE_MINIMUM;
last = prevPacket->m_nTimeStamp;
}
此处决定当前packet的headerType,具体可参考规范。大致意思是通过复用上一个包头信息,尽可能的节约发送字节数。
headerType的具体宏定义如下:
#define RTMP_PACKET_SIZE_LARGE 0
#define RTMP_PACKET_SIZE_MEDIUM 1
#define RTMP_PACKET_SIZE_SMALL 2
#define RTMP_PACKET_SIZE_MINIMUM 3
if ( packet->m_headerType > 3 ) /* sanity */
{
RTMP_Log( RTMP_LOGERROR, "sanity failed!! trying to send header of type: 0x%02x.",
(unsigned char) packet->m_headerType );
return(FALSE);
}
nSize = packetSize[packet->m_headerType];
hSize = nSize; cSize = 0;
t = packet->m_nTimeStamp - last;
此处,nSize根据headerType被赋值为不同的值。
static const int packetSize[] = { 12, 8, 4, 1 };
时间戳计算为与上一个packet时间戳的相对时间,其实也是为了节省字节数
if ( packet->m_body )
{
header = packet->m_body - nSize;
hend = packet->m_body;
}else {
header = hbuf + 6;
hend = hbuf + sizeof(hbuf);
}
只看m_body有值的情况,此时的header指向了包头的开始位置,而hend指向了包头的结束地址,也即是包体的开始位置
if ( packet->m_nChannel > 319 )
cSize = 2;
else if ( packet->m_nChannel > 63 )
cSize = 1;
if ( cSize )
{
header -= cSize;
hSize += cSize;
}
if ( nSize > 1 && t >= 0xffffff )
{
header -= 4;
hSize += 4;
}
hptr = header;
c = packet->m_headerType << 6;
switch ( cSize )
{
case 0:
c |= packet->m_nChannel;
break;
case 1:
break;
case 2:
c |= 1;
break;
}
*hptr++ = c;
if ( cSize )
{
int tmp = packet->m_nChannel - 64;
*hptr++ = tmp & 0xff;
if ( cSize == 2 )
*hptr++ = tmp >> 8;
}
if ( nSize > 1 )
{
hptr = AMF_EncodeInt24( hptr, hend, t > 0xffffff ? 0xffffff : t );
}
if ( nSize > 4 )
{
hptr = AMF_EncodeInt24( hptr, hend, packet->m_nBodySize );
*hptr++ = packet->m_packetType;
}
if ( nSize > 8 )
hptr += EncodeInt32LE( hptr, packet->m_nInfoField2 );
if ( nSize > 1 && t >= 0xffffff )
hptr = AMF_EncodeInt32( hptr, hend, t );
此处是按照规范对包头进行编码,对照规范描述皆可一一对应。
nSize = packet->m_nBodySize;
buffer = packet->m_body;
nChunkSize = r->m_outChunkSize;
RTMP_Log( RTMP_LOGDEBUG2, "%s: fd=%d, size=%d", __FUNCTION__, r->m_sb.sb_socket,
nSize );
/* send all chunks in one HTTP request */
if ( r->Link.protocol & RTMP_FEATURE_HTTP )
{
int chunks = (nSize + nChunkSize - 1) / nChunkSize;
if ( chunks > 1 )
{
tlen = chunks * (cSize + 1) + nSize + hSize;
tbuf = malloc( tlen );
if ( !tbuf )
return(FALSE);
toff = tbuf;
}
}
buffer指向包体,要开始正式发送了。http发送是不想分析的,不过此处大意便是,将当前packet的所有chunk合成一块进行发送。
while ( nSize + hSize )
{
int wrote;
if ( nSize < nChunkSize )
nChunkSize = nSize;
RTMP_LogHexString( RTMP_LOGDEBUG2, (uint8_t *) header, hSize );
RTMP_LogHexString( RTMP_LOGDEBUG2, (uint8_t *) buffer, nChunkSize );
if ( tbuf )
{
memcpy( toff, header, nChunkSize + hSize );
toff += nChunkSize + hSize;
} else{
wrote = WriteN( r, header, nChunkSize + hSize );
if ( !wrote )
return(FALSE);
WrtieN出现了!header是包头的地址,先发送包头长度hSize和chunk长度nChunkSize的数据。
}
nSize -= nChunkSize;
buffer += nChunkSize;
hSize = 0;
发送完毕一个chunk后,对剩余发送内容字节数进行更改,并且置hSize,即包头长度为0。为什么要重置hSize,那是因为对余下内容进行发送的时候,chunk的包头与第一个chunk的包头有可能不一样了(有些信息就不用发送了)。
if ( nSize > 0 )
{
header = buffer - 1;
hSize = 1;
if ( cSize )
{
header -= cSize;
hSize += cSize;
}
*header = (0xc0 | c);
if ( cSize )
{
int tmp = packet->m_nChannel - 64;
header[1] = tmp & 0xff;
if ( cSize == 2 )
header[2] = tmp >> 8;
}
所以在此重新计算hSize的长度,并且重新填写包头。重点关注
*header = (0xc0 | c);
此处0xc0向右移位6位便是0x11,即3,就是RTMP_PACKET_SIZE_MINIMUM。也就是说同一packet下,第二个chunk,不需要发送额外的消息头了,只需要发送channel_id即可
}
}
if ( tbuf )
{
int wrote = WriteN( r, tbuf, toff - tbuf );
free( tbuf );
tbuf = NULL;
if ( !wrote )
return(FALSE);
}
http发送相关,忽略。
/* we invoked a remote method */
if ( packet->m_packetType == 0x14 )
{
AVal method;
char *ptr;
ptr = packet->m_body + 1;
AMF_DecodeString( ptr, &method );
RTMP_Log( RTMP_LOGDEBUG, "Invoking %s", method.av_val );
/* keep it in call queue till result arrives */
if ( queue )
{
int txn;
ptr += 3 + method.av_len;
txn = (int) AMF_DecodeNumber( ptr );
AV_queue( &r->m_methodCalls, &r->m_numCalls, &method, txn );
}
}
packetType为0x14代表着命令消息,是需要获得返回值的,所以此处先把它记录下来,等RTMP_ReadPacket读到其回应的时候再进行处理。
if ( !r->m_vecChannelsOut[packet->m_nChannel] )
r->m_vecChannelsOut[packet->m_nChannel] = malloc( sizeof(RTMPPacket) );
memcpy( r->m_vecChannelsOut[packet->m_nChannel], packet, sizeof(RTMPPacket) );
所谓一代传一代,当前包成了下个包的上个包(有点绕),那么必须将其元信息保存下来了。
return(TRUE);
}
至此分析完毕了,宏观上总结下:
发送一个packet,会将其根据chunksize分解为若干个chunk进行发送。对于第一个chunk的头,可以尽可能复用上一个packet的头;而第二个chunk开始,chunk头无需发送额外的消息头了(因为第一个chunk已经发送了)。
有了RTMP_SendPacket,上层函数便无需关心发包的底层细节操作了,只要构造好一个packet,塞给它,它便会如实的进行发送。
PS:床上等你的这个编辑器真是xnmbyy,很多大括号都被吞了