Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions examples/event_notification_handler_endpoint.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# -*- coding: utf-8 -*-
# File copied from our code generator; changes here will be overwritten.
"""
event_notification_handler_endpoint.py - receive and process event notifications (AKA thin events) like "v1.billing.meter.error_report_triggered" using EventNotificationHandler.

In this example, we:
- write a fallback callback to handle unrecognized event notifications
- create a StripeClient called client
- Initialize an EventNotificationHandler with the client, webhook secret, and fallback callback
- register a specific handler for the "v1.billing.meter.error_report_triggered" event notification type
- use handler.handle() to process the received notification webhook body
"""

import os
from flask import Flask, request, jsonify

from stripe import StripeClient, UnhandledNotificationDetails
from stripe.v2.core import EventNotification
from stripe.events import V1BillingMeterErrorReportTriggeredEventNotification

app = Flask(__name__)
api_key = os.environ.get("STRIPE_API_KEY", "")
webhook_secret = os.environ.get("WEBHOOK_SECRET", "")


def fallback_callback(
notif: EventNotification,
client: StripeClient,
details: UnhandledNotificationDetails,
):
print(f"Got an unhandled event of type {notif.type}!")


client = StripeClient(api_key)
handler = client.notification_handler(webhook_secret, fallback_callback)


# can be anywhere in your codebase
@handler.on_v1_billing_meter_error_report_triggered
def handle_meter_error(
notif: V1BillingMeterErrorReportTriggeredEventNotification,
client: StripeClient,
):
event = notif.fetch_event()
print(f"Err! No meter found: {event.data.developer_message_summary}")


@app.route("/webhook", methods=["POST"])
def webhook():
webhook_body = request.data
sig_header = request.headers.get("Stripe-Signature")

try:
handler.handle(webhook_body, sig_header)
return jsonify(success=True), 200
except Exception as e:
return jsonify(error=str(e)), 500

Check warning

Code scanning / CodeQL

Information exposure through an exception Medium

Stack trace information
flows to this location and may be exposed to an external user.
Comment thread
xavdid marked this conversation as resolved.
Dismissed
2 changes: 2 additions & 0 deletions examples/event_notification_webhook_handler.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# -*- coding: utf-8 -*-
# File copied from our code generator; changes here will be overwritten.
"""
event_notification_webhook_handler.py - receive and process event notifications (AKA thin events) like the v1.billing.meter.error_report_triggered and v1.billing.meter.no_meter_found events.

Expand Down
12 changes: 12 additions & 0 deletions stripe/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,10 @@ def add_beta_version(
OAuthErrorObject as OAuthErrorObject,
)
from stripe._event import Event as Event
from stripe._event_notification_handler import (
StripeEventNotificationHandler as StripeEventNotificationHandler,
UnhandledNotificationDetails as UnhandledNotificationDetails,
)
from stripe._event_service import EventService as EventService
from stripe._exchange_rate import ExchangeRate as ExchangeRate
from stripe._exchange_rate_service import (
Expand Down Expand Up @@ -837,6 +841,14 @@ def add_beta_version(
"ErrorObject": ("stripe._error_object", False),
"OAuthErrorObject": ("stripe._error_object", False),
"Event": ("stripe._event", False),
"StripeEventNotificationHandler": (
"stripe._event_notification_handler",
False,
),
"UnhandledNotificationDetails": (
"stripe._event_notification_handler",
False,
),
"EventService": ("stripe._event_service", False),
"ExchangeRate": ("stripe._exchange_rate", False),
"ExchangeRateService": ("stripe._exchange_rate_service", False),
Expand Down
Loading
Loading