ngx_rtmp_control_module 模块解读

1、模块功能描述

本模块主要提供了外部接口控制rtmp流状态的接口,通过这些接口进行一些断流,录制,流名重定向等以下接口:

  • record 录制功能
  • drop 断流功能,涉黄涉暴力的话可以通过这个方式断开服务端的推流
  • redirect 对推拉流进行重定向功能

1.1 配置示例

http {
    server {
        listen       88;
        server_name  localhost;

        location /control {
            rtmp_control all; #可以配置all/record/redirect/drop
        }
    }
}

rtmp {
    server {
    listen 1935;

    application test {
        live on;
        recorder rec1 {
           record all manual;
           record_suffix all.flv;
           record_path /tmp/rec;
           record_unique on;
       }
   }
}

1.2 请求格式:

1.3 配置解析

开启本模块的控制功能,需要在http模块当中配置rtmp_control,在解析配置的时候配置回调handler(ngx_rtmp_control)主要注册 NGX_HTTP_CONTENT_PHASE阶段的handler

(ngx_rtmp_control_handler),该handler用来接受来自control一类的控制请求。

static char *
ngx_rtmp_control(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{
    ngx_http_core_loc_conf_t  *clcf;

    /* 注册接受请求的handler,该handler是处于NGX_HTTP_CONTENT_PHASE阶段 */
    clcf = ngx_http_conf_get_module_loc_conf(cf, ngx_http_core_module);
    clcf->handler = ngx_rtmp_control_handler;

    /* 解析配置rtmp_control 参数对应的值,all/record/drop/redirect */
    return ngx_conf_set_bitmask_slot(cf, cmd, conf);
}

2 源码剖析

下面主要分析接受来自外部control一类的http请求逻辑,以及相关数据结构剖析

2.1 主要数据结构分析

ngx_rtmp_control_masks是配置项名字和整型参数的转换,类似于key-value值,这是key是字符串,value是数字。

#define NGX_RTMP_CONTROL_ALL        0xff
#define NGX_RTMP_CONTROL_RECORD     0x01
#define NGX_RTMP_CONTROL_DROP       0x02
#define NGX_RTMP_CONTROL_REDIRECT   0x04

static ngx_conf_bitmask_t           ngx_rtmp_control_masks[] = {
    { ngx_string("all"),            NGX_RTMP_CONTROL_ALL       },
    { ngx_string("record"),         NGX_RTMP_CONTROL_RECORD    },
    { ngx_string("drop"),           NGX_RTMP_CONTROL_DROP      },
    { ngx_string("redirect"),       NGX_RTMP_CONTROL_REDIRECT  },
    { ngx_null_string,              0                          }
};

typedef struct {
    ngx_uint_t                      control;
} ngx_rtmp_control_loc_conf_t;

在本模块ngx_rtmp_control_loc_conf_t配置当中可以看到其实保存的是ngx_uint_t值,这个就是ngx_conf_bitmask_t做了一次转换

2.2 control主要逻辑分析

ngx_rtmp_control_handler函数主要是接受来自control类的请求处理,核心逻辑是根据请求的uri解析出是哪类一类的control请求,再分别进入各自对应的handler处理各自请求

  • ngx_rtmp_control_redirect 处理重定向请求
  • ngx_rtmp_control_drop 处理推拉流断开请求
  • ngx_rtmp_control_record 处理当前流的录制开启和结束请求
static ngx_int_t
ngx_rtmp_control_handler(ngx_http_request_t *r)
{
    ······
    /* 是否开启control配置,没开启立即返回 */
    llcf = ngx_http_get_module_loc_conf(r, ngx_rtmp_control_module);
    if (llcf->control == 0) {
        return NGX_DECLINED;
    }

    /* uri format: .../section/method?args */

    ngx_str_null(&section);
    ngx_str_null(&method);

    /* 主要解析是哪类请求,drop/record/redirect */
    for (n = r->uri.len; n; --n) {
        p = &r->uri.data[n - 1];

        if (*p != '/') {
            continue;
        }

        if (method.data) {
            section.data = p + 1;
            section.len  = method.data - section.data - 1;
            break;
        }

        method.data = p + 1;
        method.len  = r->uri.data + r->uri.len - method.data;
    }
    /* 当前模块上下文初始化 */
    ctx = ngx_pcalloc(r->pool, sizeof(ngx_rtmp_control_ctx_t));
    if (ctx == NULL) {
        return NGX_ERROR;
    }

    ngx_http_set_ctx(r, ctx, ngx_rtmp_control_module);

    if (ngx_array_init(&ctx->sessions, r->pool, 1, sizeof(void *)) != NGX_OK) {
        return NGX_ERROR;
    }

    ctx->method = method;
    /* 注意下面这块宏定义逻辑,主要是匹配控制请求和对应handler的匹配过程,这个宏定义的方式这样写我觉得主要为了避免了重复逻辑的代码,代码更简洁干净 */
#define NGX_RTMP_CONTROL_SECTION(flag, secname)                             \
    // 将上面uri中解析出来的section和secname进行匹配,进入匹配的handler当中处理
    if (llcf->control & NGX_RTMP_CONTROL_##flag &&                          \
        section.len == sizeof(#secname) - 1 &&                              \
        ngx_strncmp(section.data, #secname, sizeof(#secname) - 1) == 0)     \
    {                                                                       \
        /* 进入各自的control请求的hanlder处理逻辑当中,method是各个请求对应的方法,例如drop/publisher,这个method就是publisher这个值, 表示断开推流 */
        return ngx_rtmp_control_##secname(r, &method);                      \
    }

    NGX_RTMP_CONTROL_SECTION(RECORD, record);
    NGX_RTMP_CONTROL_SECTION(DROP, drop);
    NGX_RTMP_CONTROL_SECTION(REDIRECT, redirect);

#undef NGX_RTMP_CONTROL_SECTION

    return NGX_DECLINED;
}

2.3 drop请求的逻辑

以下按drop断流请求来分析,record和redirect过程流程和这个类似,drop断流请求主要逻辑在ngx_rtmp_control_drop当中,ngx_rtmp_control_drop函数主要解析各类控制请求的请求方法client/publisher,和http请求的结果响应,真正的断流是在ngx_rtmp_control_drop_handler当中

static ngx_int_t
ngx_rtmp_control_drop(ngx_http_request_t *r, ngx_str_t *method)
{
    ······

    /* 匹配断开哪一类对象(publisher/subscriber/client) */
    if (ctx->method.len == sizeof("publisher") - 1 &&
        ngx_memcmp(ctx->method.data, "publisher", ctx->method.len) == 0)
    {
        ctx->filter = NGX_RTMP_CONTROL_FILTER_PUBLISHER;

    } else if (ctx->method.len == sizeof("subscriber") - 1 &&
               ngx_memcmp(ctx->method.data, "subscriber", ctx->method.len)
               == 0)
    {
        ctx->filter = NGX_RTMP_CONTROL_FILTER_SUBSCRIBER;

    } else if (method->len == sizeof("client") - 1 &&
               ngx_memcmp(ctx->method.data, "client", ctx->method.len) == 0)
    {
        ctx->filter = NGX_RTMP_CONTROL_FILTER_CLIENT;

    } else {
        msg = "Undefined filter";
        goto error;
    }
    /* 前面解析了要断开哪个对象保存在ctx->filter当中,处理断流的逻辑在回调handler(ngx_rtmp_control_drop_handler)中 */
    msg = ngx_rtmp_control_walk(r, ngx_rtmp_control_drop_handler);
    if (msg != NGX_CONF_OK) {
        goto error;
    }

    /* 以下主要是处理http请求的结果响应过程 */

    len = NGX_INT_T_LEN;

    p = ngx_palloc(r->connection->pool, len);
    if (p == NULL) {
        return NGX_ERROR;
    }

    len = (size_t) (ngx_snprintf(p, len, "%ui", ctx->count) - p);

    r->headers_out.status = NGX_HTTP_OK;
    r->headers_out.content_length_n = len;

    b = ngx_calloc_buf(r->pool);
    if (b == NULL) {
        goto error;
    }

    b->start = b->pos = p;
    b->end = b->last = p + len;
    b->temporary = 1;
    b->last_buf = 1;

    ngx_memzero(&cl, sizeof(cl));
    cl.buf = b;

    ngx_http_send_header(r);

    return ngx_http_output_filter(r, &cl);

error:
    return NGX_HTTP_INTERNAL_SERVER_ERROR;
}

接下来在看ngx_rtmp_control_walk这个函数,主要在三个地方调用ngx_rtmp_control_drop、ngx_rtmp_control_record、ngx_rtmp_control_redirect来看看这个函数主要干啥了

static const char *
ngx_rtmp_control_walk(ngx_http_request_t *r, ngx_rtmp_control_handler_t h)
{
    ngx_rtmp_core_main_conf_t  *cmcf = ngx_rtmp_core_main_conf;

    ngx_str_t                   srv;
    ngx_uint_t                  sn, n;
    const char                 *msg;
    ngx_rtmp_session_t        **s;
    ngx_rtmp_control_ctx_t     *ctx;
    ngx_rtmp_core_srv_conf_t  **pcscf;

    sn = 0;
    /* http://xnj.com:80/control/drop/client?app=live&name=test  解析url问号后面的参数,srv是配置server块的序号,1,2等,分别代表srv在main配置文件中数组下表值 ,这个没有填写*/
    if (ngx_http_arg(r, (u_char *) "srv", sizeof("srv") - 1, &srv) == NGX_OK) {
        sn = ngx_atoi(srv.data, srv.len);
    }
    /* 超过数组下表长度表示没找到, 默认从0开始找 */
    if (sn >= cmcf->servers.nelts) {
        return "Server index out of range";
    }
    /* 通过数组下标直接找到指定位置的srv配置 */
    pcscf  = cmcf->servers.elts;
    pcscf += sn;

    /* 这个函数主要根据哪些请求的参数值找到指定的session值,然后保存到 ctx->sessions数组当中 */
    msg = ngx_rtmp_control_walk_server(r, *pcscf);
    if (msg != NGX_CONF_OK) {
        return msg;
    }

    ctx = ngx_http_get_module_ctx(r, ngx_rtmp_control_module);

    /* 最终就是从ctx->sessions数组当中取出session再去执行相应的操作行为,drop/record-start-stop/redirect */
    s = ctx->sessions.elts;
    for (n = 0; n < ctx->sessions.nelts; n++) {
        /* h这个回调是在ngx_rtmp_control_walk调用这个函数传参进来的,会进入各个对应的操作handler里面 */
        msg = h(r, s[n]);
        if (msg != NGX_CONF_OK) {
            return msg;
        }
    }

    return NGX_CONF_OK;
}

请求示例: http://xnj.com:80/control/drop/client?app=live&name=test&clientid=1

下面再来整体分析一下ngx_rtmp_control_walk_server这个函数,主要看函数调用堆栈

  • ngx_rtmp_control_walk_app 根据app的参数值去srv配置cscf->applications数组当中找到app配置
  • ngx_rtmp_control_walk_stream 根据流名name参数值,去live-app配置当中流数组当中lacf->streams[n]找到对应的流
  • ngx_rtmp_control_walk_session 根据参数clientid或者addr找到对应请求的session,并且放入到ctx->session数组当中

最终找到session之后,再回过来,去执行传进来的函数指针参数回调

ngx_rtmp_control_drop_handler 主要是找到session后,执行ngx_rtmp_finalize_session关闭操作

static const char *
ngx_rtmp_control_drop_handler(ngx_http_request_t *r, ngx_rtmp_session_t *s)
{
    ngx_rtmp_control_ctx_t  *ctx;

    ctx = ngx_http_get_module_ctx(r, ngx_rtmp_control_module);

    /* 找到指定session之后,直接关闭session */
    ngx_rtmp_finalize_session(s);

    ++ctx->count;

    return NGX_CONF_OK;
}

录制ngx_rtmp_control_record_handler 主要根据method参数是录制start还是录制end操作

static const char *
ngx_rtmp_control_record_handler(ngx_http_request_t *r, ngx_rtmp_session_t *s)
{
    ngx_int_t                    rc;
    ngx_str_t                    rec;
    ngx_uint_t                   rn;
    ngx_rtmp_control_ctx_t      *ctx;
    ngx_rtmp_core_app_conf_t    *cacf;
    ngx_rtmp_record_app_conf_t  *racf;

    cacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_core_module);
    racf = cacf->app_conf[ngx_rtmp_record_module.ctx_index];
    /* 根据请求参数rec 取出真实的录制机名字 */
    if (ngx_http_arg(r, (u_char *) "rec", sizeof("rec") - 1, &rec) != NGX_OK) {
        rec.len = 0;
    }
    /* 根据录制机名字去找录制对应的录制机id(rn),判断是否否存在 */
    rn = ngx_rtmp_record_find(racf, &rec);
    if (rn == NGX_CONF_UNSET_UINT) {
        return "Recorder not found";
    }

    ctx = ngx_http_get_module_ctx(r, ngx_rtmp_control_module);
    /* 解析record命令对应请求方法是开启start还是结束end */
    if (ctx->method.len == sizeof("start") - 1 &&
        ngx_strncmp(ctx->method.data, "start", ctx->method.len) == 0)
    {
        /* 开启直接打开文件,ngx_rtmp_record_node_av函数当中会判断文件是否打开来决定写还是不写,文件打开才会写 */
        rc = ngx_rtmp_record_open(s, rn, &ctx->path);

    } else if (ctx->method.len == sizeof("stop") - 1 &&
               ngx_strncmp(ctx->method.data, "stop", ctx->method.len) == 0)
    {   
        /* 关闭文件之后, 文件描述符无效,录制结束*/
        rc = ngx_rtmp_record_close(s, rn, &ctx->path);

    } else {
        return "Undefined method";
    }

    if (rc == NGX_ERROR) {
        return "Recorder error";
    }

    return NGX_CONF_OK;
}

ngx_rtmp_control_redirect_handler主要更改流名操作,然后再重新发起对应的操作

static const char *
ngx_rtmp_control_redirect_handler(ngx_http_request_t *r, ngx_rtmp_session_t *s)
{
    ngx_str_t                 name;
    ngx_rtmp_play_t           vplay;
    ngx_rtmp_publish_t        vpublish;
    ngx_rtmp_live_ctx_t      *lctx;
    ngx_rtmp_control_ctx_t   *ctx;
    ngx_rtmp_close_stream_t   vc;

    /* 根据newname参数找到对应新参数的流名 */
    if (ngx_http_arg(r, (u_char *) "newname", sizeof("newname") - 1, &name)
        != NGX_OK)
    {
        return "newname not specified";
    }

    if (name.len >= NGX_RTMP_MAX_NAME) {
        name.len = NGX_RTMP_MAX_NAME - 1;
    }

    ctx = ngx_http_get_module_ctx(r, ngx_rtmp_control_module);
    ctx->count++;

    ngx_memzero(&vc, sizeof(ngx_rtmp_close_stream_t));

    /* close_stream should be synchronous */
    ngx_rtmp_close_stream(s, &vc);

    lctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_live_module);

    /* 判断是推流还是拉流 */
    if (lctx && lctx->publishing) {
        /* publish */

        ngx_memzero(&vpublish, sizeof(ngx_rtmp_publish_t));

        ngx_memcpy(vpublish.name, name.data, name.len);

        ngx_rtmp_cmd_fill_args(vpublish.name, vpublish.args);
        /* 更改流名后,重新发起publish 操作,执行publish调用链 */
        if (ngx_rtmp_publish(s, &vpublish) != NGX_OK) {
            return "publish failed";
        }

    } else {
        /* play */

        ngx_memzero(&vplay, sizeof(ngx_rtmp_play_t));

        ngx_memcpy(vplay.name, name.data, name.len);

        ngx_rtmp_cmd_fill_args(vplay.name, vplay.args);
        /* 更改流名后,发起play操作执行play相关的调用链  */
        if (ngx_rtmp_play(s, &vplay) != NGX_OK) {
            return "play failed";
        }
    }

    return NGX_CONF_OK;
}

3 回顾总结

最终再回过头来总结一下:这个模块其实相对来说比较简单。整体流程如下

1)注册接受control请求的handler—ngx_rtmp_control_handler
2)收到来自外界的请求后,执行对应control命令请求,进入各自的handler过程
3)进入各自注册的ngx_rtmp_control_XXX_handler之前根据请求的srv-app-name等参数找到指定session放入数组。也就是ngx_rtmp_control_walk_XXX系列的函数
4)ngx_rtmp_control_walk函数在最终取到session再执行各自的handler执行各自的操作。

当然,这个模块还可以再扩展,可以自己再注册一些方法。例如:

  • query查询流的状态,判断流是否还存在之类的
  • log 修改日志级别等

猜你喜欢

转载自blog.csdn.net/wu5215080/article/details/90698783