Skip to content

Commit

Permalink
Add dry run support (#30)
Browse files Browse the repository at this point in the history
  • Loading branch information
brosquinha authored Apr 10, 2024
1 parent 5a1c1fb commit f383ea6
Show file tree
Hide file tree
Showing 4 changed files with 152 additions and 11 deletions.
33 changes: 30 additions & 3 deletions lib/req_bigquery.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ defmodule ReqBigQuery do
alias Req.Request
alias ReqBigQuery.Result

@allowed_options ~w(goth default_dataset_id project_id bigquery max_results use_legacy_sql timeout_ms)a
@allowed_options ~w(goth default_dataset_id project_id bigquery max_results use_legacy_sql timeout_ms dry_run)a
@base_url "https://bigquery.googleapis.com/bigquery/v2"
@max_results 10_000
@use_legacy_sql false
Expand Down Expand Up @@ -47,6 +47,8 @@ defmodule ReqBigQuery do
returns without any results and with the 'jobComplete' flag set to false. The default
value is 10000 milliseconds (10 seconds).
* `:dry_run` - Optional. Specifies whether to run the given query in dry run mode.
If you want to set any of these options when attaching the plugin, pass them as the second argument.
## Examples
Expand All @@ -72,6 +74,7 @@ defmodule ReqBigQuery do
columns: ["title", "views"],
job_id: "job_JDDZKquJWkY7x0LlDcmZ4nMQqshb",
num_rows: 10,
total_bytes_processed: 18161868216,
rows: %Stream{}
}
iex> Enum.to_list(res.rows)
Expand Down Expand Up @@ -108,6 +111,7 @@ defmodule ReqBigQuery do
columns: ["year", "views"],
job_id: "job_GXiJvALNsTAoAOJ39Eg3Mw94XMUQ",
num_rows: 7,
total_bytes_processed: 15686357820,
rows: %Stream{}
}
iex> Enum.to_list(res.rows)
Expand Down Expand Up @@ -143,6 +147,7 @@ defmodule ReqBigQuery do
|> Map.put(:maxResults, options[:max_results])
|> Map.put(:useLegacySql, options[:use_legacy_sql])
|> Map.put(:timeoutMs, options[:timeout_ms])
|> Map.put(:dryRun, options[:dry_run] || false)

%{request | url: uri}
|> Request.merge_options(auth: {:bearer, token}, json: json)
Expand Down Expand Up @@ -192,13 +197,15 @@ defmodule ReqBigQuery do
"kind" => "bigquery#queryResponse",
"rows" => _rows,
"schema" => %{"fields" => fields},
"totalRows" => num_rows
"totalRows" => num_rows,
"totalBytesProcessed" => total_bytes
} = initial_response,
request_options
) do
%Result{
job_id: job_id,
num_rows: String.to_integer(num_rows),
total_bytes_processed: String.to_integer(total_bytes),
rows: initial_response |> rows_stream(request_options) |> decode_rows(fields),
columns: decode_columns(fields)
}
Expand All @@ -209,13 +216,33 @@ defmodule ReqBigQuery do
"jobReference" => %{"jobId" => job_id},
"kind" => "bigquery#queryResponse",
"schema" => %{"fields" => fields},
"totalRows" => num_rows
"totalRows" => num_rows,
"totalBytesProcessed" => total_bytes
},
_request_options
) do
%Result{
job_id: job_id,
num_rows: String.to_integer(num_rows),
total_bytes_processed: String.to_integer(total_bytes),
rows: [],
columns: decode_columns(fields)
}
end

defp decode_body(
%{
"jobReference" => %{},
"kind" => "bigquery#queryResponse",
"schema" => %{"fields" => fields},
"totalBytesProcessed" => total_bytes
},
_request_options
) do
%Result{
job_id: nil,
num_rows: 0,
total_bytes_processed: String.to_integer(total_bytes),
rows: [],
columns: decode_columns(fields)
}
Expand Down
8 changes: 5 additions & 3 deletions lib/req_bigquery/result.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,19 @@ defmodule ReqBigQuery.Result do
* `rows` - The result set. A list of lists, each inner list corresponding to a
row, each element in the inner list corresponds to a column;
* `num_rows` - The number of fetched or affected rows;
* `job_id` - The ID of the Google BigQuery's executed job.
* `total_bytes_processed` - The total number of bytes processed for the query;
* `job_id` - The ID of the Google BigQuery's executed job. Returns nil for dry runs.
"""

@type t :: %__MODULE__{
columns: [String.t()],
rows: [[term()] | binary()],
num_rows: non_neg_integer(),
job_id: binary()
total_bytes_processed: non_neg_integer(),
job_id: binary() | nil
}

defstruct [:job_id, num_rows: 0, rows: [], columns: []]
defstruct [:job_id, :total_bytes_processed, num_rows: 0, rows: [], columns: []]
end

if Code.ensure_loaded?(Table.Reader) do
Expand Down
38 changes: 38 additions & 0 deletions test/integration_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -223,11 +223,49 @@ defmodule IntegrationTest do

assert result.columns == ["en_description"]
assert result.num_rows == 1
assert result.total_bytes_processed == 285_337

rows = Enum.to_list(result.rows)
assert rows == [["fruit of the apple tree"]]
end

test "returns the Google BigQuery's response with total processed bytes for a dry run request",
%{
test: goth
} do
credentials =
System.get_env("GOOGLE_APPLICATION_CREDENTIALS", "credentials.json")
|> File.read!()
|> Jason.decode!()

project_id = System.get_env("PROJECT_ID", credentials["project_id"])

source = {:service_account, credentials, []}
start_supervised!({Goth, name: goth, source: source, http_client: &Req.request/1})

query = """
SELECT en_description
FROM `bigquery-public-data.wikipedia.wikidata`
WHERE id = ?
AND numeric_id = ?
"""

response =
Req.new()
|> ReqBigQuery.attach(project_id: project_id, goth: goth)
|> Req.post!(bigquery: {query, ["Q89", 89]}, dry_run: true)

assert response.status == 200

result = response.body

assert result.total_bytes_processed == 285_337
assert result.columns == ["en_description"]
assert result.num_rows == 0
assert result.job_id == nil
assert result.rows == []
end

test "encodes and decodes types received from Google BigQuery's response", %{
test: goth
} do
Expand Down
84 changes: 79 additions & 5 deletions test/req_bigquery_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ defmodule ReqBigQueryTest do
"query" => "select * from iris",
"maxResults" => 10000,
"useLegacySql" => true,
"timeoutMs" => 20000
"timeoutMs" => 20000,
"dryRun" => false
}

assert URI.to_string(request.url) ==
Expand All @@ -42,7 +43,8 @@ defmodule ReqBigQueryTest do
%{mode: "NULLABLE", name: "name", type: "STRING"}
]
},
totalRows: "2"
totalRows: "2",
totalBytesProcessed: "1547899"
}

{request, Req.Response.json(data)}
Expand Down Expand Up @@ -99,7 +101,8 @@ defmodule ReqBigQueryTest do
],
"useLegacySql" => false,
"maxResults" => 10000,
"timeoutMs" => 10000
"timeoutMs" => 10000,
"dryRun" => true
}

assert URI.to_string(request.url) ==
Expand All @@ -121,7 +124,8 @@ defmodule ReqBigQueryTest do
%{mode: "NULLABLE", name: "name", type: "STRING"}
]
},
totalRows: "2"
totalRows: "2",
totalBytesProcessed: "1547899"
}

{request, Req.Response.json(data)}
Expand All @@ -130,7 +134,8 @@ defmodule ReqBigQueryTest do
opts = [
goth: ctx.test,
project_id: "my_awesome_project_id",
default_dataset_id: "my_awesome_dataset"
default_dataset_id: "my_awesome_dataset",
dry_run: true
]

assert response =
Expand All @@ -150,6 +155,75 @@ defmodule ReqBigQueryTest do
assert Enum.to_list(response.body.rows) == [[1, "Ale"], [2, "Wojtek"]]
end

test "executes a dry run query", ctx do
fake_goth = fn request ->
data = %{access_token: "dummy", expires_in: 3599, token_type: "Bearer"}
{request, Req.Response.json(data)}
end

start_supervised!(
{Goth,
name: ctx.test,
source: {:service_account, goth_credentials(), []},
http_client: {&Req.request/1, adapter: fake_goth}}
)

fake_bigquery = fn request ->
assert Jason.decode!(request.body) == %{
"defaultDataset" => %{"datasetId" => "my_awesome_dataset"},
"query" => "select * from iris",
"maxResults" => 10000,
"useLegacySql" => true,
"timeoutMs" => 20000,
"dryRun" => true
}

assert URI.to_string(request.url) ==
"https://bigquery.googleapis.com/bigquery/v2/projects/my_awesome_project_id/queries"

assert Req.Request.get_header(request, "content-type") == ["application/json"]
assert Req.Request.get_header(request, "authorization") == ["Bearer dummy"]

data = %{
jobReference: %{},
kind: "bigquery#queryResponse",
schema: %{
fields: [
%{mode: "NULLABLE", name: "id", type: "INTEGER"},
%{mode: "NULLABLE", name: "name", type: "STRING"}
]
},
totalBytesProcessed: "1547899"
}

{request, Req.Response.json(data)}
end

opts = [
goth: ctx.test,
project_id: "my_awesome_project_id",
default_dataset_id: "my_awesome_dataset",
use_legacy_sql: true,
timeout_ms: 20_000,
dry_run: true
]

assert response =
Req.new(adapter: fake_bigquery)
|> ReqBigQuery.attach(opts)
|> Req.post!(bigquery: "select * from iris")

assert response.status == 200

assert %ReqBigQuery.Result{
columns: ["id", "name"],
job_id: nil,
num_rows: 0,
rows: [],
total_bytes_processed: 1_547_899
} = response.body
end

defp goth_credentials do
private_key = :public_key.generate_key({:rsa, 2048, 65_537})

Expand Down

0 comments on commit f383ea6

Please sign in to comment.