Skip to content

Event Ingestion Architecture

Overview

The event ingestion layer is responsible for reliably capturing business events from distributed services and persisting them to Azure Data Lake Gen2 for analytics processing. This document details the Event Hub configuration, capture setup, and schema management strategies.

Ingestion Goals

  • Zero Data Loss: Guaranteed event capture with automatic retry
  • Schema Flexibility: Support for evolving event schemas
  • Cost Efficiency: Minimal operational overhead using managed services
  • Scalability: Handle 100K+ events/day without reconfiguration

Event Hub Architecture

Namespace Organization

graph TD
    subgraph "Event Hub Namespaces"
        NS1[CRM Namespace<br/>Standard Tier]
        NS2[Matching Namespace<br/>Standard Tier]
        NS3[Profile Namespace<br/>Standard Tier]
        NS4[Assessment Namespace<br/>Standard Tier]

        subgraph "CRM Event Hubs"
            EH1[contacts-events]
            EH2[opportunities-events]
            EH3[accounts-events]
        end

        subgraph "Matching Event Hubs"
            EH4[matches-events]
            EH5[iterations-events]
            EH6[declarations-events]
        end

        subgraph "Profile Event Hubs"
            EH7[profiles-events]
            EH8[skills-events]
            EH9[availability-events]
        end
    end

    NS1 --> EH1
    NS1 --> EH2
    NS1 --> EH3

    NS2 --> EH4
    NS2 --> EH5
    NS2 --> EH6

    NS3 --> EH7
    NS3 --> EH8
    NS3 --> EH9

Partitioning Strategy

Events are partitioned by aggregateId to ensure: - Order preservation per aggregate - Efficient parallel processing - Consistent consumer scaling

Partition Key: aggregateId
Partition Count: 4 (per Event Hub)
Retention: 7 days
Throughput Units: 2 (auto-scale to 10)

Event Hub Capture Configuration

Capture Settings

graph LR
    subgraph "Capture Configuration"
        INTERVAL[Time Window<br/>5 minutes]
        SIZE[Size Window<br/>100 MB]
        FORMAT[File Format<br/>Avro]
        COMPRESS[Compression<br/>Snappy]
    end

    subgraph "Output Path"
        CONTAINER[Container<br/>raw-events]
        PATH[Path Pattern<br/>{Namespace}/{EventHub}/{Year}/{Month}/{Day}/{Hour}]
        FILE[File Name<br/>{PartitionId}_{Offset}_{SequenceNumber}.avro]
    end

    INTERVAL --> PATH
    SIZE --> PATH
    FORMAT --> FILE
    COMPRESS --> FILE

Azure CLI Configuration

# Create Event Hub Namespace
az eventhubs namespace create \
  --name crm-events-namespace \
  --resource-group rg-metrics-platform \
  --location westeurope \
  --sku Standard \
  --enable-auto-inflate \
  --maximum-throughput-units 10

# Create Event Hub with Capture
az eventhubs eventhub create \
  --name contacts-events \
  --namespace-name crm-events-namespace \
  --resource-group rg-metrics-platform \
  --partition-count 4 \
  --message-retention 7

# Enable Capture to Data Lake
az eventhubs eventhub update \
  --name contacts-events \
  --namespace-name crm-events-namespace \
  --resource-group rg-metrics-platform \
  --enable-capture true \
  --capture-interval 300 \
  --capture-size-limit 104857600 \
  --destination-name EventHubArchive.AzureDataLake \
  --storage-account datalakemetricsplatform \
  --blob-container raw-events \
  --archive-name-format '{Namespace}/{EventHub}/{Year}/{Month}/{Day}/{Hour}/{PartitionId}_{Offset}_{SequenceNumber}.avro'

Event Schema Management

Base Event Contract

All events must conform to this base schema:

interface BusinessEvent {
  // Event Identification
  eventId: string;           // UUID v4
  timestamp: string;         // ISO 8601 UTC
  version: string;           // Schema version (e.g., "1.0")

  // Aggregate Information
  aggregateId: string;       // Unique identifier of the aggregate
  aggregateType: string;     // Type of aggregate (e.g., "Order", "Profile")
  eventType: string;         // Specific event type (e.g., "OrderCreated")

  // Event Metadata
  metadata: {
    correlationId: string;   // Trace across services
    causationId?: string;    // Previous event that caused this
    userId?: string;         // User who triggered the event
    source: string;          // Service that produced the event
    partitionKey: string;    // Event Hub partition key
    tags?: Record<string, string>; // Additional metadata
  };

  // Business Data
  payload: Record<string, any>; // Event-specific data
}

Schema Versioning Strategy

graph TD
    subgraph "Version Evolution"
        V1[Version 1.0<br/>Initial Schema]
        V1_1[Version 1.1<br/>Added Optional Field]
        V2[Version 2.0<br/>Breaking Change]
    end

    subgraph "Compatibility Rules"
        BACK[Backward Compatible<br/>- Add optional fields<br/>- Add default values]
        BREAK[Breaking Changes<br/>- Remove fields<br/>- Change types<br/>- Rename fields]
    end

    V1 -->|Compatible| V1_1
    V1 -->|Breaking| V2

    V1_1 --> BACK
    V2 --> BREAK

Event Examples

Contact Created Event

{
  "eventId": "550e8400-e29b-41d4-a716-446655440000",
  "timestamp": "2024-01-15T14:30:00Z",
  "version": "1.0",
  "aggregateId": "contact-123",
  "aggregateType": "Contact",
  "eventType": "ContactCreated",
  "metadata": {
    "correlationId": "660e8400-e29b-41d4-a716-446655440000",
    "userId": "user-456",
    "source": "CRMService",
    "partitionKey": "contact-123"
  },
  "payload": {
    "contactId": "contact-123",
    "firstName": "John",
    "lastName": "Doe",
    "email": "john.doe@example.com",
    "company": "Acme Corp",
    "createdAt": "2024-01-15T14:30:00Z"
  }
}

Match Iteration Added Event

{
  "eventId": "770e8400-e29b-41d4-a716-446655440000",
  "timestamp": "2024-01-15T15:45:00Z",
  "version": "1.0",
  "aggregateId": "match-789",
  "aggregateType": "Match",
  "eventType": "IterationAdded",
  "metadata": {
    "correlationId": "880e8400-e29b-41d4-a716-446655440000",
    "causationId": "550e8400-e29b-41d4-a716-446655440000",
    "userId": "user-789",
    "source": "MatchingService",
    "partitionKey": "match-789"
  },
  "payload": {
    "matchId": "match-789",
    "iterationId": "iteration-456",
    "iterationNumber": 2,
    "freelancerCount": 5,
    "status": "Active",
    "createdAt": "2024-01-15T15:45:00Z"
  }
}

Event Publishing

.NET Implementation

public interface IEventPublisher
{
    Task PublishAsync<T>(T businessEvent) where T : BusinessEvent;
}

public class EventHubPublisher : IEventPublisher
{
    private readonly EventHubProducerClient _client;
    private readonly ILogger<EventHubPublisher> _logger;

    public EventHubPublisher(string connectionString, string eventHubName)
    {
        _client = new EventHubProducerClient(connectionString, eventHubName);
    }

    public async Task PublishAsync<T>(T businessEvent) where T : BusinessEvent
    {
        try
        {
            // Serialize event
            var json = JsonSerializer.Serialize(businessEvent);
            var eventData = new EventData(Encoding.UTF8.GetBytes(json))
            {
                // Set partition key for ordering
                PartitionKey = businessEvent.Metadata.PartitionKey
            };

            // Add metadata as properties
            eventData.Properties["EventType"] = businessEvent.EventType;
            eventData.Properties["AggregateType"] = businessEvent.AggregateType;
            eventData.Properties["Version"] = businessEvent.Version;
            eventData.Properties["CorrelationId"] = businessEvent.Metadata.CorrelationId;

            // Send to Event Hub
            var batch = await _client.CreateBatchAsync(new CreateBatchOptions
            {
                PartitionKey = businessEvent.Metadata.PartitionKey
            });

            if (!batch.TryAdd(eventData))
            {
                throw new InvalidOperationException("Event too large for batch");
            }

            await _client.SendAsync(batch);

            _logger.LogInformation(
                "Published {EventType} for {AggregateType}:{AggregateId}",
                businessEvent.EventType,
                businessEvent.AggregateType,
                businessEvent.AggregateId
            );
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, 
                "Failed to publish {EventType} for {AggregateId}",
                businessEvent.EventType,
                businessEvent.AggregateId
            );
            throw;
        }
    }
}

Configuration in appsettings.json

{
  "EventHub": {
    "Namespaces": {
      "CRM": {
        "ConnectionString": "Endpoint=sb://crm-events.servicebus.windows.net/;SharedAccessKeyName=SendPolicy;SharedAccessKey=...",
        "EventHubs": {
          "Contacts": "contacts-events",
          "Opportunities": "opportunities-events",
          "Accounts": "accounts-events"
        }
      },
      "Matching": {
        "ConnectionString": "Endpoint=sb://matching-events.servicebus.windows.net/;SharedAccessKeyName=SendPolicy;SharedAccessKey=...",
        "EventHubs": {
          "Matches": "matches-events",
          "Iterations": "iterations-events",
          "Declarations": "declarations-events"
        }
      }
    },
    "RetryPolicy": {
      "MaxRetries": 3,
      "BackoffMultiplier": 2,
      "MaxBackoffSeconds": 30
    }
  }
}

Monitoring and Alerting

Key Metrics to Monitor

graph TD
    subgraph "Event Hub Metrics"
        M1[Incoming Messages<br/>Target: 1K-100K/day]
        M2[Capture Success Rate<br/>Target: >99.9%]
        M3[Throttled Requests<br/>Target: <1%]
        M4[Server Errors<br/>Target: <0.1%]
    end

    subgraph "Capture Metrics"
        C1[Files Created<br/>288/day per hub]
        C2[Capture Latency<br/>Target: <5 min]
        C3[File Size<br/>Target: 10-100MB]
        C4[Storage Usage<br/>Monitor growth]
    end

    subgraph "Alerts"
        A1[Capture Failure]
        A2[Throttling Detected]
        A3[High Latency]
        A4[Storage Quota]
    end

    M2 --> A1
    M3 --> A2
    C2 --> A3
    C4 --> A4

Azure Monitor Queries

// Event Hub Capture Success Rate
AzureDiagnostics
| where ResourceProvider == "MICROSOFT.EVENTHUB"
| where Category == "ArchiveLogs"
| summarize 
    SuccessCount = countif(ResultType == "Success"),
    FailureCount = countif(ResultType != "Success"),
    Total = count()
    by bin(TimeGenerated, 1h), EventHub_s
| extend SuccessRate = (SuccessCount * 100.0) / Total
| order by TimeGenerated desc

// Message Throughput
AzureMetrics
| where ResourceProvider == "MICROSOFT.EVENTHUB"
| where MetricName == "IncomingMessages"
| summarize 
    TotalMessages = sum(Total),
    AvgMessages = avg(Total)
    by bin(TimeGenerated, 1h), Resource
| order by TimeGenerated desc

// Capture File Sizes
StorageBlobLogs
| where OperationName == "PutBlob"
| where uri_s contains "raw-events"
| extend FileSize_MB = ResponseBodySize / 1048576
| summarize 
    AvgSize_MB = avg(FileSize_MB),
    MaxSize_MB = max(FileSize_MB),
    FileCount = count()
    by bin(TimeGenerated, 1h)

Troubleshooting

Common Issues and Solutions

Issue Symptoms Solution
Capture Not Working No files in Data Lake Check Event Hub capture settings, verify storage permissions
Throttling 429 errors, slow publishing Increase throughput units, implement exponential backoff
Large Events Batch send failures Split large payloads, increase batch size limits
Partition Imbalance Uneven processing Review partition key strategy, consider repartitioning
Schema Errors Deserialization failures Validate against schema, check version compatibility

Health Check Script

#!/bin/bash
# Event Hub Health Check

NAMESPACE="crm-events-namespace"
RESOURCE_GROUP="rg-metrics-platform"

echo "Checking Event Hub Namespace: $NAMESPACE"

# Check namespace status
az eventhubs namespace show \
  --name $NAMESPACE \
  --resource-group $RESOURCE_GROUP \
  --query "{Status:status, ThroughputUnits:sku.capacity}" \
  --output table

# Check Event Hubs
az eventhubs eventhub list \
  --namespace-name $NAMESPACE \
  --resource-group $RESOURCE_GROUP \
  --query "[].{Name:name, Status:status, Partitions:partitionCount, Capture:captureDescription.enabled}" \
  --output table

# Check recent metrics
az monitor metrics list \
  --resource "/subscriptions/{subscription}/resourceGroups/$RESOURCE_GROUP/providers/Microsoft.EventHub/namespaces/$NAMESPACE" \
  --metric "IncomingMessages" \
  --interval PT1H \
  --output table

Best Practices

  1. Event Design
  2. Keep events immutable
  3. Include all context needed for processing
  4. Use consistent naming conventions
  5. Version schemas explicitly

  6. Publishing

  7. Batch events when possible
  8. Implement retry with exponential backoff
  9. Use partition keys for ordering guarantees
  10. Monitor publishing metrics

  11. Capture Configuration

  12. Balance file size vs latency (5 min / 100MB)
  13. Use compression to reduce storage costs
  14. Implement lifecycle policies for old data
  15. Monitor capture success rates

  16. Security

  17. Use managed identities where possible
  18. Implement least-privilege access
  19. Rotate access keys regularly
  20. Audit event access

Next Steps