Skip to content

Commit 7cd53b1

Browse files
committed
[WIP] decoupled webhook processing
1 parent eaf5435 commit 7cd53b1

File tree

3 files changed

+87
-12
lines changed

3 files changed

+87
-12
lines changed

src/notifications/models.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,3 +156,14 @@ def deregister(self) -> None:
156156

157157
self._subscription = ""
158158
self.save(update_fields=["_subscription"])
159+
160+
class NotificationRecord(models.Model):
161+
subscription = models.ForeignKey(Subscription, related_name="received_notifications", on_delete=models.CASCADE)
162+
payload = models.BinaryField()
163+
kanaal = models.CharField(max_length=256)
164+
received_at = models.DateTimeField(auto_created=True)
165+
last_processed_at = models.DateTimeField()
166+
is_valid = models.BooleanField(default=False)
167+
is_processed = models.BooleanField(default=False)
168+
is_locked = models.BooleanField(default=False)
169+
processing_output = models.TextField()

src/open_inwoner/openzaak/api/views.py

Lines changed: 33 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from zgw_consumers.api_models.base import factory
77

88
from notifications.api.serializers import NotificatieSerializer
9+
from notifications.models import NotificationRecord
910
from open_inwoner.configurations.models import SiteConfiguration
1011
from open_inwoner.openzaak.api_models import Notification
1112
from open_inwoner.openzaak.auth import get_valid_subscription_from_request
@@ -40,6 +41,14 @@ def post(self, request):
4041
status=status.HTTP_401_UNAUTHORIZED,
4142
)
4243

44+
# Create initial log, so
45+
notification_record = NotificationRecord.objects.create(
46+
subscription=subscription,
47+
payload=request.content,
48+
is_processed=False,
49+
is_valid=False,
50+
)
51+
4352
# deserialize
4453
serializer = NotificatieSerializer(data=request.data)
4554
if not serializer.is_valid():
@@ -48,19 +57,33 @@ def post(self, request):
4857

4958
notification = factory(Notification, serializer.validated_data)
5059

51-
# verify channel
60+
# Update record with basic metadata
61+
notification_record.is_valid = True
62+
notification_record.kanaal = notification.kanaal
63+
notification_record.save(
64+
update_fields=(
65+
"is_valid",
66+
"kanaal",
67+
)
68+
)
69+
70+
# Handle test webhooks, and leave a record so it's visible for validation purposes
5271
if notification.kanaal == "test":
5372
self.log_webhook_test_channel(notification)
54-
return Response(status=status.HTTP_204_NO_CONTENT)
5573

74+
# Test webhooks don't have to be processed
75+
notification_record.is_processed = True
76+
notification_record.processing_output = "test channel, no further processing done"
77+
notification_record.save(update_fields=("is_processed", "processing_output", ))
78+
return Response(status=status.HTTP_204_NO_CONTENT)
79+
5680
if notification.kanaal not in subscription.channels:
5781
self.log_webhook_channel_not_subscribed(notification)
58-
return Response(
59-
{
60-
"detail": f"notification channel '{notification.kanaal}' not subscribed to"
61-
},
62-
status=status.HTTP_400_BAD_REQUEST,
63-
)
82+
83+
notification_record.is_processed = True
84+
notification_record.processing_output = "channel not subscribed"
85+
notification_record.save(update_fields=("is_processed", "processing_output", ))
86+
return Response({"detail": "channel not subscribed"}, status=status.HTTP_400_BAD_REQUEST)
6487

6588
if self.accept_channels and notification.kanaal not in self.accept_channels:
6689
self.log_webhook_channel_not_acceptable(notification)
@@ -97,5 +120,7 @@ def handle_notification(self, notification: Notification):
97120
if not config.notifications_cases_enabled:
98121
return
99122

123+
# If process eager, do now
124+
100125
notification_data = dataclasses.asdict(notification)
101126
process_zaken_notification.delay(notification_data)

src/open_inwoner/openzaak/tasks.py

Lines changed: 43 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,17 @@
11
import io
22

3+
import json
34
from django.core.management import call_command
5+
from django.utils import timezone
46

57
import structlog
68
from zgw_consumers.api_models.base import factory
7-
9+
from django.db import transaction
10+
from notifications.models import NotificationRecord
811
from open_inwoner.celery import app
912
from open_inwoner.openzaak.api_models import Notification
1013
from open_inwoner.openzaak.notifications import handle_zaken_notification
14+
from open_inwoner.utils.logentry import system_action
1115

1216
logger = structlog.stdlib.get_logger(__name__)
1317

@@ -25,8 +29,43 @@ def import_zgw_data():
2529
return out.getvalue()
2630

2731

32+
# TODO: Use Celery once for locking
2833
@app.task
29-
def process_zaken_notification(notification_data: dict):
34+
def process_zaken_notification(record_pk: int):
3035
logger.info("Started process_zaken_notification() task")
31-
notification = factory(Notification, notification_data)
32-
handle_zaken_notification(notification)
36+
37+
# The database guarantees that this WHERE clause evaluation
38+
# + UPDATE happens atomically - no other transaction can see
39+
# or modify the row in between. This ensure only one task
40+
# can process at a time
41+
rows_updated = NotificationRecord.objects.filter(
42+
pk=record_pk,
43+
is_locked=False
44+
).update(is_locked=True, last_processed_at=timezone.now())
45+
46+
if rows_updated == 0:
47+
# Another task already locked it. Log and raise, so Celery marks the
48+
# task as failed
49+
logger.info("Notification record already locked, skipping", record_pk=record_pk)
50+
raise RuntimeError("This task is already being processed")
51+
52+
try:
53+
notification_record = NotificationRecord.objects.get(pk=record_pk)
54+
notification_data = json.load(notification_record.payload)
55+
notification = factory(Notification, notification_data)
56+
handle_zaken_notification(notification)
57+
except NotificationRecord.DoesNotExist:
58+
system_action(
59+
"Attempted to process unknown Notification Record",
60+
log_level=logging.ERROR,
61+
record_pk=record_pk,
62+
)
63+
except Exception:
64+
msg = "Unknown error while processing notification"
65+
system_action(msg)
66+
logger.exception(msg)
67+
finally:
68+
NotificationRecord.objects.filter(pk=record_pk).update(
69+
is_locked=False,
70+
is_processed=True,
71+
)

0 commit comments

Comments
 (0)