Webhook Integration

This tutorial guides you through building a webhook consumer that receives incident events from the Incidents platform. You’ll learn how to parse CloudEvents, verify HMAC signatures, and handle events reliably.

Overview

The Incidents platform publishes incident timeline events to registered webhook subscribers. These webhooks enable real-time integration with:

  • Custom notification systems
  • Analytics dashboards
  • Automation workflows
  • Third-party ITSM platforms
  • Security monitoring tools

What You’ll Build

By the end of this tutorial, you’ll have a webhook receiver that:

  1. Accepts HTTP POST requests from the Incidents platform
  2. Verifies HMAC signatures for security
  3. Parses CloudEvents payloads
  4. Processes incident events based on type
  5. Returns appropriate HTTP status codes

Prerequisites

  • An Incidents platform instance with event publishing enabled
  • One of: Go 1.21+, Python 3.9+, or Node.js 18+
  • A publicly accessible URL for your webhook (or ngrok for local testing)

Quick Start

1. Register Your Webhook

First, register your webhook endpoint with the Incidents platform:

# Generate a secure secret
SECRET=$(openssl rand -hex 32)
echo "Your webhook secret: $SECRET"

# Register the subscriber
im subscriber add \
  --name my-integration \
  --url https://your-server.example.com/webhook \
  --types "im.incident.**" \
  --secret "$SECRET"

# Verify registration
im subscriber list

2. Build Your Webhook Receiver

Choose your preferred language below.

Go Implementation

Complete Webhook Server

package main

import (
	"crypto/hmac"
	"crypto/sha256"
	"encoding/hex"
	"encoding/json"
	"fmt"
	"io"
	"log/slog"
	"net/http"
	"os"
	"strings"
	"time"
)

// CloudEvent represents a CloudEvents v1.0 event
type CloudEvent struct {
	SpecVersion     string                 `json:"specversion"`
	ID              string                 `json:"id"`
	Source          string                 `json:"source"`
	Type            string                 `json:"type"`
	Time            time.Time              `json:"time"`
	DataContentType string                 `json:"datacontenttype,omitempty"`
	Data            map[string]interface{} `json:"data,omitempty"`

	// Incidents platform extensions
	IncidentID string `json:"imincidentid,omitempty"`
}

// WebhookHandler handles incoming webhook requests
type WebhookHandler struct {
	secret string
	logger *slog.Logger
}

func NewWebhookHandler(secret string) *WebhookHandler {
	return &WebhookHandler{
		secret: secret,
		logger: slog.New(slog.NewJSONHandler(os.Stdout, nil)),
	}
}

func (h *WebhookHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
	// Only accept POST requests
	if r.Method != http.MethodPost {
		http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
		return
	}

	// Read the request body
	body, err := io.ReadAll(r.Body)
	if err != nil {
		h.logger.Error("Failed to read request body", "error", err)
		http.Error(w, "Failed to read body", http.StatusBadRequest)
		return
	}
	defer r.Body.Close()

	// Verify the signature if secret is configured
	if h.secret != "" {
		if !h.verifySignature(body, r.Header.Get("X-CloudEvents-Signature")) {
			h.logger.Warn("Invalid webhook signature")
			http.Error(w, "Invalid signature", http.StatusUnauthorized)
			return
		}
	}

	// Parse the CloudEvent
	var event CloudEvent
	if err := json.Unmarshal(body, &event); err != nil {
		h.logger.Error("Failed to parse CloudEvent", "error", err)
		http.Error(w, "Invalid CloudEvent", http.StatusBadRequest)
		return
	}

	// Process the event
	if err := h.processEvent(&event); err != nil {
		h.logger.Error("Failed to process event",
			"event_id", event.ID,
			"event_type", event.Type,
			"error", err,
		)
		// Return 500 to trigger retry
		http.Error(w, "Processing failed", http.StatusInternalServerError)
		return
	}

	// Success - acknowledge the event
	w.WriteHeader(http.StatusOK)
	fmt.Fprintf(w, `{"status":"received","event_id":"%s"}`, event.ID)
}

func (h *WebhookHandler) verifySignature(body []byte, signatureHeader string) bool {
	if !strings.HasPrefix(signatureHeader, "sha256=") {
		return false
	}

	expectedSig := strings.TrimPrefix(signatureHeader, "sha256=")

	mac := hmac.New(sha256.New, []byte(h.secret))
	mac.Write(body)
	actualSig := hex.EncodeToString(mac.Sum(nil))

	return hmac.Equal([]byte(expectedSig), []byte(actualSig))
}

func (h *WebhookHandler) processEvent(event *CloudEvent) error {
	h.logger.Info("Received incident event",
		"event_id", event.ID,
		"event_type", event.Type,
		"incident_id", event.IncidentID,
		"source", event.Source,
	)

	// Route by event type
	switch event.Type {
	case "im.incident.declared.v1":
		return h.handleIncidentDeclared(event)
	case "im.incident.acknowledged.v1":
		return h.handleIncidentAcknowledged(event)
	case "im.incident.resolved.v1":
		return h.handleIncidentResolved(event)
	case "im.incident.closed.v1":
		return h.handleIncidentClosed(event)
	default:
		h.logger.Debug("Ignoring unhandled event type", "type", event.Type)
		return nil
	}
}

func (h *WebhookHandler) handleIncidentDeclared(event *CloudEvent) error {
	title, _ := event.Data["title"].(string)
	severity, _ := event.Data["severity"].(string)
	actor, _ := event.Data["actor"].(string)

	h.logger.Info("New incident declared",
		"incident_id", event.IncidentID,
		"title", title,
		"severity", severity,
		"declared_by", actor,
	)

	// TODO: Implement your integration logic here
	// Examples:
	// - Send Slack notification
	// - Create ticket in external system
	// - Trigger automation workflow

	return nil
}

func (h *WebhookHandler) handleIncidentAcknowledged(event *CloudEvent) error {
	actor, _ := event.Data["actor"].(string)

	h.logger.Info("Incident acknowledged",
		"incident_id", event.IncidentID,
		"acknowledged_by", actor,
	)

	return nil
}

func (h *WebhookHandler) handleIncidentResolved(event *CloudEvent) error {
	actor, _ := event.Data["actor"].(string)

	h.logger.Info("Incident resolved",
		"incident_id", event.IncidentID,
		"resolved_by", actor,
	)

	return nil
}

func (h *WebhookHandler) handleIncidentClosed(event *CloudEvent) error {
	h.logger.Info("Incident closed", "incident_id", event.IncidentID)
	return nil
}

func main() {
	// Get configuration from environment
	secret := os.Getenv("WEBHOOK_SECRET")
	port := os.Getenv("PORT")
	if port == "" {
		port = "8080"
	}

	handler := NewWebhookHandler(secret)

	mux := http.NewServeMux()
	mux.Handle("/webhook", handler)
	mux.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
		w.WriteHeader(http.StatusOK)
		fmt.Fprint(w, "OK")
	})

	slog.Info("Starting webhook server", "port", port)
	if err := http.ListenAndServe(":"+port, mux); err != nil {
		slog.Error("Server failed", "error", err)
		os.Exit(1)
	}
}

Run the Go Server

# Set your webhook secret (from registration step)
export WEBHOOK_SECRET="your-secret-here"

# Run the server
go run main.go

Python Implementation

Complete Webhook Server

#!/usr/bin/env python3
"""Webhook receiver for Incidents platform CloudEvents."""

import hashlib
import hmac
import json
import logging
import os
from dataclasses import dataclass
from datetime import datetime
from typing import Any, Optional

from flask import Flask, request, jsonify

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s %(levelname)s %(message)s'
)
logger = logging.getLogger(__name__)

app = Flask(__name__)


@dataclass
class CloudEvent:
    """Represents a CloudEvents v1.0 event."""
    specversion: str
    id: str
    source: str
    type: str
    time: datetime
    data: dict[str, Any]
    incident_id: Optional[str] = None
    data_content_type: Optional[str] = None

    @classmethod
    def from_json(cls, payload: dict) -> 'CloudEvent':
        """Parse CloudEvent from JSON payload."""
        return cls(
            specversion=payload.get('specversion', '1.0'),
            id=payload['id'],
            source=payload['source'],
            type=payload['type'],
            time=datetime.fromisoformat(payload['time'].replace('Z', '+00:00')),
            data=payload.get('data', {}),
            incident_id=payload.get('imincidentid'),
            data_content_type=payload.get('datacontenttype'),
        )


def verify_signature(payload: bytes, secret: str, signature_header: str) -> bool:
    """Verify the HMAC-SHA256 signature of the webhook payload."""
    if not signature_header.startswith('sha256='):
        return False

    expected_sig = signature_header[7:]  # Remove 'sha256=' prefix

    actual_sig = hmac.new(
        secret.encode('utf-8'),
        payload,
        hashlib.sha256
    ).hexdigest()

    return hmac.compare_digest(expected_sig, actual_sig)


def process_event(event: CloudEvent) -> None:
    """Process the CloudEvent based on its type."""
    logger.info(
        "Received event",
        extra={
            'event_id': event.id,
            'event_type': event.type,
            'incident_id': event.incident_id,
        }
    )

    handlers = {
        'im.incident.declared.v1': handle_incident_declared,
        'im.incident.acknowledged.v1': handle_incident_acknowledged,
        'im.incident.resolved.v1': handle_incident_resolved,
        'im.incident.closed.v1': handle_incident_closed,
    }

    handler = handlers.get(event.type)
    if handler:
        handler(event)
    else:
        logger.debug(f"Ignoring unhandled event type: {event.type}")


def handle_incident_declared(event: CloudEvent) -> None:
    """Handle new incident declaration."""
    title = event.data.get('title', 'Unknown')
    severity = event.data.get('severity', 'Unknown')
    actor = event.data.get('actor', 'Unknown')

    logger.info(
        f"New incident declared: {title}",
        extra={
            'incident_id': event.incident_id,
            'severity': severity,
            'declared_by': actor,
        }
    )

    # TODO: Implement your integration logic here
    # Examples:
    # - Send Slack notification
    # - Create ticket in external system
    # - Trigger automation workflow


def handle_incident_acknowledged(event: CloudEvent) -> None:
    """Handle incident acknowledgment."""
    actor = event.data.get('actor', 'Unknown')
    logger.info(
        f"Incident {event.incident_id} acknowledged by {actor}"
    )


def handle_incident_resolved(event: CloudEvent) -> None:
    """Handle incident resolution."""
    actor = event.data.get('actor', 'Unknown')
    logger.info(
        f"Incident {event.incident_id} resolved by {actor}"
    )


def handle_incident_closed(event: CloudEvent) -> None:
    """Handle incident closure."""
    logger.info(f"Incident {event.incident_id} closed")


@app.route('/webhook', methods=['POST'])
def webhook():
    """Handle incoming webhook requests."""
    # Get the raw body for signature verification
    body = request.get_data()

    # Verify signature if secret is configured
    secret = os.environ.get('WEBHOOK_SECRET')
    if secret:
        signature = request.headers.get('X-CloudEvents-Signature', '')
        if not verify_signature(body, secret, signature):
            logger.warning("Invalid webhook signature")
            return jsonify({'error': 'Invalid signature'}), 401

    # Parse the CloudEvent
    try:
        payload = json.loads(body)
        event = CloudEvent.from_json(payload)
    except (json.JSONDecodeError, KeyError) as e:
        logger.error(f"Failed to parse CloudEvent: {e}")
        return jsonify({'error': 'Invalid CloudEvent'}), 400

    # Process the event
    try:
        process_event(event)
    except Exception as e:
        logger.exception(f"Failed to process event {event.id}")
        # Return 500 to trigger retry
        return jsonify({'error': 'Processing failed'}), 500

    return jsonify({'status': 'received', 'event_id': event.id}), 200


@app.route('/health', methods=['GET'])
def health():
    """Health check endpoint."""
    return 'OK', 200


if __name__ == '__main__':
    port = int(os.environ.get('PORT', 8080))
    logger.info(f"Starting webhook server on port {port}")
    app.run(host='0.0.0.0', port=port)

Run the Python Server

# Install dependencies
pip install flask

# Set your webhook secret
export WEBHOOK_SECRET="your-secret-here"

# Run the server
python webhook_server.py

Node.js Implementation

Complete Webhook Server

const express = require("express");
const crypto = require("crypto");

const app = express();
const port = process.env.PORT || 8080;
const secret = process.env.WEBHOOK_SECRET;

// Parse raw body for signature verification
app.use("/webhook", express.raw({ type: "application/cloudevents+json" }));
app.use(express.json());

/**
 * Verify the HMAC-SHA256 signature of the webhook payload.
 */
function verifySignature(body, signatureHeader) {
  if (!signatureHeader || !signatureHeader.startsWith("sha256=")) {
    return false;
  }

  const expectedSig = signatureHeader.slice(7);
  const actualSig = crypto.createHmac("sha256", secret).update(body).digest("hex");

  try {
    return crypto.timingSafeEqual(Buffer.from(expectedSig), Buffer.from(actualSig));
  } catch {
    return false;
  }
}

/**
 * Parse CloudEvent from JSON payload.
 */
function parseCloudEvent(payload) {
  const data = JSON.parse(payload);
  return {
    specversion: data.specversion || "1.0",
    id: data.id,
    source: data.source,
    type: data.type,
    time: new Date(data.time),
    data: data.data || {},
    incidentId: data.imincidentid,
    dataContentType: data.datacontenttype
  };
}

/**
 * Process the CloudEvent based on its type.
 */
function processEvent(event) {
  console.log("Received event:", {
    eventId: event.id,
    eventType: event.type,
    incidentId: event.incidentId
  });

  const handlers = {
    "im.incident.declared.v1": handleIncidentDeclared,
    "im.incident.acknowledged.v1": handleIncidentAcknowledged,
    "im.incident.resolved.v1": handleIncidentResolved,
    "im.incident.closed.v1": handleIncidentClosed
  };

  const handler = handlers[event.type];
  if (handler) {
    handler(event);
  } else {
    console.debug(`Ignoring unhandled event type: ${event.type}`);
  }
}

function handleIncidentDeclared(event) {
  const { title, severity, actor } = event.data;
  console.log("New incident declared:", {
    incidentId: event.incidentId,
    title,
    severity,
    declaredBy: actor
  });

  // TODO: Implement your integration logic here
  // Examples:
  // - Send Slack notification
  // - Create ticket in external system
  // - Trigger automation workflow
}

function handleIncidentAcknowledged(event) {
  console.log(`Incident ${event.incidentId} acknowledged by ${event.data.actor}`);
}

function handleIncidentResolved(event) {
  console.log(`Incident ${event.incidentId} resolved by ${event.data.actor}`);
}

function handleIncidentClosed(event) {
  console.log(`Incident ${event.incidentId} closed`);
}

// Webhook endpoint
app.post("/webhook", (req, res) => {
  const body = req.body;

  // Verify signature if secret is configured
  if (secret) {
    const signature = req.get("X-CloudEvents-Signature") || "";
    if (!verifySignature(body, signature)) {
      console.warn("Invalid webhook signature");
      return res.status(401).json({ error: "Invalid signature" });
    }
  }

  // Parse the CloudEvent
  let event;
  try {
    event = parseCloudEvent(body);
  } catch (err) {
    console.error("Failed to parse CloudEvent:", err);
    return res.status(400).json({ error: "Invalid CloudEvent" });
  }

  // Process the event
  try {
    processEvent(event);
  } catch (err) {
    console.error(`Failed to process event ${event.id}:`, err);
    // Return 500 to trigger retry
    return res.status(500).json({ error: "Processing failed" });
  }

  res.json({ status: "received", event_id: event.id });
});

// Health check endpoint
app.get("/health", (req, res) => {
  res.send("OK");
});

app.listen(port, () => {
  console.log(`Webhook server listening on port ${port}`);
});

Run the Node.js Server

# Install dependencies
npm install express

# Set your webhook secret
export WEBHOOK_SECRET="your-secret-here"

# Run the server
node webhook_server.js

Testing Your Webhook

Local Testing with ngrok

For local development, use ngrok to expose your webhook:

# Start your webhook server
./webhook-server

# In another terminal, expose it via ngrok
ngrok http 8080

# Note the HTTPS URL (e.g., https://abc123.ngrok.io)

Register with the ngrok URL:

im subscriber add \
  --name local-test \
  --url https://abc123.ngrok.io/webhook \
  --types "im.incident.**" \
  --secret "$SECRET"

Trigger a Test Event

Create an incident to trigger webhook delivery:

# Declare a test incident
im declare --title "Test incident for webhook" --severity SEV-4

# You should see the event in your webhook server logs

Verify Event Delivery

Check subscriber health to confirm delivery:

im subscriber health

Best Practices

Security

  1. Always verify signatures in production
  2. Use HTTPS for webhook endpoints
  3. Rotate secrets periodically using im subscriber update
  4. Log security events (signature failures, unauthorized access)

Reliability

  1. Handle duplicates: Events may be retried. Use the event ID for idempotency.
  2. Respond quickly: Keep processing under 10 seconds
  3. Queue for async processing: For long-running tasks, queue and acknowledge immediately
@app.route('/webhook', methods=['POST'])
def webhook():
    # ... signature verification ...

    event = CloudEvent.from_json(payload)

    # Queue for async processing
    task_queue.enqueue(process_event, event)

    # Acknowledge immediately
    return jsonify({'status': 'queued', 'event_id': event.id}), 200
  1. Return appropriate status codes:
    • 200-299: Success (event delivered)
    • 4xx: Permanent failure (won’t retry)
    • 5xx: Transient failure (will retry)

Observability

  1. Log event IDs: Include the CloudEvents id in all log messages
  2. Track processing time: Monitor webhook processing latency
  3. Alert on failures: Set up alerts for high failure rates
import time

def process_event(event):
    start = time.time()
    try:
        # ... process event ...
        duration = time.time() - start
        logger.info(f"Event {event.id} processed in {duration:.3f}s")
    except Exception as e:
        duration = time.time() - start
        logger.error(f"Event {event.id} failed after {duration:.3f}s: {e}")
        raise

Troubleshooting

Webhook Not Receiving Events

  1. Check subscriber registration:

    im subscriber list
  2. Verify event type filters match:

    # Update to receive all events
    im subscriber update my-integration --types "im.**"
  3. Check subscriber health:

    im subscriber health

Signature Verification Failures

  1. Verify secret matches:

    • The secret cannot be retrieved after creation
    • Update with new secret: im subscriber update <name> --secret <new-secret>
  2. Check signature header:

    • Header name: X-CloudEvents-Signature
    • Format: sha256=<hex-encoded-signature>
  3. Verify raw body:

    • Don’t parse JSON before computing signature
    • Use raw request bytes

Events Being Retried

  1. Check your response code:

    • Return 2xx for success
    • 5xx triggers retry
  2. Check processing time:

    • Events timeout after 10 seconds
    • Process quickly or queue async
  3. Check error logs:

    im logs --component outbox-worker

Next Steps