Skip to content

Commit

Permalink
Support stateless queries.
Browse files Browse the repository at this point in the history
From
https://cloud.google.com/java/docs/reference/google-cloud-bigquery/2.34.1/com.google.cloud.bigquery.BigQuery

> Stateless queries: query execution without corresponding job metadata

Stateless queries are currently in preview.

This PR introduces support for stateless queries via a new JDBC query
parameter. Set `jobcreationmode` to a string that matches one of the
standard `JobCreationMode` enum values:

- `JOB_CREATION_MODE_UNSPECIFIED`: Unspecified JobCreationMode,
   defaults to `JOB_CREATION_REQUIRED`.
- `JOB_CREATION_REQUIRED`: Default. Job creation is always
   required.
- `JOB_CREATION_OPTIONAL`: Job creation is optional. Returning immediate
   results is prioritized. BigQuery will automatically determine if a Job
   needs to be created. The conditions under which BigQuery can decide to
   not create a Job are subject to change. If Job creation is required,
   JOB_CREATION_REQUIRED mode should be used, which is the default.

See
https://github.com/googleapis/java-bigquery/blob/v2.34.0/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/QueryJobConfiguration.java#L98-L111

For example, the following create a connection with optional job
creation:

    DriverManager.getConnection(
        "jdbc:BQDriver:my-project?jobcreationmode=JOB_CREATION_OPTIONAL",
        driverProperties);

Note that this will cause queries to fail if the provided project does
not have stateless queries preview enabled.
  • Loading branch information
goomrw committed Dec 13, 2023
1 parent 8f9b83b commit 7ca2597
Show file tree
Hide file tree
Showing 11 changed files with 238 additions and 26 deletions.
48 changes: 48 additions & 0 deletions src/main/java/net/starschema/clouddb/jdbc/BQConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,32 @@ public class BQConnection implements Connection {
/** Boolean to determine whether or not to use legacy sql (default: false) * */
private final boolean useLegacySql;

/**
* Enum that describes whether to create a job in projects that support stateless queries. Copied
* from BigQueryImpl
*/
public static enum JobCreationMode {
/** If unspecified JOB_CREATION_REQUIRED is the default. */
JOB_CREATION_MODE_UNSPECIFIED,
/** Default. Job creation is always required. */
JOB_CREATION_REQUIRED,

/**
* Job creation is optional. Returning immediate results is prioritized. BigQuery will
* automatically determine if a Job needs to be created. The conditions under which BigQuery can
* decide to not create a Job are subject to change. If Job creation is required,
* JOB_CREATION_REQUIRED mode should be used, which is the default.
*
* <p>Note that no job ID will be created if the results were returned immediately.
*/
JOB_CREATION_OPTIONAL;

private JobCreationMode() {}
}

/** The job creation mode - */
private JobCreationMode jobCreationMode = JobCreationMode.JOB_CREATION_MODE_UNSPECIFIED;

/** getter for useLegacySql */
public boolean getUseLegacySql() {
return useLegacySql;
Expand Down Expand Up @@ -210,6 +236,9 @@ public BQConnection(String url, Properties loginProp, HttpTransport httpTranspor
this.useQueryCache =
parseBooleanQueryParam(caseInsensitiveProps.getProperty("querycache"), true);

this.jobCreationMode =
parseJobCreationMode(caseInsensitiveProps.getProperty("jobcreationmode"));

// Create Connection to BigQuery
if (serviceAccount) {
try {
Expand Down Expand Up @@ -322,6 +351,21 @@ private static List<String> parseArrayQueryParam(@Nullable String string, Charac
: Arrays.asList(string.split(delimiter + "\\s*"));
}

/**
* Return a {@link JobCreationMode} or raise an exception if the string does not match a variant.
*/
private static JobCreationMode parseJobCreationMode(@Nullable String string)
throws BQSQLException {
if (string == null) {
return null;
}
try {
return JobCreationMode.valueOf(string);
} catch (IllegalArgumentException e) {
throw new BQSQLException("could not parse " + string + " as job creation mode", e);
}
}

/**
*
*
Expand Down Expand Up @@ -1214,4 +1258,8 @@ public Long getMaxBillingBytes() {
public Integer getTimeoutMs() {
return timeoutMs;
}

public JobCreationMode getJobCreationMode() {
return jobCreationMode;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ public class BQForwardOnlyResultSet implements java.sql.ResultSet {
private String projectId;
/** Reference for the Job */
private @Nullable Job completedJob;
/** The BigQuery query ID; set if the query completed without a Job */
private final @Nullable String queryId;
/** The total number of bytes processed while creating this ResultSet */
private final @Nullable Long totalBytesProcessed;
/** Whether the ResultSet came from BigQuery's cache */
Expand All @@ -134,12 +136,14 @@ public BQForwardOnlyResultSet(
Bigquery bigquery,
String projectId,
@Nullable Job completedJob,
String queryId,
BQStatementRoot bqStatementRoot)
throws SQLException {
this(
bigquery,
projectId,
completedJob,
queryId,
bqStatementRoot,
null,
false,
Expand Down Expand Up @@ -167,6 +171,7 @@ public BQForwardOnlyResultSet(
Bigquery bigquery,
String projectId,
@Nullable Job completedJob,
@Nullable String queryId,
BQStatementRoot bqStatementRoot,
List<TableRow> prefetchedRows,
boolean prefetchedAllRows,
Expand All @@ -179,6 +184,7 @@ public BQForwardOnlyResultSet(
logger.debug("Created forward only resultset TYPE_FORWARD_ONLY");
this.Statementreference = (Statement) bqStatementRoot;
this.completedJob = completedJob;
this.queryId = queryId;
this.projectId = projectId;
if (bigquery == null) {
throw new BQSQLException("Failed to fetch results. Connection is closed.");
Expand Down Expand Up @@ -3048,4 +3054,8 @@ public boolean wasNull() throws SQLException {
return null;
}
}

public @Nullable String getQueryId() {
return queryId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ public ResultSet executeQuery() throws SQLException {
this);
} else {
return new BQForwardOnlyResultSet(
this.connection.getBigquery(), this.projectId, referencedJob, this);
this.connection.getBigquery(), this.projectId, referencedJob, null, this);
}
}
// Pause execution for half second before polling job status
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,10 @@ public class BQScrollableResultSet extends ScrollableResultset<Object>
*/
private final @Nullable List<BiEngineReason> biEngineReasons;

private final JobReference jobReference;
private final @Nullable JobReference jobReference;

/** The BigQuery query ID; set if the query completed without a Job */
private final @Nullable String queryId;

private TableSchema schema;

Expand All @@ -89,7 +92,8 @@ public BQScrollableResultSet(
bigQueryGetQueryResultResponse.getCacheHit(),
null,
null,
bigQueryGetQueryResultResponse.getJobReference());
bigQueryGetQueryResultResponse.getJobReference(),
null);

BigInteger maxrow;
try {
Expand All @@ -107,7 +111,8 @@ public BQScrollableResultSet(
@Nullable Boolean cacheHit,
@Nullable String biEngineMode,
@Nullable List<BiEngineReason> biEngineReasons,
JobReference jobReference) {
@Nullable JobReference jobReference,
@Nullable String queryId) {
logger.debug("Created Scrollable resultset TYPE_SCROLL_INSENSITIVE");
try {
maxFieldSize = bqStatementRoot.getMaxFieldSize();
Expand All @@ -129,6 +134,7 @@ public BQScrollableResultSet(
this.biEngineMode = biEngineMode;
this.biEngineReasons = biEngineReasons;
this.jobReference = jobReference;
this.queryId = queryId;
}

/** {@inheritDoc} */
Expand Down Expand Up @@ -295,4 +301,8 @@ public String getString(int columnIndex) throws SQLException {
return null;
}
}

public @Nullable String getQueryId() {
return queryId;
}
}
9 changes: 6 additions & 3 deletions src/main/java/net/starschema/clouddb/jdbc/BQStatement.java
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ private ResultSet executeQueryHelper(String querySql, boolean unlimitedBillingBy
this.connection.getBigquery(),
projectId,
referencedJob,
qr.getQueryId(),
this,
rows,
fetchedAll,
Expand All @@ -234,7 +235,8 @@ private ResultSet executeQueryHelper(String querySql, boolean unlimitedBillingBy
qr.getCacheHit(),
biEngineMode,
biEngineReasons,
qr.getJobReference());
qr.getJobReference(),
qr.getQueryId());
}
jobAlreadyCompleted = true;
}
Expand Down Expand Up @@ -285,7 +287,7 @@ private ResultSet executeQueryHelper(String querySql, boolean unlimitedBillingBy
this);
} else {
return new BQForwardOnlyResultSet(
this.connection.getBigquery(), projectId, referencedJob, this);
this.connection.getBigquery(), projectId, referencedJob, null, this);
}
}
// Pause execution for half second before polling job status
Expand Down Expand Up @@ -345,7 +347,8 @@ protected QueryResponse runSyncQuery(String querySql, boolean unlimitedBillingBy
// socket timeouts
(long) getMaxRows(),
this.getAllLabels(),
this.connection.getUseQueryCache());
this.connection.getUseQueryCache(),
this.connection.getJobCreationMode());
syncResponseFromCurrentQuery.set(resp);
this.mostRecentJobReference.set(resp.getJobReference());
} catch (Exception e) {
Expand Down
11 changes: 7 additions & 4 deletions src/main/java/net/starschema/clouddb/jdbc/BQStatementRoot.java
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,8 @@ private int executeDML(String sql) throws SQLException {
(long) querytimeout * 1000,
(long) getMaxRows(),
this.getAllLabels(),
this.connection.getUseQueryCache());
this.connection.getUseQueryCache(),
this.connection.getJobCreationMode());
this.mostRecentJobReference.set(qr.getJobReference());

if (defaultValueIfNull(qr.getJobComplete(), false)) {
Expand Down Expand Up @@ -327,7 +328,8 @@ public ResultSet executeQuery(String querySql, boolean unlimitedBillingBytes)
(long) querytimeout * 1000,
(long) getMaxRows(),
this.getAllLabels(),
this.connection.getUseQueryCache());
this.connection.getUseQueryCache(),
this.connection.getJobCreationMode());
this.mostRecentJobReference.set(qr.getJobReference());

referencedJob =
Expand Down Expand Up @@ -362,7 +364,8 @@ public ResultSet executeQuery(String querySql, boolean unlimitedBillingBytes)
qr.getCacheHit(),
biEngineMode,
biEngineReasons,
referencedJob.getJobReference());
referencedJob.getJobReference(),
qr.getQueryId());
}
jobAlreadyCompleted = true;
}
Expand All @@ -384,7 +387,7 @@ public ResultSet executeQuery(String querySql, boolean unlimitedBillingBytes)
this);
} else {
return new BQForwardOnlyResultSet(
this.connection.getBigquery(), projectId, referencedJob, this);
this.connection.getBigquery(), projectId, referencedJob, null, this);
}
}
// Pause execution for half second before polling job status
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Stream;
import net.starschema.clouddb.jdbc.BQConnection.JobCreationMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -644,7 +645,8 @@ static QueryResponse runSyncQuery(
Long queryTimeoutMs,
Long maxResults,
Map<String, String> labels,
boolean useQueryCache)
boolean useQueryCache,
JobCreationMode jobCreationMode)
throws IOException {
QueryRequest qr =
new QueryRequest()
Expand All @@ -654,6 +656,9 @@ static QueryResponse runSyncQuery(
.setQuery(querySql)
.setUseLegacySql(useLegacySql)
.setMaximumBytesBilled(maxBillingBytes);
if (jobCreationMode != null) {
qr = qr.setJobCreationMode(jobCreationMode.name());
}
if (dataSet != null) {
qr.setDefaultDataset(new DatasetReference().setDatasetId(dataSet).setProjectId(projectId));
}
Expand Down
Loading

0 comments on commit 7ca2597

Please sign in to comment.