Skip to content

Commit

Permalink
feat: support for extended format for symbols
Browse files Browse the repository at this point in the history
  • Loading branch information
borgoat committed Apr 13, 2023
1 parent 17d2142 commit f83ebc0
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 10 deletions.
12 changes: 11 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,19 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- Support for extended format for symbols.

### Changed

- `subscribe` and `unsubscribe` now pass symbols as is (as string or array of objects).
SubscriptionsManager can still handle a list of strings in Elixir,
and turn it into a comma-separated string for Twelve Data.

## [0.2.1] - 2023-03-16

## Fixed
### Fixed

- Removed some unneeded logs.

Expand Down
17 changes: 13 additions & 4 deletions lib/ex_twelve_data/real_time_prices.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,15 @@ defmodule ExTwelveData.RealTimePrices do
require Logger

alias ExTwelveData.RealTimePrices.Handler
alias ExTwelveData.Symbol

@typedoc """
Symbols passed to subscribe/unsubscribe.
It can either be an array of objects (extended format),
or a comma-delimited string with multiple symbols.
"""
@type symbols_list :: String.t() | [Symbol.t()]

@type options :: [option]

Expand Down Expand Up @@ -61,13 +70,13 @@ defmodule ExTwelveData.RealTimePrices do
Subsequent calls will append new symbols to the list.
See `unsubscribe/2` and `reset/1` to remove
"""
@spec subscribe(pid, [String.t()]) :: {:error, any} | {:ok}
@spec subscribe(pid, symbols_list()) :: {:error, any} | {:ok}
def subscribe(client, symbols) do
msg =
Jason.encode!(%{
action: "subscribe",
params: %{
symbols: Enum.join(symbols, ",")
symbols: symbols
}
})

Expand All @@ -80,13 +89,13 @@ defmodule ExTwelveData.RealTimePrices do
Twelve Data will stop sending updates.
"""
@spec unsubscribe(pid, [String.t()]) :: {:error, any} | {:ok}
@spec unsubscribe(pid, symbols_list()) :: {:error, any} | {:ok}
def unsubscribe(client, symbols) do
msg =
Jason.encode!(%{
action: "unsubscribe",
params: %{
symbols: Enum.join(symbols, ",")
symbols: symbols
}
})

Expand Down
29 changes: 24 additions & 5 deletions lib/ex_twelve_data/real_time_prices/subscriptions_manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ defmodule ExTwelveData.RealTimePrices.SubscriptionsManager do
GenServer.option()
| {:pid, pid()}
| {:provider, SubscriptionsManager.Provider}
| {:max_subscriptions, integer}
| {:max_subscriptions, integer()}
| {:symbols_extended, boolean()}

# 1 event / 600ms -> 100 events per minute
@clock_period_ms 600
Expand All @@ -39,11 +40,18 @@ defmodule ExTwelveData.RealTimePrices.SubscriptionsManager do
pid = Keyword.fetch!(opts, :pid)
provider = Keyword.fetch!(opts, :provider)
max_subscriptions = Keyword.fetch!(opts, :max_subscriptions)
symbols_extended = Keyword.get(opts, :symbols_extended, false)

schedule_next_message()

{:ok,
%{tracked: MapSet.new(), pid: pid, provider: provider, max_subscriptions: max_subscriptions}}
%{
tracked: MapSet.new(),
pid: pid,
provider: provider,
max_subscriptions: max_subscriptions,
symbols_extended: symbols_extended
}}
end

def handle_call(_msg, _from, state) do
Expand All @@ -65,7 +73,8 @@ defmodule ExTwelveData.RealTimePrices.SubscriptionsManager do
tracked: current,
pid: pid,
provider: provider,
max_subscriptions: max_subscriptions
max_subscriptions: max_subscriptions,
symbols_extended: symbols_extended
} = state

new = provider.get_symbols()
Expand All @@ -76,18 +85,28 @@ defmodule ExTwelveData.RealTimePrices.SubscriptionsManager do
state

{:add, to_add, new_tracked} ->
RealTimePrices.subscribe(pid, MapSet.to_list(to_add))
symbols = to_add |> MapSet.to_list() |> join_symbols_if_needed(symbols_extended)
RealTimePrices.subscribe(pid, symbols)
%{state | tracked: new_tracked}

{:remove, to_remove, new_tracked} ->
RealTimePrices.unsubscribe(pid, MapSet.to_list(to_remove))
symbols = to_remove |> MapSet.to_list() |> join_symbols_if_needed(symbols_extended)
RealTimePrices.unsubscribe(pid, symbols)
%{state | tracked: new_tracked}
end

schedule_next_message()
{:noreply, new_state}
end

defp join_symbols_if_needed(symbols_set, extended) do
if extended do
symbols_set
else
Enum.join(symbols_set, ",")
end
end

defp schedule_next_message do
Process.send_after(self(), :clock, @clock_period_ms)
end
Expand Down
24 changes: 24 additions & 0 deletions lib/ex_twelve_data/symbol.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
defmodule ExTwelveData.Symbol do
@derive Jason.Encoder
defstruct [
:exchange,
:mic_code,
:symbol,
:type
]

@typedoc """
Supports: Stock, Index, ETF, REIT
"""
@type instrument_types :: String.t()

@typedoc """
Extended format for Twelve Data symbols.
"""
@type t :: %__MODULE__{
exchange: String.t() | nil,
mic_code: String.t() | nil,
symbol: String.t(),
type: instrument_types
}
end

0 comments on commit f83ebc0

Please sign in to comment.