1、本模块是建立tcp socket 连接进程模块,主要牵涉启动服务端socket服务,然后等待客户socket连接,然后针对每个进来的客户端进行进程分裂。
2、本模块牵涉到两个主要的模块emqx_tcp_protocol 协议 和 emqx_tcp_frame 数据帧
?2、提供统计不同的客户端socket的消息进出
3、查询客户socket的相关信息
4、客户端消息主题跨节点投递
5、提供禁止客户端连接,客户端限速等接口服务
-module(emqx_tcp_connection).
-export([logger_header/0]).
-behaviour(gen_statem).
-include("/emqx_tcp/include/emqx_tcp.hrl").
%% tcp连接报文
-record(tcp_packet_conn,
{client_id, keepalive, username, password, version}).
%% 连接回复报文
-record(tcp_packet_connack, {code, msg}).
%% tcp数据报文{长度,数据内容}
-record(tcp_packet_datatrans, {length, data}).
-record(tcp_packet_ping, {}).
-record(tcp_packet_pong, {}).
%% tcp连接断开报文
-record(tcp_packet_disconn, {}).
-include("/emqx/include/emqx.hrl").
%% 订阅数据结构{主题,订阅id,订阅配置}
-record(subscription, {topic, subid, subopts}).
%% 消息定义
-record(message,{
id :: binary(), %% 消息id
qos = 0,%% 消息质量
from :: atom() | binary(),%% 消息来自
flags = #{} :: emqx_types:flags(),
headers = #{} :: emqx_types:headers(),%% 消息头
topic :: emqx_types:topic(),%% 消息主题
payload :: emqx_types:payload(),%% 消息负载
timestamp :: integer()}).%% 消息时间戳
%% 消息投递{发送者进程id,消息内容}
-record(delivery,
{sender :: pid(), message :: #message{}}).
%% 消息路由{主题,目标节点}
-record(route,
{topic :: binary(),
dest :: node() | {binary(), node()}}).
%% 树节点ID
-type trie_node_id() :: binary() | atom().
%% 数据节点结构
-record(trie_node,{
node_id :: trie_node_id(), %% id
edge_count = 0 :: non_neg_integer(),%% 叶子数量
topic :: binary() | undefined, %% 主题
flags :: [atom()] | undefined}).
%% 叶子结构
-record(trie_edge,{
node_id :: trie_node_id(), %% 节点ID
word :: binary() | atom()}).%% 关键字
%% 树结构
-record(trie,{
edge :: #trie_edge{}, %% 叶子
node_id :: trie_node_id()}).%% 节点id
%% 插件定义
-record(plugin,{
name :: atom(), %% 插件名称
dir :: string() | undefined,%% 插件目录
descr :: string(),%% 插件描述
vendor :: string() | undefined,%%
active = false :: boolean(),%% 是否激活
info = #{} :: map(),%% 结构信息
type :: atom()}).%% 类型
%% 模块接收命令
-record(command,{
name :: atom(),%% 命令
action :: atom(),%% 动作
args = [] :: list(),%% 参数
opts = [] :: list(),%% 选项
usage :: string(),
descr :: string()}).%% 描述
%% 模块禁止
-record(banned,{
who ::
{clientid, binary()} |{username, binary()} |
{ip_address, inet:ip_address()}, %% 通过clientid,ip,username来禁止连接
by :: binary(), %% 被谁禁止
reason :: binary(),%% 禁止原因
at :: integer(),%% 禁止时间点
until :: integer()}).%% 禁止多久
-export([start_link/3]).
-export([info/1, stats/1]).
-export([kick/1]).
-export([idle/3, connected/3]).
-export([init/1,
callback_mode/0,
code_change/4,
terminate/3]).
%% 进程状态
-record(state,{
transport,
socket,
peername,
sockname,
sockstate,
active_n,
pstate,
parse_state,
keepalive,
rate_limit,
limit_timer,
enable_stats,
stats_timer,
idle_timeout}).
%% 模块启动方法
start_link(Transport, Socket, Options) ->
{ok,
%% 分裂不同的连接服务进程,然后回调本模块的init方法
proc_lib:spawn_link(emqx_tcp_connection,
init,[{Transport, Socket, Options}])}.
%% 通过进程ID获取进程信息
-spec info(pid() | #state{}) -> map().
info(CPid) when is_pid(CPid) -> call(CPid, info);
info(State = #state{pstate = PState}) ->
%% 调用 emqx_tcp_protocol的info函数获取通道信息
ChanInfo = emqx_tcp_protocol:info(PState),
%% 通过不同的原子信息去获取socket 信息
SockInfo = maps:from_list(
info([socktype,peername,sockname,sockstate,active_n],State)),
ChanInfo#{sockinfo => SockInfo}.
info(Keys, State) when is_list(Keys) ->[{Key, info(Key, State)} || Key <- Keys];
info(socktype,#state{transport = Transport, socket = Socket}) ->
Transport:type(Socket);
info(peername, #state{peername = Peername}) -> Peername;
info(sockname, #state{sockname = Sockname}) -> Sockname;
%% socket 状态
info(sockstate, #state{sockstate = SockSt}) -> SockSt;
%% 一次接收多少个数据包
info(active_n, #state{active_n = ActiveN}) -> ActiveN;
%% 限速度
info(limiter, #state{rate_limit = RateLimit}) -> rate_limit_info(RateLimit).
由于篇幅原因,接下来持续分析emqx_tcp_connection模块的代码。