diff --git a/src/main/java/net/starschema/clouddb/jdbc/BQConnection.java b/src/main/java/net/starschema/clouddb/jdbc/BQConnection.java index 48d0b8e..363b701 100644 --- a/src/main/java/net/starschema/clouddb/jdbc/BQConnection.java +++ b/src/main/java/net/starschema/clouddb/jdbc/BQConnection.java @@ -72,6 +72,32 @@ public class BQConnection implements Connection { /** Boolean to determine whether or not to use legacy sql (default: false) * */ private final boolean useLegacySql; + /** + * Enum that describes whether to create a job in projects that support stateless queries. Copied + * from BigQueryImpl + */ + public static enum JobCreationMode { + /** If unspecified JOB_CREATION_REQUIRED is the default. */ + JOB_CREATION_MODE_UNSPECIFIED, + /** Default. Job creation is always required. */ + JOB_CREATION_REQUIRED, + + /** + * Job creation is optional. Returning immediate results is prioritized. BigQuery will + * automatically determine if a Job needs to be created. The conditions under which BigQuery can + * decide to not create a Job are subject to change. If Job creation is required, + * JOB_CREATION_REQUIRED mode should be used, which is the default. + * + *

Note that no job ID will be created if the results were returned immediately. + */ + JOB_CREATION_OPTIONAL; + + private JobCreationMode() {} + } + + /** The job creation mode - */ + private JobCreationMode jobCreationMode = JobCreationMode.JOB_CREATION_MODE_UNSPECIFIED; + /** getter for useLegacySql */ public boolean getUseLegacySql() { return useLegacySql; @@ -210,6 +236,9 @@ public BQConnection(String url, Properties loginProp, HttpTransport httpTranspor this.useQueryCache = parseBooleanQueryParam(caseInsensitiveProps.getProperty("querycache"), true); + this.jobCreationMode = + parseJobCreationMode(caseInsensitiveProps.getProperty("jobcreationmode")); + // Create Connection to BigQuery if (serviceAccount) { try { @@ -322,6 +351,21 @@ private static List parseArrayQueryParam(@Nullable String string, Charac : Arrays.asList(string.split(delimiter + "\\s*")); } + /** + * Return a {@link JobCreationMode} or raise an exception if the string does not match a variant. + */ + private static JobCreationMode parseJobCreationMode(@Nullable String string) + throws BQSQLException { + if (string == null) { + return null; + } + try { + return JobCreationMode.valueOf(string); + } catch (IllegalArgumentException e) { + throw new BQSQLException("could not parse " + string + " as job creation mode", e); + } + } + /** * * @@ -1214,4 +1258,8 @@ public Long getMaxBillingBytes() { public Integer getTimeoutMs() { return timeoutMs; } + + public JobCreationMode getJobCreationMode() { + return jobCreationMode; + } } diff --git a/src/main/java/net/starschema/clouddb/jdbc/BQForwardOnlyResultSet.java b/src/main/java/net/starschema/clouddb/jdbc/BQForwardOnlyResultSet.java index de1664d..febb809 100644 --- a/src/main/java/net/starschema/clouddb/jdbc/BQForwardOnlyResultSet.java +++ b/src/main/java/net/starschema/clouddb/jdbc/BQForwardOnlyResultSet.java @@ -109,6 +109,8 @@ public class BQForwardOnlyResultSet implements java.sql.ResultSet { private String projectId; /** Reference for the Job */ private @Nullable Job completedJob; + /** The BigQuery query ID; set if the query completed without a Job */ + private final @Nullable String queryId; /** The total number of bytes processed while creating this ResultSet */ private final @Nullable Long totalBytesProcessed; /** Whether the ResultSet came from BigQuery's cache */ @@ -134,12 +136,14 @@ public BQForwardOnlyResultSet( Bigquery bigquery, String projectId, @Nullable Job completedJob, + String queryId, BQStatementRoot bqStatementRoot) throws SQLException { this( bigquery, projectId, completedJob, + queryId, bqStatementRoot, null, false, @@ -167,6 +171,7 @@ public BQForwardOnlyResultSet( Bigquery bigquery, String projectId, @Nullable Job completedJob, + @Nullable String queryId, BQStatementRoot bqStatementRoot, List prefetchedRows, boolean prefetchedAllRows, @@ -179,6 +184,7 @@ public BQForwardOnlyResultSet( logger.debug("Created forward only resultset TYPE_FORWARD_ONLY"); this.Statementreference = (Statement) bqStatementRoot; this.completedJob = completedJob; + this.queryId = queryId; this.projectId = projectId; if (bigquery == null) { throw new BQSQLException("Failed to fetch results. Connection is closed."); @@ -3048,4 +3054,8 @@ public boolean wasNull() throws SQLException { return null; } } + + public @Nullable String getQueryId() { + return queryId; + } } diff --git a/src/main/java/net/starschema/clouddb/jdbc/BQPreparedStatement.java b/src/main/java/net/starschema/clouddb/jdbc/BQPreparedStatement.java index 476e2da..a191caa 100644 --- a/src/main/java/net/starschema/clouddb/jdbc/BQPreparedStatement.java +++ b/src/main/java/net/starschema/clouddb/jdbc/BQPreparedStatement.java @@ -254,7 +254,7 @@ public ResultSet executeQuery() throws SQLException { this); } else { return new BQForwardOnlyResultSet( - this.connection.getBigquery(), this.projectId, referencedJob, this); + this.connection.getBigquery(), this.projectId, referencedJob, null, this); } } // Pause execution for half second before polling job status diff --git a/src/main/java/net/starschema/clouddb/jdbc/BQScrollableResultSet.java b/src/main/java/net/starschema/clouddb/jdbc/BQScrollableResultSet.java index 816d0b4..fdf0e90 100644 --- a/src/main/java/net/starschema/clouddb/jdbc/BQScrollableResultSet.java +++ b/src/main/java/net/starschema/clouddb/jdbc/BQScrollableResultSet.java @@ -69,7 +69,10 @@ public class BQScrollableResultSet extends ScrollableResultset */ private final @Nullable List biEngineReasons; - private final JobReference jobReference; + private final @Nullable JobReference jobReference; + + /** The BigQuery query ID; set if the query completed without a Job */ + private final @Nullable String queryId; private TableSchema schema; @@ -89,7 +92,8 @@ public BQScrollableResultSet( bigQueryGetQueryResultResponse.getCacheHit(), null, null, - bigQueryGetQueryResultResponse.getJobReference()); + bigQueryGetQueryResultResponse.getJobReference(), + null); BigInteger maxrow; try { @@ -107,7 +111,8 @@ public BQScrollableResultSet( @Nullable Boolean cacheHit, @Nullable String biEngineMode, @Nullable List biEngineReasons, - JobReference jobReference) { + @Nullable JobReference jobReference, + @Nullable String queryId) { logger.debug("Created Scrollable resultset TYPE_SCROLL_INSENSITIVE"); try { maxFieldSize = bqStatementRoot.getMaxFieldSize(); @@ -129,6 +134,7 @@ public BQScrollableResultSet( this.biEngineMode = biEngineMode; this.biEngineReasons = biEngineReasons; this.jobReference = jobReference; + this.queryId = queryId; } /** {@inheritDoc} */ @@ -295,4 +301,8 @@ public String getString(int columnIndex) throws SQLException { return null; } } + + public @Nullable String getQueryId() { + return queryId; + } } diff --git a/src/main/java/net/starschema/clouddb/jdbc/BQStatement.java b/src/main/java/net/starschema/clouddb/jdbc/BQStatement.java index 2350a89..6310a99 100644 --- a/src/main/java/net/starschema/clouddb/jdbc/BQStatement.java +++ b/src/main/java/net/starschema/clouddb/jdbc/BQStatement.java @@ -214,6 +214,7 @@ private ResultSet executeQueryHelper(String querySql, boolean unlimitedBillingBy this.connection.getBigquery(), projectId, referencedJob, + qr.getQueryId(), this, rows, fetchedAll, @@ -234,7 +235,8 @@ private ResultSet executeQueryHelper(String querySql, boolean unlimitedBillingBy qr.getCacheHit(), biEngineMode, biEngineReasons, - qr.getJobReference()); + qr.getJobReference(), + qr.getQueryId()); } jobAlreadyCompleted = true; } @@ -285,7 +287,7 @@ private ResultSet executeQueryHelper(String querySql, boolean unlimitedBillingBy this); } else { return new BQForwardOnlyResultSet( - this.connection.getBigquery(), projectId, referencedJob, this); + this.connection.getBigquery(), projectId, referencedJob, null, this); } } // Pause execution for half second before polling job status @@ -345,7 +347,8 @@ protected QueryResponse runSyncQuery(String querySql, boolean unlimitedBillingBy // socket timeouts (long) getMaxRows(), this.getAllLabels(), - this.connection.getUseQueryCache()); + this.connection.getUseQueryCache(), + this.connection.getJobCreationMode()); syncResponseFromCurrentQuery.set(resp); this.mostRecentJobReference.set(resp.getJobReference()); } catch (Exception e) { diff --git a/src/main/java/net/starschema/clouddb/jdbc/BQStatementRoot.java b/src/main/java/net/starschema/clouddb/jdbc/BQStatementRoot.java index 627dbf1..bf8d08d 100644 --- a/src/main/java/net/starschema/clouddb/jdbc/BQStatementRoot.java +++ b/src/main/java/net/starschema/clouddb/jdbc/BQStatementRoot.java @@ -265,7 +265,8 @@ private int executeDML(String sql) throws SQLException { (long) querytimeout * 1000, (long) getMaxRows(), this.getAllLabels(), - this.connection.getUseQueryCache()); + this.connection.getUseQueryCache(), + this.connection.getJobCreationMode()); this.mostRecentJobReference.set(qr.getJobReference()); if (defaultValueIfNull(qr.getJobComplete(), false)) { @@ -327,7 +328,8 @@ public ResultSet executeQuery(String querySql, boolean unlimitedBillingBytes) (long) querytimeout * 1000, (long) getMaxRows(), this.getAllLabels(), - this.connection.getUseQueryCache()); + this.connection.getUseQueryCache(), + this.connection.getJobCreationMode()); this.mostRecentJobReference.set(qr.getJobReference()); referencedJob = @@ -362,7 +364,8 @@ public ResultSet executeQuery(String querySql, boolean unlimitedBillingBytes) qr.getCacheHit(), biEngineMode, biEngineReasons, - referencedJob.getJobReference()); + referencedJob.getJobReference(), + qr.getQueryId()); } jobAlreadyCompleted = true; } @@ -384,7 +387,7 @@ public ResultSet executeQuery(String querySql, boolean unlimitedBillingBytes) this); } else { return new BQForwardOnlyResultSet( - this.connection.getBigquery(), projectId, referencedJob, this); + this.connection.getBigquery(), projectId, referencedJob, null, this); } } // Pause execution for half second before polling job status diff --git a/src/main/java/net/starschema/clouddb/jdbc/BQSupportFuncts.java b/src/main/java/net/starschema/clouddb/jdbc/BQSupportFuncts.java index 4f39d0a..bc86999 100644 --- a/src/main/java/net/starschema/clouddb/jdbc/BQSupportFuncts.java +++ b/src/main/java/net/starschema/clouddb/jdbc/BQSupportFuncts.java @@ -44,6 +44,7 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Stream; +import net.starschema.clouddb.jdbc.BQConnection.JobCreationMode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -644,7 +645,8 @@ static QueryResponse runSyncQuery( Long queryTimeoutMs, Long maxResults, Map labels, - boolean useQueryCache) + boolean useQueryCache, + JobCreationMode jobCreationMode) throws IOException { QueryRequest qr = new QueryRequest() @@ -654,6 +656,9 @@ static QueryResponse runSyncQuery( .setQuery(querySql) .setUseLegacySql(useLegacySql) .setMaximumBytesBilled(maxBillingBytes); + if (jobCreationMode != null) { + qr = qr.setJobCreationMode(jobCreationMode.name()); + } if (dataSet != null) { qr.setDefaultDataset(new DatasetReference().setDatasetId(dataSet).setProjectId(projectId)); } diff --git a/src/test/java/net/starschema/clouddb/jdbc/BQForwardOnlyResultSetFunctionTest.java b/src/test/java/net/starschema/clouddb/jdbc/BQForwardOnlyResultSetFunctionTest.java index f51be8e..207779e 100644 --- a/src/test/java/net/starschema/clouddb/jdbc/BQForwardOnlyResultSetFunctionTest.java +++ b/src/test/java/net/starschema/clouddb/jdbc/BQForwardOnlyResultSetFunctionTest.java @@ -28,7 +28,11 @@ import com.google.gson.Gson; import java.io.IOException; import java.math.BigDecimal; -import java.sql.*; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.sql.Timestamp; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date; @@ -36,6 +40,8 @@ import java.util.Map; import java.util.Properties; import junit.framework.Assert; +import org.assertj.core.api.Assertions; +import org.junit.After; import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; @@ -68,6 +74,14 @@ public void setup() throws SQLException, IOException { this.defaultConn = new BQConnection(url, new Properties()); } + @After + public void teardown() throws SQLException { + if (defaultConn != null) { + defaultConn.close(); + defaultConn = null; + } + } + private BQConnection conn() throws SQLException, IOException { return this.defaultConn; } @@ -198,21 +212,26 @@ public void isClosedValidtest() { */ @Before public void NewConnection() { - NewConnection(true); + NewConnection("&useLegacySql=true"); } - void NewConnection(boolean useLegacySql) { - + void NewConnection(String extraUrl) { this.logger.info("Testing the JDBC driver"); try { Class.forName("net.starschema.clouddb.jdbc.BQDriver"); Properties props = BQSupportFuncts.readFromPropFile( getClass().getResource("/installedaccount1.properties").getFile()); - props.setProperty("useLegacySql", String.valueOf(useLegacySql)); + String jdcbUrl = BQSupportFuncts.constructUrlFromPropertiesFile(props); + if (extraUrl != null) { + jdcbUrl += extraUrl; + } + if (BQForwardOnlyResultSetFunctionTest.con != null) { + BQForwardOnlyResultSetFunctionTest.con.close(); + } BQForwardOnlyResultSetFunctionTest.con = DriverManager.getConnection( - BQSupportFuncts.constructUrlFromPropertiesFile(props), + jdcbUrl, BQSupportFuncts.readFromPropFile( getClass().getResource("/installedaccount1.properties").getFile())); } catch (Exception e) { @@ -434,7 +453,7 @@ public void testResultSetTypesInGetString() throws SQLException { + "STRUCT(1 as a, ['an', 'array'] as b)," + "TIMESTAMP('2012-01-01 00:00:03.032') as t"; - this.NewConnection(false); + this.NewConnection("&useLegacySql=false"); java.sql.ResultSet result = null; try { Statement stmt = @@ -499,7 +518,7 @@ public void testResultSetTypesInGetObject() throws SQLException, ParseException + "CAST('2011-04-03' AS DATE), " + "CAST('nan' AS FLOAT)"; - this.NewConnection(true); + this.NewConnection("&useLegacySql=true"); java.sql.ResultSet result = null; try { Statement stmt = @@ -532,7 +551,7 @@ public void testResultSetTypesInGetObject() throws SQLException, ParseException public void testResultSetArraysInGetObject() throws SQLException, ParseException { final String sql = "SELECT [1, 2, 3], [TIMESTAMP(\"2010-09-07 15:30:00 America/Los_Angeles\")]"; - this.NewConnection(false); + this.NewConnection("&useLegacySql=false"); java.sql.ResultSet result = null; try { Statement stmt = @@ -566,7 +585,7 @@ public void testResultSetArraysInGetObject() throws SQLException, ParseException @Test public void testResultSetTimeType() throws SQLException, ParseException { final String sql = "select current_time(), CAST('00:00:02.123455' AS TIME)"; - this.NewConnection(false); + this.NewConnection("&useLegacySql=false"); java.sql.ResultSet result = null; try { Statement stmt = @@ -609,7 +628,7 @@ public void testResultSetProcedures() throws SQLException, ParseException { final String sql = "CREATE PROCEDURE looker_test.procedure_test(target_id INT64)\n" + "BEGIN\n" + "END;"; - this.NewConnection(false); + this.NewConnection("&useLegacySql=false"); java.sql.ResultSet result = null; try { Statement stmt = @@ -636,7 +655,7 @@ public void testResultSetProcedures() throws SQLException, ParseException { public void testResultSetProceduresAsync() throws SQLException { final String sql = "CREATE PROCEDURE looker_test.long_procedure(target_id INT64)\n" + "BEGIN\n" + "END;"; - this.NewConnection(false); + this.NewConnection("&useLegacySql=false"); try { BQConnection bq = conn(); @@ -679,7 +698,7 @@ public void testBQForwardOnlyResultSetDoesntThrowNPE() throws Exception { // before the results have been fetched. This was throwing a NPE. bq.close(); try { - new BQForwardOnlyResultSet(bq.getBigquery(), defaultProjectId, ref, stmt); + new BQForwardOnlyResultSet(bq.getBigquery(), defaultProjectId, ref, null, stmt); Assert.fail("Initalizing BQForwardOnlyResultSet should throw something other than a NPE."); } catch (SQLException e) { Assert.assertEquals(e.getMessage(), "Failed to fetch results. Connection is closed."); @@ -730,4 +749,21 @@ private void mockResponse(String jsonResponse) throws Exception { results.getBiEngineMode(); results.getBiEngineReasons(); } + + @Test + public void testStatelessQuery() throws SQLException { + NewConnection("&useLegacySql=false&jobcreationmode=JOB_CREATION_OPTIONAL"); + StatelessQuery.assumeStatelessQueriesEnabled( + BQForwardOnlyResultSetFunctionTest.con.getCatalog()); + final Statement stmt = + BQForwardOnlyResultSetFunctionTest.con.createStatement( + ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + final ResultSet result = stmt.executeQuery(StatelessQuery.exampleQuery()); + final String[][] rows = BQSupportMethods.GetQueryResult(result); + Assertions.assertThat(rows).isEqualTo(StatelessQuery.exampleValues()); + + final BQForwardOnlyResultSet bqResultSet = (BQForwardOnlyResultSet) result; + Assertions.assertThat(bqResultSet.getJobId()).isNull(); + Assertions.assertThat(bqResultSet.getQueryId()).contains("!"); + } } diff --git a/src/test/java/net/starschema/clouddb/jdbc/BQScrollableResultSetFunctionTest.java b/src/test/java/net/starschema/clouddb/jdbc/BQScrollableResultSetFunctionTest.java index 2f40868..c7e8190 100644 --- a/src/test/java/net/starschema/clouddb/jdbc/BQScrollableResultSetFunctionTest.java +++ b/src/test/java/net/starschema/clouddb/jdbc/BQScrollableResultSetFunctionTest.java @@ -28,6 +28,8 @@ import java.sql.Statement; import java.util.Properties; import junit.framework.Assert; +import org.assertj.core.api.Assertions; +import org.junit.After; import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; @@ -242,7 +244,16 @@ public void isClosedValidtest() { */ @Before public void NewConnection() { + NewConnection("&useLegacySql=true"); + } + + @After + public void closeConnection() throws SQLException { + BQScrollableResultSetFunctionTest.con.close(); + BQScrollableResultSetFunctionTest.con = null; + } + public void NewConnection(String extraUrl) { try { if (BQScrollableResultSetFunctionTest.con == null || !BQScrollableResultSetFunctionTest.con.isValid(0)) { @@ -253,7 +264,9 @@ public void NewConnection() { BQSupportFuncts.constructUrlFromPropertiesFile( BQSupportFuncts.readFromPropFile( getClass().getResource("/installedaccount1.properties").getFile())); - jdbcUrl += "&useLegacySql=true"; + if (jdbcUrl != null) { + jdbcUrl += extraUrl; + } BQScrollableResultSetFunctionTest.con = DriverManager.getConnection( jdbcUrl, @@ -714,4 +727,22 @@ private void mockResponse(String jsonResponse) throws Exception { results.getBiEngineMode(); results.getBiEngineReasons(); } + + @Test + public void testStatelessQuery() throws SQLException { + closeConnection(); + NewConnection("&useLegacySql=true&jobcreationmode=JOB_CREATION_OPTIONAL"); + StatelessQuery.assumeStatelessQueriesEnabled( + BQScrollableResultSetFunctionTest.con.getCatalog()); + final Statement stmt = + BQScrollableResultSetFunctionTest.con.createStatement( + ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY); + final ResultSet result = stmt.executeQuery(StatelessQuery.exampleQuery()); + final String[][] rows = BQSupportMethods.GetQueryResult(result); + Assertions.assertThat(rows).isEqualTo(StatelessQuery.exampleValues()); + + final BQScrollableResultSet bqResultSet = (BQScrollableResultSet) result; + Assertions.assertThat(bqResultSet.getJobId()).isNull(); + Assertions.assertThat(bqResultSet.getQueryId()).contains("!"); + } } diff --git a/src/test/java/net/starschema/clouddb/jdbc/JdbcUrlTest.java b/src/test/java/net/starschema/clouddb/jdbc/JdbcUrlTest.java index 3732de3..6b32a9d 100644 --- a/src/test/java/net/starschema/clouddb/jdbc/JdbcUrlTest.java +++ b/src/test/java/net/starschema/clouddb/jdbc/JdbcUrlTest.java @@ -13,6 +13,7 @@ import java.util.Map; import java.util.Properties; import junit.framework.Assert; +import net.starschema.clouddb.jdbc.BQConnection.JobCreationMode; import org.assertj.core.api.Assertions; import org.junit.Before; import org.junit.Rule; @@ -508,4 +509,27 @@ private Properties getProperties(String pathToProp) throws IOException { private String getUrl(String pathToProp, String dataset) throws IOException { return BQSupportFuncts.constructUrlFromPropertiesFile(getProperties(pathToProp), true, dataset); } + + @Test + public void missingJobCreationModeDefaultsToNull() throws Exception { + final String url = getUrl("/protectedaccount.properties", null); + Assertions.assertThat(url).doesNotContain("jobcreationmode"); + bq = new BQConnection(url, new Properties()); + final JobCreationMode mode = bq.getJobCreationMode(); + Assertions.assertThat(mode).isNull(); + } + + @Test + public void jobCreationModeTest() throws Exception { + final String url = getUrl("/protectedaccount.properties", null); + Assertions.assertThat(url).doesNotContain("jobcreationmode"); + final JobCreationMode[] modes = JobCreationMode.values(); + for (JobCreationMode mode : modes) { + final String fullURL = String.format("%s&jobcreationmode=%s", url, mode.name()); + try (BQConnection bq = new BQConnection(fullURL, new Properties())) { + final JobCreationMode parsedMode = bq.getJobCreationMode(); + Assertions.assertThat(parsedMode).isEqualTo(mode); + } + } + } } diff --git a/src/test/java/net/starschema/clouddb/jdbc/StatelessQuery.java b/src/test/java/net/starschema/clouddb/jdbc/StatelessQuery.java new file mode 100644 index 0000000..f86f2a6 --- /dev/null +++ b/src/test/java/net/starschema/clouddb/jdbc/StatelessQuery.java @@ -0,0 +1,42 @@ +package net.starschema.clouddb.jdbc; + +import com.google.common.collect.ImmutableSet; +import java.util.Set; +import org.junit.Assume; + +/** Helpers for tests that require projects with stateless queries enabled */ +public final class StatelessQuery { + + private StatelessQuery() {} + + private static final Set ENABLED_PROJECTS = + ImmutableSet.of("disco-parsec-659", "looker-db-test"); + + /** + * Raise an {@link org.junit.AssumptionViolatedException} if the provided project isn't one that's + * known to have stateless queries enabled + * + * @param project the project to check - get it from {@link BQConnection.getCatalog() } + */ + public static void assumeStatelessQueriesEnabled(String project) { + Assume.assumeTrue(ENABLED_PROJECTS.contains(project)); + } + + /** + * A small query that should run statelessly (that is, without a job). + * + * @return the query + */ + public static String exampleQuery() { + return "SELECT 9876"; + } + + /** + * The values returned by {@link StatelessQuery} + * + * @return An array of strings representing the returned values + */ + public static String[][] exampleValues() { + return new String[][] {new String[] {"9876"}}; + } +}