Skip to content

Commit 9af4051

Browse files
committed
[Fix #1395] Improving algoritm
Signed-off-by: fjtirado <ftirados@redhat.com>
1 parent ad1eb92 commit 9af4051

22 files changed

Lines changed: 449 additions & 37 deletions

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowInstance.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package io.serverlessworkflow.impl;
1717

1818
import java.util.concurrent.CompletableFuture;
19+
import java.util.function.Supplier;
1920

2021
public interface WorkflowInstance extends WorkflowInstanceData {
2122
CompletableFuture<WorkflowModel> start();
@@ -49,4 +50,6 @@ public interface WorkflowInstance extends WorkflowInstanceData {
4950
boolean cancel();
5051

5152
boolean resume();
53+
54+
<T> T addMetadataIfAbsent(String key, Supplier<T> supplier);
5255
}

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowInstanceData.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package io.serverlessworkflow.impl;
1717

1818
import java.time.Instant;
19+
import java.util.Optional;
1920

2021
public interface WorkflowInstanceData {
2122
String id();
@@ -29,4 +30,6 @@ public interface WorkflowInstanceData {
2930
WorkflowStatus status();
3031

3132
WorkflowModel context();
33+
34+
<T> Optional<T> findMetadata(String key, Class<T> objectClass);
3235
}

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -350,9 +350,16 @@ public void addCancelable(CompletableFuture<?> cancelable) {
350350
}
351351
}
352352

353-
public <T> T additionalObject(String key, Supplier<T> supplier) {
353+
@Override
354+
public <T> T addMetadataIfAbsent(String key, Supplier<T> supplier) {
354355
return (T) additionalObjects.computeIfAbsent(key, k -> supplier.get());
355356
}
356357

358+
@Override
359+
public <T> Optional<T> findMetadata(String key, Class<T> objectClass) {
360+
Object value = additionalObjects.get(key);
361+
return objectClass.isInstance(value) ? Optional.of(objectClass.cast(value)) : Optional.empty();
362+
}
363+
357364
public void restoreContext(WorkflowContext workflow, TaskContext context) {}
358365
}

impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/AllStrategyCorrelationInfo.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,21 @@
1616
package io.serverlessworkflow.impl.scheduler;
1717

1818
import io.cloudevents.CloudEvent;
19+
import io.serverlessworkflow.impl.WorkflowInstance;
1920
import io.serverlessworkflow.impl.events.EventRegistrationBuilder;
20-
import java.util.Collection;
21+
import java.util.Map;
2122
import java.util.function.Consumer;
2223

2324
public interface AllStrategyCorrelationInfo extends AutoCloseable {
2425
void correlate(
25-
EventRegistrationBuilder reg, CloudEvent event, Consumer<Collection<CloudEvent>> starter);
26+
EventRegistrationBuilder reg,
27+
CloudEvent event,
28+
Consumer<Map<EventRegistrationBuilder, CloudEvent>> starter);
2629

2730
void register(EventRegistrationBuilder reg);
2831

32+
default void addMetadata(
33+
WorkflowInstance instance, Map<EventRegistrationBuilder, CloudEvent> events) {}
34+
2935
default void close() {}
3036
}

impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/DefaultWorkflowScheduler.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
package io.serverlessworkflow.impl.scheduler;
1717

1818
import io.serverlessworkflow.impl.WorkflowDefinition;
19-
import io.serverlessworkflow.impl.WorkflowModel;
19+
import io.serverlessworkflow.impl.WorkflowInstance;
2020
import java.util.concurrent.Executors;
2121
import java.util.concurrent.ScheduledExecutorService;
2222
import java.util.concurrent.ScheduledFuture;
@@ -83,10 +83,10 @@ protected CronResolverIntanceRunner(WorkflowDefinition definition) {
8383
}
8484

8585
@Override
86-
public void accept(WorkflowModel model) {
86+
public void accept(WorkflowInstance instance) {
8787
if (!cancelled.get()) {
8888
scheduleNext();
89-
super.accept(model);
89+
super.accept(instance);
9090
}
9191
}
9292
}

impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/InMemoryAllStrategyCorrelationInfo.java

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import io.cloudevents.CloudEvent;
1919
import io.serverlessworkflow.impl.events.EventRegistrationBuilder;
2020
import java.util.ArrayList;
21-
import java.util.Collection;
2221
import java.util.HashMap;
2322
import java.util.List;
2423
import java.util.Map;
@@ -30,21 +29,23 @@ public class InMemoryAllStrategyCorrelationInfo implements AllStrategyCorrelatio
3029

3130
@Override
3231
public void correlate(
33-
EventRegistrationBuilder reg, CloudEvent event, Consumer<Collection<CloudEvent>> starter) {
34-
Collection<CloudEvent> collection = new ArrayList<>();
32+
EventRegistrationBuilder reg,
33+
CloudEvent event,
34+
Consumer<Map<EventRegistrationBuilder, CloudEvent>> starter) {
35+
Map<EventRegistrationBuilder, CloudEvent> result = new HashMap<>();
3536
// to minimize the critical section, conversion is done later, here we are
3637
// performing just collection, if any
3738
synchronized (correlatedEvents) {
3839
correlatedEvents.get(reg).add(event);
39-
Collection<List<CloudEvent>> events = correlatedEvents.values();
40-
if (satisfyCondition(events)) {
41-
for (List<CloudEvent> values : events) {
42-
collection.add(values.remove(0));
40+
if (satisfyCondition(correlatedEvents)) {
41+
for (java.util.Map.Entry<EventRegistrationBuilder, List<CloudEvent>> values :
42+
correlatedEvents.entrySet()) {
43+
result.put(values.getKey(), values.getValue().remove(0));
4344
}
4445
}
4546
}
46-
if (!collection.isEmpty()) {
47-
starter.accept(collection);
47+
if (!result.isEmpty()) {
48+
starter.accept(result);
4849
}
4950
}
5051

@@ -56,8 +57,8 @@ public void register(EventRegistrationBuilder reg) {
5657
correlatedEvents.put(reg, new ArrayList<CloudEvent>());
5758
}
5859

59-
private boolean satisfyCondition(Collection<List<CloudEvent>> events) {
60-
for (List<CloudEvent> values : events) {
60+
private boolean satisfyCondition(Map<EventRegistrationBuilder, List<CloudEvent>> events) {
61+
for (List<CloudEvent> values : events.values()) {
6162
if (values.isEmpty()) {
6263
return false;
6364
}

impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/ScheduledEventConsumer.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,16 @@
1919

2020
import io.cloudevents.CloudEvent;
2121
import io.serverlessworkflow.impl.WorkflowDefinition;
22+
import io.serverlessworkflow.impl.WorkflowInstance;
2223
import io.serverlessworkflow.impl.WorkflowModel;
2324
import io.serverlessworkflow.impl.WorkflowModelCollection;
2425
import io.serverlessworkflow.impl.events.EventConsumer;
2526
import io.serverlessworkflow.impl.events.EventRegistration;
27+
import io.serverlessworkflow.impl.events.EventRegistrationBuilder;
2628
import io.serverlessworkflow.impl.events.EventRegistrationBuilderInfo;
2729
import java.util.ArrayList;
2830
import java.util.Collection;
31+
import java.util.Map;
2932
import java.util.function.Function;
3033

3134
public class ScheduledEventConsumer implements AutoCloseable {
@@ -78,13 +81,15 @@ public ScheduledEventConsumer(
7881
protected void start(CloudEvent ce) {
7982
WorkflowModelCollection model = definition.application().modelFactory().createCollection();
8083
model.add(converter.apply(ce));
81-
instanceRunner.accept(model);
84+
instanceRunner.accept(definition.instance(model));
8285
}
8386

84-
protected void start(Collection<CloudEvent> ces) {
87+
protected void start(Map<EventRegistrationBuilder, CloudEvent> ces) {
8588
WorkflowModelCollection model = definition.application().modelFactory().createCollection();
86-
ces.forEach(ce -> model.add(converter.apply(ce)));
87-
instanceRunner.accept(model);
89+
ces.values().forEach(ce -> model.add(converter.apply(ce)));
90+
WorkflowInstance instance = definition.instance(model);
91+
allStrategyCorrelationInfo.addMetadata(instance, ces);
92+
instanceRunner.accept(instance);
8893
}
8994

9095
public void close() {

impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/ScheduledInstanceRunnable.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import io.serverlessworkflow.impl.WorkflowModel;
2121
import java.util.function.Consumer;
2222

23-
public class ScheduledInstanceRunnable implements Runnable, Consumer<WorkflowModel> {
23+
public class ScheduledInstanceRunnable implements Runnable, Consumer<WorkflowInstance> {
2424

2525
protected final WorkflowDefinition definition;
2626

@@ -30,16 +30,20 @@ public ScheduledInstanceRunnable(WorkflowDefinition definition) {
3030

3131
@Override
3232
public void run() {
33-
accept(definition.application().modelFactory().fromNull());
33+
accept(definition.instance());
3434
}
3535

3636
@Override
37-
public void accept(WorkflowModel model) {
38-
runScheduledInstance(definition, model);
37+
public void accept(WorkflowInstance instance) {
38+
runScheduledInstance(definition, instance);
3939
}
4040

4141
public static void runScheduledInstance(WorkflowDefinition definition, WorkflowModel model) {
42-
WorkflowInstance instance = definition.instance(model);
42+
runScheduledInstance(definition, definition.instance(model));
43+
}
44+
45+
private static void runScheduledInstance(
46+
WorkflowDefinition definition, WorkflowInstance instance) {
4347
definition.addScheduledInstance(instance);
4448
definition.application().executorService().execute(() -> instance.start());
4549
}
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.persistence;
17+
18+
import io.cloudevents.CloudEvent;
19+
import io.serverlessworkflow.impl.WorkflowDefinition;
20+
import io.serverlessworkflow.impl.WorkflowInstance;
21+
import io.serverlessworkflow.impl.events.EventRegistrationBuilder;
22+
import io.serverlessworkflow.impl.scheduler.AllStrategyCorrelationInfo;
23+
import java.util.ArrayList;
24+
import java.util.Collection;
25+
import java.util.HashMap;
26+
import java.util.List;
27+
import java.util.Map;
28+
import java.util.Map.Entry;
29+
import java.util.function.Consumer;
30+
import java.util.stream.Collectors;
31+
32+
public abstract class AbstractAllStrategyCorrelationInfo implements AllStrategyCorrelationInfo {
33+
34+
protected final WorkflowDefinition definition;
35+
private final PersistenceExecutor executor;
36+
private final Map<EventRegistrationBuilder, String> reg2IdMapping = new HashMap<>();
37+
private final Map<String, EventRegistrationBuilder> id2RegMapping = new HashMap<>();
38+
39+
private int counter;
40+
41+
public AbstractAllStrategyCorrelationInfo(
42+
WorkflowDefinition definition, PersistenceExecutor executor) {
43+
this.definition = definition;
44+
this.executor = executor;
45+
}
46+
47+
@Override
48+
public void correlate(
49+
EventRegistrationBuilder reg,
50+
CloudEvent event,
51+
Consumer<Map<EventRegistrationBuilder, CloudEvent>> starter) {
52+
executor
53+
.execute(() -> operation(reg2IdMapping.get(reg), event), definition)
54+
.thenAccept(events -> events.forEach(starter));
55+
}
56+
57+
@Override
58+
public void register(EventRegistrationBuilder reg) {
59+
String id = generateIdFromReg(reg);
60+
id2RegMapping.put(id, reg);
61+
reg2IdMapping.put(reg, id);
62+
}
63+
64+
protected final Collection<Map<EventRegistrationBuilder, CloudEvent>> operation(
65+
CorrelationOperations operations, String reg, CloudEvent event) {
66+
operations.storeEvent(reg, event);
67+
Map<String, List<CloudEvent>> events = operations.retrieveEvents(id2RegMapping.keySet());
68+
Map<String, Collection<String>> processed = new HashMap<>();
69+
Collection<Map<EventRegistrationBuilder, CloudEvent>> result = new ArrayList<>();
70+
boolean notDone = true;
71+
while (notDone) {
72+
Map<EventRegistrationBuilder, CloudEvent> row = new HashMap<>();
73+
for (Entry<String, List<CloudEvent>> item : events.entrySet()) {
74+
List<CloudEvent> list = item.getValue();
75+
if (list.isEmpty()) {
76+
notDone = false;
77+
break;
78+
}
79+
CloudEvent retrieved = item.getValue().remove(0);
80+
processed.computeIfAbsent(item.getKey(), k -> new ArrayList<>()).add(retrieved.getId());
81+
row.put(id2RegMapping.get(item.getKey()), retrieved);
82+
}
83+
result.add(row);
84+
}
85+
operations.markAsProcessed(processed);
86+
return result;
87+
}
88+
89+
public void addMetadata(
90+
WorkflowInstance instance, Map<EventRegistrationBuilder, CloudEvent> events) {
91+
instance.addMetadataIfAbsent(
92+
AbstractPersistenceInstanceWriter.CLOUD_EVENT_IDS,
93+
() ->
94+
events.entrySet().stream()
95+
.collect(
96+
Collectors.toMap(
97+
e -> reg2IdMapping.get(e.getKey()), v -> v.getValue().getId())));
98+
}
99+
100+
protected abstract Collection<Map<EventRegistrationBuilder, CloudEvent>> operation(
101+
String reg, CloudEvent event);
102+
103+
protected String generateIdFromReg(EventRegistrationBuilder reg) {
104+
final String separator = ":";
105+
return definition.id().toString(separator) + separator + ++counter;
106+
}
107+
}

impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/AbstractAsyncPersistenceExecutor.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,24 @@
1515
*/
1616
package io.serverlessworkflow.impl.persistence;
1717

18-
import io.serverlessworkflow.impl.WorkflowContextData;
18+
import io.serverlessworkflow.impl.WorkflowDefinitionData;
1919
import java.util.Optional;
2020
import java.util.concurrent.CompletableFuture;
2121
import java.util.concurrent.ExecutorService;
22+
import java.util.function.Supplier;
2223

2324
public abstract class AbstractAsyncPersistenceExecutor implements PersistenceExecutor {
2425
@Override
25-
public CompletableFuture<Void> execute(Runnable runnable, WorkflowContextData context) {
26+
public CompletableFuture<Void> execute(Runnable runnable, WorkflowDefinitionData definition) {
27+
2628
return CompletableFuture.runAsync(
27-
runnable, executorService().orElse(context.definition().application().executorService()));
29+
runnable, executorService().orElse(definition.application().executorService()));
30+
}
31+
32+
@Override
33+
public <T> CompletableFuture<T> execute(Supplier<T> runnable, WorkflowDefinitionData definition) {
34+
return CompletableFuture.supplyAsync(
35+
runnable, executorService().orElse(definition.application().executorService()));
2836
}
2937

3038
protected abstract Optional<ExecutorService> executorService();

0 commit comments

Comments
 (0)