Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions stripe/_webhook.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import time
from collections import OrderedDict
from hashlib import sha256
from typing import Optional

# Used for global variables
import stripe # noqa: IMP101
Expand Down Expand Up @@ -52,6 +53,40 @@ def _compute_signature(payload, secret):
)
return mac.hexdigest()

@classmethod
def generate_test_header_string(
cls,
payload: str,
secret: str,
timestamp: Optional[int] = None,
scheme: Optional[str] = None,
signature: Optional[str] = None,
) -> str:
"""
Generates a value for the `Stripe-Signature` header that can be used
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docstring uses a mix of markdown and RST syntax. I would suggest sticking to markdown to be consistent with the rest of the code base. You can specify parameters as:

Args:
    payload: ...
    secret: ...

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

switched to a markdown Args: block. Thanks for the catch!

when testing code that calls `Webhook.construct_event` or
`WebhookSignature.verify_header`. Mirrors `generateTestHeaderString`
from stripe-node.

Args:
payload: The webhook payload to sign, as a string.
secret: The webhook signing secret (`whsec_...`).
timestamp: Unix timestamp to embed in the header. Defaults to
the current time.
scheme: Signature scheme. Defaults to
`WebhookSignature.EXPECTED_SCHEME`.
signature: Pre-computed signature to embed in the header. If
omitted, a signature is computed from `payload` and `secret`.
"""
if timestamp is None:
timestamp = int(time.time())
if scheme is None:
scheme = cls.EXPECTED_SCHEME
if signature is None:
signed_payload = "%d.%s" % (timestamp, payload)
signature = cls._compute_signature(signed_payload, secret)
return "t=%d,%s=%s" % (timestamp, scheme, signature)

@staticmethod
def _get_timestamp_and_signatures(header, scheme):
list_items = [i.split("=", 2) for i in header.split(",")]
Expand Down
70 changes: 58 additions & 12 deletions tests/test_webhook.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,13 @@


def generate_header(**kwargs):
timestamp = kwargs.get("timestamp", int(time.time()))
payload = kwargs.get("payload", DUMMY_WEBHOOK_PAYLOAD)
secret = kwargs.get("secret", DUMMY_WEBHOOK_SECRET)
scheme = kwargs.get("scheme", stripe.WebhookSignature.EXPECTED_SCHEME)
signature = kwargs.get("signature", None)
if signature is None:
payload_to_sign = "%d.%s" % (timestamp, payload)
signature = stripe.WebhookSignature._compute_signature(
payload_to_sign, secret
)
header = "t=%d,%s=%s" % (timestamp, scheme, signature)
return header
return stripe.WebhookSignature.generate_test_header_string(
payload=kwargs.get("payload", DUMMY_WEBHOOK_PAYLOAD),
secret=kwargs.get("secret", DUMMY_WEBHOOK_SECRET),
timestamp=kwargs.get("timestamp"),
scheme=kwargs.get("scheme"),
signature=kwargs.get("signature"),
)


class TestWebhook(object):
Expand Down Expand Up @@ -149,6 +144,57 @@ def test_timestamp_off_but_no_tolerance(self):
)


class TestGenerateTestHeaderString(object):
def test_uses_defaults_when_optional_args_omitted(self):
before = int(time.time())
header = stripe.WebhookSignature.generate_test_header_string(
payload=DUMMY_WEBHOOK_PAYLOAD, secret=DUMMY_WEBHOOK_SECRET
)
after = int(time.time())

parts = dict(item.split("=", 1) for item in header.split(","))
assert before <= int(parts["t"]) <= after
assert "v1" in parts

def test_header_verifies_round_trip(self):
header = stripe.WebhookSignature.generate_test_header_string(
payload=DUMMY_WEBHOOK_PAYLOAD, secret=DUMMY_WEBHOOK_SECRET
)
assert stripe.WebhookSignature.verify_header(
DUMMY_WEBHOOK_PAYLOAD,
header,
DUMMY_WEBHOOK_SECRET,
tolerance=10,
)

def test_honors_custom_timestamp_and_scheme(self):
header = stripe.WebhookSignature.generate_test_header_string(
payload=DUMMY_WEBHOOK_PAYLOAD,
secret=DUMMY_WEBHOOK_SECRET,
timestamp=12345,
scheme="v0",
)
assert header.startswith("t=12345,v0=")

def test_uses_provided_signature_verbatim(self):
header = stripe.WebhookSignature.generate_test_header_string(
payload=DUMMY_WEBHOOK_PAYLOAD,
secret=DUMMY_WEBHOOK_SECRET,
timestamp=12345,
signature="deadbeef",
)
assert header == "t=12345,v1=deadbeef"

def test_bad_secret_fails_verification(self):
header = stripe.WebhookSignature.generate_test_header_string(
payload=DUMMY_WEBHOOK_PAYLOAD, secret="whsec_wrong"
)
with pytest.raises(SignatureVerificationError):
stripe.WebhookSignature.verify_header(
DUMMY_WEBHOOK_PAYLOAD, header, DUMMY_WEBHOOK_SECRET
)


class TestStripeClientConstructEvent(object):
def test_construct_event(self, stripe_mock_stripe_client):
header = generate_header()
Expand Down