Skip to content
This repository was archived by the owner on Nov 11, 2022. It is now read-only.

Commit 3212b63

Browse files
peihedhalperi
authored andcommitted
BigQueryQuerySource: support queries that reference no tables (#454)
1 parent 0fe13b9 commit 3212b63

4 files changed

Lines changed: 32 additions & 22 deletions

File tree

sdk/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -957,10 +957,7 @@ protected TableReference getTableToExtract(BigQueryOptions bqOptions)
957957
executeQuery(
958958
executingProject,
959959
queryJobId,
960-
query,
961960
tableToExtract,
962-
flattenResults,
963-
useLegacySql,
964961
bqServices.getJobService(bqOptions));
965962
return tableToExtract;
966963
}
@@ -983,37 +980,33 @@ public void populateDisplayData(DisplayData.Builder builder) {
983980
super.populateDisplayData(builder);
984981
builder.add(DisplayData.item("query", query));
985982
}
983+
986984
private synchronized JobStatistics dryRunQueryIfNeeded(BigQueryOptions bqOptions)
987985
throws InterruptedException, IOException {
988986
if (dryRunJobStats.get() == null) {
989-
JobStatistics jobStats =
990-
bqServices.getJobService(bqOptions).dryRunQuery(executingProject, query);
987+
JobStatistics jobStats = bqServices.getJobService(bqOptions).dryRunQuery(
988+
executingProject, createBasicQueryConfig());
991989
dryRunJobStats.compareAndSet(null, jobStats);
992990
}
993991
return dryRunJobStats.get();
994992
}
995993

996-
private static void executeQuery(
994+
private void executeQuery(
997995
String executingProject,
998996
String jobId,
999-
String query,
1000997
TableReference destinationTable,
1001-
boolean flattenResults,
1002-
boolean useLegacySql,
1003998
JobService jobService) throws IOException, InterruptedException {
1004999
JobReference jobRef = new JobReference()
10051000
.setProjectId(executingProject)
10061001
.setJobId(jobId);
1007-
JobConfigurationQuery queryConfig = new JobConfigurationQuery();
1008-
queryConfig
1009-
.setQuery(query)
1002+
1003+
JobConfigurationQuery queryConfig = createBasicQueryConfig()
10101004
.setAllowLargeResults(true)
10111005
.setCreateDisposition("CREATE_IF_NEEDED")
10121006
.setDestinationTable(destinationTable)
1013-
.setFlattenResults(flattenResults)
1014-
.setUseLegacySql(useLegacySql)
10151007
.setPriority("BATCH")
10161008
.setWriteDisposition("WRITE_EMPTY");
1009+
10171010
jobService.startQueryJob(jobRef, queryConfig);
10181011
Job job = jobService.pollJob(jobRef, JOB_POLL_MAX_RETRIES);
10191012
if (parseStatus(job) != Status.SUCCEEDED) {
@@ -1022,6 +1015,13 @@ private static void executeQuery(
10221015
return;
10231016
}
10241017

1018+
private JobConfigurationQuery createBasicQueryConfig() {
1019+
return new JobConfigurationQuery()
1020+
.setQuery(query)
1021+
.setFlattenResults(flattenResults)
1022+
.setUseLegacySql(useLegacySql);
1023+
}
1024+
10251025
private void readObject(ObjectInputStream in) throws ClassNotFoundException, IOException {
10261026
in.defaultReadObject();
10271027
dryRunJobStats = new AtomicReference<>();

sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryServices.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ Job pollJob(JobReference jobRef, int maxAttempts)
9999
/**
100100
* Dry runs the query in the given project.
101101
*/
102-
JobStatistics dryRunQuery(String projectId, String query)
102+
JobStatistics dryRunQuery(String projectId, JobConfigurationQuery queryConfig)
103103
throws InterruptedException, IOException;
104104

105105
/**

sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryServicesImpl.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -252,12 +252,11 @@ Job pollJob(
252252
}
253253

254254
@Override
255-
public JobStatistics dryRunQuery(String projectId, String query)
255+
public JobStatistics dryRunQuery(String projectId, JobConfigurationQuery queryConfig)
256256
throws InterruptedException, IOException {
257257
Job job = new Job()
258258
.setConfiguration(new JobConfiguration()
259-
.setQuery(new JobConfigurationQuery()
260-
.setQuery(query))
259+
.setQuery(queryConfig)
261260
.setDryRun(true));
262261
BackOff backoff =
263262
FluentBackoff.DEFAULT
@@ -266,7 +265,7 @@ public JobStatistics dryRunQuery(String projectId, String query)
266265
client.jobs().insert(projectId, job),
267266
String.format(
268267
"Unable to dry run query: %s, aborting after %d retries.",
269-
query, MAX_RPC_RETRIES),
268+
queryConfig, MAX_RPC_RETRIES),
270269
Sleeper.DEFAULT,
271270
backoff).getStatistics();
272271
}

sdk/src/test/java/com/google/cloud/dataflow/sdk/io/BigQueryIOTest.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@
113113
import org.junit.rules.TemporaryFolder;
114114
import org.junit.runner.RunWith;
115115
import org.junit.runners.JUnit4;
116+
import org.mockito.ArgumentCaptor;
116117
import org.mockito.Mock;
117118
import org.mockito.Mockito;
118119
import org.mockito.MockitoAnnotations;
@@ -360,7 +361,7 @@ private void startJob(JobReference jobRef) throws IOException, InterruptedExcept
360361
}
361362

362363
@Override
363-
public JobStatistics dryRunQuery(String projectId, String query)
364+
public JobStatistics dryRunQuery(String projectId, JobConfigurationQuery query)
364365
throws InterruptedException, IOException {
365366
throw new UnsupportedOperationException();
366367
}
@@ -1174,7 +1175,7 @@ public void testBigQueryQuerySourceInitSplit() throws Exception {
11741175
.setProjectId("testProejct")
11751176
.setDatasetId("testDataset")
11761177
.setTableId("testTable");
1177-
when(mockJobService.dryRunQuery(anyString(), anyString()))
1178+
when(mockJobService.dryRunQuery(anyString(), Mockito.<JobConfigurationQuery>any()))
11781179
.thenReturn(new JobStatistics().setQuery(
11791180
new JobStatistics2()
11801181
.setTotalBytesProcessed(100L)
@@ -1211,6 +1212,11 @@ public void testBigQueryQuerySourceInitSplit() throws Exception {
12111212
.startExtractJob(Mockito.<JobReference>any(), Mockito.<JobConfigurationExtract>any());
12121213
Mockito.verify(mockDatasetService)
12131214
.createDataset(anyString(), anyString(), anyString(), anyString());
1215+
ArgumentCaptor<JobConfigurationQuery> queryConfigArg =
1216+
ArgumentCaptor.forClass(JobConfigurationQuery.class);
1217+
Mockito.verify(mockJobService).dryRunQuery(anyString(), queryConfigArg.capture());
1218+
assertEquals(true, queryConfigArg.getValue().getFlattenResults());
1219+
assertEquals(true, queryConfigArg.getValue().getUseLegacySql());
12141220
}
12151221

12161222
@Test
@@ -1256,7 +1262,7 @@ public void testBigQueryNoTableQuerySourceInitSplit() throws Exception {
12561262
PipelineOptions options = PipelineOptionsFactory.create();
12571263
options.setTempLocation(extractDestinationDir);
12581264

1259-
when(mockJobService.dryRunQuery(anyString(), anyString()))
1265+
when(mockJobService.dryRunQuery(anyString(), Mockito.<JobConfigurationQuery>any()))
12601266
.thenReturn(new JobStatistics().setQuery(
12611267
new JobStatistics2()
12621268
.setTotalBytesProcessed(100L)));
@@ -1289,6 +1295,11 @@ public void testBigQueryNoTableQuerySourceInitSplit() throws Exception {
12891295
.startExtractJob(Mockito.<JobReference>any(), Mockito.<JobConfigurationExtract>any());
12901296
Mockito.verify(mockDatasetService)
12911297
.createDataset(anyString(), anyString(), anyString(), anyString());
1298+
ArgumentCaptor<JobConfigurationQuery> queryConfigArg =
1299+
ArgumentCaptor.forClass(JobConfigurationQuery.class);
1300+
Mockito.verify(mockJobService).dryRunQuery(anyString(), queryConfigArg.capture());
1301+
assertEquals(true, queryConfigArg.getValue().getFlattenResults());
1302+
assertEquals(true, queryConfigArg.getValue().getUseLegacySql());
12921303
}
12931304

12941305
@Test

0 commit comments

Comments
 (0)