Skip to content

feat: Implement correlation on event filter#1386

Draft
matheusandre1 wants to merge 1 commit into
serverlessworkflow:mainfrom
matheusandre1:feature/issue1206
Draft

feat: Implement correlation on event filter#1386
matheusandre1 wants to merge 1 commit into
serverlessworkflow:mainfrom
matheusandre1:feature/issue1206

Conversation

@matheusandre1
Copy link
Copy Markdown
Contributor

Many thanks for submitting your Pull Request ❤️!

What this PR does / why we need it: Closes: #1206

Special notes for reviewers:

Additional information (if needed):

@matheusandre1 matheusandre1 marked this pull request as ready for review May 18, 2026 21:56
@matheusandre1 matheusandre1 requested a review from fjtirado as a code owner May 18, 2026 21:56
Copilot AI review requested due to automatic review settings May 18, 2026 21:56
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot encountered an error and was unable to review this pull request. You can try again by re-requesting a review.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one belongs to a different PR, isnt it

Copy link
Copy Markdown
Collaborator

@fjtirado fjtirado left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to separate the DSL gap PR you were working on from the Correlation one.

@matheusandre1 matheusandre1 force-pushed the feature/issue1206 branch 2 times, most recently from d8d98e5 to 77143c1 Compare May 19, 2026 16:00
@matheusandre1 matheusandre1 marked this pull request as draft May 19, 2026 16:00
@matheusandre1 matheusandre1 marked this pull request as ready for review May 19, 2026 16:01
Copilot AI review requested due to automatic review settings May 19, 2026 16:01
@matheusandre1 matheusandre1 marked this pull request as draft May 19, 2026 16:01
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 10 out of 10 changed files in this pull request and generated 7 comments.

Comment thread impl/test/src/test/java/io/serverlessworkflow/impl/test/CorrelationTest.java Outdated
Comment thread impl/test/src/test/java/io/serverlessworkflow/impl/test/CorrelationTest.java Outdated
Comment thread impl/test/src/test/java/io/serverlessworkflow/impl/test/CorrelationTest.java Outdated
- Add CorrelationPredicate for evaluating correlation expressions
- Add correlate support in AbstractEventFilterBuilder and AbstractEventFilterSpec
- Update TypeEventRegistration and TypeEventRegistrationBuilder with correlation predicates
- Implement correlation matching in AbstractTypeConsumer
- Add CorrelationTest and listen-correlate.yaml
- Add correlate tests in WorkflowBuilderTest and DSLTest

Signed-off-by: Matheus André <matheusandr2@gmail.com>
@matheusandre1
Copy link
Copy Markdown
Contributor Author

@fjtirado I will send the other file separately after you review this one.

Comment on lines +66 to +68
return correlationPredicates.isEmpty()
? new TypeEventRegistrationBuilder(type, cePredicate)
: new TypeEventRegistrationBuilder(type, cePredicate, correlationPredicates);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return correlationPredicates.isEmpty()
? new TypeEventRegistrationBuilder(type, cePredicate)
: new TypeEventRegistrationBuilder(type, cePredicate, correlationPredicates);
return new TypeEventRegistrationBuilder(type, cePredicate, correlationPredicates);

}

public static CorrelationPredicate from(
String key, io.serverlessworkflow.api.types.CorrelateProperty prop, WorkflowApplication app) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use import rather than fully qualified name

Copy link
Copy Markdown
Collaborator

@fjtirado fjtirado May 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class should hava package protected scope and implement CloudEventPredicate

type, application.cloudEventPredicateFactory().build(application, properties));
CloudEventPredicate cePredicate =
application.cloudEventPredicateFactory().build(application, properties);
Collection<CorrelationPredicate> correlationPredicates =
Copy link
Copy Markdown
Collaborator

@fjtirado fjtirado May 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Collection<CorrelationPredicate> correlationPredicates =
Collection<CloudEventPredicate> correlationPredicates =

public record TypeEventRegistrationBuilder(
String type,
CloudEventPredicate cePredicate,
Collection<CorrelationPredicate> correlationPredicates)
Copy link
Copy Markdown
Collaborator

@fjtirado fjtirado May 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Collection<CorrelationPredicate> correlationPredicates)
Collection<CloudEventPredicate> correlationPredicates)

Comment on lines +82 to +84
public String key() {
return key;
}
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is not used, in fact, I do not think you need the key as member

Comment on lines +100 to +111
WorkflowModel eventModel = null;
for (TypeEventRegistration registration : registrations) {
if (registration.predicate().test(ce, registration.workflow(), registration.task())) {
Collection<CorrelationPredicate> predicates = registration.correlationPredicates();
if (!predicates.isEmpty()) {
if (eventModel == null) {
eventModel = modelFactory.from(ce);
}
if (!testCorrelation(eventModel, registration)) {
continue;
}
}
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
WorkflowModel eventModel = null;
for (TypeEventRegistration registration : registrations) {
if (registration.predicate().test(ce, registration.workflow(), registration.task())) {
Collection<CorrelationPredicate> predicates = registration.correlationPredicates();
if (!predicates.isEmpty()) {
if (eventModel == null) {
eventModel = modelFactory.from(ce);
}
if (!testCorrelation(eventModel, registration)) {
continue;
}
}
WorkflowModel eventModel = null;
for (TypeEventRegistration registration : registrations) {
if (registration.predicate().test(ce, registration.workflow(), registration.task() && testCorrelation(ce,registration)) {

}
}

private boolean testCorrelation(WorkflowModel eventModel, TypeEventRegistration registration) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private boolean testCorrelation(WorkflowModel eventModel, TypeEventRegistration registration) {
private boolean testCorrelation(CloudEvent ce, TypeEventRegistration registration) {

return new CorrelationPredicate(key, fromResolver, expectResolver);
}

public boolean test(WorkflowModel eventModel, WorkflowContext workflow, TaskContext task) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public boolean test(WorkflowModel eventModel, WorkflowContext workflow, TaskContext task) {
public boolean test(CloudEvent cloudEvent, WorkflowContext workflow, TaskContext task) {
WorkflowModel eventModel = workflow.definition().application().modelFactory().from(ce);

return true;
}
for (CorrelationPredicate pred : predicates) {
if (!pred.test(eventModel, registration.workflow(), registration.task())) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (!pred.test(eventModel, registration.workflow(), registration.task())) {
if (!pred.test(ce, registration.workflow(), registration.task())) {

}

private boolean testCorrelation(WorkflowModel eventModel, TypeEventRegistration registration) {
Collection<CorrelationPredicate> predicates = registration.correlationPredicates();
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Collection<CorrelationPredicate> predicates = registration.correlationPredicates();
Collection<CloudEventPredicate> predicates = registration.correlationPredicates();

Copy link
Copy Markdown
Collaborator

@fjtirado fjtirado left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, but I think ColrreationPredicatre should be a CloudEventPredicate

@fjtirado fjtirado self-requested a review May 20, 2026 09:48
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Feat: Implement correlation on event filter

3 participants