Skip to content

Commit

Permalink
Realtime vehicles / alerts PubSub (#2265)
Browse files Browse the repository at this point in the history
* refactor: move lookup key to socket assign

* refactor: use PubSub for Realtime.Server

* refactor: remove unnecessary Map.merge in calls to Socket.assign

Co-authored-by: Kayla Firestack <[email protected]>

* refactor(tests): use factor rather than constant

---------

Co-authored-by: Kayla Firestack <[email protected]>
  • Loading branch information
lemald and firestack authored Oct 30, 2023
1 parent 1d752ba commit 3bf9201
Show file tree
Hide file tree
Showing 11 changed files with 328 additions and 198 deletions.
124 changes: 72 additions & 52 deletions lib/realtime/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ defmodule Realtime.Server do
@spec default_name() :: GenServer.name()
def default_name(), do: Realtime.Server

@spec pubsub_name() :: Phoenix.PubSub.t()
def pubsub_name(), do: Realtime.PubSub

@spec start_link(Keyword.t()) :: GenServer.on_start()
def start_link(start_link_opts) do
GenServer.start_link(__MODULE__, [], start_link_opts)
Expand All @@ -93,69 +96,92 @@ defmodule Realtime.Server do
```
Those `lookup_args` can be passed into `RealTime.Server.lookup(lookup_args)/1` to get the data.
"""
@spec subscribe_to_route(Route.id(), GenServer.server()) :: [VehicleOrGhost.t()]
@spec subscribe_to_route(Route.id(), GenServer.server()) ::
{subscription_key(), [VehicleOrGhost.t()]}
def subscribe_to_route(route_id, server \\ default_name()) do
subscribe(server, {:route_id, route_id})
subscription_key = {:route_id, route_id}
{subscription_key, subscribe(server, subscription_key)}
end

@spec subscribe_to_all_shuttles(GenServer.server()) :: [Vehicle.t()]
@spec subscribe_to_all_shuttles(GenServer.server()) :: {subscription_key(), [Vehicle.t()]}
def subscribe_to_all_shuttles(server \\ default_name()) do
subscribe(server, :all_shuttles)
subscription_key = :all_shuttles
{subscription_key, subscribe(server, subscription_key)}
end

@spec subscribe_to_search(search_params(), GenServer.server()) :: [VehicleOrGhost.t()]
@spec subscribe_to_search(search_params(), GenServer.server()) ::
{subscription_key(), [VehicleOrGhost.t()]}
def subscribe_to_search(search_params, server \\ default_name()) do
subscribe(server, {:search, search_params})
subscription_key = {:search, search_params}
{subscription_key, subscribe(server, subscription_key)}
end

@spec subscribe_to_limited_search(search_params(), GenServer.server()) ::
limited_search_result()
{subscription_key(), limited_search_result()}
def subscribe_to_limited_search(search_params, server \\ default_name()) do
subscribe(server, {:limited_search, search_params})
subscription_key = {:limited_search, search_params}
{subscription_key, subscribe(server, subscription_key)}
end

@spec update_limited_search_subscription(search_params(), GenServer.server()) ::
limited_search_result()
{subscription_key(), limited_search_result()}
def update_limited_search_subscription(search_params, server \\ default_name()) do
update_subscription(server, {:limited_search, search_params})
subscription_key = {:limited_search, search_params}
{subscription_key, update_subscription(server, subscription_key)}
end

@spec subscribe_to_vehicle(String.t(), GenServer.server()) :: [VehicleOrGhost.t()]
@spec subscribe_to_vehicle(String.t(), GenServer.server()) ::
{subscription_key(), [VehicleOrGhost.t()]}
def subscribe_to_vehicle(vehicle_id, server \\ default_name()) do
subscribe(
server,
{:vehicle, vehicle_id}
)
subscription_key = {:vehicle, vehicle_id}

{subscription_key,
subscribe(
server,
subscription_key
)}
end

@spec subscribe_to_vehicle_with_logged_out(String.t(), GenServer.server()) :: [
VehicleOrGhost.t()
]
@spec subscribe_to_vehicle_with_logged_out(String.t(), GenServer.server()) ::
{subscription_key(),
[
VehicleOrGhost.t()
]}
def subscribe_to_vehicle_with_logged_out(vehicle_id, server \\ default_name()) do
subscribe(
server,
{:vehicle_with_logged_out, vehicle_id}
)
subscription_key = {:vehicle_with_logged_out, vehicle_id}

{subscription_key,
subscribe(
server,
subscription_key
)}
end

@spec subscribe_to_run_ids([Run.id()], GenServer.server()) :: [VehicleOrGhost.t()]
@spec subscribe_to_run_ids([Run.id()], GenServer.server()) ::
{subscription_key(), [VehicleOrGhost.t()]}
def subscribe_to_run_ids(run_ids, server \\ default_name()) do
subscribe(server, {:run_ids, run_ids})
subscription_key = {:run_ids, run_ids}
{subscription_key, subscribe(server, subscription_key)}
end

@spec subscribe_to_block_ids([Block.id()], GenServer.server()) :: [VehicleOrGhost.t()]
@spec subscribe_to_block_ids([Block.id()], GenServer.server()) ::
{subscription_key(), [VehicleOrGhost.t()]}
def subscribe_to_block_ids(block_ids, server \\ default_name()) do
subscribe(server, {:block_ids, block_ids})
subscription_key = {:block_ids, block_ids}
{subscription_key, subscribe(server, subscription_key)}
end

@spec subscribe_to_all_pull_backs(GenServer.server()) :: [VehicleOrGhost.t()]
@spec subscribe_to_all_pull_backs(GenServer.server()) ::
{subscription_key(), [VehicleOrGhost.t()]}
def subscribe_to_all_pull_backs(server \\ default_name()) do
subscribe(server, :all_pull_backs)
subscription_key = :all_pull_backs
{subscription_key, subscribe(server, subscription_key)}
end

@spec subscribe_to_alerts(Route.id(), GenServer.server()) :: [String.t()]
@spec subscribe_to_alerts(Route.id(), GenServer.server()) :: {subscription_key(), [String.t()]}
def subscribe_to_alerts(route_id, server \\ default_name()) do
subscribe(server, {:alerts, route_id})
subscription_key = {:alerts, route_id}
{subscription_key, subscribe(server, subscription_key)}
end

def peek_at_vehicles_by_run_ids(run_ids, server \\ default_name()) do
Expand Down Expand Up @@ -187,17 +213,20 @@ defmodule Realtime.Server do
@spec subscribe(GenServer.server(), {:run_ids, [Run.id()]}) :: [VehicleOrGhost.t()]
@spec subscribe(GenServer.server(), {:block_ids, [Block.id()]}) :: [VehicleOrGhost.t()]
@spec subscribe(GenServer.server(), {:alerts, Route.id()}) :: [String.t()]
defp subscribe(server, {:alerts, _route_id} = subscription_key) do
{pubsub, ets} = GenServer.call(server, :subscription_info)
Phoenix.PubSub.subscribe(pubsub, "realtime_alerts")
lookup({ets, subscription_key})
end

defp subscribe(server, subscription_key) do
{registry_key, ets} = GenServer.call(server, :subscription_info)
Registry.register(Realtime.Registry, registry_key, subscription_key)
{pubsub, ets} = GenServer.call(server, :subscription_info)
Phoenix.PubSub.subscribe(pubsub, "realtime_vehicles")
lookup({ets, subscription_key})
end

defp update_subscription(server, {:limited_search, _search_params} = subscription_key) do
{registry_key, ets} = GenServer.call(server, :subscription_info)
# Replace the old search subscription with the new one
Registry.unregister_match(Realtime.Registry, registry_key, {:limited_search, %{}})
Registry.register(Realtime.Registry, registry_key, subscription_key)
{_pubsub, ets} = GenServer.call(server, :subscription_info)

lookup({ets, subscription_key})
end
Expand Down Expand Up @@ -378,8 +407,7 @@ defmodule Realtime.Server do

@impl true
def handle_call(:subscription_info, _from, %__MODULE__{} = state) do
registry_key = self()
{:reply, {registry_key, state.ets}, state}
{:reply, {pubsub_name(), state.ets}, state}
end

def handle_call(:ets, _from, %__MODULE__{ets: ets} = state) do
Expand Down Expand Up @@ -518,21 +546,13 @@ defmodule Realtime.Server do

@spec broadcast(t(), :vehicles | :alerts) :: :ok
defp broadcast(state, data_type) do
registry_key = self()

Registry.dispatch(Realtime.Supervisor.registry_name(), registry_key, fn entries ->
Enum.each(entries, fn {pid, subscripition_key} ->
if (data_type == :alerts and match?({:alerts, _}, subscripition_key)) or
(data_type == :vehicles and !match?({:alerts, _}, subscripition_key)) do
send_data({pid, subscripition_key}, state)
end
end)
end)
end
topic =
case data_type do
:vehicles -> "realtime_vehicles"
:alerts -> "realtime_alerts"
end

@spec send_data({pid, subscription_key}, t) :: broadcast_message
defp send_data({pid, subscription_key}, state) do
send(pid, {:new_realtime_data, {state.ets, subscription_key}})
Phoenix.PubSub.broadcast(pubsub_name(), topic, {:new_realtime_data, state.ets})
end

@spec block_is_active?(VehicleOrGhost.t()) :: boolean
Expand Down
1 change: 1 addition & 0 deletions lib/realtime/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ defmodule Realtime.Supervisor do
children = [
{Registry, keys: :duplicate, name: registry_name()},
{Realtime.BlockWaiverStore, name: Realtime.BlockWaiverStore.default_name()},
{Phoenix.PubSub, name: Realtime.Server.pubsub_name()},
{Realtime.Server, name: Realtime.Server.default_name()},
{Realtime.TrainVehiclesPubSub, name: Realtime.TrainVehiclesPubSub.default_name()},
{Realtime.DataStatusPubSub, name: Realtime.DataStatusPubSub.default_name()},
Expand Down
12 changes: 8 additions & 4 deletions lib/skate_web/channels/alerts_channel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,17 @@ defmodule SkateWeb.AlertsChannel do

@impl SkateWeb.AuthenticatedChannel
def join_authenticated("alerts:route:" <> route_id, _message, socket) do
alerts = Duration.log_duration(Server, :subscribe_to_alerts, [route_id])
{:ok, %{data: alerts}, socket}
{lookup_key, alerts} = Duration.log_duration(Server, :subscribe_to_alerts, [route_id])

{:ok, %{data: alerts}, Phoenix.Socket.assign(socket, lookup_key: lookup_key)}
end

@impl SkateWeb.AuthenticatedChannel
def handle_info_authenticated({:new_realtime_data, lookup_args}, socket) do
data = Server.lookup(lookup_args)
def handle_info_authenticated({:new_realtime_data, ets}, socket) do
lookup_key = Map.get(socket.assigns, :lookup_key)

data = Server.lookup({ets, lookup_key})

:ok = push(socket, "alerts", %{data: data})
{:noreply, socket}
end
Expand Down
39 changes: 29 additions & 10 deletions lib/skate_web/channels/vehicle_channel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@ defmodule SkateWeb.VehicleChannel do
alias Realtime.Server

@impl SkateWeb.AuthenticatedChannel
def handle_info_authenticated({:new_realtime_data, lookup_params}, socket) do
vehicle_or_ghost = Realtime.Server.lookup(lookup_params)
def handle_info_authenticated({:new_realtime_data, ets}, socket) do
lookup_key = socket.assigns[:lookup_key]

vehicle_or_ghost = Realtime.Server.lookup({ets, lookup_key})

:ok = push(socket, "vehicle", %{data: List.first(vehicle_or_ghost)})

{:noreply, socket}
Expand All @@ -15,11 +18,17 @@ defmodule SkateWeb.VehicleChannel do
@impl SkateWeb.AuthenticatedChannel
def join_authenticated("vehicle:run_ids:" <> run_ids, _message, socket) do
run_ids = String.split(run_ids, ",")

vehicle_or_ghost = Realtime.Server.peek_at_vehicles_by_run_ids(run_ids) |> List.first()

if vehicle_or_ghost do
_ = Server.subscribe_to_vehicle(vehicle_or_ghost.id)
end
socket =
if vehicle_or_ghost do
{lookup_key, _vehicle_or_ghost} = Server.subscribe_to_vehicle(vehicle_or_ghost.id)

Phoenix.Socket.assign(socket, lookup_key: lookup_key)
else
socket
end

{:ok, %{data: vehicle_or_ghost}, socket}
end
Expand All @@ -36,13 +45,23 @@ defmodule SkateWeb.VehicleChannel do
Realtime.Server.peek_at_vehicle_by_id(vehicle_or_ghost_id) |> List.first()
end

if vehicle_or_ghost do
if user_in_test_group? do
_ = Server.subscribe_to_vehicle_with_logged_out(vehicle_or_ghost.id)
{lookup_key, _vehicle_or_ghost} =
if vehicle_or_ghost do
if user_in_test_group? do
Server.subscribe_to_vehicle_with_logged_out(vehicle_or_ghost.id)
else
Server.subscribe_to_vehicle(vehicle_or_ghost.id)
end
else
{nil, nil}
end

socket =
if is_nil(lookup_key) do
socket
else
_ = Server.subscribe_to_vehicle(vehicle_or_ghost.id)
Phoenix.Socket.assign(socket, lookup_key: lookup_key)
end
end

{:ok, %{data: vehicle_or_ghost}, socket}
end
Expand Down
54 changes: 34 additions & 20 deletions lib/skate_web/channels/vehicles_channel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,30 +8,40 @@ defmodule SkateWeb.VehiclesChannel do

@impl SkateWeb.AuthenticatedChannel
def join_authenticated("vehicles:shuttle:all", _message, socket) do
shuttles = Duration.log_duration(Server, :subscribe_to_all_shuttles, [])
{:ok, %{data: shuttles}, socket}
{lookup_key, shuttles} = Duration.log_duration(Server, :subscribe_to_all_shuttles, [])

{:ok, %{data: shuttles}, Phoenix.Socket.assign(socket, lookup_key: lookup_key)}
end

def join_authenticated("vehicles:pull_backs:all", _message, socket) do
pull_backs = Duration.log_duration(Server, :subscribe_to_all_pull_backs, [])
{:ok, %{data: pull_backs}, socket}
{lookup_key, pull_backs} = Duration.log_duration(Server, :subscribe_to_all_pull_backs, [])

{:ok, %{data: pull_backs}, Phoenix.Socket.assign(socket, lookup_key: lookup_key)}
end

def join_authenticated("vehicles:route:" <> route_id, _message, socket) do
vehicles_and_ghosts = Duration.log_duration(Server, :subscribe_to_route, [route_id])
{:ok, %{data: vehicles_and_ghosts}, socket}
{lookup_key, vehicles_and_ghosts} =
Duration.log_duration(Server, :subscribe_to_route, [route_id])

{:ok, %{data: vehicles_and_ghosts}, Phoenix.Socket.assign(socket, lookup_key: lookup_key)}
end

def join_authenticated("vehicles:run_ids:" <> run_ids, _message, socket) do
run_ids = String.split(run_ids, ",")
vehicles_and_ghosts = Duration.log_duration(Server, :subscribe_to_run_ids, [run_ids])
{:ok, %{data: vehicles_and_ghosts}, socket}

{lookup_key, vehicles_and_ghosts} =
Duration.log_duration(Server, :subscribe_to_run_ids, [run_ids])

{:ok, %{data: vehicles_and_ghosts}, Phoenix.Socket.assign(socket, lookup_key: lookup_key)}
end

def join_authenticated("vehicles:block_ids:" <> block_ids, _message, socket) do
block_ids = String.split(block_ids, ",")
vehicles_and_ghosts = Duration.log_duration(Server, :subscribe_to_block_ids, [block_ids])
{:ok, %{data: vehicles_and_ghosts}, socket}

{lookup_key, vehicles_and_ghosts} =
Duration.log_duration(Server, :subscribe_to_block_ids, [block_ids])

{:ok, %{data: vehicles_and_ghosts}, Phoenix.Socket.assign(socket, lookup_key: lookup_key)}
end

def join_authenticated(
Expand Down Expand Up @@ -76,26 +86,30 @@ defmodule SkateWeb.VehiclesChannel do
"User=#{username} searched for property=#{subscribe_args.property}, text=#{subscribe_args.text}"
end)

vehicles = Duration.log_duration(Server, :subscribe_to_search, [subscribe_args])
{lookup_key, vehicles} = Duration.log_duration(Server, :subscribe_to_search, [subscribe_args])

{:ok, %{data: vehicles}, socket}
{:ok, %{data: vehicles}, Phoenix.Socket.assign(socket, lookup_key: lookup_key)}
end

def join_authenticated(topic, _message, _socket) do
{:error, %{message: "no such topic \"#{topic}\""}}
end

@impl SkateWeb.AuthenticatedChannel
def handle_info_authenticated({:new_realtime_data, lookup_args}, socket) do
event_name = event_name(lookup_args)
data = Server.lookup(lookup_args)
def handle_info_authenticated({:new_realtime_data, ets}, socket) do
lookup_key = socket.assigns[:lookup_key]

data = Server.lookup({ets, lookup_key})

event_name = event_name(lookup_key)
:ok = push(socket, event_name, %{data: data})

{:noreply, socket}
end

@spec event_name(Server.lookup_key()) :: String.t()
defp event_name({_ets, :all_shuttles}), do: "shuttles"
defp event_name({_ets, :all_pull_backs}), do: "pull_backs"
defp event_name({_ets, {:search, _}}), do: "search"
defp event_name({_ets, _}), do: "vehicles"
@spec event_name(Server.subscription_key()) :: String.t()
defp event_name(:all_shuttles), do: "shuttles"
defp event_name(:all_pull_backs), do: "pull_backs"
defp event_name({:search, _}), do: "search"
defp event_name(_), do: "vehicles"
end
Loading

0 comments on commit 3bf9201

Please sign in to comment.