From f383ea632faeeaf8eff8a4a5a761d41e9132069e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Thales=20C=C3=A9sar=20Giriboni?= Date: Wed, 10 Apr 2024 16:12:56 -0300 Subject: [PATCH] Add dry run support (#30) --- lib/req_bigquery.ex | 33 +++++++++++++-- lib/req_bigquery/result.ex | 8 ++-- test/integration_test.exs | 38 +++++++++++++++++ test/req_bigquery_test.exs | 84 +++++++++++++++++++++++++++++++++++--- 4 files changed, 152 insertions(+), 11 deletions(-) diff --git a/lib/req_bigquery.ex b/lib/req_bigquery.ex index e403e07..475358a 100644 --- a/lib/req_bigquery.ex +++ b/lib/req_bigquery.ex @@ -10,7 +10,7 @@ defmodule ReqBigQuery do alias Req.Request alias ReqBigQuery.Result - @allowed_options ~w(goth default_dataset_id project_id bigquery max_results use_legacy_sql timeout_ms)a + @allowed_options ~w(goth default_dataset_id project_id bigquery max_results use_legacy_sql timeout_ms dry_run)a @base_url "https://bigquery.googleapis.com/bigquery/v2" @max_results 10_000 @use_legacy_sql false @@ -47,6 +47,8 @@ defmodule ReqBigQuery do returns without any results and with the 'jobComplete' flag set to false. The default value is 10000 milliseconds (10 seconds). + * `:dry_run` - Optional. Specifies whether to run the given query in dry run mode. + If you want to set any of these options when attaching the plugin, pass them as the second argument. ## Examples @@ -72,6 +74,7 @@ defmodule ReqBigQuery do columns: ["title", "views"], job_id: "job_JDDZKquJWkY7x0LlDcmZ4nMQqshb", num_rows: 10, + total_bytes_processed: 18161868216, rows: %Stream{} } iex> Enum.to_list(res.rows) @@ -108,6 +111,7 @@ defmodule ReqBigQuery do columns: ["year", "views"], job_id: "job_GXiJvALNsTAoAOJ39Eg3Mw94XMUQ", num_rows: 7, + total_bytes_processed: 15686357820, rows: %Stream{} } iex> Enum.to_list(res.rows) @@ -143,6 +147,7 @@ defmodule ReqBigQuery do |> Map.put(:maxResults, options[:max_results]) |> Map.put(:useLegacySql, options[:use_legacy_sql]) |> Map.put(:timeoutMs, options[:timeout_ms]) + |> Map.put(:dryRun, options[:dry_run] || false) %{request | url: uri} |> Request.merge_options(auth: {:bearer, token}, json: json) @@ -192,13 +197,15 @@ defmodule ReqBigQuery do "kind" => "bigquery#queryResponse", "rows" => _rows, "schema" => %{"fields" => fields}, - "totalRows" => num_rows + "totalRows" => num_rows, + "totalBytesProcessed" => total_bytes } = initial_response, request_options ) do %Result{ job_id: job_id, num_rows: String.to_integer(num_rows), + total_bytes_processed: String.to_integer(total_bytes), rows: initial_response |> rows_stream(request_options) |> decode_rows(fields), columns: decode_columns(fields) } @@ -209,13 +216,33 @@ defmodule ReqBigQuery do "jobReference" => %{"jobId" => job_id}, "kind" => "bigquery#queryResponse", "schema" => %{"fields" => fields}, - "totalRows" => num_rows + "totalRows" => num_rows, + "totalBytesProcessed" => total_bytes }, _request_options ) do %Result{ job_id: job_id, num_rows: String.to_integer(num_rows), + total_bytes_processed: String.to_integer(total_bytes), + rows: [], + columns: decode_columns(fields) + } + end + + defp decode_body( + %{ + "jobReference" => %{}, + "kind" => "bigquery#queryResponse", + "schema" => %{"fields" => fields}, + "totalBytesProcessed" => total_bytes + }, + _request_options + ) do + %Result{ + job_id: nil, + num_rows: 0, + total_bytes_processed: String.to_integer(total_bytes), rows: [], columns: decode_columns(fields) } diff --git a/lib/req_bigquery/result.ex b/lib/req_bigquery/result.ex index dcf109e..5c03ed9 100644 --- a/lib/req_bigquery/result.ex +++ b/lib/req_bigquery/result.ex @@ -8,17 +8,19 @@ defmodule ReqBigQuery.Result do * `rows` - The result set. A list of lists, each inner list corresponding to a row, each element in the inner list corresponds to a column; * `num_rows` - The number of fetched or affected rows; - * `job_id` - The ID of the Google BigQuery's executed job. + * `total_bytes_processed` - The total number of bytes processed for the query; + * `job_id` - The ID of the Google BigQuery's executed job. Returns nil for dry runs. """ @type t :: %__MODULE__{ columns: [String.t()], rows: [[term()] | binary()], num_rows: non_neg_integer(), - job_id: binary() + total_bytes_processed: non_neg_integer(), + job_id: binary() | nil } - defstruct [:job_id, num_rows: 0, rows: [], columns: []] + defstruct [:job_id, :total_bytes_processed, num_rows: 0, rows: [], columns: []] end if Code.ensure_loaded?(Table.Reader) do diff --git a/test/integration_test.exs b/test/integration_test.exs index a6596c2..9fc0d31 100644 --- a/test/integration_test.exs +++ b/test/integration_test.exs @@ -223,11 +223,49 @@ defmodule IntegrationTest do assert result.columns == ["en_description"] assert result.num_rows == 1 + assert result.total_bytes_processed == 285_337 rows = Enum.to_list(result.rows) assert rows == [["fruit of the apple tree"]] end + test "returns the Google BigQuery's response with total processed bytes for a dry run request", + %{ + test: goth + } do + credentials = + System.get_env("GOOGLE_APPLICATION_CREDENTIALS", "credentials.json") + |> File.read!() + |> Jason.decode!() + + project_id = System.get_env("PROJECT_ID", credentials["project_id"]) + + source = {:service_account, credentials, []} + start_supervised!({Goth, name: goth, source: source, http_client: &Req.request/1}) + + query = """ + SELECT en_description + FROM `bigquery-public-data.wikipedia.wikidata` + WHERE id = ? + AND numeric_id = ? + """ + + response = + Req.new() + |> ReqBigQuery.attach(project_id: project_id, goth: goth) + |> Req.post!(bigquery: {query, ["Q89", 89]}, dry_run: true) + + assert response.status == 200 + + result = response.body + + assert result.total_bytes_processed == 285_337 + assert result.columns == ["en_description"] + assert result.num_rows == 0 + assert result.job_id == nil + assert result.rows == [] + end + test "encodes and decodes types received from Google BigQuery's response", %{ test: goth } do diff --git a/test/req_bigquery_test.exs b/test/req_bigquery_test.exs index 5baba73..035d52c 100644 --- a/test/req_bigquery_test.exs +++ b/test/req_bigquery_test.exs @@ -20,7 +20,8 @@ defmodule ReqBigQueryTest do "query" => "select * from iris", "maxResults" => 10000, "useLegacySql" => true, - "timeoutMs" => 20000 + "timeoutMs" => 20000, + "dryRun" => false } assert URI.to_string(request.url) == @@ -42,7 +43,8 @@ defmodule ReqBigQueryTest do %{mode: "NULLABLE", name: "name", type: "STRING"} ] }, - totalRows: "2" + totalRows: "2", + totalBytesProcessed: "1547899" } {request, Req.Response.json(data)} @@ -99,7 +101,8 @@ defmodule ReqBigQueryTest do ], "useLegacySql" => false, "maxResults" => 10000, - "timeoutMs" => 10000 + "timeoutMs" => 10000, + "dryRun" => true } assert URI.to_string(request.url) == @@ -121,7 +124,8 @@ defmodule ReqBigQueryTest do %{mode: "NULLABLE", name: "name", type: "STRING"} ] }, - totalRows: "2" + totalRows: "2", + totalBytesProcessed: "1547899" } {request, Req.Response.json(data)} @@ -130,7 +134,8 @@ defmodule ReqBigQueryTest do opts = [ goth: ctx.test, project_id: "my_awesome_project_id", - default_dataset_id: "my_awesome_dataset" + default_dataset_id: "my_awesome_dataset", + dry_run: true ] assert response = @@ -150,6 +155,75 @@ defmodule ReqBigQueryTest do assert Enum.to_list(response.body.rows) == [[1, "Ale"], [2, "Wojtek"]] end + test "executes a dry run query", ctx do + fake_goth = fn request -> + data = %{access_token: "dummy", expires_in: 3599, token_type: "Bearer"} + {request, Req.Response.json(data)} + end + + start_supervised!( + {Goth, + name: ctx.test, + source: {:service_account, goth_credentials(), []}, + http_client: {&Req.request/1, adapter: fake_goth}} + ) + + fake_bigquery = fn request -> + assert Jason.decode!(request.body) == %{ + "defaultDataset" => %{"datasetId" => "my_awesome_dataset"}, + "query" => "select * from iris", + "maxResults" => 10000, + "useLegacySql" => true, + "timeoutMs" => 20000, + "dryRun" => true + } + + assert URI.to_string(request.url) == + "https://bigquery.googleapis.com/bigquery/v2/projects/my_awesome_project_id/queries" + + assert Req.Request.get_header(request, "content-type") == ["application/json"] + assert Req.Request.get_header(request, "authorization") == ["Bearer dummy"] + + data = %{ + jobReference: %{}, + kind: "bigquery#queryResponse", + schema: %{ + fields: [ + %{mode: "NULLABLE", name: "id", type: "INTEGER"}, + %{mode: "NULLABLE", name: "name", type: "STRING"} + ] + }, + totalBytesProcessed: "1547899" + } + + {request, Req.Response.json(data)} + end + + opts = [ + goth: ctx.test, + project_id: "my_awesome_project_id", + default_dataset_id: "my_awesome_dataset", + use_legacy_sql: true, + timeout_ms: 20_000, + dry_run: true + ] + + assert response = + Req.new(adapter: fake_bigquery) + |> ReqBigQuery.attach(opts) + |> Req.post!(bigquery: "select * from iris") + + assert response.status == 200 + + assert %ReqBigQuery.Result{ + columns: ["id", "name"], + job_id: nil, + num_rows: 0, + rows: [], + total_bytes_processed: 1_547_899 + } = response.body + end + defp goth_credentials do private_key = :public_key.generate_key({:rsa, 2048, 65_537})