deliver({message, #message{payload = Payload}},PState) ->
send(#tcp_packet_datatrans{data = Payload,length = byte_size(Payload)}, PState);
deliver({connack, Code, Msg}, PState) ->
send(#tcp_packet_connack{code = Code, msg = Msg},PState);
deliver(pong, PState) ->send(#tcp_packet_pong{}, PState);
deliver(Delivery, _PState) ->{error, {not_supported_delivery, Delivery}}.
send(Packet,PState = #pstate{proto_ver = Ver, sendfun = Send}) ->
case Send(Packet, #{version => Ver}) of
ok ->
trace(send, Packet),
{ok, PState};
{ok, Data} ->
trace(send, Packet),
NPState = maybe_gc_and_check_oom(iolist_size(Data), PState),
{ok,
inc_stats(send,
begin
case tuple_to_list(Packet) of
[tcp_packet_conn | _] -> conn;
[tcp_packet_connack | _] -> connack;
[tcp_packet_datatrans | _] -> datatrans;
[tcp_packet_ping | _] -> ping;
[tcp_packet_pong | _] -> pong;
[tcp_packet_disconn | _] -> disconn
end
end,
NPState)};
{error, Reason} -> {error, Reason}
end.
trace(recv, Packet) ->
begin
logger:log(debug,
#{},
#{report_cb =>
fun (_) ->
{'$logger_header'() ++ "RECV ~s",
[emqx_tcp_frame:format(Packet)]}
end,
mfa => {emqx_tcp_protocol, trace, 2}, line => 292})
end;
trace(send, Packet) ->
begin
logger:log(debug,
#{},
#{report_cb =>
fun (_) ->
{'$logger_header'() ++ "SEND ~s",
[emqx_tcp_frame:format(Packet)]}
end,
mfa => {emqx_tcp_protocol, trace, 2}, line => 294})
end.
maybe_gc_and_check_oom(_Oct,
PState = #pstate{gc_state = undefined}) ->
PState;
maybe_gc_and_check_oom(Oct,
PState = #pstate{gc_state = GCSt}) ->
{IsGC, GCSt1} = emqx_gc:run(1, Oct, GCSt),
if IsGC ->
Policy = application:get_env(emqx_tcp,
force_shutdown_policy,
undefined),
case emqx_oom:check(emqx_oom:init(Policy)) of
ok -> ok;
Shutdown -> self() ! Shutdown
end;
true -> ok
end,
PState#pstate{gc_state = GCSt1}.
-compile({inline, [{run_hooks, 2}, {run_hooks, 3}]}).
run_hooks(Name, Args) ->
ok = emqx_metrics:inc(Name),
emqx_hooks:run(Name, Args).
run_hooks(Name, Args, Acc) ->
ok = emqx_metrics:inc(Name),
emqx_hooks:run_fold(Name, Args, Acc).
inc_stats(recv, Type,PState = #pstate{recv_stats = Stats}) ->
PState#pstate{recv_stats = inc_stats(Type, Stats)};
inc_stats(send, Type,PState = #pstate{send_stats = Stats}) ->
PState#pstate{send_stats = inc_stats(Type, Stats)}.
inc_stats(Type,Stats = #{pkt := PktCnt, msg := MsgCnt}) ->
Stats#{pkt := PktCnt + 1,
msg := case Type =:= datatrans of
true -> MsgCnt + 1;
false -> MsgCnt
end}.
to_binary(A) when is_atom(A) -> atom_to_binary(A, utf8);
to_binary(B) when is_binary(B) -> B.
emqx私有tcp协议服务器开发---emqx_tcp_protocol模块(3)
猜你喜欢
转载自blog.csdn.net/qq513036862/article/details/110315928
今日推荐
周排行