Skip to content

Commit bd7d85c

Browse files
fix: make sure to fall back to old query path when query job is incomplete (#941)
* fix: make sure to fall back to old query path when query job is incomplete (takes more than 10s) * nit * address comments * add comment * nit update
1 parent 7843755 commit bd7d85c

File tree

3 files changed

+96
-7
lines changed

3 files changed

+96
-7
lines changed

google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryImpl.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1268,20 +1268,22 @@ public com.google.api.services.bigquery.model.QueryResponse call() {
12681268

12691269
long numRows;
12701270
Schema schema;
1271-
if (results.getSchema() == null && results.getJobComplete()) {
1272-
JobId jobId = JobId.fromPb(results.getJobReference());
1273-
Job job = getJob(jobId, options);
1274-
TableResult tableResult = job.getQueryResults();
1275-
return tableResult;
1276-
} else {
1277-
schema = results.getSchema() == null ? null : Schema.fromPb(results.getSchema());
1271+
if (results.getJobComplete() && results.getSchema() != null) {
1272+
schema = Schema.fromPb(results.getSchema());
12781273
if (results.getNumDmlAffectedRows() == null && results.getTotalRows() == null) {
12791274
numRows = 0L;
12801275
} else if (results.getNumDmlAffectedRows() != null) {
12811276
numRows = results.getNumDmlAffectedRows();
12821277
} else {
12831278
numRows = results.getTotalRows().longValue();
12841279
}
1280+
} else {
1281+
// Query is long running (> 10s) and hasn't completed yet, or query completed but didn't
1282+
// return the schema, fallback. Some operations don't return the schema and can be optimized
1283+
// here, but this is left as future work.
1284+
JobId jobId = JobId.fromPb(results.getJobReference());
1285+
Job job = getJob(jobId, options);
1286+
return job.getQueryResults();
12851287
}
12861288

12871289
if (results.getPageToken() != null) {

google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/BigQueryImplTest.java

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1955,6 +1955,63 @@ public void testFastQueryMultiplePages() throws InterruptedException {
19551955
verify(bigqueryRpcMock).queryRpc(PROJECT, requestInfo.toPb());
19561956
}
19571957

1958+
@Test
1959+
public void testFastQuerySlowDdl() throws InterruptedException {
1960+
// mock new fast query path response when running a query that takes more than 10s
1961+
JobId queryJob = JobId.of(PROJECT, JOB);
1962+
com.google.api.services.bigquery.model.QueryResponse queryResponsePb =
1963+
new com.google.api.services.bigquery.model.QueryResponse()
1964+
.setJobComplete(false) // false when query does not complete in 10s
1965+
.setJobReference(queryJob.toPb()) // backend sends back a jobReference
1966+
.setRows(ImmutableList.of(TABLE_ROW))
1967+
.setSchema(TABLE_SCHEMA.toPb());
1968+
1969+
// mock job response from backend
1970+
com.google.api.services.bigquery.model.Job responseJob =
1971+
new com.google.api.services.bigquery.model.Job()
1972+
.setConfiguration(QUERY_JOB_CONFIGURATION_FOR_QUERY.toPb())
1973+
.setJobReference(queryJob.toPb())
1974+
.setId(JOB)
1975+
.setStatus(new com.google.api.services.bigquery.model.JobStatus().setState("DONE"));
1976+
1977+
// mock old query path response when falling back
1978+
GetQueryResultsResponse queryResultsResponsePb =
1979+
new GetQueryResultsResponse()
1980+
.setJobReference(responseJob.getJobReference())
1981+
.setRows(ImmutableList.of(TABLE_ROW))
1982+
.setJobComplete(true)
1983+
.setTotalRows(BigInteger.valueOf(1L))
1984+
.setSchema(TABLE_SCHEMA.toPb());
1985+
1986+
QueryRequestInfo requestInfo = new QueryRequestInfo(QUERY_JOB_CONFIGURATION_FOR_QUERY);
1987+
QueryRequest requestPb = requestInfo.toPb();
1988+
1989+
when(bigqueryRpcMock.queryRpc(PROJECT, requestPb)).thenReturn(queryResponsePb);
1990+
responseJob.getConfiguration().getQuery().setDestinationTable(TABLE_ID.toPb());
1991+
when(bigqueryRpcMock.getJob(PROJECT, JOB, null, EMPTY_RPC_OPTIONS)).thenReturn(responseJob);
1992+
when(bigqueryRpcMock.getQueryResults(
1993+
PROJECT, JOB, null, BigQueryImpl.optionMap(Job.DEFAULT_QUERY_WAIT_OPTIONS)))
1994+
.thenReturn(queryResultsResponsePb);
1995+
when(bigqueryRpcMock.listTableData(PROJECT, DATASET, TABLE, EMPTY_RPC_OPTIONS))
1996+
.thenReturn(new TableDataList().setRows(ImmutableList.of(TABLE_ROW)).setTotalRows(1L));
1997+
1998+
bigquery = options.getService();
1999+
TableResult result = bigquery.query(QUERY_JOB_CONFIGURATION_FOR_QUERY);
2000+
assertThat(result.getSchema()).isEqualTo(TABLE_SCHEMA);
2001+
assertThat(result.getTotalRows()).isEqualTo(1);
2002+
for (FieldValueList row : result.getValues()) {
2003+
assertThat(row.get(0).getBooleanValue()).isFalse();
2004+
assertThat(row.get(1).getLongValue()).isEqualTo(1);
2005+
}
2006+
2007+
verify(bigqueryRpcMock).queryRpc(PROJECT, requestInfo.toPb());
2008+
verify(bigqueryRpcMock).getJob(PROJECT, JOB, null, EMPTY_RPC_OPTIONS);
2009+
verify(bigqueryRpcMock)
2010+
.getQueryResults(
2011+
PROJECT, JOB, null, BigQueryImpl.optionMap(Job.DEFAULT_QUERY_WAIT_OPTIONS));
2012+
verify(bigqueryRpcMock).listTableData(PROJECT, DATASET, TABLE, EMPTY_RPC_OPTIONS);
2013+
}
2014+
19582015
@Test
19592016
public void testQueryRequestCompletedOptions() throws InterruptedException {
19602017
JobId queryJob = JobId.of(PROJECT, JOB);

google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1813,6 +1813,36 @@ public void testFastDDLQuery() throws InterruptedException {
18131813
}
18141814
}
18151815

1816+
@Test
1817+
public void testFastQuerySlowDDL() throws InterruptedException {
1818+
String tableName =
1819+
"test_table_fast_query_ddl_slow_" + UUID.randomUUID().toString().substring(0, 8);
1820+
// This query take more than 10s to run and should fall back on the old query path
1821+
String slowDdlQuery =
1822+
String.format(
1823+
"CREATE OR REPLACE TABLE %s AS SELECT unique_key, agency, complaint_type, descriptor, street_name, city, landmark FROM `bigquery-public-data.new_york.311_service_requests`",
1824+
tableName);
1825+
QueryJobConfiguration ddlConfig =
1826+
QueryJobConfiguration.newBuilder(slowDdlQuery)
1827+
.setDefaultDataset(DatasetId.of(DATASET))
1828+
.build();
1829+
TableResult result = bigquery.query(ddlConfig);
1830+
assertEquals(0, result.getTotalRows());
1831+
assertNotNull(result.getSchema());
1832+
// Verify correctness of table content
1833+
String sqlQuery = String.format("SELECT * FROM %s.%s", DATASET, tableName);
1834+
QueryJobConfiguration sqlConfig = QueryJobConfiguration.newBuilder(sqlQuery).build();
1835+
TableResult resultAfterDDL = bigquery.query(sqlConfig);
1836+
for (FieldValueList row : resultAfterDDL.getValues()) {
1837+
FieldValue unique_key = row.get(0);
1838+
assertEquals(unique_key, row.get("unique_key"));
1839+
FieldValue agency = row.get(1);
1840+
assertEquals(agency, row.get("agency"));
1841+
FieldValue complaint_type = row.get(2);
1842+
assertEquals(complaint_type, row.get("complaint_type"));
1843+
}
1844+
}
1845+
18161846
@Test
18171847
public void testFastQueryHTTPException() throws InterruptedException {
18181848
String queryInvalid =

0 commit comments

Comments
 (0)