-module(emqx_backend_mongo_cli).
-behaviour(ecpool_worker).
-include("../include/emqx.hrl").
-include("../include/mongo_protocol.hrl").
-export([client_connected/2,
subscribe_lookup/2,
client_disconnected/2,
message_fetch/3,
lookup_retain/2,
acked_delete/2,
message_publish/3,
message_store/3,
message_retain/3,
delete_retain/2,
message_acked/2]).
-export([mongo_insert/3,
mongo_query/3,
mongo_insert_update/4,
mongo_delete/3,
record_to_msg/1,
maps_to_binary/1]).
-export([connect/1]).
-vsn("4.2.1").
client_connected(Pool, Msg) ->
Selector = [<<"clientid">>],
Docs = [<<"state">>, 1, <<"node">>, node(), <<"online_at">>, erlang:system_time(millisecond), <<"offline_at">>, null],
mongo_insert_update(Pool, <<"mqtt_client">>, feed_var(Selector, Msg), Docs),
ok.
subscribe_lookup(Pool, Msg) ->
Selector = [<<"clientid">>],
case mongo_query(Pool,
<<"mqtt_sub">>,
feed_var(Selector, Msg))
of
{ok, []} -> [];
{ok, Rows} ->
[{maps:get(<<"topic">>, Map),
#{qos => trunc(maps:get(<<"qos">>, Map, 0))}}
|| Map <- Rows];
{error, Error} ->
logger:error("Lookup subscription error: ~p", [Error]),
[]
end.
client_disconnected(Pool, Msg) ->
Selector = [<<"clientid">>],
Docs = [<<"state">>,
0,
<<"offline_at">>,
erlang:system_time(millisecond)],
mongo_insert_update(Pool,
<<"mqtt_client">>,
feed_var(Selector, Msg),
Docs),
ok.
message_fetch(Pool, Msg, Opts) ->
Topic = proplists:get_value(topic, Msg),
ClientId = proplists:get_value(clientid, Msg),
Selector0 = [<<"clientid">>, <<"topic">>],
case mongo_query_for_field(Pool,
<<"mqtt_acked">>,
feed_var(Selector0, Msg),
<<"mongo_id">>)
of
undefined ->
Selector1 = #{<<"$query">> => {<<"topic">>, Topic},
<<"$orderby">> => #{<<"id">> => -1}},
Mid = case ecpool:with_client(Pool,
fun (C) ->
mongo_api:find(C,
<<"mqtt_msg">>,
Selector1,
#{},
0,
1)
end)
of
[] -> 0;
{ok, RPid} when is_pid(RPid) ->
[Result] = mc_cursor:next_batch(RPid),
mc_cursor:close(RPid),
maps:get(<<"id">>, Result);
_ -> 0
end,
mongo_insert(Pool,
<<"mqtt_acked">>,
[<<"clientid">>,
ClientId,
<<"topic">>,
Topic,
<<"mongo_id">>,
Mid]),
[];
AckId ->
Selector2 = case proplists:get_value(time_range, Opts)
of
undefined ->
[<<"id">>,
{<<"$gt">>, AckId},
<<"topic">>,
Topic];
TimeRange ->
Time = (erlang:system_time(seconds) - TimeRange)
* 1000,
[<<"id">>,
{<<"$gt">>, AckId},
<<"topic">>,
Topic,
<<"arrived">>,
{<<"$gte">>, Time}]
end,
Selector3 = #{<<"$query">> => list_to_tuple(Selector2),
<<"$orderby">> => #{<<"id">> => -1}},
{Fun, Function} = case
proplists:get_value(max_returned_count, Opts)
of
undefined ->
{fun (C) ->
mongo_api:find(C,
<<"mqtt_msg">>,
Selector3,
#{})
end,
rest};
MaxReturnedCount ->
{fun (C) ->
mongo_api:find(C,
<<"mqtt_msg">>,
Selector3,
#{},
0,
MaxReturnedCount)
end,
next_batch}
end,
case ecpool:with_client(Pool, Fun) of
[] -> [];
{error, Error} ->
logger:error("Lookup message error: ~p", [Error]),
[];
{ok, Cursor} when is_pid(Cursor) ->
Rows = mc_cursor:Function(Cursor),
mc_cursor:close(Cursor),
[begin
M =
record_to_msg(maps:to_list(maps:update(<<"payload">>,
maps_to_binary(maps:get(<<"payload">>,
Row)),
Row))),
M#message{id = from_b62(M#message.id)}
end
|| Row <- lists:reverse(Rows)]
end
end.
maps_to_binary(Payload) ->
case Payload of
Payload when is_binary(Payload) -> Payload;
_ ->
try emqx_json:encode(Payload) catch
C:E ->
logger:error("Maps to Binary error: ~p ~p .~n", [C, E])
end
end.
lookup_retain(Pool, Msg0) ->
Topic = proplists:get_value(topic, Msg0),
Selector = [<<"topic">>],
case mongo_query(Pool,
<<"mqtt_retain">>,
feed_var(Selector, [{topic, Topic}]))
of
{ok, []} -> [];
{ok, [Row | _]} ->
Msg = record_to_msg(maps:to_list(Row)),
[Msg#message{flags = #{retain => true},
id = from_b62(Msg#message.id)}];
{error, Error} ->
logger:error("Lookup retain error: ~p", [Error]),
[]
end.
acked_delete(Pool, Msg) ->
Selector = [<<"clientid">>, <<"topic">>],
mongo_delete(Pool,
<<"mqtt_acked">>,
feed_var(Selector, Msg)).
message_publish(Pool, #message{payload = Payload} = Msg,
PayloadFormat) ->
FormatedPayload = payload_format_transfer(PayloadFormat,
Payload),
Id = gen_auto_increment(Pool, <<"mqtt_msg">>),
Selector = [<<"topic">>,
<<"msgid">>,
<<"sender">>,
<<"qos">>,
<<"retain">>,
<<"payload">>,
<<"timestamp">>],
case mongo_insert(Pool,
<<"mqtt_msg">>,
feed_var(Selector,
Msg#message{payload = FormatedPayload})
++ [<<"id">>, Id])
of
true -> Msg;
false -> Msg
end.
message_store(Pool, #message{payload = Payload} = Msg,
PayloadFormat) ->
FormatedPayload = payload_format_transfer(PayloadFormat,
Payload),
Selector = [<<"topic">>,
<<"msgid">>,
<<"sender">>,
<<"qos">>,
<<"retain">>,
<<"payload">>,
<<"timestamp">>],
case mongo_insert(Pool,
<<"mqtt_msg">>,
feed_var(Selector,
Msg#message{payload = FormatedPayload}))
of
true -> Msg;
false -> Msg
end.
message_retain(Pool, #message{payload = Payload} = Msg,
PayloadFormat) ->
FormatedPayload = payload_format_transfer(PayloadFormat,
Payload),
Selector = [<<"topic">>],
Docs = [<<"msgid">>,
<<"sender">>,
<<"qos">>,
<<"payload">>,
<<"timestamp">>],
case mongo_insert_update(Pool,
<<"mqtt_retain">>,
feed_var(Selector,
Msg#message{payload = FormatedPayload}),
feed_var(Docs, Msg))
of
true -> Msg;
false -> Msg
end.
payload_format_transfer(PayloadFormat, Payload) ->
case PayloadFormat of
mongo_json ->
try emqx_json:decode(Payload, [return_maps]) catch
_:_ -> Payload
end;
_plain_text -> Payload
end.
delete_retain(Pool, Msg) ->
Selector = [<<"topic">>],
mongo_delete(Pool,
<<"mqtt_retain">>,
feed_var(Selector, Msg)),
Msg.
message_acked(Pool, Msg) ->
Selector = [<<"clientid">>, <<"topic">>],
Docs = [<<"clientid">>, <<"topic">>, <<"mongo_id">>],
mongo_insert_update(Pool,
<<"mqtt_acked">>,
feed_var(Selector, Msg),
feed_var(Docs, Msg)),
ok.
feed_var(Params, Msg) -> feed_var(Params, Msg, []).
feed_var([], _Msg, Acc) -> lists:reverse(Acc);
feed_var([<<"topic">> | Params],
Msg = #message{topic = Topic}, Acc) ->
feed_var(Params, Msg, [Topic, <<"topic">> | Acc]);
feed_var([<<"topic">> | Params], Vals, Acc)
when is_list(Vals) ->
feed_var(Params,
Vals,
[proplists:get_value(topic, Vals, null), <<"topic">>
| Acc]);
feed_var([<<"msgid">> | Params],
Msg = #message{id = MsgId}, Acc) ->
feed_var(Params,
Msg,
[to_b62(MsgId), <<"msgid">> | Acc]);
feed_var([<<"msgid">> | Params], Vals, Acc)
when is_list(Vals) ->
feed_var(Params,
Vals,
[to_b62(proplists:get_value(msgid, Vals, null)),
<<"msgid">>
| Acc]);
feed_var([<<"mongo_id">> | Params], Vals, Acc)
when is_list(Vals) ->
feed_var(Params,
Vals,
[proplists:get_value(mongo_id, Vals, null),
<<"mongo_id">>
| Acc]);
feed_var([<<"sender">> | Params],
Msg = #message{from = From}, Acc) ->
ClientId = to_binary(From),
feed_var(Params, Msg, [ClientId, <<"sender">> | Acc]);
feed_var([<<"sender">> | Params], Vals, Acc)
when is_list(Vals) ->
ClientId = to_binary(proplists:get_value(clientid,
Vals,
null)),
feed_var(Params, Vals, [ClientId, <<"sender">> | Acc]);
feed_var([<<"clientid">> | Params],
Msg = #message{from = From}, Acc) ->
ClientId = to_binary(From),
feed_var(Params, Msg, [ClientId, <<"clientid">> | Acc]);
feed_var([<<"clientid">> | Params], Vals, Acc)
when is_list(Vals) ->
ClientId = to_binary(proplists:get_value(clientid,
Vals,
null)),
feed_var(Params,
Vals,
[ClientId, <<"clientid">> | Acc]);
feed_var([<<"qos">> | Params],
Msg = #message{qos = Qos}, Acc) ->
feed_var(Params, Msg, [Qos, <<"qos">> | Acc]);
feed_var([<<"qos">> | Params], Vals, Acc)
when is_list(Vals) ->
feed_var(Params,
Vals,
[proplists:get_value(qos, Vals, null), <<"qos">>
| Acc]);
feed_var([<<"retain">> | Params],
Msg = #message{flags = #{retain := Retain}}, Acc) ->
feed_var(Params, Msg, [i(Retain), <<"retain">> | Acc]);
feed_var([<<"payload">> | Params],
Msg = #message{payload = Payload}, Acc) ->
feed_var(Params, Msg, [Payload, <<"payload">> | Acc]);
feed_var([<<"timestamp">> | Params],
Msg = #message{timestamp = Ts}, Acc) ->
feed_var(Params, Msg, [Ts, <<"arrived">> | Acc]);
feed_var([_ | Params], Msg, Acc) ->
feed_var(Params, Msg, [null | Acc]).
i(true) -> 1;
i(false) -> 0.
b(0) -> false;
b(1) -> true.
a2b(A) -> erlang:atom_to_binary(A, utf8).
to_binary(From) when is_atom(From) -> a2b(From);
to_binary(From) when is_binary(From) -> From.
record_to_msg(Record) ->
record_to_msg(Record, #message{headers = #{}}).
record_to_msg([], Msg) -> Msg;
record_to_msg([{<<"id">>, Id} | Record], Msg) ->
record_to_msg(Record,
emqx_message:set_header(mongo_id, Id, Msg));
record_to_msg([{<<"msgid">>, MsgId} | Record], Msg) ->
record_to_msg(Record, Msg#message{id = MsgId});
record_to_msg([{<<"topic">>, Topic} | Record], Msg) ->
record_to_msg(Record, Msg#message{topic = Topic});
record_to_msg([{<<"sender">>, Sender} | Record], Msg) ->
record_to_msg(Record, Msg#message{from = Sender});
record_to_msg([{<<"qos">>, Qos} | Record], Msg) ->
record_to_msg(Record, Msg#message{qos = Qos});
record_to_msg([{<<"retain">>, R} | Record], Msg) ->
record_to_msg(Record,
Msg#message{flags = #{retain => b(R)}});
record_to_msg([{<<"payload">>, Payload} | Record],
Msg) ->
record_to_msg(Record, Msg#message{payload = Payload});
record_to_msg([{<<"arrived">>, Arrived} | Record],
Msg) ->
record_to_msg(Record, Msg#message{timestamp = Arrived});
record_to_msg([_ | Record], Msg) ->
record_to_msg(Record, Msg).
to_b62(MsgId) -> emqx_guid:to_base62(MsgId).
from_b62(MsgId) -> emqx_guid:from_base62(MsgId).
connect(Opts) ->
Type = proplists:get_value(type, Opts, single),
Hosts = proplists:get_value(hosts, Opts, []),
Options = proplists:get_value(options, Opts, []),
WorkerOptions = proplists:get_value(worker_options,
Opts,
[]),
mongo_api:connect(Type, Hosts, Options, WorkerOptions).
mongo_insert(Pool, Collection, Selector0) ->
logger:debug("mongo_insert Collection:~p, Selector:~p",
[Collection, Selector0]),
Selector = list_to_tuple(Selector0),
case ecpool:with_client(Pool,
fun (C) -> mongo_api:insert(C, Collection, Selector)
end)
of
{
{true, Map}, _} ->
case maps:get(<<"n">>, Map) > 0 of
true -> true;
false ->
logger:error("Failed to store message: ~p", [Map]),
false
end;
{
{false, Map}, _} ->
logger:error("Failed to store message: ~p", [Map]),
false
end.
mongo_delete(Pool, Collection, Selector0) ->
logger:debug("mongo_delete Collection:~p, Selector:~p",
[Collection, Selector0]),
Selector = list_to_tuple(Selector0),
case ecpool:with_client(Pool,
fun (C) -> mongo_api:delete(C, Collection, Selector)
end)
of
{true, Map} ->
case maps:get(<<"n">>, Map) > 0 of
true -> true;
false ->
logger:error("Failed to delete: ~p", [Map]),
false
end;
{false, Map} ->
logger:error("Failed to delete: ~p", [Map]),
false
end.
mongo_query(Pool, Collection, Selector0) ->
logger:debug("mongo_query Collection:~p, Selector:~p",
[Collection, Selector0]),
Selector = list_to_tuple(Selector0),
case ecpool:with_client(Pool,
fun (C) ->
mongo_api:find(C, Collection, Selector, #{})
end)
of
{ok, Cursor} when is_pid(Cursor) ->
Result = case mc_cursor:rest(Cursor) of
error -> {error, mc_curosr_reset_failed};
Result1 -> {ok, Result1}
end,
mc_cursor:close(Cursor),
logger:debug("query result is: ~p", [Result]),
Result;
[] -> {ok, []};
_Err -> {error, _Err}
end.
mongo_query_for_field(Pool, Collection, Selector0,
Field) ->
logger:debug("mongo_query Collection:~p, Selector:~p",
[Collection, Selector0]),
Selector = list_to_tuple(Selector0),
Result = ecpool:with_client(Pool,
fun (C) ->
mongo_api:find_one(C,
Collection,
Selector,
#{})
end),
logger:debug("query result is: ~p", [Result]),
case Result of
undefined -> undefined;
#{} -> maps:get(Field, Result)
end.
gen_auto_increment(Pool, Table) ->
Docs = {<<"$inc">>, {<<"id">>, 1}},
Selector = {<<"name">>, Table},
case ecpool:with_client(Pool,
fun (C) ->
mongo_api:update(C,
<<"mongo_id">>,
Selector,
Docs,
#{upsert => true,
multi => true})
end)
of
{true, Map} ->
case maps:get(<<"n">>, Map) > 0 of
true ->
Map1 = ecpool:with_client(Pool,
fun (C) ->
mongo_api:find_one(C,
<<"mongo_id">>,
Selector,
#{})
end),
maps:get(<<"id">>, Map1);
false ->
logger:error("Failed to gen_auto_increment: ~p", [Map]),
error
end;
Error ->
logger:error("Table: ~p gen_auto_increment fail, error:~p",
[Table, Error]),
error
end.
mongo_insert_update(Pool, Collection, Selector0,
Docs0) ->
Docs = {<<"$set">>, list_to_tuple(Docs0)},
Selector = list_to_tuple(Selector0),
logger:debug("Mongo_insert_update Collection:~p, Selector:~"
"p, Docs:~p",
[Collection, Selector, Docs]),
case ecpool:with_client(Pool,
fun (C) ->
mongo_api:update(C,
Collection,
Selector,
Docs,
#{upsert => true,
multi => true})
end)
of
{true, Map} ->
case maps:get(<<"n">>, Map) > 0 of
true -> true;
false ->
logger:error("Failed to insert update~p: ~p",
[Collection, Map]),
false
end;
{false, Map} ->
logger:error("Failed to ~p: ~p", [Collection, Map]),
false
end.
EMQX消息存储MongoDB源码分析(3)
猜你喜欢
转载自blog.csdn.net/qq513036862/article/details/112962388
今日推荐
周排行