Signature verification
Every Puck webhook delivery is signed with an HMAC-SHA256 digest computed over the raw request body. Verifying the signature before processing any payload protects against spoofed requests and replay attacks.
The signature header
Every delivery includes the header:
X-Puck-Signature: t=<unix_seconds>,v1=<hex_hmac>Where:
t— Unix timestamp (seconds) at the moment Puck computed the signature. Used for replay protection.v1— Hex-encoded HMAC-SHA256 over the string"${t}.${raw_body}", keyed with the subscription’s secret.
The v1 prefix allows Puck to add future signature schemes without breaking existing handlers — always validate whichever version(s) you support.
How to verify
The algorithm is:
- Parse
tandv1from theX-Puck-Signatureheader. - Check that
|now - t| ≤ 300seconds (5 minutes). Reject stale payloads. - Compute
HMAC-SHA256(secret, "${t}.${raw_body}")whereraw_bodyis the unmodified request body bytes — not a re-encoded JSON object. - Compare the computed digest against
v1using a constant-time equality function. Reject on mismatch.
The raw body requirement is critical: any middleware that parses and re-serializes JSON before your handler receives the request will produce a different byte sequence and a different HMAC. Make sure your framework captures the raw bytes before JSON parsing.
Verification code
// Verify a Puck webhook signature in Node.js.//// Drop this into your Express / Fastify / Next.js handler. The pattern is the// same in every framework: get the raw body (NOT a parsed JSON object), pull// the `X-Puck-Signature` header, and call verifyPuckSignature.//// Reject 5+ minute-old timestamps to defeat replay.
import { createHmac, timingSafeEqual } from "node:crypto";
const TOLERANCE_MS = 5 * 60 * 1_000;
export function verifyPuckSignature( header: string | null | undefined, rawBody: string, secret: string,): boolean { if (!header) return false;
let timestamp: number | undefined; const sigs: string[] = []; for (const part of header.split(",")) { const [k, v] = part.trim().split("="); if (k === "t") timestamp = parseInt(v, 10); else if (k === "v1") sigs.push(v); } if (!timestamp || sigs.length === 0) return false;
const now = Math.floor(Date.now() / 1000); if (Math.abs(now - timestamp) * 1_000 > TOLERANCE_MS) return false;
const expected = createHmac("sha256", secret) .update(`${timestamp}.${rawBody}`) .digest("hex");
for (const got of sigs) { if (got.length !== expected.length) continue; try { if (timingSafeEqual(Buffer.from(got, "hex"), Buffer.from(expected, "hex"))) { return true; } } catch { // fall through } } return false;}
// Express example://// import express from "express";// const app = express();// // IMPORTANT: capture the raw body so the HMAC can be recomputed exactly.// app.use("/puck/events", express.raw({ type: "application/json" }));// app.post("/puck/events", (req, res) => {// const raw = req.body.toString("utf8");// const ok = verifyPuckSignature(// req.header("X-Puck-Signature"),// raw,// process.env.PUCK_WEBHOOK_SECRET!,// );// if (!ok) return res.status(401).send("invalid signature");// const event = JSON.parse(raw);// // Handle event.event_type, dedupe on event.event_id, ...// res.status(204).end();// });"""Verify a Puck webhook signature in Python.
Works in any framework — the inputs are the raw request body and theX-Puck-Signature header value."""
import hashlibimport hmacimport time
TOLERANCE_SECONDS = 5 * 60
def verify_puck_signature(header: str | None, raw_body: bytes, secret: str) -> bool: if not header: return False
timestamp: int | None = None sigs: list[str] = [] for part in header.split(","): kv = part.strip().split("=", 1) if len(kv) != 2: return False k, v = kv if k == "t": try: timestamp = int(v) except ValueError: return False elif k == "v1": sigs.append(v)
if timestamp is None or not sigs: return False
now = int(time.time()) if abs(now - timestamp) > TOLERANCE_SECONDS: return False
expected = hmac.new( secret.encode("utf-8"), f"{timestamp}.".encode("utf-8") + raw_body, hashlib.sha256, ).hexdigest()
for got in sigs: if hmac.compare_digest(got, expected): return True return False
# Flask example:## from flask import Flask, request## app = Flask(__name__)# SECRET = os.environ["PUCK_WEBHOOK_SECRET"]## @app.post("/puck/events")# def puck_events():# raw = request.get_data() # raw bytes — do NOT use request.json# header = request.headers.get("X-Puck-Signature")# if not verify_puck_signature(header, raw, SECRET):# return "invalid signature", 401# event = json.loads(raw)# # Handle event["event_type"], dedupe on event["event_id"], ...# return "", 204// Package puckwebhook verifies signatures on incoming Puck webhook// deliveries. Drop into any HTTP server. The handler must read the raw// body before unmarshalling — the HMAC is computed over the bytes Puck// signed, not over a re-encoded representation.package puckwebhook
import ( "crypto/hmac" "crypto/sha256" "encoding/hex" "strconv" "strings" "time")
const toleranceSeconds = 5 * 60
// Verify returns nil on success or an error describing why verification failed.func Verify(header string, rawBody []byte, secret string) error { if header == "" { return errMissingHeader }
var timestamp int64 = -1 var sigs []string for _, part := range strings.Split(header, ",") { kv := strings.SplitN(strings.TrimSpace(part), "=", 2) if len(kv) != 2 { return errMalformed } switch kv[0] { case "t": n, err := strconv.ParseInt(kv[1], 10, 64) if err != nil { return errMalformed } timestamp = n case "v1": sigs = append(sigs, kv[1]) } } if timestamp < 0 || len(sigs) == 0 { return errMalformed }
now := time.Now().Unix() diff := now - timestamp if diff < 0 { diff = -diff } if diff > toleranceSeconds { return errStale }
mac := hmac.New(sha256.New, []byte(secret)) mac.Write([]byte(strconv.FormatInt(timestamp, 10))) mac.Write([]byte(".")) mac.Write(rawBody) expected := hex.EncodeToString(mac.Sum(nil))
for _, got := range sigs { if hmac.Equal([]byte(got), []byte(expected)) { return nil } } return errMismatch}
type verifyError string
func (e verifyError) Error() string { return string(e) }
const ( errMissingHeader = verifyError("missing X-Puck-Signature header") errMalformed = verifyError("malformed X-Puck-Signature header") errStale = verifyError("timestamp outside tolerance window") errMismatch = verifyError("signature mismatch"))
// net/http example://// func handlePuck(w http.ResponseWriter, r *http.Request) {// body, _ := io.ReadAll(r.Body)// sig := r.Header.Get("X-Puck-Signature")// if err := puckwebhook.Verify(sig, body, os.Getenv("PUCK_WEBHOOK_SECRET")); err != nil {// http.Error(w, err.Error(), http.StatusUnauthorized)// return// }// var ev struct {// EventID string `json:"event_id"`// EventType string `json:"event_type"`// }// _ = json.Unmarshal(body, &ev)// // Handle ev.EventType, dedupe on ev.EventID, ...// w.WriteHeader(http.StatusNoContent)// }# frozen_string_literal: true
# Verify a Puck webhook signature in Ruby. No external gems required.## Use Rack's `request.body.read` (rewind first if you need to re-read) to# get the raw bytes. Do NOT use `params.to_json` — the bytes Puck signed# are what came over the wire, not anything you re-encoded.
require "openssl"
module PuckWebhook TOLERANCE_SECONDS = 5 * 60
module_function
def verify(header, raw_body, secret) return false if header.nil? || header.empty?
timestamp = nil sigs = [] header.split(",").each do |part| k, v = part.strip.split("=", 2) return false if v.nil?
case k when "t" timestamp = Integer(v) rescue (return false) when "v1" sigs << v end end return false if timestamp.nil? || sigs.empty?
now = Time.now.to_i return false if (now - timestamp).abs > TOLERANCE_SECONDS
expected = OpenSSL::HMAC.hexdigest( OpenSSL::Digest.new("sha256"), secret, "#{timestamp}.#{raw_body}", )
sigs.any? { |got| secure_compare(got, expected) } end
# Constant-time string comparison. def secure_compare(a, b) return false unless a.bytesize == b.bytesize
l = a.unpack("C*") r = 0 b.each_byte { |byte| r |= byte ^ l.shift } r.zero? endend
# Sinatra example:## require "sinatra"# require "json"## post "/puck/events" do# raw = request.body.read# unless PuckWebhook.verify(request.env["HTTP_X_PUCK_SIGNATURE"], raw, ENV.fetch("PUCK_WEBHOOK_SECRET"))# halt 401, "invalid signature"# end# event = JSON.parse(raw)# # Handle event["event_type"], dedupe on event["event_id"], ...# status 204# end//! Verify Puck webhook signatures in Rust.//!//! Add to Cargo.toml://! hmac = "0.12"//! sha2 = "0.10"//! subtle = "2" // for constant-time compare//!//! Drop into any HTTP framework. The handler must capture the raw body//! before deserializing — the HMAC covers the exact bytes Puck signed.
use std::time::{SystemTime, UNIX_EPOCH};
use hmac::{Hmac, Mac};use sha2::Sha256;use subtle::ConstantTimeEq;
const TOLERANCE_SECONDS: i64 = 5 * 60;
#[derive(Debug, PartialEq, Eq)]pub enum VerifyError { MissingHeader, Malformed, StaleTimestamp, SignatureMismatch,}
pub fn verify( header: Option<&str>, raw_body: &[u8], secret: &[u8],) -> Result<(), VerifyError> { let header = header.ok_or(VerifyError::MissingHeader)?;
let mut timestamp: Option<i64> = None; let mut sigs: Vec<&str> = Vec::new(); for part in header.split(',') { let part = part.trim(); let mut kv = part.splitn(2, '='); let k = kv.next().ok_or(VerifyError::Malformed)?; let v = kv.next().ok_or(VerifyError::Malformed)?; match k { "t" => { timestamp = Some(v.parse().map_err(|_| VerifyError::Malformed)?); } "v1" => sigs.push(v), _ => {} } } let timestamp = timestamp.ok_or(VerifyError::Malformed)?; if sigs.is_empty() { return Err(VerifyError::Malformed); }
let now = SystemTime::now() .duration_since(UNIX_EPOCH) .map_err(|_| VerifyError::Malformed)? .as_secs() as i64; if (now - timestamp).abs() > TOLERANCE_SECONDS { return Err(VerifyError::StaleTimestamp); }
let mut mac = <Hmac<Sha256>>::new_from_slice(secret).map_err(|_| VerifyError::Malformed)?; mac.update(format!("{timestamp}.").as_bytes()); mac.update(raw_body); let expected = mac.finalize().into_bytes(); let expected_hex = hex_encode(&expected);
for got in &sigs { if got.as_bytes().ct_eq(expected_hex.as_bytes()).into() { return Ok(()); } } Err(VerifyError::SignatureMismatch)}
fn hex_encode(bytes: &[u8]) -> String { let mut out = String::with_capacity(bytes.len() * 2); for b in bytes { use std::fmt::Write; write!(&mut out, "{b:02x}").unwrap(); } out}
// Axum example://// use axum::body::Bytes;// use axum::http::HeaderMap;//// async fn puck_events(headers: HeaderMap, body: Bytes) -> impl IntoResponse {// let sig = headers.get("X-Puck-Signature").and_then(|v| v.to_str().ok());// let secret = std::env::var("PUCK_WEBHOOK_SECRET").unwrap();// if verify(sig, &body, secret.as_bytes()).is_err() {// return (StatusCode::UNAUTHORIZED, "invalid signature").into_response();// }// // Handle event...// StatusCode::NO_CONTENT.into_response()// }Replay protection
The t field in the header is also embedded in the HMAC input, which means an attacker cannot replay a captured delivery after the tolerance window expires: the timestamp would be outside the 5-minute window, and any modification to t would invalidate the HMAC.
The tolerance of 5 minutes accommodates reasonable clock skew between Puck’s servers and your handler. If your environment has tighter requirements, use a shorter window — but make sure your system clock is synchronized (NTP).
Secret rotation
To rotate a subscription’s secret without dropping deliveries:
- Call
PATCH /v1/webhooks/subscriptions/:idwith a newsecret. - Update your handler to accept signatures from both the old and new secret during a transition window (for example, 5 minutes).
- Remove the old secret from your handler.
The subscription detail in the console (Settings → Webhooks → subscription name) provides a Rotate secret button that generates a new secret server-side and returns it once.
Related
- Subscriptions — managing the secret lifecycle
- Event types — what the
datapayload looks like for each event - Retries & deliveries — how failed deliveries are retried