Skip to content

Commit

Permalink
Maybe charge for late packets (#995)
Browse files Browse the repository at this point in the history
* Maybe charge for late packets

* Add test for charge late packets
  • Loading branch information
macpie authored Aug 16, 2023
1 parent 94744bc commit 5c5f6c0
Show file tree
Hide file tree
Showing 8 changed files with 186 additions and 9 deletions.
3 changes: 3 additions & 0 deletions .env-template
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,6 @@ ROUTER_METRICS_PORT=3000
# The integer represents the first 7-bits of a DevAddr. NOT the Network ID.
# Change at your own discretion, this may lead to instability in assigning DevAddrs to devices.
ROUTER_DEVADDR_PREFIX=72

# Charge an org for a packet received late or replayed (default: false).
ROUTER_CHARGE_LATE_PACKETS=false
3 changes: 2 additions & 1 deletion config/sys.config.src
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@
{devaddr_allocate_resolution, "${ROUTER_DEVADDR_ALLOCATE_RESOLUTION}"},
{metrics_port, "${ROUTER_METRICS_PORT}"},
{denylist_keys, ["1SbEYKju337P6aYsRd9DT2k4qgK5ZK62kXbSvnJgqeaxK3hqQrYURZjL"]},
{denylist_url, "https://api.github.com/repos/helium/denylist/releases/latest"}
{denylist_url, "https://api.github.com/repos/helium/denylist/releases/latest"},
{charge_late_packets, "${ROUTER_CHARGE_LATE_PACKETS}"}
]},
{grpcbox, [
{client, #{
Expand Down
3 changes: 2 additions & 1 deletion config/test.config
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
skf_enabled => "true",
route_id => "test-route-id"
}},
{config_service_max_timeout_attempt, 5}
{config_service_max_timeout_attempt, 5},
{charge_late_packets, false}
]},
{blockchain, [
{port, 3615},
Expand Down
2 changes: 1 addition & 1 deletion src/apis/router_console_api.erl
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ get_unfunded_org_ids() ->
org_manual_update_router_dc(OrgID, Balance) ->
{Endpoint, Token} = token_lookup(),
Url = <<Endpoint/binary, "/api/router/organizations/manual_update_router_dc">>,
lager:debug("get ~p", [Url]),
lager:debug("post ~p", [Url]),
Opts = [
with_body,
{pool, ?POOL},
Expand Down
40 changes: 35 additions & 5 deletions src/device/router_device_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -836,11 +836,12 @@ handle_cast(
{error, Reason} ->
lager:debug("packet not validated: ~p", [Reason]),
case Reason of
late_packet ->
{late_packet, LateFcnt} ->
ok = charge_and_update_org(Packet0, Device1),
ok = router_utils:event_uplink_dropped_late_packet(
PacketTime,
HoldTime,
PacketFCnt,
LateFcnt,
Device1,
PubKeyBin
);
Expand Down Expand Up @@ -1574,7 +1575,7 @@ validate_frame(
PacketFCnt,
LastSeenFCnt
]),
{error, late_packet};
{error, {late_packet, PacketFCnt}};
undefined when
FrameAck == 1 andalso PacketFCnt == DownlinkHandledAtFCnt andalso
Window < ?RX_MAX_WINDOW
Expand All @@ -1583,7 +1584,7 @@ validate_frame(
"we got a late confirmed up packet for ~p: DownlinkHandledAt: ~p within window ~p",
[PacketFCnt, DownlinkHandledAtFCnt, Window]
),
{error, late_packet};
{error, {late_packet, PacketFCnt}};
undefined when
FrameAck == 1 andalso PacketFCnt == DownlinkHandledAtFCnt andalso
Window >= ?RX_MAX_WINDOW
Expand All @@ -1606,7 +1607,7 @@ validate_frame(
"we got a replay packet [verified: ~p] [device: ~p]",
[VerifiedFCnt, DeviceFCnt]
),
{error, late_packet};
{error, {late_packet, VerifiedFCnt}};
undefined ->
lager:debug("we got a fresh packet [fcnt: ~p]", [PacketFCnt]),
validate_frame_(
Expand Down Expand Up @@ -1795,6 +1796,35 @@ validate_frame_(PacketFCnt, Packet, PubKeyBin, HotspotRegion, Device0, OfferCach
maybe_charge(Device, PayloadSize, _PubKeyBin, _PHash, _OfferCache) ->
router_console_dc_tracker:charge(Device, PayloadSize).

-spec charge_and_update_org(
Packet :: blockchain_helium_packet_v1:packet(), Device :: router_device:device()
) -> ok.
charge_and_update_org(Packet, Device) ->
case router_utils:get_env_bool(charge_late_packets, false) of
false ->
lager:debug("not charging for late packets");
true ->
<<_MType:3, _MHDRRFU:3, _Major:2, _DevAddr:4/binary, _ADR:1, _ADRACKReq:1, _ACK:1,
_RFU:1, FOptsLen:4, _FCnt:16, _FOpts:FOptsLen/binary,
PayloadAndMIC/binary>> = blockchain_helium_packet_v1:payload(Packet),
{_FPort, FRMPayload} = lorawan_utils:extract_frame_port_payload(PayloadAndMIC),
PayloadSize = erlang:byte_size(FRMPayload),
case router_console_dc_tracker:charge(Device, PayloadSize) of
{error, _Reason} ->
lager:warning("failed to charge ~p", [_Reason]);
{ok, Balance, _Nonce} ->
OrgID = maps:get(organization_id, router_device:metadata(Device), undefined),
case router_console_api:org_manual_update_router_dc(OrgID, Balance) of
{error, _Reason} ->
lager:warning("failed to update org(~w) to ~w: ~p", [
OrgID, Balance, _Reason
]);
ok ->
lager:debug("org ~s updated to ~w", [OrgID, Balance])
end
end
end.

%%%-------------------------------------------------------------------
%% @doc
%% Check device's message queue to potentially wait or send reply
Expand Down
13 changes: 13 additions & 0 deletions test/console_callback.erl
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,19 @@ handle('GET', [<<"api">>, <<"router">>, <<"organizations">>, <<"zero_dc">>], _Re
[{unfunded_org_ids, IDs}] -> IDs
end,
{200, [], jsx:encode(#{<<"data">> => OrgIDs})};
handle(
'POST', [<<"api">>, <<"router">>, <<"organizations">>, <<"manual_update_router_dc">>], Req, Args
) ->
Pid = maps:get(forward, Args),
Body = elli_request:body(Req),
try jsx:decode(Body, [return_maps]) of
Map ->
Pid ! {manual_update_router_dc, Map},
{204, [], <<>>}
catch
_:_ ->
{400, [], <<"bad_body">>}
end;
%% POST to channel
handle('POST', [<<"channel">>], Req, Args) ->
Pid = maps:get(forward, Args),
Expand Down
111 changes: 110 additions & 1 deletion test/router_device_worker_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
]).

-export([
charge_for_late_packets/1,
device_update_test/1,
unjoined_device_update_test/1,
stopped_unjoined_device_update_test/1,
Expand Down Expand Up @@ -64,6 +65,7 @@ groups() ->

all_tests() ->
[
charge_for_late_packets,
device_worker_stop_children_test,
device_update_test,
unjoined_device_update_test,
Expand Down Expand Up @@ -104,6 +106,114 @@ end_per_testcase(TestCase, Config) ->
%%--------------------------------------------------------------------
%% TEST CASES
%%--------------------------------------------------------------------
charge_for_late_packets(Config) ->
#{
stream := Stream,
pubkey_bin := PubKeyBin1,
hotspot_name := HotspotName1
} = test_utils:join_device(Config),

application:set_env(router, charge_late_packets, true),

%% Waiting for reply from router to hotspot
test_utils:wait_state_channel_message(1250),

%% Check that device is in cache
{ok, DB, CF} = router_db:get_devices(),
WorkerId = router_devices_sup:id(?CONSOLE_DEVICE_ID),
{ok, Device} = router_device:get_by_id(DB, CF, WorkerId),

SendPacketFun = fun(PubKeyBin, Fcnt) ->
Stream !
{send,
test_utils:frame_packet(
?UNCONFIRMED_UP,
PubKeyBin,
router_device:nwk_s_key(Device),
router_device:app_s_key(Device),
Fcnt
)}
end,

{StartingBalance, StartingNonce} = router_console_dc_tracker:current_balance(?CONSOLE_ORG_ID),

%% Simulate multiple hotspots sending data
SendPacketFun(PubKeyBin1, 500),
test_utils:wait_until(fun() ->
test_utils:get_device_last_seen_fcnt(?CONSOLE_DEVICE_ID) == 500
end),

{ok, _} = test_utils:wait_for_console_event_sub(
<<"uplink_unconfirmed">>,
#{
<<"id">> => fun erlang:is_binary/1,
<<"category">> => <<"uplink">>,
<<"sub_category">> => <<"uplink_unconfirmed">>,
<<"description">> => fun erlang:is_binary/1,
<<"reported_at">> => fun erlang:is_integer/1,
<<"device_id">> => ?CONSOLE_DEVICE_ID,
<<"data">> => #{
<<"dc">> => #{<<"balance">> => 98, <<"nonce">> => 1, <<"used">> => 1},
<<"fcnt">> => fun erlang:is_integer/1,
<<"payload_size">> => fun erlang:is_integer/1,
<<"payload">> => fun erlang:is_binary/1,
<<"raw_packet">> => fun erlang:is_binary/1,
<<"port">> => fun erlang:is_integer/1,
<<"devaddr">> => fun erlang:is_binary/1,
<<"hotspot">> => #{
<<"id">> => erlang:list_to_binary(libp2p_crypto:bin_to_b58(PubKeyBin1)),
<<"name">> => erlang:list_to_binary(HotspotName1),
<<"rssi">> => 0.0,
<<"snr">> => 0.0,
<<"spreading">> => <<"SF8BW125">>,
<<"frequency">> => fun erlang:is_float/1,
<<"channel">> => fun erlang:is_number/1,
<<"lat">> => fun erlang:is_float/1,
<<"long">> => fun erlang:is_float/1
},
<<"mac">> => [],
<<"hold_time">> => fun erlang:is_integer/1
}
}
),

%% Make sure DC is not being charged for the late packet.
{Balance1, Nonce1} = router_console_dc_tracker:current_balance(?CONSOLE_ORG_ID),
?assertEqual(Balance1, StartingBalance - 1),
?assertEqual(Nonce1, StartingNonce),

SendPacketFun(PubKeyBin1, 100),

test_utils:wait_for_console_event_sub(<<"uplink_dropped_late">>, #{
<<"id">> => fun erlang:is_binary/1,
<<"category">> => <<"uplink_dropped">>,
<<"sub_category">> => <<"uplink_dropped_late">>,
<<"description">> => <<"Late packet">>,
<<"reported_at">> => fun erlang:is_integer/1,
<<"device_id">> => ?CONSOLE_DEVICE_ID,
<<"data">> => #{
<<"fcnt">> => 100,
<<"hotspot">> => #{
<<"id">> => erlang:list_to_binary(libp2p_crypto:bin_to_b58(PubKeyBin1)),
<<"name">> => erlang:list_to_binary(blockchain_utils:addr2name(PubKeyBin1))
},
<<"hold_time">> => fun erlang:is_integer/1
}
}),

{Balance2, Nonce2} = router_console_dc_tracker:current_balance(?CONSOLE_ORG_ID),
?assertEqual(Balance2, Balance1 - 1),
?assertEqual(Nonce2, Nonce1),

OrgID = maps:get(organization_id, router_device:metadata(Device), undefined),
test_utils:wait_manual_update_router_dc(#{
<<"organization_id">> => OrgID,
<<"amount">> => Balance2
}),

application:set_env(router, charge_late_packets, false),
ok.

device_worker_late_packet_double_charge_test(Config) ->
#{
stream := Stream,
Expand Down Expand Up @@ -1421,7 +1531,6 @@ replay_uplink_far_in_the_past_test(Config) ->
<<"sub_category">> => <<"uplink_dropped_late">>
}),


ok.

offer_cache_test(Config) ->
Expand Down
20 changes: 20 additions & 0 deletions test/test_utils.erl
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
wait_state_channel_message/3,
wait_state_channel_message/8,
wait_organizations_burned/1,
wait_manual_update_router_dc/1,
wait_state_channel_packet/1,
join_payload/2,
join_packet/3, join_packet/4,
Expand Down Expand Up @@ -925,6 +926,25 @@ wait_organizations_burned(Expected) ->
ct:fail("wait_organizations_burned failed")
end.

wait_manual_update_router_dc(Expected) ->
try
receive
{manual_update_router_dc, Got} ->
case match_map(Expected, Got) of
true ->
ok;
{false, Reason} ->
ct:pal("FAILED got: ~n~p~n expected: ~n~p", [Got, Expected]),
ct:fail("wait_manual_update_router_dc failed ~p", [Reason])
end
after 1250 -> ct:fail("wait_manual_update_router_dc timeout")
end
catch
_Class:_Reason:_Stacktrace ->
ct:pal("wait_manual_update_router_dc stacktrace ~n~p", [{_Reason, _Stacktrace}]),
ct:fail("wait_manual_update_router_dc failed")
end.

join_packet(PubKeyBin, AppKey, DevNonce) ->
join_packet(PubKeyBin, AppKey, DevNonce, #{}).

Expand Down

0 comments on commit 5c5f6c0

Please sign in to comment.