ngx_rtmp_live_module 模块

ngx_rtmp_live_module 模块整体描述

live模块是负责音视频流的分发作用,主要将接收来自publisher推流上来的音视频数据分发给每个player播放连接。本文章主要集中分析以下三点

1)流的组织结构
2)流数据的分发
3)流资源的回收

模块数据结构分析

直播服务器是支持多路不同的流名同时推流,而且每路流是可以支持很多个观众同时并发的,那么这些多个推流拉流行为,在服务器内部是怎样组织起来的呢。

流的组织结构主要涉及到以下三个结构体 ngx_rtmp_live_stream_t 、 ngx_rtmp_live_ctx_t以及ngx_rtmp_live_app_conf_t

/* 主要保存了所有流的结构信息,包过推流和拉流,流名,当前流的带宽信息等 */
struct ngx_rtmp_live_stream_s {
    u_char                              name[NGX_RTMP_MAX_NAME];  当前流的流名
    ngx_rtmp_live_stream_t             *next;                     其他不同流名的流就是通过当前链表指针结构组织起来的
    ngx_rtmp_live_ctx_t                *ctx;                      当前连接的ctx信息
    ngx_rtmp_live_ctx_t                *publish_ctx;              当前流名的推流的ctx信息
    ngx_rtmp_bandwidth_t                bw_in;                    流入带宽信息
    ngx_rtmp_bandwidth_t                bw_in_audio;              流入音频带宽信息
    ngx_rtmp_bandwidth_t                bw_in_video;              流入视频带宽信息
    ngx_rtmp_bandwidth_t                bw_out;                   出口带宽信息
    ngx_msec_t                          epoch;                    当前流创建时间
    unsigned                            active:1;                 是否是活跃状态,通常是推流有数据的时候
    unsigned                            publishing:1;             当前是否有推流,有可能是先通过play创建的stream结构,此时推流还没有
};

重点是在ngx_rtmp_live_stream_s结构体将不同流名的流通过next链表指针组织起来的

/* 每一个推流连接或者播放连接都有一个live的上下文ctx,下面这个结构体保存的就是当前每个客户端的live_ctx信息 */
struct ngx_rtmp_live_ctx_s {
    ngx_rtmp_session_t                 *session;        //当前连接的session
    ngx_rtmp_live_stream_t             *stream;         //当前流的流结构,上面有这个结构体的介绍
    ngx_rtmp_live_ctx_t                *next;           //当前流还有其他的播放连接,通过该指针串联的
    ngx_uint_t                          ndropped;       //统计丢包的个数
    ngx_rtmp_live_chunk_stream_t        cs[2];          //音频和视频的时间戳、csid以及丢包统计等
    ngx_uint_t                          meta_version;   //meta 版本信息
    ngx_event_t                         idle_evt;       //推流没数据断开定时器
    unsigned                            active:1;       //正在推流状态
    unsigned                            publishing:1;   //是否是推流连接的ctx
    unsigned                            silent:1;
    unsigned                            paused:1;       //是否暂停
};

ngx_rtmp_live_ctx_s结构体主要是描述每个连接服务器的客户端再本模块的ctx上下文信息,同一路流的信息是通过next指针串联起来的

typedef struct {
    ngx_int_t                           nbuckets;       /* hash 槽个数 */
    ngx_rtmp_live_stream_t            **streams;        /* hash id对应的流 */
    ······
    ngx_rtmp_live_stream_t             *free_streams;   /* 循环利用stream结构 */
} ngx_rtmp_live_app_conf_t;

上面是大致的所有流数据组织形式,所有的流先通过流名 hash的方式将流放到不同的槽当中,然后将相同槽的流的用链表的形式串联起来。然后同一路流的连接通过ctx的next链表组织起来。

以下是大致流的组织结构图:

在这里插入图片描述

源码分析

源码介绍分析主要分析,流的组织过程,以及流的数据分发,以及资源回收情况。

流的组织过程:新流名和新连接的加入与删除过程
流的数据分发:将推流数据分发给观看连接
流资源回收:播放和推流连接断开是怎样回收

流的组织过程

流的组织过程主要有publish和play触发,相关函数主要有:

ngx_rtmp_live_play
ngx_rtmp_live_publish
ngx_rtmp_live_join
ngx_rtmp_live_get_streams

publish和play的过程其实差不多,这里只分析下publish推理过程,收到rtmp publish消息会进入ngx_rtmp_live_publish函数下面函数是依次调用顺序
static ngx_int_t
ngx_rtmp_live_publish(ngx_rtmp_session_t *s, ngx_rtmp_publish_t *v)
{
ngx_rtmp_live_app_conf_t *lacf;
ngx_rtmp_live_ctx_t *ctx;

    lacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_live_module);

    if (lacf == NULL || !lacf->live) {
        goto next;
    }

    /* join stream as publisher */

    ngx_rtmp_live_join(s, v->name, 1);

    ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_live_module);
    if (ctx == NULL || !ctx->publishing) {
        goto next;
    }

    ctx->silent = v->silent;

    if (!ctx->silent) {
        ngx_rtmp_send_status(s, "NetStream.Publish.Start",
                            "status", "Start publishing");
    }

next:
    return next_publish(s, v);
}

ngx_rtmp_live_join函数被ngx_rtmp_live_play和ngx_rtmp_live_publish 调用,主要就是将当前ctx放到ctx->stream->ctx->next头插法的形式放到链表当中

static void
ngx_rtmp_live_join(ngx_rtmp_session_t *s, u_char *name, unsigned publisher)
{
    ngx_rtmp_live_ctx_t            *ctx;
    ngx_rtmp_live_stream_t        **stream;
    ngx_rtmp_live_app_conf_t       *lacf;

    lacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_live_module);
    if (lacf == NULL) {
        return;
    }
    /* 上下文以及当前流名stream 是否存在,避免重复添加 */
    ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_live_module);
    if (ctx && ctx->stream) {
        ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
                       "live: already joined");
        return;
    }
    /* 给ctx上下文分配内存 */
    if (ctx == NULL) {
        ctx = ngx_palloc(s->connection->pool, sizeof(ngx_rtmp_live_ctx_t));
        ngx_rtmp_set_ctx(s, ctx, ngx_rtmp_live_module);
    }

    ngx_memzero(ctx, sizeof(*ctx));

    ctx->session = s;

    ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
                   "live: join '%s'", name);

    /* stream 结构是否存在*/
    stream = ngx_rtmp_live_get_stream(s, name, publisher || lacf->idle_streams);

    /* 如果此时没有推流并且不等待立即断开播放端的连接,idle_streams表示等不等推流到来 */
    if (stream == NULL ||
        !(publisher || (*stream)->publishing || lacf->idle_streams))
    {
        ngx_log_error(NGX_LOG_ERR, s->connection->log, 0,
                      "live: stream not found");

        ngx_rtmp_send_status(s, "NetStream.Play.StreamNotFound", "error",
                             "No such stream");

        ngx_rtmp_finalize_session(s);

        return;
    }
    /* 如果是推流连接,给当前流publishing赋值,说明正在推流 */
    if (publisher) {
        if ((*stream)->publishing) {
            ngx_log_error(NGX_LOG_ERR, s->connection->log, 0,
                          "live: already publishing");

            ngx_rtmp_send_status(s, "NetStream.Publish.BadName", "error",
                                 "Already publishing");

            return;
        }

        (*stream)->publishing = 1;
    }

    /* 在ctx当中保存当前流stream信息,供方便查找其他的连接 */
    ctx->stream = *stream;
    ctx->publishing = publisher;
    ctx->next = (*stream)->ctx; /* 头插法将当前连接放到链表头,同路流用ctx->stream->ctx->next组织 */

    (*stream)->ctx = ctx;

    if (lacf->buflen) {
        s->out_buffer = 1;
    }

    ctx->cs[0].csid = NGX_RTMP_CSID_VIDEO;
    ctx->cs[1].csid = NGX_RTMP_CSID_AUDIO;

    if (!ctx->publishing && ctx->stream->active) {
        ngx_rtmp_live_start(s);
    }
}

ngx_rtmp_live_get_stream就是创建对应流名的stream结构。

  • 1)根据流名hash先找对应的槽是否存在,同一个槽下面有多个stream。
  • 2)再判断对应流名的stream是否存在,找到立即返回。
  • 3)如果没找到,重新申请分配并初始化stream结构
static ngx_rtmp_live_stream_t **
ngx_rtmp_live_get_stream(ngx_rtmp_session_t *s, u_char *name, int create)
{
    ngx_rtmp_live_app_conf_t   *lacf;
    ngx_rtmp_live_stream_t    **stream;
    size_t                      len;

    lacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_live_module);
    if (lacf == NULL) {
        return NULL;
    }

    len = ngx_strlen(name);
    /* 根据流名hash到对应的槽,如果找到,再去stream->next链表当中匹配name是否一致,如果找到立即返回 */
    stream = &lacf->streams[ngx_hash_key(name, len) % lacf->nbuckets];

    for (; *stream; stream = &(*stream)->next) {
        if (ngx_strcmp(name, (*stream)->name) == 0) {
            return stream;
        }
    }

    /* create 为0 表示拉流,拉流情况没找到直接返回null */
    if (!create) {
        return NULL;
    }

    ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
            "live: create stream '%s'", name);

    if (lacf->free_streams) {
        *stream = lacf->free_streams;
        lacf->free_streams = lacf->free_streams->next;
    } else {
        *stream = ngx_palloc(lacf->pool, sizeof(ngx_rtmp_live_stream_t));
    }
    ngx_memzero(*stream, sizeof(ngx_rtmp_live_stream_t));
    ngx_memcpy((*stream)->name, name,
            ngx_min(sizeof((*stream)->name) - 1, len));
    (*stream)->epoch = ngx_current_msec;

    return stream;
}

play的过程也是类似,整体过程简单讲,就是先创建stream,然后再将当前ctx放到stream->ctx->next链表当中

流的分发

音视频流的数据分发主要是在 ngx_rtmp_live_av 函数当中,函数是在rtmp 模块 postconfiguration阶段注册的。rtmp有各种命令消息和数据消息,NGX_RTMP_MSG_AUDIO和

NGX_RTMP_MSG_VIDEO主要是音视频消息,在接受到来自推流端的数据之后,会根据不同消息类型,以及各模块注册的消息处理回调handler进入各模块的消息处理函数当中,本函数就是

live模块的音视频消息处理过程。核心逻辑就是将收到的每帧音视频数据实时转发给不同player连接,下面看看是怎么将数据转发的:

 static ngx_int_t
ngx_rtmp_live_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
                 ngx_chain_t *in)
{
    ·······

    lacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_live_module);
    if (lacf == NULL) {
        return NGX_ERROR;
    }
    /* 没打开live 配置或者收到数据为null, 说明不会处理数据分发,*/
    if (!lacf->live || in == NULL  || in->buf == NULL) {
        return NGX_OK;
    }

    ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_live_module);
    if (ctx == NULL || ctx->stream == NULL) {
        return NGX_OK;
    }
    /* 没有推流,不进行数据分发。有可能play先连接,publish后连接的过程  */
    if (ctx->publishing == 0) {
        ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
                       "live: %s from non-publisher", type_s);
        return NGX_OK;
    }

    if (!ctx->stream->active) {
        ngx_rtmp_live_start(s);
    }
    /* 添加推流一直没有数据来,加定时器超时断开机制 */
    if (ctx->idle_evt.timer_set) {
        ngx_add_timer(&ctx->idle_evt, lacf->idle_timeout);
    }

    s->current_time = h->timestamp;

    peers = 0;
    apkt = NULL;
    aapkt = NULL;
    acopkt = NULL;
    header = NULL;
    coheader = NULL;
    meta = NULL;
    meta_version = 0;
    mandatory = 0;

    prio = (h->type == NGX_RTMP_MSG_VIDEO ?
            ngx_rtmp_get_video_frame_type(in) : 0);

    cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);
    /* 这里其实可以看到音频和视频是分开处理的,也可以通过开启interleave配置放到一起处理 */
    csidx = !(lacf->interleave || h->type == NGX_RTMP_MSG_VIDEO);

    cs  = &ctx->cs[csidx];

    ngx_memzero(&ch, sizeof(ch));

    ch.timestamp = h->timestamp;
    ch.msid = NGX_RTMP_MSID;
    ch.csid = cs->csid;
    ch.type = h->type;

    lh = ch;
    /* 当前流处于活跃状态,更新上次同类型的帧时间戳,有可能是音频也可能是视频 */
    if (cs->active) {
        lh.timestamp = cs->timestamp;
    }

    clh = lh;
    clh.type = (h->type == NGX_RTMP_MSG_AUDIO ? NGX_RTMP_MSG_VIDEO :
                                                NGX_RTMP_MSG_AUDIO);

    cs->active = 1;
    cs->timestamp = ch.timestamp;

    delta = ch.timestamp - lh.timestamp;

/*  当前帧和上一同类型消息帧时间戳间隔过大处理
    if (delta >> 31) {
        ngx_log_debug2(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
                       "live: clipping non-monotonical timestamp %uD->%uD",
                       lh.timestamp, ch.timestamp);
        delta = 0;
        ch.timestamp = lh.timestamp;
    }
*/
    /* 给当前帧新分配内存 */
    rpkt = ngx_rtmp_append_shared_bufs(cscf, NULL, in);

    /* 将当前帧内容拷贝到rpkt当中,并且计算相对时间戳信息等  */
    ngx_rtmp_prepare_message(s, &ch, &lh, rpkt);

    codec_ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_codec_module);
    /* 以下括号当中主要是准备送 meta 信息,以及音视频头数据 */
    if (codec_ctx) {

        if (h->type == NGX_RTMP_MSG_AUDIO) {
            header = codec_ctx->aac_header;

            if (lacf->interleave) {
                coheader = codec_ctx->avc_header;
            }

            if (codec_ctx->audio_codec_id == NGX_RTMP_AUDIO_AAC &&
                ngx_rtmp_is_codec_header(in))
            {
                prio = 0;
                mandatory = 1;
            }

        } else {
            header = codec_ctx->avc_header;

            if (lacf->interleave) {
                coheader = codec_ctx->aac_header;
            }

            if (codec_ctx->video_codec_id == NGX_RTMP_VIDEO_H264 &&
                ngx_rtmp_is_codec_header(in))
            {
                prio = 0;
                mandatory = 1;
            }
        }

        if (codec_ctx->meta) {
            meta = codec_ctx->meta;
            meta_version = codec_ctx->meta_version;
        }
    }

    /* broadcast to all subscribers  整个for循环当中是将数据推流的每帧数据转发给所有play连接 
     *  分发过程:从stream的ctx链表当中循环遍历获取每个连接的ctx信息,然后进行转发
     */
    for (pctx = ctx->stream->ctx; pctx; pctx = pctx->next) {
        if (pctx == ctx || pctx->paused) {
            continue;
        }

        ss = pctx->session;
        cs = &pctx->cs[csidx];

        /* send metadata */

        if (meta && meta_version != pctx->meta_version) {
            ngx_log_debug0(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0,
                           "live: meta");

            if (ngx_rtmp_send_message(ss, meta, 0) == NGX_OK) {
                pctx->meta_version = meta_version;
            }
        }

        /* sync stream  丢包超过一定事件的时候,重新发送音视频头,sync默认100ms */

        if (cs->active && (lacf->sync && cs->dropped > lacf->sync)) {
            ngx_log_debug2(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0,
                           "live: sync %s dropped=%uD", type_s, cs->dropped);

            cs->active = 0;
            cs->dropped = 0;
        }

        /* absolute packet */

        if (!cs->active) {

            if (mandatory) {
                ngx_log_debug0(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0,
                               "live: skipping header");
                continue;
            }
            /* 如果开启了,wait_video 开关,等视频数据来了再开始发送 有可能音频数据先到 */
            if (lacf->wait_video && h->type == NGX_RTMP_MSG_AUDIO &&
                !pctx->cs[0].active)
            {
                ngx_log_debug0(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0,
                               "live: waiting for video");
                continue;
            }
            /* 如果开启了wait_key 开关,等待关键帧来了,才开始发送音视频消息并且先发送关键帧 */
            if (lacf->wait_key && prio != NGX_RTMP_VIDEO_KEY_FRAME &&
               (lacf->interleave || h->type == NGX_RTMP_MSG_VIDEO))
            {
                ngx_log_debug0(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0,
                               "live: skip non-key");
                continue;
            }

            dummy_audio = 0;
            if (lacf->wait_video && h->type == NGX_RTMP_MSG_VIDEO &&
                !pctx->cs[1].active)
            {
                dummy_audio = 1;
                /* 制造一个假的音频包 */
                if (aapkt == NULL) {
                    aapkt = ngx_rtmp_alloc_shared_buf(cscf);
                    ngx_rtmp_prepare_message(s, &clh, NULL, aapkt);
                }
            }

            /* coheader 是处理音频和视频放在同一个chunk通道里面发送的情况 */
            if (header || coheader) {

                /* send absolute codec header */

                ngx_log_debug2(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0,
                               "live: abs %s header timestamp=%uD",
                               type_s, lh.timestamp);
                /* 发送音视频头 */
                if (header) {
                    if (apkt == NULL) {
                        apkt = ngx_rtmp_append_shared_bufs(cscf, NULL, header);
                        ngx_rtmp_prepare_message(s, &lh, NULL, apkt);
                    }

                    rc = ngx_rtmp_send_message(ss, apkt, 0);
                    if (rc != NGX_OK) {
                        continue;
                    }
                }

                if (coheader) {
                    if (acopkt == NULL) {
                        acopkt = ngx_rtmp_append_shared_bufs(cscf, NULL, coheader);
                        ngx_rtmp_prepare_message(s, &clh, NULL, acopkt);
                    }

                    rc = ngx_rtmp_send_message(ss, acopkt, 0);
                    if (rc != NGX_OK) {
                        continue;
                    }
                /* 这里不知道是不是为了处理特定场景时间戳信息 目前暂时不太理解 */
                } else if (dummy_audio) {
                    ngx_rtmp_send_message(ss, aapkt, 0);
                }

                cs->timestamp = lh.timestamp;
                cs->active = 1;
                ss->current_time = cs->timestamp;

            } else {

                /* send absolute packet  音视频头为空的情况下,继续发送当前音视频帧 */

                ngx_log_debug2(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0,
                               "live: abs %s packet timestamp=%uD",
                               type_s, ch.timestamp);

                if (apkt == NULL) {
                    apkt = ngx_rtmp_append_shared_bufs(cscf, NULL, in);
                    ngx_rtmp_prepare_message(s, &ch, NULL, apkt);
                }

                rc = ngx_rtmp_send_message(ss, apkt, prio);
                if (rc != NGX_OK) {
                    continue;
                }

                cs->timestamp = ch.timestamp;
                cs->active = 1;
                ss->current_time = cs->timestamp;

                ++peers;

                if (dummy_audio) {
                    ngx_rtmp_send_message(ss, aapkt, 0);
                }

                continue;
            }
        }

        /* 发送当前音视频帧,这个函数返回NGX_OK 和NGX_AGAIN
         * NGX_AGAIN 表示服务器主动当前帧丢弃,NGX_OK表示发送成功
         */
        if (ngx_rtmp_send_message(ss, rpkt, prio) != NGX_OK) {
            ++pctx->ndropped;
            /* 统计丢包的时长间隔 */
            cs->dropped += delta;

            if (mandatory) {
                ngx_log_debug0(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0,
                               "live: mandatory packet failed");
                ngx_rtmp_finalize_session(ss);
            }

            continue;
        }
        /* 更新当前音视频帧的上一次时间戳信息, */
        cs->timestamp += delta;
        ++peers;
        ss->current_time = cs->timestamp;
    }

    /* 回收rpkt/apkt//aapkt到内存池当中 */
    if (rpkt) {
        ngx_rtmp_free_shared_chain(cscf, rpkt);
    }

    if (apkt) {
        ngx_rtmp_free_shared_chain(cscf, apkt);
    }

    if (aapkt) {
        ngx_rtmp_free_shared_chain(cscf, aapkt);
    }

    if (acopkt) {
        ngx_rtmp_free_shared_chain(cscf, acopkt);
    }

    /* 更新输入带宽值 */
    ngx_rtmp_update_bandwidth(&ctx->stream->bw_in, h->mlen);
    /* 更新输出带宽值,peers是播放连接个数 */
    ngx_rtmp_update_bandwidth(&ctx->stream->bw_out, h->mlen * peers);

    /* 分别按音频和视频计算带宽值 */
    ngx_rtmp_update_bandwidth(h->type == NGX_RTMP_MSG_AUDIO ?
                              &ctx->stream->bw_in_audio :
                              &ctx->stream->bw_in_video,
                              h->mlen);

    return NGX_OK;
}

看完以上函数之后,发现其实逻辑也很简单,只是live模块的相关配置比较多,里面各种if…else判断条件增加了复杂度。

再总结一下整体分发过程:

  • 1)先未当前帧分配内存,按rtmp协议包格式封装好当前帧
  • 2)for循环通过ctx->stream->ctx遍历找到每个播放链接
  • 3)然后先发送meta信息,音视频头数据
  • 4)发送当前音视频帧信息
  • 5)回收为当前帧分配的内存(当然这个回收其实是放到一个内存池里面)
    过程当中也有一些其他情况的处理,例如按先发送视频帧,关键帧,丢帧超过一定时长重新发送音视频头等

流数据资源回收

流数据资源回收主要是在讲当前ctx从stream移除,将当前流的stream从链表当中移除

 static ngx_int_t
ngx_rtmp_live_close_stream(ngx_rtmp_session_t *s, ngx_rtmp_close_stream_t *v)
{
    ngx_rtmp_session_t             *ss;
    ngx_rtmp_live_ctx_t            *ctx, **cctx, *pctx;
    ngx_rtmp_live_stream_t        **stream;
    ngx_rtmp_live_app_conf_t       *lacf;

    lacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_live_module);
    if (lacf == NULL) {
        goto next;
    }

    ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_live_module);
    if (ctx == NULL) {
        goto next;
    }
    /* 以上主要避免重新释放的过程 */
    if (ctx->stream == NULL) {
        ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
                       "live: not joined");
        goto next;
    }

    /* 删除推流的过程,先给stream结构关闭操作,给publishing,publish_ctx变量赋null操作,说明没有推流 */
    if (ctx->stream->publishing && ctx->publishing) {
        ctx->stream->publishing = 0;
        ctx->stream->publish_ctx = NULL;
    }

    /* 将当前ctx 从ctx->stream->ctx链表中移除 */
    for (cctx = &ctx->stream->ctx; *cctx; cctx = &(*cctx)->next) {
        if (*cctx == ctx) {
            *cctx = ctx->next;
            break;
        }
    }

    /* 如果当前是推理,给对端发送rtmp stop消息 */
    if (ctx->publishing || ctx->stream->active) {
        ngx_rtmp_live_stop(s);
    }
    /* 以下主要处理不开启idle_stream的情况下,close的是推流关闭所有拉流的情况 */
    if (ctx->publishing) {
        ngx_rtmp_send_status(s, "NetStream.Unpublish.Success",
                             "status", "Stop publishing");
        if (!lacf->idle_streams) {
            for (pctx = ctx->stream->ctx; pctx; pctx = pctx->next) {
                if (pctx->publishing == 0) {
                    ss = pctx->session;
                    ngx_log_debug0(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0,
                                   "live: no publisher");
                    ngx_rtmp_finalize_session(ss);
                }
            }
        }
    }
    /* 给当前ctx的stream指针变量赋空 */
    if (ctx->stream->ctx || ctx->stream->publish_ctx) {
        ctx->stream = NULL;
        goto next;
    }

    ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
                   "live: delete empty stream '%s'",
                   ctx->stream->name);

    stream = ngx_rtmp_live_get_stream(s, ctx->stream->name, 0);
    if (stream == NULL) {
        goto next;
    }
    /* 将当前流的stream节点从链表当中移除,放到free链表中循环利用  */
    *stream = (*stream)->next;

    ctx->stream->next = lacf->free_streams;
    lacf->free_streams = ctx->stream;
    ctx->stream = NULL;

    if (!ctx->silent && !ctx->publishing && !lacf->play_restart) {
        ngx_rtmp_send_status(s, "NetStream.Play.Stop", "status", "Stop live");
    }

next:
    return next_close_stream(s, v);
}

以上就是整篇文章的流组织,流数据分发,资源回收的整体分析。

另外注意play_restart这个配置,默认不开启,如果开启了表示在当前服务器会透传start和stop信息给下游或者客户端。

如果播放器收到rtmp stop消息后,有的播放器会断开播放连接,有的播放器先收到stop,再收到start会重新发起rtmp请求。看不同播放器处理。

本模块其他函数ngx_rtmp_live_start ngx_rtmp_live_stop ngx_rtmp_live_stream_begin ngx_rtmp_live_stream_eof这类是rtmp命令消息处理,比较简单不做分析

模块发现的问题:

拉流不断重新推流,无法拉到数据,需要进行纠正。当前这个纠正思路是在ngx_rtmp_live_av函数中,重推之后时间戳从0开始了。

主要修复逻辑计算音频和视频的时间间隔,音频间隔根据采样率计算,视频根据帧率计算

 /* fix republish timestamp  */
 codec_ctx = ngx_rtmp_get_module_ctx(s,ngx_rtmp_codec_module);
 if (codec_ctx && ((int32_t)h->timestamp < 0 ||h->timestamp < cs->timestamp)) {

     if (cs->timestamp && h->timestamp == 0) {
        ch.timestamp = cs->timestamp;
     }
     else if (h->type == NGX_RTMP_MSG_VIDEO) {
         if (codec_ctx->frame_rate > 0) {
             /* 计算视频间隔 */
             ch.timestamp = cs->timestamp + 1000/codec_ctx->frame_rate;
         } else {
             ch.timestamp = cs->timestamp;
         }
         /* 当前视频帧时间比上次音频时间还小,纠正 */
         if (ch.timestamp < ctx->cs[1].timestamp) {
             ch.timestamp = ctx->cs[1].timestamp;
         }
     }
     else if (h->type == NGX_RTMP_MSG_AUDIO){

         if (codec_ctx->sample_rate &&codec_ctx->sample_size) {
             /* 计算音频间隔 */
             ch.timestamp = cs->timestamp + 1024 * 1000/codec_ctx->sample_rate;
         } else {
             ch.timestamp = cs->timestamp;
         }
          /* 当前音频帧时间比上次视频帧时间还小,纠正 */
         if (ch.timestamp < ctx->cs[0].timestamp) {
             ch.timestamp = ctx->cs[0].timestamp;
         }
     }
 }

猜你喜欢

转载自blog.csdn.net/wu5215080/article/details/90755763