Event Ingestion Architecture¶
Overview¶
The event ingestion layer is responsible for reliably capturing business events from distributed services and persisting them to Azure Data Lake Gen2 for analytics processing. This document details the Event Hub configuration, capture setup, and schema management strategies.
Ingestion Goals
- Zero Data Loss: Guaranteed event capture with automatic retry
- Schema Flexibility: Support for evolving event schemas
- Cost Efficiency: Minimal operational overhead using managed services
- Scalability: Handle 100K+ events/day without reconfiguration
Event Hub Architecture¶
Namespace Organization¶
graph TD
subgraph "Event Hub Namespaces"
NS1[CRM Namespace<br/>Standard Tier]
NS2[Matching Namespace<br/>Standard Tier]
NS3[Profile Namespace<br/>Standard Tier]
NS4[Assessment Namespace<br/>Standard Tier]
subgraph "CRM Event Hubs"
EH1[contacts-events]
EH2[opportunities-events]
EH3[accounts-events]
end
subgraph "Matching Event Hubs"
EH4[matches-events]
EH5[iterations-events]
EH6[declarations-events]
end
subgraph "Profile Event Hubs"
EH7[profiles-events]
EH8[skills-events]
EH9[availability-events]
end
end
NS1 --> EH1
NS1 --> EH2
NS1 --> EH3
NS2 --> EH4
NS2 --> EH5
NS2 --> EH6
NS3 --> EH7
NS3 --> EH8
NS3 --> EH9
Partitioning Strategy¶
Events are partitioned by aggregateId to ensure:
- Order preservation per aggregate
- Efficient parallel processing
- Consistent consumer scaling
Partition Key: aggregateId
Partition Count: 4 (per Event Hub)
Retention: 7 days
Throughput Units: 2 (auto-scale to 10)
Event Hub Capture Configuration¶
Capture Settings¶
graph LR
subgraph "Capture Configuration"
INTERVAL[Time Window<br/>5 minutes]
SIZE[Size Window<br/>100 MB]
FORMAT[File Format<br/>Avro]
COMPRESS[Compression<br/>Snappy]
end
subgraph "Output Path"
CONTAINER[Container<br/>raw-events]
PATH[Path Pattern<br/>{Namespace}/{EventHub}/{Year}/{Month}/{Day}/{Hour}]
FILE[File Name<br/>{PartitionId}_{Offset}_{SequenceNumber}.avro]
end
INTERVAL --> PATH
SIZE --> PATH
FORMAT --> FILE
COMPRESS --> FILE
Azure CLI Configuration¶
# Create Event Hub Namespace
az eventhubs namespace create \
--name crm-events-namespace \
--resource-group rg-metrics-platform \
--location westeurope \
--sku Standard \
--enable-auto-inflate \
--maximum-throughput-units 10
# Create Event Hub with Capture
az eventhubs eventhub create \
--name contacts-events \
--namespace-name crm-events-namespace \
--resource-group rg-metrics-platform \
--partition-count 4 \
--message-retention 7
# Enable Capture to Data Lake
az eventhubs eventhub update \
--name contacts-events \
--namespace-name crm-events-namespace \
--resource-group rg-metrics-platform \
--enable-capture true \
--capture-interval 300 \
--capture-size-limit 104857600 \
--destination-name EventHubArchive.AzureDataLake \
--storage-account datalakemetricsplatform \
--blob-container raw-events \
--archive-name-format '{Namespace}/{EventHub}/{Year}/{Month}/{Day}/{Hour}/{PartitionId}_{Offset}_{SequenceNumber}.avro'
Event Schema Management¶
Base Event Contract¶
All events must conform to this base schema:
interface BusinessEvent {
// Event Identification
eventId: string; // UUID v4
timestamp: string; // ISO 8601 UTC
version: string; // Schema version (e.g., "1.0")
// Aggregate Information
aggregateId: string; // Unique identifier of the aggregate
aggregateType: string; // Type of aggregate (e.g., "Order", "Profile")
eventType: string; // Specific event type (e.g., "OrderCreated")
// Event Metadata
metadata: {
correlationId: string; // Trace across services
causationId?: string; // Previous event that caused this
userId?: string; // User who triggered the event
source: string; // Service that produced the event
partitionKey: string; // Event Hub partition key
tags?: Record<string, string>; // Additional metadata
};
// Business Data
payload: Record<string, any>; // Event-specific data
}
Schema Versioning Strategy¶
graph TD
subgraph "Version Evolution"
V1[Version 1.0<br/>Initial Schema]
V1_1[Version 1.1<br/>Added Optional Field]
V2[Version 2.0<br/>Breaking Change]
end
subgraph "Compatibility Rules"
BACK[Backward Compatible<br/>- Add optional fields<br/>- Add default values]
BREAK[Breaking Changes<br/>- Remove fields<br/>- Change types<br/>- Rename fields]
end
V1 -->|Compatible| V1_1
V1 -->|Breaking| V2
V1_1 --> BACK
V2 --> BREAK
Event Examples¶
Contact Created Event¶
{
"eventId": "550e8400-e29b-41d4-a716-446655440000",
"timestamp": "2024-01-15T14:30:00Z",
"version": "1.0",
"aggregateId": "contact-123",
"aggregateType": "Contact",
"eventType": "ContactCreated",
"metadata": {
"correlationId": "660e8400-e29b-41d4-a716-446655440000",
"userId": "user-456",
"source": "CRMService",
"partitionKey": "contact-123"
},
"payload": {
"contactId": "contact-123",
"firstName": "John",
"lastName": "Doe",
"email": "john.doe@example.com",
"company": "Acme Corp",
"createdAt": "2024-01-15T14:30:00Z"
}
}
Match Iteration Added Event¶
{
"eventId": "770e8400-e29b-41d4-a716-446655440000",
"timestamp": "2024-01-15T15:45:00Z",
"version": "1.0",
"aggregateId": "match-789",
"aggregateType": "Match",
"eventType": "IterationAdded",
"metadata": {
"correlationId": "880e8400-e29b-41d4-a716-446655440000",
"causationId": "550e8400-e29b-41d4-a716-446655440000",
"userId": "user-789",
"source": "MatchingService",
"partitionKey": "match-789"
},
"payload": {
"matchId": "match-789",
"iterationId": "iteration-456",
"iterationNumber": 2,
"freelancerCount": 5,
"status": "Active",
"createdAt": "2024-01-15T15:45:00Z"
}
}
Event Publishing¶
.NET Implementation¶
public interface IEventPublisher
{
Task PublishAsync<T>(T businessEvent) where T : BusinessEvent;
}
public class EventHubPublisher : IEventPublisher
{
private readonly EventHubProducerClient _client;
private readonly ILogger<EventHubPublisher> _logger;
public EventHubPublisher(string connectionString, string eventHubName)
{
_client = new EventHubProducerClient(connectionString, eventHubName);
}
public async Task PublishAsync<T>(T businessEvent) where T : BusinessEvent
{
try
{
// Serialize event
var json = JsonSerializer.Serialize(businessEvent);
var eventData = new EventData(Encoding.UTF8.GetBytes(json))
{
// Set partition key for ordering
PartitionKey = businessEvent.Metadata.PartitionKey
};
// Add metadata as properties
eventData.Properties["EventType"] = businessEvent.EventType;
eventData.Properties["AggregateType"] = businessEvent.AggregateType;
eventData.Properties["Version"] = businessEvent.Version;
eventData.Properties["CorrelationId"] = businessEvent.Metadata.CorrelationId;
// Send to Event Hub
var batch = await _client.CreateBatchAsync(new CreateBatchOptions
{
PartitionKey = businessEvent.Metadata.PartitionKey
});
if (!batch.TryAdd(eventData))
{
throw new InvalidOperationException("Event too large for batch");
}
await _client.SendAsync(batch);
_logger.LogInformation(
"Published {EventType} for {AggregateType}:{AggregateId}",
businessEvent.EventType,
businessEvent.AggregateType,
businessEvent.AggregateId
);
}
catch (Exception ex)
{
_logger.LogError(ex,
"Failed to publish {EventType} for {AggregateId}",
businessEvent.EventType,
businessEvent.AggregateId
);
throw;
}
}
}
Configuration in appsettings.json¶
{
"EventHub": {
"Namespaces": {
"CRM": {
"ConnectionString": "Endpoint=sb://crm-events.servicebus.windows.net/;SharedAccessKeyName=SendPolicy;SharedAccessKey=...",
"EventHubs": {
"Contacts": "contacts-events",
"Opportunities": "opportunities-events",
"Accounts": "accounts-events"
}
},
"Matching": {
"ConnectionString": "Endpoint=sb://matching-events.servicebus.windows.net/;SharedAccessKeyName=SendPolicy;SharedAccessKey=...",
"EventHubs": {
"Matches": "matches-events",
"Iterations": "iterations-events",
"Declarations": "declarations-events"
}
}
},
"RetryPolicy": {
"MaxRetries": 3,
"BackoffMultiplier": 2,
"MaxBackoffSeconds": 30
}
}
}
Monitoring and Alerting¶
Key Metrics to Monitor¶
graph TD
subgraph "Event Hub Metrics"
M1[Incoming Messages<br/>Target: 1K-100K/day]
M2[Capture Success Rate<br/>Target: >99.9%]
M3[Throttled Requests<br/>Target: <1%]
M4[Server Errors<br/>Target: <0.1%]
end
subgraph "Capture Metrics"
C1[Files Created<br/>288/day per hub]
C2[Capture Latency<br/>Target: <5 min]
C3[File Size<br/>Target: 10-100MB]
C4[Storage Usage<br/>Monitor growth]
end
subgraph "Alerts"
A1[Capture Failure]
A2[Throttling Detected]
A3[High Latency]
A4[Storage Quota]
end
M2 --> A1
M3 --> A2
C2 --> A3
C4 --> A4
Azure Monitor Queries¶
// Event Hub Capture Success Rate
AzureDiagnostics
| where ResourceProvider == "MICROSOFT.EVENTHUB"
| where Category == "ArchiveLogs"
| summarize
SuccessCount = countif(ResultType == "Success"),
FailureCount = countif(ResultType != "Success"),
Total = count()
by bin(TimeGenerated, 1h), EventHub_s
| extend SuccessRate = (SuccessCount * 100.0) / Total
| order by TimeGenerated desc
// Message Throughput
AzureMetrics
| where ResourceProvider == "MICROSOFT.EVENTHUB"
| where MetricName == "IncomingMessages"
| summarize
TotalMessages = sum(Total),
AvgMessages = avg(Total)
by bin(TimeGenerated, 1h), Resource
| order by TimeGenerated desc
// Capture File Sizes
StorageBlobLogs
| where OperationName == "PutBlob"
| where uri_s contains "raw-events"
| extend FileSize_MB = ResponseBodySize / 1048576
| summarize
AvgSize_MB = avg(FileSize_MB),
MaxSize_MB = max(FileSize_MB),
FileCount = count()
by bin(TimeGenerated, 1h)
Troubleshooting¶
Common Issues and Solutions¶
| Issue | Symptoms | Solution |
|---|---|---|
| Capture Not Working | No files in Data Lake | Check Event Hub capture settings, verify storage permissions |
| Throttling | 429 errors, slow publishing | Increase throughput units, implement exponential backoff |
| Large Events | Batch send failures | Split large payloads, increase batch size limits |
| Partition Imbalance | Uneven processing | Review partition key strategy, consider repartitioning |
| Schema Errors | Deserialization failures | Validate against schema, check version compatibility |
Health Check Script¶
#!/bin/bash
# Event Hub Health Check
NAMESPACE="crm-events-namespace"
RESOURCE_GROUP="rg-metrics-platform"
echo "Checking Event Hub Namespace: $NAMESPACE"
# Check namespace status
az eventhubs namespace show \
--name $NAMESPACE \
--resource-group $RESOURCE_GROUP \
--query "{Status:status, ThroughputUnits:sku.capacity}" \
--output table
# Check Event Hubs
az eventhubs eventhub list \
--namespace-name $NAMESPACE \
--resource-group $RESOURCE_GROUP \
--query "[].{Name:name, Status:status, Partitions:partitionCount, Capture:captureDescription.enabled}" \
--output table
# Check recent metrics
az monitor metrics list \
--resource "/subscriptions/{subscription}/resourceGroups/$RESOURCE_GROUP/providers/Microsoft.EventHub/namespaces/$NAMESPACE" \
--metric "IncomingMessages" \
--interval PT1H \
--output table
Best Practices¶
- Event Design
- Keep events immutable
- Include all context needed for processing
- Use consistent naming conventions
-
Version schemas explicitly
-
Publishing
- Batch events when possible
- Implement retry with exponential backoff
- Use partition keys for ordering guarantees
-
Monitor publishing metrics
-
Capture Configuration
- Balance file size vs latency (5 min / 100MB)
- Use compression to reduce storage costs
- Implement lifecycle policies for old data
-
Monitor capture success rates
-
Security
- Use managed identities where possible
- Implement least-privilege access
- Rotate access keys regularly
- Audit event access
Next Steps¶
- Configure Data Storage for captured events
- Set up Data Processing pipelines
- Implement Cost Optimization strategies