@@ -100,7 +100,7 @@ public Void apply(Iterable<KV<Integer, Integer>> input) {
100100 values .add (kv .getValue ());
101101 }
102102 if (timeBound ) {
103- assertTrue (values .size () > 2 );
103+ assertTrue (values .size () >= 1 );
104104 } else if (dedup ) {
105105 // Verify that at least some data came through. The chance of 90% of the input
106106 // being duplicates is essentially zero.
@@ -111,7 +111,7 @@ public Void apply(Iterable<KV<Integer, Integer>> input) {
111111 Collections .sort (values );
112112 for (int i = 0 ; i < values .size (); i ++) {
113113 assertEquals (i , (int ) values .get (i ));
114- }
114+ }
115115 if (finalizeTracker != null ) {
116116 assertThat (finalizeTracker , containsInAnyOrder (values .size () - 1 ));
117117 }
@@ -126,7 +126,7 @@ private void test(boolean dedup, boolean timeBound) throws Exception {
126126 finalizeTracker = new ArrayList <>();
127127 TestCountingSource .setFinalizeTracker (finalizeTracker );
128128 }
129- TestCountingSource source = new TestCountingSource (Integer .MAX_VALUE );
129+ TestCountingSource source = new TestCountingSource (Integer .MAX_VALUE ). withoutSplitting () ;
130130 if (dedup ) {
131131 source = source .withDedup ();
132132 }
@@ -135,11 +135,6 @@ private void test(boolean dedup, boolean timeBound) throws Exception {
135135 ? p .apply (Read .from (source ).withMaxReadTime (Duration .millis (200 )))
136136 : p .apply (Read .from (source ).withMaxNumRecords (NUM_RECORDS ));
137137
138- List <KV <Integer , Integer >> expectedOutput = new ArrayList <>();
139- for (int i = 0 ; i < NUM_RECORDS ; i ++) {
140- expectedOutput .add (KV .of (0 , i ));
141- }
142-
143138 // Because some of the NUM_RECORDS elements read are dupes, the final output
144139 // will only have output from 0 to n where n < NUM_RECORDS.
145140 DataflowAssert .that (output ).satisfies (new Checker (dedup , timeBound ));
0 commit comments