feat: Implement correlation on event filter#1386
Conversation
a138648 to
ba7cffc
Compare
There was a problem hiding this comment.
This one belongs to a different PR, isnt it
fjtirado
left a comment
There was a problem hiding this comment.
You need to separate the DSL gap PR you were working on from the Correlation one.
d8d98e5 to
77143c1
Compare
- Add CorrelationPredicate for evaluating correlation expressions - Add correlate support in AbstractEventFilterBuilder and AbstractEventFilterSpec - Update TypeEventRegistration and TypeEventRegistrationBuilder with correlation predicates - Implement correlation matching in AbstractTypeConsumer - Add CorrelationTest and listen-correlate.yaml - Add correlate tests in WorkflowBuilderTest and DSLTest Signed-off-by: Matheus André <matheusandr2@gmail.com>
77143c1 to
17ca1de
Compare
|
@fjtirado I will send the other file separately after you review this one. |
| return correlationPredicates.isEmpty() | ||
| ? new TypeEventRegistrationBuilder(type, cePredicate) | ||
| : new TypeEventRegistrationBuilder(type, cePredicate, correlationPredicates); |
There was a problem hiding this comment.
| return correlationPredicates.isEmpty() | |
| ? new TypeEventRegistrationBuilder(type, cePredicate) | |
| : new TypeEventRegistrationBuilder(type, cePredicate, correlationPredicates); | |
| return new TypeEventRegistrationBuilder(type, cePredicate, correlationPredicates); |
| } | ||
|
|
||
| public static CorrelationPredicate from( | ||
| String key, io.serverlessworkflow.api.types.CorrelateProperty prop, WorkflowApplication app) { |
There was a problem hiding this comment.
Use import rather than fully qualified name
There was a problem hiding this comment.
This class should hava package protected scope and implement CloudEventPredicate
| type, application.cloudEventPredicateFactory().build(application, properties)); | ||
| CloudEventPredicate cePredicate = | ||
| application.cloudEventPredicateFactory().build(application, properties); | ||
| Collection<CorrelationPredicate> correlationPredicates = |
There was a problem hiding this comment.
| Collection<CorrelationPredicate> correlationPredicates = | |
| Collection<CloudEventPredicate> correlationPredicates = |
| public record TypeEventRegistrationBuilder( | ||
| String type, | ||
| CloudEventPredicate cePredicate, | ||
| Collection<CorrelationPredicate> correlationPredicates) |
There was a problem hiding this comment.
| Collection<CorrelationPredicate> correlationPredicates) | |
| Collection<CloudEventPredicate> correlationPredicates) |
| public String key() { | ||
| return key; | ||
| } |
There was a problem hiding this comment.
This method is not used, in fact, I do not think you need the key as member
| WorkflowModel eventModel = null; | ||
| for (TypeEventRegistration registration : registrations) { | ||
| if (registration.predicate().test(ce, registration.workflow(), registration.task())) { | ||
| Collection<CorrelationPredicate> predicates = registration.correlationPredicates(); | ||
| if (!predicates.isEmpty()) { | ||
| if (eventModel == null) { | ||
| eventModel = modelFactory.from(ce); | ||
| } | ||
| if (!testCorrelation(eventModel, registration)) { | ||
| continue; | ||
| } | ||
| } |
There was a problem hiding this comment.
| WorkflowModel eventModel = null; | |
| for (TypeEventRegistration registration : registrations) { | |
| if (registration.predicate().test(ce, registration.workflow(), registration.task())) { | |
| Collection<CorrelationPredicate> predicates = registration.correlationPredicates(); | |
| if (!predicates.isEmpty()) { | |
| if (eventModel == null) { | |
| eventModel = modelFactory.from(ce); | |
| } | |
| if (!testCorrelation(eventModel, registration)) { | |
| continue; | |
| } | |
| } | |
| WorkflowModel eventModel = null; | |
| for (TypeEventRegistration registration : registrations) { | |
| if (registration.predicate().test(ce, registration.workflow(), registration.task() && testCorrelation(ce,registration)) { |
| } | ||
| } | ||
|
|
||
| private boolean testCorrelation(WorkflowModel eventModel, TypeEventRegistration registration) { |
There was a problem hiding this comment.
| private boolean testCorrelation(WorkflowModel eventModel, TypeEventRegistration registration) { | |
| private boolean testCorrelation(CloudEvent ce, TypeEventRegistration registration) { |
| return new CorrelationPredicate(key, fromResolver, expectResolver); | ||
| } | ||
|
|
||
| public boolean test(WorkflowModel eventModel, WorkflowContext workflow, TaskContext task) { |
There was a problem hiding this comment.
| public boolean test(WorkflowModel eventModel, WorkflowContext workflow, TaskContext task) { | |
| public boolean test(CloudEvent cloudEvent, WorkflowContext workflow, TaskContext task) { | |
| WorkflowModel eventModel = workflow.definition().application().modelFactory().from(ce); |
| return true; | ||
| } | ||
| for (CorrelationPredicate pred : predicates) { | ||
| if (!pred.test(eventModel, registration.workflow(), registration.task())) { |
There was a problem hiding this comment.
| if (!pred.test(eventModel, registration.workflow(), registration.task())) { | |
| if (!pred.test(ce, registration.workflow(), registration.task())) { |
| } | ||
|
|
||
| private boolean testCorrelation(WorkflowModel eventModel, TypeEventRegistration registration) { | ||
| Collection<CorrelationPredicate> predicates = registration.correlationPredicates(); |
There was a problem hiding this comment.
| Collection<CorrelationPredicate> predicates = registration.correlationPredicates(); | |
| Collection<CloudEventPredicate> predicates = registration.correlationPredicates(); |
Many thanks for submitting your Pull Request ❤️!
What this PR does / why we need it: Closes: #1206
Special notes for reviewers:
Additional information (if needed):