Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scroll support #95

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 69 additions & 0 deletions lib/snap/search.ex
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,75 @@ defmodule Snap.Search do
end
end

@doc """
Gets the next page of results in a scroll which was initiated by passing
the scroll param into a search request, and parses the result into a
`Snap.SearchResponse`.

`Snap.SearchResponse` implements `Enumerable`, so you can count and iterate
directly on the struct.

"""
def scroll_req(cluster, scroll_id, ttl \\ "1m", params \\ [], headers \\ [], opts \\ []) do
body = %{
scroll: ttl,
scroll_id: scroll_id
}

case cluster.post("/_search/scroll", body, params, headers, opts) do
{:ok, response} -> {:ok, SearchResponse.new(response)}
err -> err
end
end

@doc """
Return all the results for a query via a set of scrolls, lazily as a stream.

It is highly recommended that you set size to something large 10k is the max
And that you sort by _doc for efficiency reasons
https://www.elastic.co/guide/en/elasticsearch/reference/current/paginate-search-results.html#scroll-search-results

## Examples

query = %{query: %{match_all: %{}}, size: 10_000, sort: ["_doc"]}
stream = Snap.Search.scroll(Cluster, "index", query)

"""
def scroll(cluster, index_or_alias, query, params \\ [], headers \\ [], opts \\ []) do
params = [scroll: "1m"] |> Keyword.merge(params)

Stream.resource(
fn ->
nil
end,
fn scroll_id ->
results =
if is_nil(scroll_id) do
{:ok, results} =
Snap.Search.search(cluster, index_or_alias, query, params, headers, opts)

results
else
{:ok, results} = Snap.Search.scroll_req(cluster, scroll_id)
results
end

hits = results.hits.hits

if Enum.empty?(hits) do
{:halt, nil}
else
new_scroll_id = results.scroll_id
# pass the scroll id so we can now scroll
{results, new_scroll_id}
end
end,
fn _ ->
nil
end
)
end

@doc """
Runs a count of the documents in an index, using an optional query.
"""
Expand Down