Skip to content

Commit

Permalink
Support pluggable JSON libraries
Browse files Browse the repository at this point in the history
It's handy to be able to drop in a faster JSON library, such as Jsonrs, for use
cases with very large requests and responses, as JSON encoding and decoding time
is greatly reduced.
  • Loading branch information
tomtaylor committed Apr 22, 2024
1 parent 16c0623 commit 5968aef
Show file tree
Hide file tree
Showing 12 changed files with 193 additions and 161 deletions.
3 changes: 3 additions & 0 deletions lib/snap.ex
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ defmodule Snap do
`[:my_app, :snap]`)
* `index_namespace` - see `Snap.Cluster.Namespace` for details (defaults to
`nil`)
* `json_library` - the library used for encoding/decoding JSON (defaults to
`Jason`. You may wish to switch this to [`Jsonrs`](https://hex.pm/packages/jsonrs)
for better performance encoding and decoding large requests and responses)
## Telemetry
Expand Down
148 changes: 84 additions & 64 deletions lib/snap/bulk/action.ex
Original file line number Diff line number Diff line change
@@ -1,123 +1,143 @@
defmodule Snap.Bulk.Action do
@moduledoc false
@callback to_action_json(struct()) :: map()
@callback to_document_json(struct()) :: map() | nil
end

defmodule Snap.Bulk.Action.Create do
@moduledoc """
Represents a create step in a `Snap.Bulk` operation
"""
@behaviour Snap.Bulk.Action

@enforce_keys [:doc]
defstruct [:_index, :_id, :require_alias, :doc]
defstruct [:index, :id, :require_alias, :doc]

@type t :: %__MODULE__{
_index: String.t() | nil,
_id: String.t() | nil,
index: String.t() | nil,
id: String.t() | nil,
require_alias: boolean() | nil,
doc: map()
}

@doc false
def to_action_json(%__MODULE__{index: index, id: id, require_alias: require_alias}) do
values = %{_index: index, _id: id, require_alias: require_alias}

values
|> Enum.reject(&is_nil(elem(&1, 1)))
|> Enum.into(%{})
|> then(fn values -> %{"create" => values} end)
end

@doc false
def to_document_json(%__MODULE__{doc: doc}) do
doc
end
end

defmodule Snap.Bulk.Action.Delete do
@moduledoc """
Represents a delete step in a `Snap.Bulk` operation
"""
@enforce_keys [:_id]
defstruct [:_index, :_id, :require_alias]
@behaviour Snap.Bulk.Action

@enforce_keys [:id]
defstruct [:index, :id, :require_alias]

@type t :: %__MODULE__{
_index: String.t() | nil,
_id: String.t(),
index: String.t() | nil,
id: String.t(),
require_alias: boolean() | nil
}

@doc false
def to_action_json(%__MODULE__{index: index, id: id, require_alias: require_alias}) do
values = %{_index: index, _id: id, require_alias: require_alias}

values
|> Enum.reject(&is_nil(elem(&1, 1)))
|> Enum.into(%{})
|> then(fn values -> %{"delete" => values} end)
end

@doc false
def to_document_json(_), do: nil
end

defmodule Snap.Bulk.Action.Index do
@moduledoc """
Represents an index step in a `Snap.Bulk` operation
"""
@behaviour Snap.Bulk.Action

@enforce_keys [:doc]
defstruct [:_index, :_id, :require_alias, :doc]
defstruct [:index, :id, :require_alias, :doc]

@type t :: %__MODULE__{
_index: String.t() | nil,
_id: String.t() | nil,
index: String.t() | nil,
id: String.t() | nil,
require_alias: boolean() | nil,
doc: map()
}

@doc false
def to_action_json(%__MODULE__{index: index, id: id, require_alias: require_alias}) do
values = %{_index: index, _id: id, require_alias: require_alias}

values
|> Enum.reject(&is_nil(elem(&1, 1)))
|> Enum.into(%{})
|> then(fn values -> %{"index" => values} end)
end

@doc false
def to_document_json(%__MODULE__{doc: doc}) do
doc
end
end

defmodule Snap.Bulk.Action.Update do
@moduledoc """
Represents an update step in a `Snap.Bulk` operation
"""
@behaviour Snap.Bulk.Action

@enforce_keys [:doc]
defstruct [
:_id,
:_index,
:_source,
:id,
:index,
:require_alias,
:doc,
:doc_as_upsert,
:require_alias,
:script,
:upsert
:script
]

@type t :: %__MODULE__{
_id: String.t() | nil,
_index: String.t() | nil,
_source: boolean() | nil,
id: String.t() | nil,
index: String.t() | nil,
require_alias: boolean() | nil,
doc: map(),
doc_as_upsert: boolean() | nil,
require_alias: boolean() | nil,
script: map() | nil,
upsert: map() | nil
script: map() | nil
}
end

defimpl Jason.Encoder, for: Snap.Bulk.Action.Create do
require Jason.Helpers

def encode(%Snap.Bulk.Action.Create{_index: index, _id: id, require_alias: require_alias}, opts) do
values = [_index: index, _id: id, require_alias: require_alias]

values
|> Enum.reject(&is_nil(elem(&1, 1)))
|> then(fn values -> %{"create" => Jason.OrderedObject.new(values)} end)
|> Jason.Encode.map(opts)
end
end

defimpl Jason.Encoder, for: Snap.Bulk.Action.Delete do
require Jason.Helpers

def encode(%Snap.Bulk.Action.Delete{_index: index, _id: id, require_alias: require_alias}, opts) do
values = [_index: index, _id: id, require_alias: require_alias]

values
|> Enum.reject(&is_nil(elem(&1, 1)))
|> then(fn values -> %{"delete" => Jason.OrderedObject.new(values)} end)
|> Jason.Encode.map(opts)
end
end

defimpl Jason.Encoder, for: Snap.Bulk.Action.Update do
require Jason.Helpers

def encode(%Snap.Bulk.Action.Update{_index: index, _id: id, require_alias: require_alias}, opts) do
values = [_index: index, _id: id, require_alias: require_alias]
@doc false
def to_action_json(%__MODULE__{index: index, id: id, require_alias: require_alias}) do
values = %{_index: index, _id: id, require_alias: require_alias}

values
|> Enum.reject(&is_nil(elem(&1, 1)))
|> then(fn values -> %{"update" => Jason.OrderedObject.new(values)} end)
|> Jason.Encode.map(opts)
|> Enum.into(%{})
|> then(fn values -> %{"update" => values} end)
end
end

defimpl Jason.Encoder, for: Snap.Bulk.Action.Index do
require Jason.Helpers

def encode(%Snap.Bulk.Action.Index{_index: index, _id: id, require_alias: require_alias}, opts) do
values = [_index: index, _id: id, require_alias: require_alias]
@doc false
def to_document_json(%__MODULE__{doc: doc, doc_as_upsert: doc_as_upsert, script: script}) do
values = %{doc: doc, doc_as_upsert: doc_as_upsert, script: script}

values
|> Enum.reject(&is_nil(elem(&1, 1)))
|> then(fn values -> %{"index" => Jason.OrderedObject.new(values)} end)
|> Jason.Encode.map(opts)
|> Enum.into(%{})
end
end
53 changes: 16 additions & 37 deletions lib/snap/bulk/actions.ex
Original file line number Diff line number Diff line change
@@ -1,56 +1,35 @@
defmodule Snap.Bulk.Actions do
@moduledoc false

alias Snap.Bulk.Action.{Create, Index, Update, Delete}

@doc """
Encodes a list of bulk action structs into line separated JSON for feeding to the
/_bulk endpoint.
"""
def encode(actions) do
encode_actions([], actions)
def encode(actions, json_library \\ Jason) do
encode_actions([], actions, json_library)
end

defp encode_actions(iolist, []) do
defp encode_actions(iolist, [], _json_library) do
iolist
end

defp encode_actions(iolist, [head | tail]) do
updated_iolist = [iolist, encode_action(head)]
encode_actions(updated_iolist, tail)
end

defp encode_action(%type{} = action) when type in [Create, Index] do
doc = action.doc

doc_json =
doc
|> Jason.encode!()

action_json = encode_action_command(action)

[action_json, "\n", doc_json, "\n"]
defp encode_actions(iolist, [head | tail], json_library) do
updated_iolist = [iolist, encode_action(head, json_library)]
encode_actions(updated_iolist, tail, json_library)
end

defp encode_action(%Delete{} = action) do
action_json = encode_action_command(action)

[action_json, "\n"]
end

defp encode_action(%Update{} = action) do
doc = action.doc

doc_json =
%{doc: doc}
|> Jason.encode!()

action_json = encode_action_command(action)
defp encode_action(%type{} = action, json_library) do
action_json = type.to_action_json(action)
doc_json = type.to_document_json(action)

[action_json, "\n", doc_json, "\n"]
if doc_json do
[encode_json(action_json, json_library), "\n", encode_json(doc_json, json_library), "\n"]
else
[encode_json(action_json, json_library), "\n"]
end
end

defp encode_action_command(action) do
Jason.encode!(action)
defp encode_json(json, json_library) do
json_library.encode!(json)
end
end
9 changes: 5 additions & 4 deletions lib/snap/bulk/bulk.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ defmodule Snap.Bulk do
```
actions = [
%Snap.Bulk.Action.Create{_id: 1, doc: %{foo: "bar"}},
%Snap.Bulk.Action.Create{_id: 2, doc: %{foo: "bar"}},
%Snap.Bulk.Action.Create{_id: 3, doc: %{foo: "bar"}}
%Snap.Bulk.Action.Create{id: 1, doc: %{foo: "bar"}},
%Snap.Bulk.Action.Create{id: 2, doc: %{foo: "bar"}},
%Snap.Bulk.Action.Create{id: 3, doc: %{foo: "bar"}}
]
actions
Expand Down Expand Up @@ -111,7 +111,8 @@ defmodule Snap.Bulk do
end

defp process_chunk(actions, cluster, index, params, request_opts, error_count, _max_errors) do
body = Actions.encode(actions)
json_library = cluster.json_library()
body = Actions.encode(actions, json_library)

headers = [{"content-type", "application/x-ndjson"}]

Expand Down
10 changes: 9 additions & 1 deletion lib/snap/cluster.ex
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,13 @@ defmodule Snap.Cluster do
Supervisor.config(__MODULE__)
end

@doc """
Returns the JSON library configured for the Cluster.
"""
def json_library() do
Keyword.get(config(), :json_library, Jason)
end

@doc """
Returns the otp_app that the Cluster was defined with.
"""
Expand Down Expand Up @@ -124,7 +131,8 @@ defmodule Snap.Cluster do
index_namespace: String.t() | nil,
telemetry_prefix: list(atom()),
http_client_adapter:
Snap.HTTPClient.t() | {Snap.HTTPClient.t(), adapter_config :: Keyword.t()}
Snap.HTTPClient.t() | {Snap.HTTPClient.t(), adapter_config :: Keyword.t()},
json_library: module()
]

@doc """
Expand Down
19 changes: 10 additions & 9 deletions lib/snap/multi/multi.ex
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ defmodule Snap.Multi do
{:ok, Snap.Multi.Response.t()} | {:error, Snap.Cluster.error()}
def run(%__MODULE__{} = multi, cluster, index_or_alias, params \\ [], headers \\ [], opts \\ []) do
ids = build_ids(multi.searches)
body = encode(multi)
json_library = cluster.json_library()
body = encode(multi, json_library)
headers = headers ++ [{"content-type", "application/x-ndjson"}]
namespaced_index = Namespace.add_namespace_to_index(index_or_alias, cluster)

Expand All @@ -76,23 +77,23 @@ defmodule Snap.Multi do
end
end

defp encode(%__MODULE__{} = multi) do
defp encode(%__MODULE__{} = multi, json_library) do
multi.searches
|> Enum.flat_map(&encode_search/1)
|> Enum.flat_map(&encode_search(&1, json_library))
end

defp encode_search(%Search{headers: headers, body: body}) do
[encode_headers(headers), "\n", encode_body(body), "\n"]
defp encode_search(%Search{headers: headers, body: body}, json_library) do
[encode_headers(headers, json_library), "\n", encode_body(body, json_library), "\n"]
end

defp encode_headers(headers) do
defp encode_headers(headers, json_library) do
headers
|> Enum.into(%{})
|> Jason.encode!(pretty: false)
|> json_library.encode!(pretty: false)
end

defp encode_body(body) do
Jason.encode!(body, pretty: false)
defp encode_body(body, json_library) do
json_library.encode!(body, pretty: false)
end

defp build_ids(searches) do
Expand Down
Loading

0 comments on commit 5968aef

Please sign in to comment.