Skip to content
This repository was archived by the owner on Nov 11, 2022. It is now read-only.

Commit b697307

Browse files
authored
Merge pull request #425 from lukecwik/backport-pr-934
Backport Apache Beam PR/934
2 parents b726911 + 57db857 commit b697307

2 files changed

Lines changed: 42 additions & 46 deletions

File tree

sdk/src/main/java/com/google/cloud/dataflow/sdk/io/TextIO.java

Lines changed: 39 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636
import com.google.common.annotations.VisibleForTesting;
3737
import com.google.common.base.Preconditions;
3838
import com.google.protobuf.ByteString;
39-
4039
import java.io.IOException;
4140
import java.io.OutputStream;
4241
import java.nio.ByteBuffer;
@@ -47,7 +46,6 @@
4746
import java.nio.charset.StandardCharsets;
4847
import java.util.NoSuchElementException;
4948
import java.util.regex.Pattern;
50-
5149
import javax.annotation.Nullable;
5250

5351
/**
@@ -473,25 +471,25 @@ public static Bound<String> withoutValidation() {
473471

474472
/**
475473
* Returns a transform for writing to text files that adds a header string to the files
476-
* it writes.
474+
* it writes. Note that a newline character will be added after the header.
477475
*
478476
* <p>A {@code null} value will clear any previously configured header.
479477
*
480478
* @param header the string to be added as file header
481479
*/
482-
public static Bound<String> withHeader(String header) {
480+
public static Bound<String> withHeader(@Nullable String header) {
483481
return new Bound<>(DEFAULT_TEXT_CODER).withHeader(header);
484482
}
485483

486484
/**
487485
* Returns a transform for writing to text files that adds a footer string to the files
488-
* it writes.
486+
* it writes. Note that a newline character will be added after the header.
489487
*
490488
* <p>A {@code null} value will clear any previously configured footer.
491489
*
492490
* @param footer the string to be added as file footer
493491
*/
494-
public static Bound<String> withFooter(String footer) {
492+
public static Bound<String> withFooter(@Nullable String footer) {
495493
return new Bound<>(DEFAULT_TEXT_CODER).withFooter(footer);
496494
}
497495

@@ -513,10 +511,10 @@ public static class Bound<T> extends PTransform<PCollection<T>, PDone> {
513511
private final String filenameSuffix;
514512

515513
/** An optional header to add to each file. */
516-
private final String header;
514+
@Nullable private final String header;
517515

518516
/** An optional footer to add to each file. */
519-
private final String footer;
517+
@Nullable private final String footer;
520518

521519
/** The Coder to use to decode each line. */
522520
private final Coder<T> coder;
@@ -534,9 +532,9 @@ public static class Bound<T> extends PTransform<PCollection<T>, PDone> {
534532
this(null, null, "", null, null, coder, 0, DEFAULT_SHARD_TEMPLATE, true);
535533
}
536534

537-
private Bound(String name, String filenamePrefix, String filenameSuffix, String header,
538-
String footer, Coder<T> coder, int numShards, String shardTemplate,
539-
boolean validate) {
535+
private Bound(String name, String filenamePrefix, String filenameSuffix,
536+
@Nullable String header, @Nullable String footer, Coder<T> coder, int numShards,
537+
String shardTemplate, boolean validate) {
540538
super(name);
541539
this.header = header;
542540
this.footer = footer;
@@ -653,30 +651,30 @@ public <X> Bound<X> withCoder(Coder<X> coder) {
653651

654652
/**
655653
* Returns a transform for writing to text files that adds a header string to the files
656-
* it writes.
654+
* it writes. Note that a newline character will be added after the header.
657655
*
658656
* <p>A {@code null} value will clear any previously configured header.
659657
*
660658
* <p>Does not modify this object.
661659
*
662660
* @param header the string to be added as file header
663661
*/
664-
public Bound<T> withHeader(String header) {
662+
public Bound<T> withHeader(@Nullable String header) {
665663
return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, numShards,
666664
shardTemplate, false);
667665
}
668666

669667
/**
670668
* Returns a transform for writing to text files that adds a footer string to the files
671-
* it writes.
669+
* it writes. Note that a newline character will be added after the header.
672670
*
673671
* <p>A {@code null} value will clear any previously configured footer.
674672
*
675673
* <p>Does not modify this object.
676674
*
677675
* @param footer the string to be added as file footer
678676
*/
679-
public Bound<T> withFooter(String footer) {
677+
public Bound<T> withFooter(@Nullable String footer) {
680678
return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, numShards,
681679
shardTemplate, false);
682680
}
@@ -767,10 +765,12 @@ public Coder<T> getCoder() {
767765
return coder;
768766
}
769767

768+
@Nullable
770769
public String getHeader() {
771770
return header;
772771
}
773772

773+
@Nullable
774774
public String getFooter() {
775775
return footer;
776776
}
@@ -835,7 +835,7 @@ private static void validateOutputComponent(String partialFilePattern) {
835835
private TextIO() {}
836836

837837
/**
838-
* A {@link FileBasedSource} which can decode records delimited by new line characters.
838+
* A {@link FileBasedSource} which can decode records delimited by newline characters.
839839
*
840840
* <p>This source splits the data into records using {@code UTF-8} {@code \n}, {@code \r}, or
841841
* {@code \r\n} as the delimiter. This source is not strict and supports decoding the last record
@@ -883,7 +883,7 @@ public Coder<T> getDefaultOutputCoder() {
883883

884884
/**
885885
* A {@link com.google.cloud.dataflow.sdk.io.FileBasedSource.FileBasedReader FileBasedReader}
886-
* which can decode records delimited by new line characters.
886+
* which can decode records delimited by newline characters.
887887
*
888888
* See {@link TextSource} for further details.
889889
*/
@@ -1046,19 +1046,20 @@ private boolean tryToEnsureNumberOfBytesInBuffer(int minCapacity) throws IOExcep
10461046
}
10471047

10481048
/**
1049-
* A {@link FileBasedSink} for text files. Produces text files with the new line separator
1049+
* A {@link FileBasedSink} for text files. Produces text files with the newline separator
10501050
* {@code '\n'} represented in {@code UTF-8} format as the record separator.
10511051
* Each record (including the last) is terminated.
10521052
*/
10531053
@VisibleForTesting
10541054
static class TextSink<T> extends FileBasedSink<T> {
10551055
private final Coder<T> coder;
1056-
private final String header;
1057-
private final String footer;
1056+
@Nullable private final String header;
1057+
@Nullable private final String footer;
10581058

10591059
@VisibleForTesting
10601060
TextSink(
1061-
String baseOutputFilename, String extension, String header, String footer,
1061+
String baseOutputFilename, String extension,
1062+
@Nullable String header, @Nullable String footer,
10621063
String fileNameTemplate, Coder<T> coder) {
10631064
super(baseOutputFilename, extension, fileNameTemplate);
10641065
this.coder = coder;
@@ -1077,10 +1078,11 @@ public FileBasedSink.FileBasedWriteOperation<T> createWriteOperation(PipelineOpt
10771078
*/
10781079
private static class TextWriteOperation<T> extends FileBasedWriteOperation<T> {
10791080
private final Coder<T> coder;
1080-
private final String header;
1081-
private final String footer;
1081+
@Nullable private final String header;
1082+
@Nullable private final String footer;
10821083

1083-
private TextWriteOperation(TextSink<T> sink, Coder<T> coder, String header, String footer) {
1084+
private TextWriteOperation(TextSink<T> sink, Coder<T> coder,
1085+
@Nullable String header, @Nullable String footer) {
10841086
super(sink);
10851087
this.coder = coder;
10861088
this.header = header;
@@ -1100,30 +1102,25 @@ public FileBasedWriter<T> createWriter(PipelineOptions options) throws Exception
11001102
private static class TextWriter<T> extends FileBasedWriter<T> {
11011103
private static final byte[] NEWLINE = "\n".getBytes(StandardCharsets.UTF_8);
11021104
private final Coder<T> coder;
1103-
private final String header;
1104-
private final String footer;
1105+
@Nullable private final String header;
1106+
@Nullable private final String footer;
11051107
private OutputStream out;
11061108

1107-
public TextWriter(FileBasedWriteOperation<T> writeOperation, Coder<T> coder) {
1108-
this(writeOperation, coder, null, null);
1109-
}
1110-
1111-
public TextWriter(FileBasedWriteOperation<T> writeOperation, Coder<T> coder, String header) {
1112-
this(writeOperation, coder, header, null);
1113-
}
1114-
1115-
public TextWriter(FileBasedWriteOperation<T> writeOperation, Coder<T> coder, String header,
1116-
String footer) {
1109+
public TextWriter(FileBasedWriteOperation<T> writeOperation, Coder<T> coder,
1110+
@Nullable String header, @Nullable String footer) {
11171111
super(writeOperation);
11181112
this.header = header;
11191113
this.footer = footer;
11201114
this.mimeType = MimeTypes.TEXT;
11211115
this.coder = coder;
11221116
}
11231117

1124-
private void writeLine(String line) throws IOException {
1125-
if (line != null) {
1126-
out.write(line.getBytes(StandardCharsets.UTF_8));
1118+
/**
1119+
* Writes {@code value} followed by a newline if {@code value} is not null.
1120+
*/
1121+
private void writeIfNotNull(@Nullable String value) throws IOException {
1122+
if (value != null) {
1123+
out.write(value.getBytes(StandardCharsets.UTF_8));
11271124
out.write(NEWLINE);
11281125
}
11291126
}
@@ -1133,14 +1130,14 @@ protected void prepareWrite(WritableByteChannel channel) throws Exception {
11331130
out = Channels.newOutputStream(channel);
11341131
}
11351132

1136-
@Override
1133+
@Override
11371134
protected void writeHeader() throws Exception {
1138-
writeLine(header);
1135+
writeIfNotNull(header);
11391136
}
11401137

11411138
@Override
11421139
protected void writeFooter() throws Exception {
1143-
writeLine(footer);
1140+
writeIfNotNull(footer);
11441141
}
11451142

11461143
@Override

sdk/src/test/java/com/google/cloud/dataflow/sdk/io/TextIOTest.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,6 @@
9191
import java.nio.file.StandardOpenOption;
9292
import java.util.ArrayList;
9393
import java.util.Arrays;
94-
import java.util.LinkedList;
9594
import java.util.List;
9695
import java.util.Set;
9796
import java.util.zip.GZIPOutputStream;
@@ -310,7 +309,7 @@ public static <T> void assertOutputFiles(
310309
List<List<String>> actual = new ArrayList<>();
311310
for (File tmpFile : expectedFiles) {
312311
try (BufferedReader reader = new BufferedReader(new FileReader(tmpFile))) {
313-
List<String> currentFile = Lists.newArrayList();
312+
List<String> currentFile = new ArrayList<>();
314313
for (;;) {
315314
String line = reader.readLine();
316315
if (line == null) {
@@ -322,15 +321,15 @@ public static <T> void assertOutputFiles(
322321
}
323322
}
324323

325-
LinkedList<String> expectedElements = Lists.newLinkedList();
324+
List<String> expectedElements = new ArrayList<>(elems.length);
326325
for (int i = 0; i < elems.length; i++) {
327326
T elem = elems[i];
328327
byte[] encodedElem = CoderUtils.encodeToByteArray(coder, elem);
329328
String line = new String(encodedElem);
330329
expectedElements.add(line);
331330
}
332331

333-
ArrayList<String> actualElements =
332+
List<String> actualElements =
334333
Lists.newArrayList(
335334
Iterables.concat(
336335
FluentIterable

0 commit comments

Comments
 (0)