ngx_rtmp_live_module 模块整体描述
live模块是负责音视频流的分发作用,主要将接收来自publisher推流上来的音视频数据分发给每个player播放连接。本文章主要集中分析以下三点
1)流的组织结构
2)流数据的分发
3)流资源的回收
模块数据结构分析
直播服务器是支持多路不同的流名同时推流,而且每路流是可以支持很多个观众同时并发的,那么这些多个推流拉流行为,在服务器内部是怎样组织起来的呢。
流的组织结构主要涉及到以下三个结构体 ngx_rtmp_live_stream_t 、 ngx_rtmp_live_ctx_t以及ngx_rtmp_live_app_conf_t
/* 主要保存了所有流的结构信息,包过推流和拉流,流名,当前流的带宽信息等 */
struct ngx_rtmp_live_stream_s {
u_char name[NGX_RTMP_MAX_NAME]; 当前流的流名
ngx_rtmp_live_stream_t *next; 其他不同流名的流就是通过当前链表指针结构组织起来的
ngx_rtmp_live_ctx_t *ctx; 当前连接的ctx信息
ngx_rtmp_live_ctx_t *publish_ctx; 当前流名的推流的ctx信息
ngx_rtmp_bandwidth_t bw_in; 流入带宽信息
ngx_rtmp_bandwidth_t bw_in_audio; 流入音频带宽信息
ngx_rtmp_bandwidth_t bw_in_video; 流入视频带宽信息
ngx_rtmp_bandwidth_t bw_out; 出口带宽信息
ngx_msec_t epoch; 当前流创建时间
unsigned active:1; 是否是活跃状态,通常是推流有数据的时候
unsigned publishing:1; 当前是否有推流,有可能是先通过play创建的stream结构,此时推流还没有
};
重点是在ngx_rtmp_live_stream_s结构体将不同流名的流通过next链表指针组织起来的
/* 每一个推流连接或者播放连接都有一个live的上下文ctx,下面这个结构体保存的就是当前每个客户端的live_ctx信息 */
struct ngx_rtmp_live_ctx_s {
ngx_rtmp_session_t *session; //当前连接的session
ngx_rtmp_live_stream_t *stream; //当前流的流结构,上面有这个结构体的介绍
ngx_rtmp_live_ctx_t *next; //当前流还有其他的播放连接,通过该指针串联的
ngx_uint_t ndropped; //统计丢包的个数
ngx_rtmp_live_chunk_stream_t cs[2]; //音频和视频的时间戳、csid以及丢包统计等
ngx_uint_t meta_version; //meta 版本信息
ngx_event_t idle_evt; //推流没数据断开定时器
unsigned active:1; //正在推流状态
unsigned publishing:1; //是否是推流连接的ctx
unsigned silent:1;
unsigned paused:1; //是否暂停
};
ngx_rtmp_live_ctx_s结构体主要是描述每个连接服务器的客户端再本模块的ctx上下文信息,同一路流的信息是通过next指针串联起来的
typedef struct {
ngx_int_t nbuckets; /* hash 槽个数 */
ngx_rtmp_live_stream_t **streams; /* hash id对应的流 */
······
ngx_rtmp_live_stream_t *free_streams; /* 循环利用stream结构 */
} ngx_rtmp_live_app_conf_t;
上面是大致的所有流数据组织形式,所有的流先通过流名 hash的方式将流放到不同的槽当中,然后将相同槽的流的用链表的形式串联起来。然后同一路流的连接通过ctx的next链表组织起来。
以下是大致流的组织结构图:
源码分析
源码介绍分析主要分析,流的组织过程,以及流的数据分发,以及资源回收情况。
流的组织过程:新流名和新连接的加入与删除过程
流的数据分发:将推流数据分发给观看连接
流资源回收:播放和推流连接断开是怎样回收
流的组织过程
流的组织过程主要有publish和play触发,相关函数主要有:
ngx_rtmp_live_play
ngx_rtmp_live_publish
ngx_rtmp_live_join
ngx_rtmp_live_get_streams
publish和play的过程其实差不多,这里只分析下publish推理过程,收到rtmp publish消息会进入ngx_rtmp_live_publish函数下面函数是依次调用顺序
static ngx_int_t
ngx_rtmp_live_publish(ngx_rtmp_session_t *s, ngx_rtmp_publish_t *v)
{
ngx_rtmp_live_app_conf_t *lacf;
ngx_rtmp_live_ctx_t *ctx;
lacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_live_module);
if (lacf == NULL || !lacf->live) {
goto next;
}
/* join stream as publisher */
ngx_rtmp_live_join(s, v->name, 1);
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_live_module);
if (ctx == NULL || !ctx->publishing) {
goto next;
}
ctx->silent = v->silent;
if (!ctx->silent) {
ngx_rtmp_send_status(s, "NetStream.Publish.Start",
"status", "Start publishing");
}
next:
return next_publish(s, v);
}
ngx_rtmp_live_join函数被ngx_rtmp_live_play和ngx_rtmp_live_publish 调用,主要就是将当前ctx放到ctx->stream->ctx->next头插法的形式放到链表当中
static void
ngx_rtmp_live_join(ngx_rtmp_session_t *s, u_char *name, unsigned publisher)
{
ngx_rtmp_live_ctx_t *ctx;
ngx_rtmp_live_stream_t **stream;
ngx_rtmp_live_app_conf_t *lacf;
lacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_live_module);
if (lacf == NULL) {
return;
}
/* 上下文以及当前流名stream 是否存在,避免重复添加 */
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_live_module);
if (ctx && ctx->stream) {
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"live: already joined");
return;
}
/* 给ctx上下文分配内存 */
if (ctx == NULL) {
ctx = ngx_palloc(s->connection->pool, sizeof(ngx_rtmp_live_ctx_t));
ngx_rtmp_set_ctx(s, ctx, ngx_rtmp_live_module);
}
ngx_memzero(ctx, sizeof(*ctx));
ctx->session = s;
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"live: join '%s'", name);
/* stream 结构是否存在*/
stream = ngx_rtmp_live_get_stream(s, name, publisher || lacf->idle_streams);
/* 如果此时没有推流并且不等待立即断开播放端的连接,idle_streams表示等不等推流到来 */
if (stream == NULL ||
!(publisher || (*stream)->publishing || lacf->idle_streams))
{
ngx_log_error(NGX_LOG_ERR, s->connection->log, 0,
"live: stream not found");
ngx_rtmp_send_status(s, "NetStream.Play.StreamNotFound", "error",
"No such stream");
ngx_rtmp_finalize_session(s);
return;
}
/* 如果是推流连接,给当前流publishing赋值,说明正在推流 */
if (publisher) {
if ((*stream)->publishing) {
ngx_log_error(NGX_LOG_ERR, s->connection->log, 0,
"live: already publishing");
ngx_rtmp_send_status(s, "NetStream.Publish.BadName", "error",
"Already publishing");
return;
}
(*stream)->publishing = 1;
}
/* 在ctx当中保存当前流stream信息,供方便查找其他的连接 */
ctx->stream = *stream;
ctx->publishing = publisher;
ctx->next = (*stream)->ctx; /* 头插法将当前连接放到链表头,同路流用ctx->stream->ctx->next组织 */
(*stream)->ctx = ctx;
if (lacf->buflen) {
s->out_buffer = 1;
}
ctx->cs[0].csid = NGX_RTMP_CSID_VIDEO;
ctx->cs[1].csid = NGX_RTMP_CSID_AUDIO;
if (!ctx->publishing && ctx->stream->active) {
ngx_rtmp_live_start(s);
}
}
ngx_rtmp_live_get_stream就是创建对应流名的stream结构。
- 1)根据流名hash先找对应的槽是否存在,同一个槽下面有多个stream。
- 2)再判断对应流名的stream是否存在,找到立即返回。
- 3)如果没找到,重新申请分配并初始化stream结构
static ngx_rtmp_live_stream_t **
ngx_rtmp_live_get_stream(ngx_rtmp_session_t *s, u_char *name, int create)
{
ngx_rtmp_live_app_conf_t *lacf;
ngx_rtmp_live_stream_t **stream;
size_t len;
lacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_live_module);
if (lacf == NULL) {
return NULL;
}
len = ngx_strlen(name);
/* 根据流名hash到对应的槽,如果找到,再去stream->next链表当中匹配name是否一致,如果找到立即返回 */
stream = &lacf->streams[ngx_hash_key(name, len) % lacf->nbuckets];
for (; *stream; stream = &(*stream)->next) {
if (ngx_strcmp(name, (*stream)->name) == 0) {
return stream;
}
}
/* create 为0 表示拉流,拉流情况没找到直接返回null */
if (!create) {
return NULL;
}
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"live: create stream '%s'", name);
if (lacf->free_streams) {
*stream = lacf->free_streams;
lacf->free_streams = lacf->free_streams->next;
} else {
*stream = ngx_palloc(lacf->pool, sizeof(ngx_rtmp_live_stream_t));
}
ngx_memzero(*stream, sizeof(ngx_rtmp_live_stream_t));
ngx_memcpy((*stream)->name, name,
ngx_min(sizeof((*stream)->name) - 1, len));
(*stream)->epoch = ngx_current_msec;
return stream;
}
play的过程也是类似,整体过程简单讲,就是先创建stream,然后再将当前ctx放到stream->ctx->next链表当中
流的分发
音视频流的数据分发主要是在 ngx_rtmp_live_av 函数当中,函数是在rtmp 模块 postconfiguration阶段注册的。rtmp有各种命令消息和数据消息,NGX_RTMP_MSG_AUDIO和
NGX_RTMP_MSG_VIDEO主要是音视频消息,在接受到来自推流端的数据之后,会根据不同消息类型,以及各模块注册的消息处理回调handler进入各模块的消息处理函数当中,本函数就是
live模块的音视频消息处理过程。核心逻辑就是将收到的每帧音视频数据实时转发给不同player连接,下面看看是怎么将数据转发的:
static ngx_int_t
ngx_rtmp_live_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
ngx_chain_t *in)
{
·······
lacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_live_module);
if (lacf == NULL) {
return NGX_ERROR;
}
/* 没打开live 配置或者收到数据为null, 说明不会处理数据分发,*/
if (!lacf->live || in == NULL || in->buf == NULL) {
return NGX_OK;
}
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_live_module);
if (ctx == NULL || ctx->stream == NULL) {
return NGX_OK;
}
/* 没有推流,不进行数据分发。有可能play先连接,publish后连接的过程 */
if (ctx->publishing == 0) {
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"live: %s from non-publisher", type_s);
return NGX_OK;
}
if (!ctx->stream->active) {
ngx_rtmp_live_start(s);
}
/* 添加推流一直没有数据来,加定时器超时断开机制 */
if (ctx->idle_evt.timer_set) {
ngx_add_timer(&ctx->idle_evt, lacf->idle_timeout);
}
s->current_time = h->timestamp;
peers = 0;
apkt = NULL;
aapkt = NULL;
acopkt = NULL;
header = NULL;
coheader = NULL;
meta = NULL;
meta_version = 0;
mandatory = 0;
prio = (h->type == NGX_RTMP_MSG_VIDEO ?
ngx_rtmp_get_video_frame_type(in) : 0);
cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);
/* 这里其实可以看到音频和视频是分开处理的,也可以通过开启interleave配置放到一起处理 */
csidx = !(lacf->interleave || h->type == NGX_RTMP_MSG_VIDEO);
cs = &ctx->cs[csidx];
ngx_memzero(&ch, sizeof(ch));
ch.timestamp = h->timestamp;
ch.msid = NGX_RTMP_MSID;
ch.csid = cs->csid;
ch.type = h->type;
lh = ch;
/* 当前流处于活跃状态,更新上次同类型的帧时间戳,有可能是音频也可能是视频 */
if (cs->active) {
lh.timestamp = cs->timestamp;
}
clh = lh;
clh.type = (h->type == NGX_RTMP_MSG_AUDIO ? NGX_RTMP_MSG_VIDEO :
NGX_RTMP_MSG_AUDIO);
cs->active = 1;
cs->timestamp = ch.timestamp;
delta = ch.timestamp - lh.timestamp;
/* 当前帧和上一同类型消息帧时间戳间隔过大处理
if (delta >> 31) {
ngx_log_debug2(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"live: clipping non-monotonical timestamp %uD->%uD",
lh.timestamp, ch.timestamp);
delta = 0;
ch.timestamp = lh.timestamp;
}
*/
/* 给当前帧新分配内存 */
rpkt = ngx_rtmp_append_shared_bufs(cscf, NULL, in);
/* 将当前帧内容拷贝到rpkt当中,并且计算相对时间戳信息等 */
ngx_rtmp_prepare_message(s, &ch, &lh, rpkt);
codec_ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_codec_module);
/* 以下括号当中主要是准备送 meta 信息,以及音视频头数据 */
if (codec_ctx) {
if (h->type == NGX_RTMP_MSG_AUDIO) {
header = codec_ctx->aac_header;
if (lacf->interleave) {
coheader = codec_ctx->avc_header;
}
if (codec_ctx->audio_codec_id == NGX_RTMP_AUDIO_AAC &&
ngx_rtmp_is_codec_header(in))
{
prio = 0;
mandatory = 1;
}
} else {
header = codec_ctx->avc_header;
if (lacf->interleave) {
coheader = codec_ctx->aac_header;
}
if (codec_ctx->video_codec_id == NGX_RTMP_VIDEO_H264 &&
ngx_rtmp_is_codec_header(in))
{
prio = 0;
mandatory = 1;
}
}
if (codec_ctx->meta) {
meta = codec_ctx->meta;
meta_version = codec_ctx->meta_version;
}
}
/* broadcast to all subscribers 整个for循环当中是将数据推流的每帧数据转发给所有play连接
* 分发过程:从stream的ctx链表当中循环遍历获取每个连接的ctx信息,然后进行转发
*/
for (pctx = ctx->stream->ctx; pctx; pctx = pctx->next) {
if (pctx == ctx || pctx->paused) {
continue;
}
ss = pctx->session;
cs = &pctx->cs[csidx];
/* send metadata */
if (meta && meta_version != pctx->meta_version) {
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0,
"live: meta");
if (ngx_rtmp_send_message(ss, meta, 0) == NGX_OK) {
pctx->meta_version = meta_version;
}
}
/* sync stream 丢包超过一定事件的时候,重新发送音视频头,sync默认100ms */
if (cs->active && (lacf->sync && cs->dropped > lacf->sync)) {
ngx_log_debug2(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0,
"live: sync %s dropped=%uD", type_s, cs->dropped);
cs->active = 0;
cs->dropped = 0;
}
/* absolute packet */
if (!cs->active) {
if (mandatory) {
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0,
"live: skipping header");
continue;
}
/* 如果开启了,wait_video 开关,等视频数据来了再开始发送 有可能音频数据先到 */
if (lacf->wait_video && h->type == NGX_RTMP_MSG_AUDIO &&
!pctx->cs[0].active)
{
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0,
"live: waiting for video");
continue;
}
/* 如果开启了wait_key 开关,等待关键帧来了,才开始发送音视频消息并且先发送关键帧 */
if (lacf->wait_key && prio != NGX_RTMP_VIDEO_KEY_FRAME &&
(lacf->interleave || h->type == NGX_RTMP_MSG_VIDEO))
{
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0,
"live: skip non-key");
continue;
}
dummy_audio = 0;
if (lacf->wait_video && h->type == NGX_RTMP_MSG_VIDEO &&
!pctx->cs[1].active)
{
dummy_audio = 1;
/* 制造一个假的音频包 */
if (aapkt == NULL) {
aapkt = ngx_rtmp_alloc_shared_buf(cscf);
ngx_rtmp_prepare_message(s, &clh, NULL, aapkt);
}
}
/* coheader 是处理音频和视频放在同一个chunk通道里面发送的情况 */
if (header || coheader) {
/* send absolute codec header */
ngx_log_debug2(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0,
"live: abs %s header timestamp=%uD",
type_s, lh.timestamp);
/* 发送音视频头 */
if (header) {
if (apkt == NULL) {
apkt = ngx_rtmp_append_shared_bufs(cscf, NULL, header);
ngx_rtmp_prepare_message(s, &lh, NULL, apkt);
}
rc = ngx_rtmp_send_message(ss, apkt, 0);
if (rc != NGX_OK) {
continue;
}
}
if (coheader) {
if (acopkt == NULL) {
acopkt = ngx_rtmp_append_shared_bufs(cscf, NULL, coheader);
ngx_rtmp_prepare_message(s, &clh, NULL, acopkt);
}
rc = ngx_rtmp_send_message(ss, acopkt, 0);
if (rc != NGX_OK) {
continue;
}
/* 这里不知道是不是为了处理特定场景时间戳信息 目前暂时不太理解 */
} else if (dummy_audio) {
ngx_rtmp_send_message(ss, aapkt, 0);
}
cs->timestamp = lh.timestamp;
cs->active = 1;
ss->current_time = cs->timestamp;
} else {
/* send absolute packet 音视频头为空的情况下,继续发送当前音视频帧 */
ngx_log_debug2(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0,
"live: abs %s packet timestamp=%uD",
type_s, ch.timestamp);
if (apkt == NULL) {
apkt = ngx_rtmp_append_shared_bufs(cscf, NULL, in);
ngx_rtmp_prepare_message(s, &ch, NULL, apkt);
}
rc = ngx_rtmp_send_message(ss, apkt, prio);
if (rc != NGX_OK) {
continue;
}
cs->timestamp = ch.timestamp;
cs->active = 1;
ss->current_time = cs->timestamp;
++peers;
if (dummy_audio) {
ngx_rtmp_send_message(ss, aapkt, 0);
}
continue;
}
}
/* 发送当前音视频帧,这个函数返回NGX_OK 和NGX_AGAIN
* NGX_AGAIN 表示服务器主动当前帧丢弃,NGX_OK表示发送成功
*/
if (ngx_rtmp_send_message(ss, rpkt, prio) != NGX_OK) {
++pctx->ndropped;
/* 统计丢包的时长间隔 */
cs->dropped += delta;
if (mandatory) {
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0,
"live: mandatory packet failed");
ngx_rtmp_finalize_session(ss);
}
continue;
}
/* 更新当前音视频帧的上一次时间戳信息, */
cs->timestamp += delta;
++peers;
ss->current_time = cs->timestamp;
}
/* 回收rpkt/apkt//aapkt到内存池当中 */
if (rpkt) {
ngx_rtmp_free_shared_chain(cscf, rpkt);
}
if (apkt) {
ngx_rtmp_free_shared_chain(cscf, apkt);
}
if (aapkt) {
ngx_rtmp_free_shared_chain(cscf, aapkt);
}
if (acopkt) {
ngx_rtmp_free_shared_chain(cscf, acopkt);
}
/* 更新输入带宽值 */
ngx_rtmp_update_bandwidth(&ctx->stream->bw_in, h->mlen);
/* 更新输出带宽值,peers是播放连接个数 */
ngx_rtmp_update_bandwidth(&ctx->stream->bw_out, h->mlen * peers);
/* 分别按音频和视频计算带宽值 */
ngx_rtmp_update_bandwidth(h->type == NGX_RTMP_MSG_AUDIO ?
&ctx->stream->bw_in_audio :
&ctx->stream->bw_in_video,
h->mlen);
return NGX_OK;
}
看完以上函数之后,发现其实逻辑也很简单,只是live模块的相关配置比较多,里面各种if…else判断条件增加了复杂度。
再总结一下整体分发过程:
- 1)先未当前帧分配内存,按rtmp协议包格式封装好当前帧
- 2)for循环通过ctx->stream->ctx遍历找到每个播放链接
- 3)然后先发送meta信息,音视频头数据
- 4)发送当前音视频帧信息
- 5)回收为当前帧分配的内存(当然这个回收其实是放到一个内存池里面)
过程当中也有一些其他情况的处理,例如按先发送视频帧,关键帧,丢帧超过一定时长重新发送音视频头等
流数据资源回收
流数据资源回收主要是在讲当前ctx从stream移除,将当前流的stream从链表当中移除
static ngx_int_t
ngx_rtmp_live_close_stream(ngx_rtmp_session_t *s, ngx_rtmp_close_stream_t *v)
{
ngx_rtmp_session_t *ss;
ngx_rtmp_live_ctx_t *ctx, **cctx, *pctx;
ngx_rtmp_live_stream_t **stream;
ngx_rtmp_live_app_conf_t *lacf;
lacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_live_module);
if (lacf == NULL) {
goto next;
}
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_live_module);
if (ctx == NULL) {
goto next;
}
/* 以上主要避免重新释放的过程 */
if (ctx->stream == NULL) {
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"live: not joined");
goto next;
}
/* 删除推流的过程,先给stream结构关闭操作,给publishing,publish_ctx变量赋null操作,说明没有推流 */
if (ctx->stream->publishing && ctx->publishing) {
ctx->stream->publishing = 0;
ctx->stream->publish_ctx = NULL;
}
/* 将当前ctx 从ctx->stream->ctx链表中移除 */
for (cctx = &ctx->stream->ctx; *cctx; cctx = &(*cctx)->next) {
if (*cctx == ctx) {
*cctx = ctx->next;
break;
}
}
/* 如果当前是推理,给对端发送rtmp stop消息 */
if (ctx->publishing || ctx->stream->active) {
ngx_rtmp_live_stop(s);
}
/* 以下主要处理不开启idle_stream的情况下,close的是推流关闭所有拉流的情况 */
if (ctx->publishing) {
ngx_rtmp_send_status(s, "NetStream.Unpublish.Success",
"status", "Stop publishing");
if (!lacf->idle_streams) {
for (pctx = ctx->stream->ctx; pctx; pctx = pctx->next) {
if (pctx->publishing == 0) {
ss = pctx->session;
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0,
"live: no publisher");
ngx_rtmp_finalize_session(ss);
}
}
}
}
/* 给当前ctx的stream指针变量赋空 */
if (ctx->stream->ctx || ctx->stream->publish_ctx) {
ctx->stream = NULL;
goto next;
}
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"live: delete empty stream '%s'",
ctx->stream->name);
stream = ngx_rtmp_live_get_stream(s, ctx->stream->name, 0);
if (stream == NULL) {
goto next;
}
/* 将当前流的stream节点从链表当中移除,放到free链表中循环利用 */
*stream = (*stream)->next;
ctx->stream->next = lacf->free_streams;
lacf->free_streams = ctx->stream;
ctx->stream = NULL;
if (!ctx->silent && !ctx->publishing && !lacf->play_restart) {
ngx_rtmp_send_status(s, "NetStream.Play.Stop", "status", "Stop live");
}
next:
return next_close_stream(s, v);
}
以上就是整篇文章的流组织,流数据分发,资源回收的整体分析。
另外注意play_restart这个配置,默认不开启,如果开启了表示在当前服务器会透传start和stop信息给下游或者客户端。
如果播放器收到rtmp stop消息后,有的播放器会断开播放连接,有的播放器先收到stop,再收到start会重新发起rtmp请求。看不同播放器处理。
本模块其他函数ngx_rtmp_live_start ngx_rtmp_live_stop ngx_rtmp_live_stream_begin ngx_rtmp_live_stream_eof这类是rtmp命令消息处理,比较简单不做分析
模块发现的问题:
拉流不断重新推流,无法拉到数据,需要进行纠正。当前这个纠正思路是在ngx_rtmp_live_av函数中,重推之后时间戳从0开始了。
主要修复逻辑计算音频和视频的时间间隔,音频间隔根据采样率计算,视频根据帧率计算
/* fix republish timestamp */
codec_ctx = ngx_rtmp_get_module_ctx(s,ngx_rtmp_codec_module);
if (codec_ctx && ((int32_t)h->timestamp < 0 ||h->timestamp < cs->timestamp)) {
if (cs->timestamp && h->timestamp == 0) {
ch.timestamp = cs->timestamp;
}
else if (h->type == NGX_RTMP_MSG_VIDEO) {
if (codec_ctx->frame_rate > 0) {
/* 计算视频间隔 */
ch.timestamp = cs->timestamp + 1000/codec_ctx->frame_rate;
} else {
ch.timestamp = cs->timestamp;
}
/* 当前视频帧时间比上次音频时间还小,纠正 */
if (ch.timestamp < ctx->cs[1].timestamp) {
ch.timestamp = ctx->cs[1].timestamp;
}
}
else if (h->type == NGX_RTMP_MSG_AUDIO){
if (codec_ctx->sample_rate &&codec_ctx->sample_size) {
/* 计算音频间隔 */
ch.timestamp = cs->timestamp + 1024 * 1000/codec_ctx->sample_rate;
} else {
ch.timestamp = cs->timestamp;
}
/* 当前音频帧时间比上次视频帧时间还小,纠正 */
if (ch.timestamp < ctx->cs[0].timestamp) {
ch.timestamp = ctx->cs[0].timestamp;
}
}
}