Webhook Integration
This tutorial guides you through building a webhook consumer that receives incident events from the Incidents platform. You’ll learn how to parse CloudEvents, verify HMAC signatures, and handle events reliably.
Overview
The Incidents platform publishes incident timeline events to registered webhook subscribers. These webhooks enable real-time integration with:
- Custom notification systems
- Analytics dashboards
- Automation workflows
- Third-party ITSM platforms
- Security monitoring tools
What You’ll Build
By the end of this tutorial, you’ll have a webhook receiver that:
- Accepts HTTP POST requests from the Incidents platform
- Verifies HMAC signatures for security
- Parses CloudEvents payloads
- Processes incident events based on type
- Returns appropriate HTTP status codes
Prerequisites
- An Incidents platform instance with event publishing enabled
- One of: Go 1.21+, Python 3.9+, or Node.js 18+
- A publicly accessible URL for your webhook (or ngrok for local testing)
Quick Start
1. Register Your Webhook
First, register your webhook endpoint with the Incidents platform:
# Generate a secure secret
SECRET=$(openssl rand -hex 32)
echo "Your webhook secret: $SECRET"
# Register the subscriber
im subscriber add \
--name my-integration \
--url https://your-server.example.com/webhook \
--types "im.incident.**" \
--secret "$SECRET"
# Verify registration
im subscriber list2. Build Your Webhook Receiver
Choose your preferred language below.
Go Implementation
Complete Webhook Server
package main
import (
"crypto/hmac"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"log/slog"
"net/http"
"os"
"strings"
"time"
)
// CloudEvent represents a CloudEvents v1.0 event
type CloudEvent struct {
SpecVersion string `json:"specversion"`
ID string `json:"id"`
Source string `json:"source"`
Type string `json:"type"`
Time time.Time `json:"time"`
DataContentType string `json:"datacontenttype,omitempty"`
Data map[string]interface{} `json:"data,omitempty"`
// Incidents platform extensions
IncidentID string `json:"imincidentid,omitempty"`
}
// WebhookHandler handles incoming webhook requests
type WebhookHandler struct {
secret string
logger *slog.Logger
}
func NewWebhookHandler(secret string) *WebhookHandler {
return &WebhookHandler{
secret: secret,
logger: slog.New(slog.NewJSONHandler(os.Stdout, nil)),
}
}
func (h *WebhookHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Only accept POST requests
if r.Method != http.MethodPost {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
// Read the request body
body, err := io.ReadAll(r.Body)
if err != nil {
h.logger.Error("Failed to read request body", "error", err)
http.Error(w, "Failed to read body", http.StatusBadRequest)
return
}
defer r.Body.Close()
// Verify the signature if secret is configured
if h.secret != "" {
if !h.verifySignature(body, r.Header.Get("X-CloudEvents-Signature")) {
h.logger.Warn("Invalid webhook signature")
http.Error(w, "Invalid signature", http.StatusUnauthorized)
return
}
}
// Parse the CloudEvent
var event CloudEvent
if err := json.Unmarshal(body, &event); err != nil {
h.logger.Error("Failed to parse CloudEvent", "error", err)
http.Error(w, "Invalid CloudEvent", http.StatusBadRequest)
return
}
// Process the event
if err := h.processEvent(&event); err != nil {
h.logger.Error("Failed to process event",
"event_id", event.ID,
"event_type", event.Type,
"error", err,
)
// Return 500 to trigger retry
http.Error(w, "Processing failed", http.StatusInternalServerError)
return
}
// Success - acknowledge the event
w.WriteHeader(http.StatusOK)
fmt.Fprintf(w, `{"status":"received","event_id":"%s"}`, event.ID)
}
func (h *WebhookHandler) verifySignature(body []byte, signatureHeader string) bool {
if !strings.HasPrefix(signatureHeader, "sha256=") {
return false
}
expectedSig := strings.TrimPrefix(signatureHeader, "sha256=")
mac := hmac.New(sha256.New, []byte(h.secret))
mac.Write(body)
actualSig := hex.EncodeToString(mac.Sum(nil))
return hmac.Equal([]byte(expectedSig), []byte(actualSig))
}
func (h *WebhookHandler) processEvent(event *CloudEvent) error {
h.logger.Info("Received incident event",
"event_id", event.ID,
"event_type", event.Type,
"incident_id", event.IncidentID,
"source", event.Source,
)
// Route by event type
switch event.Type {
case "im.incident.declared.v1":
return h.handleIncidentDeclared(event)
case "im.incident.acknowledged.v1":
return h.handleIncidentAcknowledged(event)
case "im.incident.resolved.v1":
return h.handleIncidentResolved(event)
case "im.incident.closed.v1":
return h.handleIncidentClosed(event)
default:
h.logger.Debug("Ignoring unhandled event type", "type", event.Type)
return nil
}
}
func (h *WebhookHandler) handleIncidentDeclared(event *CloudEvent) error {
title, _ := event.Data["title"].(string)
severity, _ := event.Data["severity"].(string)
actor, _ := event.Data["actor"].(string)
h.logger.Info("New incident declared",
"incident_id", event.IncidentID,
"title", title,
"severity", severity,
"declared_by", actor,
)
// TODO: Implement your integration logic here
// Examples:
// - Send Slack notification
// - Create ticket in external system
// - Trigger automation workflow
return nil
}
func (h *WebhookHandler) handleIncidentAcknowledged(event *CloudEvent) error {
actor, _ := event.Data["actor"].(string)
h.logger.Info("Incident acknowledged",
"incident_id", event.IncidentID,
"acknowledged_by", actor,
)
return nil
}
func (h *WebhookHandler) handleIncidentResolved(event *CloudEvent) error {
actor, _ := event.Data["actor"].(string)
h.logger.Info("Incident resolved",
"incident_id", event.IncidentID,
"resolved_by", actor,
)
return nil
}
func (h *WebhookHandler) handleIncidentClosed(event *CloudEvent) error {
h.logger.Info("Incident closed", "incident_id", event.IncidentID)
return nil
}
func main() {
// Get configuration from environment
secret := os.Getenv("WEBHOOK_SECRET")
port := os.Getenv("PORT")
if port == "" {
port = "8080"
}
handler := NewWebhookHandler(secret)
mux := http.NewServeMux()
mux.Handle("/webhook", handler)
mux.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
fmt.Fprint(w, "OK")
})
slog.Info("Starting webhook server", "port", port)
if err := http.ListenAndServe(":"+port, mux); err != nil {
slog.Error("Server failed", "error", err)
os.Exit(1)
}
}Run the Go Server
# Set your webhook secret (from registration step)
export WEBHOOK_SECRET="your-secret-here"
# Run the server
go run main.goPython Implementation
Complete Webhook Server
#!/usr/bin/env python3
"""Webhook receiver for Incidents platform CloudEvents."""
import hashlib
import hmac
import json
import logging
import os
from dataclasses import dataclass
from datetime import datetime
from typing import Any, Optional
from flask import Flask, request, jsonify
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s %(levelname)s %(message)s'
)
logger = logging.getLogger(__name__)
app = Flask(__name__)
@dataclass
class CloudEvent:
"""Represents a CloudEvents v1.0 event."""
specversion: str
id: str
source: str
type: str
time: datetime
data: dict[str, Any]
incident_id: Optional[str] = None
data_content_type: Optional[str] = None
@classmethod
def from_json(cls, payload: dict) -> 'CloudEvent':
"""Parse CloudEvent from JSON payload."""
return cls(
specversion=payload.get('specversion', '1.0'),
id=payload['id'],
source=payload['source'],
type=payload['type'],
time=datetime.fromisoformat(payload['time'].replace('Z', '+00:00')),
data=payload.get('data', {}),
incident_id=payload.get('imincidentid'),
data_content_type=payload.get('datacontenttype'),
)
def verify_signature(payload: bytes, secret: str, signature_header: str) -> bool:
"""Verify the HMAC-SHA256 signature of the webhook payload."""
if not signature_header.startswith('sha256='):
return False
expected_sig = signature_header[7:] # Remove 'sha256=' prefix
actual_sig = hmac.new(
secret.encode('utf-8'),
payload,
hashlib.sha256
).hexdigest()
return hmac.compare_digest(expected_sig, actual_sig)
def process_event(event: CloudEvent) -> None:
"""Process the CloudEvent based on its type."""
logger.info(
"Received event",
extra={
'event_id': event.id,
'event_type': event.type,
'incident_id': event.incident_id,
}
)
handlers = {
'im.incident.declared.v1': handle_incident_declared,
'im.incident.acknowledged.v1': handle_incident_acknowledged,
'im.incident.resolved.v1': handle_incident_resolved,
'im.incident.closed.v1': handle_incident_closed,
}
handler = handlers.get(event.type)
if handler:
handler(event)
else:
logger.debug(f"Ignoring unhandled event type: {event.type}")
def handle_incident_declared(event: CloudEvent) -> None:
"""Handle new incident declaration."""
title = event.data.get('title', 'Unknown')
severity = event.data.get('severity', 'Unknown')
actor = event.data.get('actor', 'Unknown')
logger.info(
f"New incident declared: {title}",
extra={
'incident_id': event.incident_id,
'severity': severity,
'declared_by': actor,
}
)
# TODO: Implement your integration logic here
# Examples:
# - Send Slack notification
# - Create ticket in external system
# - Trigger automation workflow
def handle_incident_acknowledged(event: CloudEvent) -> None:
"""Handle incident acknowledgment."""
actor = event.data.get('actor', 'Unknown')
logger.info(
f"Incident {event.incident_id} acknowledged by {actor}"
)
def handle_incident_resolved(event: CloudEvent) -> None:
"""Handle incident resolution."""
actor = event.data.get('actor', 'Unknown')
logger.info(
f"Incident {event.incident_id} resolved by {actor}"
)
def handle_incident_closed(event: CloudEvent) -> None:
"""Handle incident closure."""
logger.info(f"Incident {event.incident_id} closed")
@app.route('/webhook', methods=['POST'])
def webhook():
"""Handle incoming webhook requests."""
# Get the raw body for signature verification
body = request.get_data()
# Verify signature if secret is configured
secret = os.environ.get('WEBHOOK_SECRET')
if secret:
signature = request.headers.get('X-CloudEvents-Signature', '')
if not verify_signature(body, secret, signature):
logger.warning("Invalid webhook signature")
return jsonify({'error': 'Invalid signature'}), 401
# Parse the CloudEvent
try:
payload = json.loads(body)
event = CloudEvent.from_json(payload)
except (json.JSONDecodeError, KeyError) as e:
logger.error(f"Failed to parse CloudEvent: {e}")
return jsonify({'error': 'Invalid CloudEvent'}), 400
# Process the event
try:
process_event(event)
except Exception as e:
logger.exception(f"Failed to process event {event.id}")
# Return 500 to trigger retry
return jsonify({'error': 'Processing failed'}), 500
return jsonify({'status': 'received', 'event_id': event.id}), 200
@app.route('/health', methods=['GET'])
def health():
"""Health check endpoint."""
return 'OK', 200
if __name__ == '__main__':
port = int(os.environ.get('PORT', 8080))
logger.info(f"Starting webhook server on port {port}")
app.run(host='0.0.0.0', port=port)Run the Python Server
# Install dependencies
pip install flask
# Set your webhook secret
export WEBHOOK_SECRET="your-secret-here"
# Run the server
python webhook_server.pyNode.js Implementation
Complete Webhook Server
const express = require("express");
const crypto = require("crypto");
const app = express();
const port = process.env.PORT || 8080;
const secret = process.env.WEBHOOK_SECRET;
// Parse raw body for signature verification
app.use("/webhook", express.raw({ type: "application/cloudevents+json" }));
app.use(express.json());
/**
* Verify the HMAC-SHA256 signature of the webhook payload.
*/
function verifySignature(body, signatureHeader) {
if (!signatureHeader || !signatureHeader.startsWith("sha256=")) {
return false;
}
const expectedSig = signatureHeader.slice(7);
const actualSig = crypto.createHmac("sha256", secret).update(body).digest("hex");
try {
return crypto.timingSafeEqual(Buffer.from(expectedSig), Buffer.from(actualSig));
} catch {
return false;
}
}
/**
* Parse CloudEvent from JSON payload.
*/
function parseCloudEvent(payload) {
const data = JSON.parse(payload);
return {
specversion: data.specversion || "1.0",
id: data.id,
source: data.source,
type: data.type,
time: new Date(data.time),
data: data.data || {},
incidentId: data.imincidentid,
dataContentType: data.datacontenttype
};
}
/**
* Process the CloudEvent based on its type.
*/
function processEvent(event) {
console.log("Received event:", {
eventId: event.id,
eventType: event.type,
incidentId: event.incidentId
});
const handlers = {
"im.incident.declared.v1": handleIncidentDeclared,
"im.incident.acknowledged.v1": handleIncidentAcknowledged,
"im.incident.resolved.v1": handleIncidentResolved,
"im.incident.closed.v1": handleIncidentClosed
};
const handler = handlers[event.type];
if (handler) {
handler(event);
} else {
console.debug(`Ignoring unhandled event type: ${event.type}`);
}
}
function handleIncidentDeclared(event) {
const { title, severity, actor } = event.data;
console.log("New incident declared:", {
incidentId: event.incidentId,
title,
severity,
declaredBy: actor
});
// TODO: Implement your integration logic here
// Examples:
// - Send Slack notification
// - Create ticket in external system
// - Trigger automation workflow
}
function handleIncidentAcknowledged(event) {
console.log(`Incident ${event.incidentId} acknowledged by ${event.data.actor}`);
}
function handleIncidentResolved(event) {
console.log(`Incident ${event.incidentId} resolved by ${event.data.actor}`);
}
function handleIncidentClosed(event) {
console.log(`Incident ${event.incidentId} closed`);
}
// Webhook endpoint
app.post("/webhook", (req, res) => {
const body = req.body;
// Verify signature if secret is configured
if (secret) {
const signature = req.get("X-CloudEvents-Signature") || "";
if (!verifySignature(body, signature)) {
console.warn("Invalid webhook signature");
return res.status(401).json({ error: "Invalid signature" });
}
}
// Parse the CloudEvent
let event;
try {
event = parseCloudEvent(body);
} catch (err) {
console.error("Failed to parse CloudEvent:", err);
return res.status(400).json({ error: "Invalid CloudEvent" });
}
// Process the event
try {
processEvent(event);
} catch (err) {
console.error(`Failed to process event ${event.id}:`, err);
// Return 500 to trigger retry
return res.status(500).json({ error: "Processing failed" });
}
res.json({ status: "received", event_id: event.id });
});
// Health check endpoint
app.get("/health", (req, res) => {
res.send("OK");
});
app.listen(port, () => {
console.log(`Webhook server listening on port ${port}`);
});Run the Node.js Server
# Install dependencies
npm install express
# Set your webhook secret
export WEBHOOK_SECRET="your-secret-here"
# Run the server
node webhook_server.jsTesting Your Webhook
Local Testing with ngrok
For local development, use ngrok to expose your webhook:
# Start your webhook server
./webhook-server
# In another terminal, expose it via ngrok
ngrok http 8080
# Note the HTTPS URL (e.g., https://abc123.ngrok.io)Register with the ngrok URL:
im subscriber add \
--name local-test \
--url https://abc123.ngrok.io/webhook \
--types "im.incident.**" \
--secret "$SECRET"Trigger a Test Event
Create an incident to trigger webhook delivery:
# Declare a test incident
im declare --title "Test incident for webhook" --severity SEV-4
# You should see the event in your webhook server logsVerify Event Delivery
Check subscriber health to confirm delivery:
im subscriber healthBest Practices
Security
- Always verify signatures in production
- Use HTTPS for webhook endpoints
- Rotate secrets periodically using
im subscriber update - Log security events (signature failures, unauthorized access)
Reliability
- Handle duplicates: Events may be retried. Use the event ID for idempotency.
- Respond quickly: Keep processing under 10 seconds
- Queue for async processing: For long-running tasks, queue and acknowledge immediately
@app.route('/webhook', methods=['POST'])
def webhook():
# ... signature verification ...
event = CloudEvent.from_json(payload)
# Queue for async processing
task_queue.enqueue(process_event, event)
# Acknowledge immediately
return jsonify({'status': 'queued', 'event_id': event.id}), 200- Return appropriate status codes:
200-299: Success (event delivered)4xx: Permanent failure (won’t retry)5xx: Transient failure (will retry)
Observability
- Log event IDs: Include the CloudEvents
idin all log messages - Track processing time: Monitor webhook processing latency
- Alert on failures: Set up alerts for high failure rates
import time
def process_event(event):
start = time.time()
try:
# ... process event ...
duration = time.time() - start
logger.info(f"Event {event.id} processed in {duration:.3f}s")
except Exception as e:
duration = time.time() - start
logger.error(f"Event {event.id} failed after {duration:.3f}s: {e}")
raiseTroubleshooting
Webhook Not Receiving Events
-
Check subscriber registration:
im subscriber list -
Verify event type filters match:
# Update to receive all events im subscriber update my-integration --types "im.**" -
Check subscriber health:
im subscriber health
Signature Verification Failures
-
Verify secret matches:
- The secret cannot be retrieved after creation
- Update with new secret:
im subscriber update <name> --secret <new-secret>
-
Check signature header:
- Header name:
X-CloudEvents-Signature - Format:
sha256=<hex-encoded-signature>
- Header name:
-
Verify raw body:
- Don’t parse JSON before computing signature
- Use raw request bytes
Events Being Retried
-
Check your response code:
- Return 2xx for success
- 5xx triggers retry
-
Check processing time:
- Events timeout after 10 seconds
- Process quickly or queue async
-
Check error logs:
im logs --component outbox-worker
Next Steps
- Review the Event Distribution Guide for subscriber management
- See the Configuration Reference for server settings
- Explore the REST API Reference for programmatic access