Skip to content
This repository was archived by the owner on Nov 11, 2022. It is now read-only.

Commit 0fe13b9

Browse files
peihedhalperi
authored andcommitted
Fix the NPE when getting reference table location in BigQueryQuerySource (#453)
1 parent f0108a2 commit 0fe13b9

4 files changed

Lines changed: 105 additions & 18 deletions

File tree

sdk/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -931,21 +931,26 @@ public BoundedReader<TableRow> createReader(PipelineOptions options) throws IOEx
931931
protected TableReference getTableToExtract(BigQueryOptions bqOptions)
932932
throws IOException, InterruptedException {
933933
// 1. Find the location of the query.
934-
TableReference dryRunTempTable = dryRunQueryIfNeeded(bqOptions)
935-
.getQuery()
936-
.getReferencedTables()
937-
.get(0);
934+
String location = null;
935+
List<TableReference> referencedTables =
936+
dryRunQueryIfNeeded(bqOptions).getQuery().getReferencedTables();
938937
DatasetService tableService = bqServices.getDatasetService(bqOptions);
939-
String location = tableService.getTable(
940-
dryRunTempTable.getProjectId(),
941-
dryRunTempTable.getDatasetId(),
942-
dryRunTempTable.getTableId()).getLocation();
938+
if (referencedTables != null && !referencedTables.isEmpty()) {
939+
TableReference queryTable = referencedTables.get(0);
940+
location = tableService.getTable(
941+
queryTable.getProjectId(),
942+
queryTable.getDatasetId(),
943+
queryTable.getTableId()).getLocation();
944+
}
943945

944946
// 2. Create the temporary dataset in the query location.
945947
TableReference tableToExtract =
946948
JSON_FACTORY.fromString(jsonQueryTempTable, TableReference.class);
947949
tableService.createDataset(
948-
tableToExtract.getProjectId(), tableToExtract.getDatasetId(), location, "");
950+
tableToExtract.getProjectId(),
951+
tableToExtract.getDatasetId(),
952+
location,
953+
"Dataset for BigQuery query job temporary table");
949954

950955
// 3. Execute the query.
951956
String queryJobId = jobIdToken + "-query";

sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryServices.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,8 @@ void deleteTable(String projectId, String datasetId, String tableId)
130130
/**
131131
* Create a {@link Dataset} with the given {@code location} and {@code description}.
132132
*/
133-
void createDataset(String projectId, String datasetId, String location, String description)
133+
void createDataset(
134+
String projectId, String datasetId, @Nullable String location, @Nullable String description)
134135
throws IOException, InterruptedException;
135136

136137
/**

sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryServicesImpl.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -439,7 +439,7 @@ public Dataset getDataset(String projectId, String datasetId)
439439
*/
440440
@Override
441441
public void createDataset(
442-
String projectId, String datasetId, String location, String description)
442+
String projectId, String datasetId, @Nullable String location, @Nullable String description)
443443
throws IOException, InterruptedException {
444444
BackOff backoff =
445445
FluentBackoff.DEFAULT
@@ -450,19 +450,22 @@ public void createDataset(
450450
private void createDataset(
451451
String projectId,
452452
String datasetId,
453-
String location,
454-
String description,
453+
@Nullable String location,
454+
@Nullable String description,
455455
Sleeper sleeper,
456456
BackOff backoff) throws IOException, InterruptedException {
457457
DatasetReference datasetRef = new DatasetReference()
458458
.setProjectId(projectId)
459459
.setDatasetId(datasetId);
460460

461-
Dataset dataset = new Dataset()
462-
.setDatasetReference(datasetRef)
463-
.setLocation(location)
464-
.setFriendlyName(location)
465-
.setDescription(description);
461+
Dataset dataset = new Dataset().setDatasetReference(datasetRef);
462+
if (location != null) {
463+
dataset.setLocation(location);
464+
}
465+
if (description != null) {
466+
dataset.setFriendlyName(description);
467+
dataset.setDescription(description);
468+
}
466469

467470
Exception lastException;
468471
do {

sdk/src/test/java/com/google/cloud/dataflow/sdk/io/BigQueryIOTest.java

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1213,6 +1213,84 @@ public void testBigQueryQuerySourceInitSplit() throws Exception {
12131213
.createDataset(anyString(), anyString(), anyString(), anyString());
12141214
}
12151215

1216+
@Test
1217+
public void testBigQueryNoTableQuerySourceInitSplit() throws Exception {
1218+
TableReference dryRunTable = new TableReference();
1219+
1220+
Job queryJob = new Job();
1221+
JobStatistics queryJobStats = new JobStatistics();
1222+
JobStatistics2 queryStats = new JobStatistics2();
1223+
queryStats.setReferencedTables(ImmutableList.of(dryRunTable));
1224+
queryJobStats.setQuery(queryStats);
1225+
queryJob.setStatus(new JobStatus())
1226+
.setStatistics(queryJobStats);
1227+
1228+
Job extractJob = new Job();
1229+
JobStatistics extractJobStats = new JobStatistics();
1230+
JobStatistics4 extractStats = new JobStatistics4();
1231+
extractStats.setDestinationUriFileCounts(ImmutableList.of(1L));
1232+
extractJobStats.setExtract(extractStats);
1233+
extractJob.setStatus(new JobStatus())
1234+
.setStatistics(extractJobStats);
1235+
1236+
FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
1237+
.withJobService(mockJobService)
1238+
.withDatasetService(mockDatasetService)
1239+
.readerReturns(
1240+
toJsonString(new TableRow().set("name", "a").set("number", "1")),
1241+
toJsonString(new TableRow().set("name", "b").set("number", "2")),
1242+
toJsonString(new TableRow().set("name", "c").set("number", "3")));
1243+
1244+
String jobIdToken = "testJobIdToken";
1245+
String extractDestinationDir = "mock://tempLocation";
1246+
TableReference destinationTable = BigQueryIO.parseTableSpec("project:data_set.table_name");
1247+
BoundedSource<TableRow> bqSource = BigQueryQuerySource.create(
1248+
jobIdToken, "query", destinationTable, true /* flattenResults */, true /* useLegacySql */,
1249+
extractDestinationDir, fakeBqServices);
1250+
1251+
List<TableRow> expected = ImmutableList.of(
1252+
new TableRow().set("name", "a").set("number", "1"),
1253+
new TableRow().set("name", "b").set("number", "2"),
1254+
new TableRow().set("name", "c").set("number", "3"));
1255+
1256+
PipelineOptions options = PipelineOptionsFactory.create();
1257+
options.setTempLocation(extractDestinationDir);
1258+
1259+
when(mockJobService.dryRunQuery(anyString(), anyString()))
1260+
.thenReturn(new JobStatistics().setQuery(
1261+
new JobStatistics2()
1262+
.setTotalBytesProcessed(100L)));
1263+
when(mockDatasetService.getTable(
1264+
eq(destinationTable.getProjectId()),
1265+
eq(destinationTable.getDatasetId()),
1266+
eq(destinationTable.getTableId())))
1267+
.thenReturn(new Table().setSchema(new TableSchema()));
1268+
IOChannelUtils.setIOFactory("mock", mockIOChannelFactory);
1269+
when(mockIOChannelFactory.resolve(anyString(), anyString()))
1270+
.thenReturn("mock://tempLocation/output");
1271+
when(mockJobService.pollJob(Mockito.<JobReference>any(), Mockito.anyInt()))
1272+
.thenReturn(extractJob);
1273+
1274+
Assert.assertThat(
1275+
SourceTestUtils.readFromSource(bqSource, options),
1276+
CoreMatchers.is(expected));
1277+
SourceTestUtils.assertSplitAtFractionBehavior(
1278+
bqSource, 2, 0.3, ExpectedSplitOutcome.MUST_BE_CONSISTENT_IF_SUCCEEDS, options);
1279+
1280+
List<? extends BoundedSource<TableRow>> sources = bqSource.splitIntoBundles(100, options);
1281+
assertEquals(1, sources.size());
1282+
BoundedSource<TableRow> actual = sources.get(0);
1283+
assertThat(actual, CoreMatchers.instanceOf(TransformingSource.class));
1284+
1285+
Mockito.verify(mockJobService)
1286+
.startQueryJob(
1287+
Mockito.<JobReference>any(), Mockito.<JobConfigurationQuery>any());
1288+
Mockito.verify(mockJobService)
1289+
.startExtractJob(Mockito.<JobReference>any(), Mockito.<JobConfigurationExtract>any());
1290+
Mockito.verify(mockDatasetService)
1291+
.createDataset(anyString(), anyString(), anyString(), anyString());
1292+
}
1293+
12161294
@Test
12171295
public void testTransformingSource() throws Exception {
12181296
int numElements = 10000;

0 commit comments

Comments
 (0)