1919import com .mongodb .internal .async .function .LoopState ;
2020import com .mongodb .internal .time .StartTime ;
2121import com .mongodb .lang .Nullable ;
22- import org .junit .jupiter .api .AfterAll ;
23- import org .junit .jupiter .api .BeforeAll ;
2422import org .junit .jupiter .params .ParameterizedTest ;
2523import org .junit .jupiter .params .provider .CsvSource ;
2624
3028import java .time .Duration ;
3129import java .util .Objects ;
3230import java .util .concurrent .CompletableFuture ;
33- import java .util .concurrent .Executors ;
34- import java .util .concurrent .ScheduledExecutorService ;
31+ import java .util .concurrent .ExecutionException ;
32+ import java .util .concurrent .Future ;
33+ import java .util .concurrent .ScheduledThreadPoolExecutor ;
3534import java .util .concurrent .ThreadLocalRandom ;
3635import java .util .concurrent .TimeUnit ;
3736
3837import static com .mongodb .internal .async .AsyncRunnable .beginAsync ;
3938
40- class VakoTest {
41- private static ScheduledExecutorService executor ;
42-
43- @ BeforeAll
44- static void beforeAll () {
45- executor = Executors .newScheduledThreadPool (1 );
46- }
47-
48- @ AfterAll
49- static void afterAll () throws InterruptedException {
50- executor .shutdownNow ();
51- com .mongodb .assertions .Assertions .assertTrue (executor .awaitTermination (1 , TimeUnit .MINUTES ));
52- }
53-
39+ class AsyncLoopTest {
5440 @ ParameterizedTest
5541 @ CsvSource ({
5642 "10"
5743 })
58- void asyncCallbackLoop (final int iterations ) throws Exception {
44+ void testDemo (final int iterations ) throws Exception {
5945 System .err .printf ("baselineStackDepth=%d%n%n" , Thread .currentThread ().getStackTrace ().length );
6046 CompletableFuture <Void > join = new CompletableFuture <>();
6147 LoopState loopState = new LoopState ();
@@ -71,7 +57,7 @@ void asyncCallbackLoop(final int iterations) throws Exception {
7157 complete (join , r , t );
7258 });
7359 join .get ();
74- System .err .printf ("%n% nDONE%n%n" );
60+ System .err .printf ("%nDONE%n%n" );
7561 }
7662
7763 private enum IterationExecutionType {
@@ -81,52 +67,91 @@ private enum IterationExecutionType {
8167 MIXED_SYNC_SAME_THREAD_AND_ASYNC
8268 }
8369
70+ private enum Verbocity {
71+ VERBOSE ,
72+ COMPACT
73+ }
74+
75+ private enum ThreadManagement {
76+ NEW_THREAD_PER_TASK ,
77+ REUSE_THREADS
78+ }
79+
8480 @ ParameterizedTest ()
8581 @ CsvSource ({
86- "1_000_000, 0, SYNC_SAME_THREAD, 0, false" ,
87- // "1_000_000, 0, SYNC_DIFFERENT_THREAD, 0, false",
88- "1_000_000, 0, ASYNC, 0, false" ,
89- "1_000_000, 0, MIXED_SYNC_SAME_THREAD_AND_ASYNC, 0, false" ,
90- "4, 0, ASYNC, 4, true" ,
91- "4, 4, ASYNC, 0, true" ,
82+ "250_000, 0, SYNC_SAME_THREAD, 0, COMPACT, 0, REUSE_THREADS" ,
83+ "250_000, 0, ASYNC, 0, COMPACT, 0, NEW_THREAD_PER_TASK" ,
84+ "250_000, 0, ASYNC, 0, COMPACT, 1, REUSE_THREADS" ,
85+ "250_000, 0, ASYNC, 0, COMPACT, 2, REUSE_THREADS" ,
86+ "250_000, 0, MIXED_SYNC_SAME_THREAD_AND_ASYNC, 0, COMPACT, 0, NEW_THREAD_PER_TASK" ,
87+ "250_000, 0, MIXED_SYNC_SAME_THREAD_AND_ASYNC, 0, COMPACT, 1, REUSE_THREADS" ,
88+ "4, 0, ASYNC, 4, VERBOSE, 1, REUSE_THREADS" ,
89+ "4, 4, ASYNC, 0, VERBOSE, 1, REUSE_THREADS" ,
90+ "250_000, 0, SYNC_DIFFERENT_THREAD, 0, COMPACT, 0, NEW_THREAD_PER_TASK" ,
91+ "250_000, 0, SYNC_DIFFERENT_THREAD, 0, COMPACT, 1, REUSE_THREADS" ,
9292 })
93- void testThenRunDoWhileLoop (
93+ void thenRunDoWhileLoopTest (
9494 final int counterInitialValue ,
9595 final int blockSyncPartOfIterationTotalSeconds ,
9696 final IterationExecutionType executionType ,
9797 final int delayAsyncExecutionTotalSeconds ,
98- final boolean verbose ) throws Exception {
99- System .err .printf ("baselineStackDepth=%d%n%n" , Thread .currentThread ().getStackTrace ().length );
98+ final Verbocity verbocity ,
99+ final int executorSize ,
100+ final ThreadManagement threadManagement ) throws Exception {
100101 Duration blockSyncPartOfIterationTotalDuration = Duration .ofSeconds (blockSyncPartOfIterationTotalSeconds );
101- com .mongodb .assertions .Assertions .assertTrue (
102- executionType .equals (IterationExecutionType .ASYNC ) || delayAsyncExecutionTotalSeconds == 0 );
102+ if (executionType .equals (IterationExecutionType .SYNC_DIFFERENT_THREAD )) {
103+ com .mongodb .assertions .Assertions .assertTrue (
104+ (executorSize > 0 && threadManagement .equals (ThreadManagement .REUSE_THREADS ))
105+ || (executorSize == 0 && threadManagement .equals (ThreadManagement .NEW_THREAD_PER_TASK )));
106+ }
107+ if (executionType .equals (IterationExecutionType .SYNC_SAME_THREAD )) {
108+ com .mongodb .assertions .Assertions .assertTrue (executorSize == 0 );
109+ com .mongodb .assertions .Assertions .assertTrue (threadManagement .equals (ThreadManagement .REUSE_THREADS ));
110+ }
111+ if (!executionType .equals (IterationExecutionType .ASYNC )) {
112+ com .mongodb .assertions .Assertions .assertTrue (delayAsyncExecutionTotalSeconds == 0 );
113+ }
114+ if (threadManagement .equals (ThreadManagement .NEW_THREAD_PER_TASK )) {
115+ com .mongodb .assertions .Assertions .assertTrue (executorSize == 0 );
116+ }
103117 Duration delayAsyncExecutionTotalDuration = Duration .ofSeconds (delayAsyncExecutionTotalSeconds );
104- StartTime start = StartTime .now ();
105- CompletableFuture <Void > join = new CompletableFuture <>();
106- asyncLoop (new Counter (counterInitialValue , verbose ),
107- blockSyncPartOfIterationTotalDuration , executionType , delayAsyncExecutionTotalDuration , verbose ,
108- (r , t ) -> {
118+ ScheduledExecutor executor = executorSize == 0 ? null : new ScheduledExecutor (executorSize , threadManagement );
119+ try {
120+ System .err .printf ("baselineStackDepth=%d%n%n" , Thread .currentThread ().getStackTrace ().length );
121+ StartTime start = StartTime .now ();
122+ CompletableFuture <Void > join = new CompletableFuture <>();
123+ asyncLoop (new Counter (counterInitialValue , verbocity ),
124+ blockSyncPartOfIterationTotalDuration , executionType , delayAsyncExecutionTotalDuration , verbocity , executor ,
125+ (r , t ) -> {
109126 System .err .printf ("test callback completed callStackDepth=%s, r=%s, t=%s%n" ,
110127 Thread .currentThread ().getStackTrace ().length , r , exceptionToString (t ));
111128 complete (join , r , t );
112- });
113- System .err .printf ("\t asyncLoop method completed in %s%n" , start .elapsed ());
114- join .get ();
115- System .err .printf ("%n%nDONE%n%n" );
129+ });
130+ System .err .printf ("\t asyncLoop method completed in %s%n" , start .elapsed ());
131+ join .get ();
132+ System .err .printf ("%nDONE%n%n" );
133+ } finally {
134+ if (executor != null ) {
135+ executor .shutdownNow ();
136+ com .mongodb .assertions .Assertions .assertTrue (executor .awaitTermination (1 , TimeUnit .MINUTES ));
137+ }
138+ }
116139 }
117140
118141 private static void asyncLoop (
119142 final Counter counter ,
120143 final Duration blockSyncPartOfIterationTotalDuration ,
121144 final IterationExecutionType executionType ,
122145 final Duration delayAsyncExecutionTotalDuration ,
123- final boolean verbose ,
146+ final Verbocity verbocity ,
147+ @ Nullable
148+ final ScheduledExecutor executor ,
124149 final SingleResultCallback <Void > callback ) {
125150 beginAsync ().thenRunDoWhileLoop (c -> {
126151 sleep (blockSyncPartOfIterationTotalDuration .dividedBy (counter .initial ()));
127152 StartTime start = StartTime .now ();
128- asyncPartOfIteration (counter , executionType , delayAsyncExecutionTotalDuration , verbose , c );
129- if (verbose ) {
153+ asyncPartOfIteration (counter , executionType , delayAsyncExecutionTotalDuration , verbocity , executor , c );
154+ if (verbocity . equals ( Verbocity . VERBOSE ) ) {
130155 System .err .printf ("\t asyncPartOfIteration method completed in %s%n" , start .elapsed ());
131156 }
132157 }, () -> !counter .done ()).finish (callback );
@@ -136,13 +161,15 @@ private static void asyncPartOfIteration(
136161 final Counter counter ,
137162 final IterationExecutionType executionType ,
138163 final Duration delayAsyncExecutionTotalDuration ,
139- final boolean verbose ,
164+ final Verbocity verbocity ,
165+ @ Nullable
166+ final ScheduledExecutor executor ,
140167 final SingleResultCallback <Void > callback ) {
141168 Runnable asyncPartOfIteration = () -> {
142169 counter .countDown ();
143170 StartTime start = StartTime .now ();
144171 callback .complete (callback );
145- if (verbose ) {
172+ if (verbocity . equals ( Verbocity . VERBOSE ) ) {
146173 System .err .printf ("\t asyncPartOfIteration callback.complete method completed in %s%n" , start .elapsed ());
147174 }
148175 };
@@ -152,22 +179,42 @@ private static void asyncPartOfIteration(
152179 break ;
153180 }
154181 case SYNC_DIFFERENT_THREAD : {
155- Thread guaranteedDifferentThread = new Thread (asyncPartOfIteration );
156- guaranteedDifferentThread .start ();
157- join (guaranteedDifferentThread );
182+ if (executor == null ) {
183+ Thread thread = new Thread (asyncPartOfIteration );
184+ thread .start ();
185+ join (thread );
186+ } else {
187+ join (executor .submit (asyncPartOfIteration ));
188+ }
158189 break ;
159190 }
160191 case ASYNC : {
161- executor .schedule (asyncPartOfIteration ,
162- delayAsyncExecutionTotalDuration .dividedBy (counter .initial ()).toNanos (), TimeUnit .NANOSECONDS );
192+ if (executor == null ) {
193+ Thread thread = new Thread (() -> {
194+ sleep (delayAsyncExecutionTotalDuration .dividedBy (counter .initial ()));
195+ asyncPartOfIteration .run ();
196+ });
197+ thread .start ();
198+ } else {
199+ com .mongodb .assertions .Assertions .assertNotNull (executor ).schedule (asyncPartOfIteration ,
200+ delayAsyncExecutionTotalDuration .dividedBy (counter .initial ()).toNanos (), TimeUnit .NANOSECONDS );
201+ }
163202 break ;
164203 }
165204 case MIXED_SYNC_SAME_THREAD_AND_ASYNC : {
166205 if (ThreadLocalRandom .current ().nextBoolean ()) {
167206 asyncPartOfIteration .run ();
168207 } else {
169- executor .schedule (asyncPartOfIteration ,
170- delayAsyncExecutionTotalDuration .dividedBy (counter .initial ()).toNanos (), TimeUnit .NANOSECONDS );
208+ if (executor == null ) {
209+ Thread thread = new Thread (() -> {
210+ sleep (delayAsyncExecutionTotalDuration .dividedBy (counter .initial ()));
211+ asyncPartOfIteration .run ();
212+ });
213+ thread .start ();
214+ } else {
215+ com .mongodb .assertions .Assertions .assertNotNull (executor ).schedule (asyncPartOfIteration ,
216+ delayAsyncExecutionTotalDuration .dividedBy (counter .initial ()).toNanos (), TimeUnit .NANOSECONDS );
217+ }
171218 }
172219 break ;
173220 }
@@ -181,13 +228,13 @@ private static final class Counter {
181228 private final int initial ;
182229 private int current ;
183230 private boolean doneReturnedTrue ;
184- private final boolean verbose ;
231+ private final Verbocity verbocity ;
185232
186- Counter (final int initial , final boolean verbose ) {
233+ Counter (final int initial , final Verbocity verbocity ) {
187234 this .initial = initial ;
188235 this .current = initial ;
189236 this .doneReturnedTrue = false ;
190- this .verbose = verbose ;
237+ this .verbocity = verbocity ;
191238 }
192239
193240 int initial () {
@@ -198,7 +245,7 @@ void countDown() {
198245 com .mongodb .assertions .Assertions .assertTrue (current > 0 );
199246 int previous = current ;
200247 int decremented = --current ;
201- if (verbose || decremented % 100_000 == 0 ) {
248+ if (verbocity . equals ( Verbocity . VERBOSE ) || decremented % 50_000 == 0 ) {
202249 System .err .printf ("counted %d->%d tid=%d callStackDepth=%d %n" ,
203250 previous , decremented , Thread .currentThread ().getId (), Thread .currentThread ().getStackTrace ().length );
204251 }
@@ -247,6 +294,17 @@ private static void join(final Thread thread) {
247294 }
248295 }
249296
297+ private static void join (final Future <?> future ) {
298+ try {
299+ future .get ();
300+ } catch (InterruptedException e ) {
301+ Thread .currentThread ().interrupt ();
302+ throw new RuntimeException (e );
303+ } catch (ExecutionException e ) {
304+ throw new RuntimeException (e );
305+ }
306+ }
307+
250308 private static void sleep (final Duration duration ) {
251309 if (duration .isZero ()) {
252310 return ;
@@ -262,4 +320,61 @@ private static void sleep(final Duration duration) {
262320 throw new RuntimeException (e );
263321 }
264322 }
323+
324+ /**
325+ * This {@link ScheduledThreadPoolExecutor} propagates exceptions that caused termination of a task execution,
326+ * causing the thread that executed the task to be terminated.
327+ */
328+ private static final class ScheduledExecutor extends ScheduledThreadPoolExecutor {
329+ ScheduledExecutor (final int size , final ThreadManagement threadManagement ) {
330+ super (size , r -> {
331+ Thread thread = new Thread (() -> {
332+ r .run ();
333+ if (threadManagement .equals (ThreadManagement .NEW_THREAD_PER_TASK )) {
334+ terminateCurrentThread ();
335+ }
336+ });
337+ thread .setUncaughtExceptionHandler ((t , e ) -> {
338+ if (e instanceof ThreadTerminationException ) {
339+ return ;
340+ }
341+ t .getThreadGroup ().uncaughtException (t , e );
342+ });
343+ return thread ;
344+ });
345+ }
346+
347+ private static void terminateCurrentThread () {
348+ throw ThreadTerminationException .INSTANCE ;
349+ }
350+
351+ @ Override
352+ protected void afterExecute (final Runnable r , final Throwable t ) {
353+ if (t instanceof ThreadTerminationException ) {
354+ throw (ThreadTerminationException ) t ;
355+ } else if (r instanceof Future <?>) {
356+ Future <?> future = (Future <?>) r ;
357+ if (future .isDone ()) {
358+ try {
359+ future .get ();
360+ } catch (ExecutionException e ) {
361+ Throwable cause = e .getCause ();
362+ if (cause instanceof ThreadTerminationException ) {
363+ throw (ThreadTerminationException ) cause ;
364+ }
365+ } catch (Throwable e ) {
366+ // do nothing, we are not swallowing `e`, btw
367+ }
368+ }
369+ }
370+ }
371+
372+ private static final class ThreadTerminationException extends RuntimeException {
373+ static final ThreadTerminationException INSTANCE = new ThreadTerminationException ();
374+
375+ private ThreadTerminationException () {
376+ super (null , null , false , false );
377+ }
378+ }
379+ }
265380}
0 commit comments