Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support pluggable JSON libraries #101

Merged
merged 1 commit into from
May 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions lib/snap.ex
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ defmodule Snap do
`[:my_app, :snap]`)
* `index_namespace` - see `Snap.Cluster.Namespace` for details (defaults to
`nil`)
* `json_library` - the library used for encoding/decoding JSON (defaults to
`Jason`. You may wish to switch this to [`Jsonrs`](https://hex.pm/packages/jsonrs)
for better performance encoding and decoding large requests and responses)

## Telemetry

Expand Down
148 changes: 84 additions & 64 deletions lib/snap/bulk/action.ex
Original file line number Diff line number Diff line change
@@ -1,123 +1,143 @@
defmodule Snap.Bulk.Action do
@moduledoc false
@callback to_action_json(struct()) :: map()
@callback to_document_json(struct()) :: map() | nil
end

defmodule Snap.Bulk.Action.Create do
@moduledoc """
Represents a create step in a `Snap.Bulk` operation
"""
@behaviour Snap.Bulk.Action

@enforce_keys [:doc]
defstruct [:_index, :_id, :require_alias, :doc]
defstruct [:index, :id, :require_alias, :doc]

@type t :: %__MODULE__{
_index: String.t() | nil,
_id: String.t() | nil,
index: String.t() | nil,
id: String.t() | nil,
require_alias: boolean() | nil,
doc: map()
}

@doc false
def to_action_json(%__MODULE__{index: index, id: id, require_alias: require_alias}) do
values = %{_index: index, _id: id, require_alias: require_alias}

values
|> Enum.reject(&is_nil(elem(&1, 1)))
|> Enum.into(%{})
|> then(fn values -> %{"create" => values} end)
end

@doc false
def to_document_json(%__MODULE__{doc: doc}) do
doc
end
end

defmodule Snap.Bulk.Action.Delete do
@moduledoc """
Represents a delete step in a `Snap.Bulk` operation
"""
@enforce_keys [:_id]
defstruct [:_index, :_id, :require_alias]
@behaviour Snap.Bulk.Action

@enforce_keys [:id]
defstruct [:index, :id, :require_alias]

@type t :: %__MODULE__{
_index: String.t() | nil,
_id: String.t(),
index: String.t() | nil,
id: String.t(),
require_alias: boolean() | nil
}

@doc false
def to_action_json(%__MODULE__{index: index, id: id, require_alias: require_alias}) do
values = %{_index: index, _id: id, require_alias: require_alias}

values
|> Enum.reject(&is_nil(elem(&1, 1)))
|> Enum.into(%{})
|> then(fn values -> %{"delete" => values} end)
end

@doc false
def to_document_json(_), do: nil
end

defmodule Snap.Bulk.Action.Index do
@moduledoc """
Represents an index step in a `Snap.Bulk` operation
"""
@behaviour Snap.Bulk.Action

@enforce_keys [:doc]
defstruct [:_index, :_id, :require_alias, :doc]
defstruct [:index, :id, :require_alias, :doc]

@type t :: %__MODULE__{
_index: String.t() | nil,
_id: String.t() | nil,
index: String.t() | nil,
id: String.t() | nil,
require_alias: boolean() | nil,
doc: map()
}

@doc false
def to_action_json(%__MODULE__{index: index, id: id, require_alias: require_alias}) do
values = %{_index: index, _id: id, require_alias: require_alias}

values
|> Enum.reject(&is_nil(elem(&1, 1)))
|> Enum.into(%{})
|> then(fn values -> %{"index" => values} end)
end

@doc false
def to_document_json(%__MODULE__{doc: doc}) do
doc
end
end

defmodule Snap.Bulk.Action.Update do
@moduledoc """
Represents an update step in a `Snap.Bulk` operation
"""
@behaviour Snap.Bulk.Action

@enforce_keys [:doc]
defstruct [
:_id,
:_index,
:_source,
:id,
:index,
:require_alias,
:doc,
:doc_as_upsert,
:require_alias,
:script,
:upsert
:script
]

@type t :: %__MODULE__{
_id: String.t() | nil,
_index: String.t() | nil,
_source: boolean() | nil,
id: String.t() | nil,
index: String.t() | nil,
require_alias: boolean() | nil,
doc: map(),
doc_as_upsert: boolean() | nil,
require_alias: boolean() | nil,
script: map() | nil,
upsert: map() | nil
script: map() | nil
}
end

defimpl Jason.Encoder, for: Snap.Bulk.Action.Create do
require Jason.Helpers

def encode(%Snap.Bulk.Action.Create{_index: index, _id: id, require_alias: require_alias}, opts) do
values = [_index: index, _id: id, require_alias: require_alias]

values
|> Enum.reject(&is_nil(elem(&1, 1)))
|> then(fn values -> %{"create" => Jason.OrderedObject.new(values)} end)
|> Jason.Encode.map(opts)
end
end

defimpl Jason.Encoder, for: Snap.Bulk.Action.Delete do
require Jason.Helpers

def encode(%Snap.Bulk.Action.Delete{_index: index, _id: id, require_alias: require_alias}, opts) do
values = [_index: index, _id: id, require_alias: require_alias]

values
|> Enum.reject(&is_nil(elem(&1, 1)))
|> then(fn values -> %{"delete" => Jason.OrderedObject.new(values)} end)
|> Jason.Encode.map(opts)
end
end

defimpl Jason.Encoder, for: Snap.Bulk.Action.Update do
require Jason.Helpers

def encode(%Snap.Bulk.Action.Update{_index: index, _id: id, require_alias: require_alias}, opts) do
values = [_index: index, _id: id, require_alias: require_alias]
@doc false
def to_action_json(%__MODULE__{index: index, id: id, require_alias: require_alias}) do
values = %{_index: index, _id: id, require_alias: require_alias}

values
|> Enum.reject(&is_nil(elem(&1, 1)))
|> then(fn values -> %{"update" => Jason.OrderedObject.new(values)} end)
|> Jason.Encode.map(opts)
|> Enum.into(%{})
|> then(fn values -> %{"update" => values} end)
end
end

defimpl Jason.Encoder, for: Snap.Bulk.Action.Index do
require Jason.Helpers

def encode(%Snap.Bulk.Action.Index{_index: index, _id: id, require_alias: require_alias}, opts) do
values = [_index: index, _id: id, require_alias: require_alias]
@doc false
def to_document_json(%__MODULE__{doc: doc, doc_as_upsert: doc_as_upsert, script: script}) do
values = %{doc: doc, doc_as_upsert: doc_as_upsert, script: script}

values
|> Enum.reject(&is_nil(elem(&1, 1)))
|> then(fn values -> %{"index" => Jason.OrderedObject.new(values)} end)
|> Jason.Encode.map(opts)
|> Enum.into(%{})
end
end
53 changes: 16 additions & 37 deletions lib/snap/bulk/actions.ex
Original file line number Diff line number Diff line change
@@ -1,56 +1,35 @@
defmodule Snap.Bulk.Actions do
@moduledoc false

alias Snap.Bulk.Action.{Create, Index, Update, Delete}

@doc """
Encodes a list of bulk action structs into line separated JSON for feeding to the
/_bulk endpoint.
"""
def encode(actions) do
encode_actions([], actions)
def encode(actions, json_library \\ Jason) do
encode_actions([], actions, json_library)
end

defp encode_actions(iolist, []) do
defp encode_actions(iolist, [], _json_library) do
iolist
end

defp encode_actions(iolist, [head | tail]) do
updated_iolist = [iolist, encode_action(head)]
encode_actions(updated_iolist, tail)
end

defp encode_action(%type{} = action) when type in [Create, Index] do
doc = action.doc

doc_json =
doc
|> Jason.encode!()

action_json = encode_action_command(action)

[action_json, "\n", doc_json, "\n"]
defp encode_actions(iolist, [head | tail], json_library) do
updated_iolist = [iolist, encode_action(head, json_library)]
encode_actions(updated_iolist, tail, json_library)
end

defp encode_action(%Delete{} = action) do
action_json = encode_action_command(action)

[action_json, "\n"]
end

defp encode_action(%Update{} = action) do
doc = action.doc

doc_json =
%{doc: doc}
|> Jason.encode!()

action_json = encode_action_command(action)
defp encode_action(%type{} = action, json_library) do
action_json = type.to_action_json(action)
doc_json = type.to_document_json(action)

[action_json, "\n", doc_json, "\n"]
if doc_json do
[encode_json(action_json, json_library), "\n", encode_json(doc_json, json_library), "\n"]
else
[encode_json(action_json, json_library), "\n"]
end
end

defp encode_action_command(action) do
Jason.encode!(action)
defp encode_json(json, json_library) do
json_library.encode!(json)
end
end
9 changes: 5 additions & 4 deletions lib/snap/bulk/bulk.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ defmodule Snap.Bulk do

```
actions = [
%Snap.Bulk.Action.Create{_id: 1, doc: %{foo: "bar"}},
%Snap.Bulk.Action.Create{_id: 2, doc: %{foo: "bar"}},
%Snap.Bulk.Action.Create{_id: 3, doc: %{foo: "bar"}}
%Snap.Bulk.Action.Create{id: 1, doc: %{foo: "bar"}},
%Snap.Bulk.Action.Create{id: 2, doc: %{foo: "bar"}},
%Snap.Bulk.Action.Create{id: 3, doc: %{foo: "bar"}}
]

actions
Expand Down Expand Up @@ -111,7 +111,8 @@ defmodule Snap.Bulk do
end

defp process_chunk(actions, cluster, index, params, request_opts, error_count, _max_errors) do
body = Actions.encode(actions)
json_library = cluster.json_library()
body = Actions.encode(actions, json_library)

headers = [{"content-type", "application/x-ndjson"}]

Expand Down
10 changes: 9 additions & 1 deletion lib/snap/cluster.ex
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,13 @@ defmodule Snap.Cluster do
Supervisor.config(__MODULE__)
end

@doc """
Returns the JSON library configured for the Cluster.
"""
def json_library() do
Keyword.get(config(), :json_library, Jason)
end

@doc """
Returns the otp_app that the Cluster was defined with.
"""
Expand Down Expand Up @@ -124,7 +131,8 @@ defmodule Snap.Cluster do
index_namespace: String.t() | nil,
telemetry_prefix: list(atom()),
http_client_adapter:
Snap.HTTPClient.t() | {Snap.HTTPClient.t(), adapter_config :: Keyword.t()}
Snap.HTTPClient.t() | {Snap.HTTPClient.t(), adapter_config :: Keyword.t()},
json_library: module()
]

@doc """
Expand Down
19 changes: 10 additions & 9 deletions lib/snap/multi/multi.ex
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ defmodule Snap.Multi do
{:ok, Snap.Multi.Response.t()} | {:error, Snap.Cluster.error()}
def run(%__MODULE__{} = multi, cluster, index_or_alias, params \\ [], headers \\ [], opts \\ []) do
ids = build_ids(multi.searches)
body = encode(multi)
json_library = cluster.json_library()
body = encode(multi, json_library)
headers = headers ++ [{"content-type", "application/x-ndjson"}]
namespaced_index = Namespace.add_namespace_to_index(index_or_alias, cluster)

Expand All @@ -76,23 +77,23 @@ defmodule Snap.Multi do
end
end

defp encode(%__MODULE__{} = multi) do
defp encode(%__MODULE__{} = multi, json_library) do
multi.searches
|> Enum.flat_map(&encode_search/1)
|> Enum.flat_map(&encode_search(&1, json_library))
end

defp encode_search(%Search{headers: headers, body: body}) do
[encode_headers(headers), "\n", encode_body(body), "\n"]
defp encode_search(%Search{headers: headers, body: body}, json_library) do
[encode_headers(headers, json_library), "\n", encode_body(body, json_library), "\n"]
end

defp encode_headers(headers) do
defp encode_headers(headers, json_library) do
headers
|> Enum.into(%{})
|> Jason.encode!(pretty: false)
|> json_library.encode!(pretty: false)
end

defp encode_body(body) do
Jason.encode!(body, pretty: false)
defp encode_body(body, json_library) do
json_library.encode!(body, pretty: false)
end

defp build_ids(searches) do
Expand Down
Loading
Loading