Skip to content

Transforming DataVerse CRUD Events into Business Events

Overview

This guide demonstrates how to build a containerized .NET application that transforms generic DataVerse CRUD events into meaningful business events. We'll use DataVerse Contact entities as our primary example, showing how to detect specific field changes and publish targeted business events.

Understanding DataVerse Event Publishing

DataVerse is Microsoft's cloud-based data platform that provides built-in event publishing capabilities through Service Endpoints. When entities are created, updated, or deleted, DataVerse can automatically publish these events to external systems.

Key DataVerse Concepts

Service Endpoints: Configuration points in DataVerse that define where events should be sent. These can target Azure Service Bus, Event Hubs, or webhook endpoints.

Pre/Post Images: DataVerse can include snapshots of entity data before and after an operation, enabling precise change detection at the field level.

Event Context: Rich metadata about the operation including user information, organization context, and correlation identifiers.

DataVerse Event Structure

DataVerse events contain standardized information regardless of the entity type:

{
  "MessageName": "Update",
  "PrimaryEntityName": "contact",
  "PrimaryEntityId": "12345678-1234-1234-1234-123456789012",
  "UserId": "87654321-4321-4321-4321-210987654321",
  "OrganizationId": "11111111-2222-3333-4444-555555555555",
  "CorrelationId": "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee",
  "RequestId": "ffffffff-gggg-hhhh-iiii-jjjjjjjjjjjj",
  "InputParameters": { ... },
  "PreEntityImages": { ... },
  "PostEntityImages": { ... }
}

Solution Architecture

Our solution implements an event transformation pattern where generic DataVerse events are processed and converted into specific business events.

graph TB
    A[DataVerse Contact Entity] -->|CRUD Operations| B[DataVerse Service Endpoint]
    B -->|Generic Events| C[Azure Event Hub<br/>dataverse-events]
    C --> D[Containerized .NET App<br/>Event Processor]
    D -->|Business Events| E[Azure Event Hub<br/>business-events]

    subgraph "Event Processor Container"
        D1[Event Hub Consumer]
        D2[Event Parser]
        D3[Change Detector]
        D4[Business Event Factory]
        D5[Event Hub Producer]

        D1 --> D2
        D2 --> D3
        D3 --> D4
        D4 --> D5
    end

    E -->|Contact Updated| F[Generic Projection Systems]
    E -->|Email Changed| G[Email Verification Service]
    E -->|Contact Created| H[Welcome Campaign Service]

Event Flow Details

sequenceDiagram
    participant DV as DataVerse
    participant EH1 as Event Hub (Input)
    participant APP as .NET Processor
    participant EH2 as Event Hub (Output)
    participant SYS as Downstream Systems

    DV->>EH1: Generic CRUD Event
    EH1->>APP: Event Message
    APP->>APP: Parse & Analyze Changes
    APP->>EH2: Business Event(s)
    EH2->>SYS: Specific Business Events

Core Implementation

Event Models

First, define the core event models for both input and output:

// DataVerse Event Structure
public class DataVerseEvent
{
    public string MessageName { get; set; }
    public string PrimaryEntityName { get; set; }
    public Guid PrimaryEntityId { get; set; }
    public Guid UserId { get; set; }
    public Guid OrganizationId { get; set; }
    public Guid CorrelationId { get; set; }
    public Guid RequestId { get; set; }
    public Dictionary<string, object> InputParameters { get; set; }
    public Dictionary<string, EntityImage> PreEntityImages { get; set; }
    public Dictionary<string, EntityImage> PostEntityImages { get; set; }
}

public class EntityImage
{
    public string LogicalName { get; set; }
    public Guid Id { get; set; }
    public Dictionary<string, object> Attributes { get; set; }
}

// Business Event Base
public abstract class BusinessEvent
{
    public Guid EventId { get; set; } = Guid.NewGuid();
    public DateTime EventTime { get; set; } = DateTime.UtcNow;
    public string EventType { get; set; }
    public Guid CorrelationId { get; set; }
    public string Source { get; set; } = "DataVerse.EventProcessor";
}

// Specific Business Events
public class ContactUpdatedEvent : BusinessEvent
{
    public ContactUpdatedEvent()
    {
        EventType = "Contact.Updated";
    }

    public Guid ContactId { get; set; }
    public Dictionary<string, FieldChange> ChangedFields { get; set; }
}

public class ContactEmailChangedEvent : BusinessEvent
{
    public ContactEmailChangedEvent()
    {
        EventType = "Contact.EmailChanged";
    }

    public Guid ContactId { get; set; }
    public string PreviousEmail { get; set; }
    public string NewEmail { get; set; }
    public string FullName { get; set; }
}

public class FieldChange
{
    public object OldValue { get; set; }
    public object NewValue { get; set; }
    public string FieldType { get; set; }
}

Change Detection Engine

The core logic for detecting specific field changes using pre/post images:

public class ContactChangeDetector
{
    private readonly ILogger<ContactChangeDetector> _logger;

    public ContactChangeDetector(ILogger<ContactChangeDetector> logger)
    {
        _logger = logger;
    }

    public ContactChangeAnalysis AnalyzeChanges(DataVerseEvent dataVerseEvent)
    {
        if (dataVerseEvent.MessageName != "Update" || 
            dataVerseEvent.PrimaryEntityName != "contact")
        {
            return new ContactChangeAnalysis { HasChanges = false };
        }

        var preImage = GetContactImage(dataVerseEvent.PreEntityImages);
        var postImage = GetContactImage(dataVerseEvent.PostEntityImages);

        if (preImage == null || postImage == null)
        {
            _logger.LogWarning("Missing pre or post image for contact {ContactId}", 
                dataVerseEvent.PrimaryEntityId);
            return new ContactChangeAnalysis { HasChanges = false };
        }

        var analysis = new ContactChangeAnalysis
        {
            HasChanges = true,
            ContactId = dataVerseEvent.PrimaryEntityId,
            CorrelationId = dataVerseEvent.CorrelationId,
            ChangedFields = DetectFieldChanges(preImage.Attributes, postImage.Attributes)
        };

        // Detect specific business-relevant changes
        analysis.EmailChanged = HasEmailChanged(preImage.Attributes, postImage.Attributes);
        analysis.StatusChanged = HasStatusChanged(preImage.Attributes, postImage.Attributes);

        return analysis;
    }

    private EntityImage GetContactImage(Dictionary<string, EntityImage> images)
    {
        return images?.Values?.FirstOrDefault(img => img.LogicalName == "contact");
    }

    private Dictionary<string, FieldChange> DetectFieldChanges(
        Dictionary<string, object> preAttributes,
        Dictionary<string, object> postAttributes)
    {
        var changes = new Dictionary<string, FieldChange>();

        // Check all fields that exist in either pre or post image
        var allFields = preAttributes.Keys.Union(postAttributes.Keys);

        foreach (var field in allFields)
        {
            var oldValue = preAttributes.GetValueOrDefault(field);
            var newValue = postAttributes.GetValueOrDefault(field);

            if (!ValuesEqual(oldValue, newValue))
            {
                changes[field] = new FieldChange
                {
                    OldValue = oldValue,
                    NewValue = newValue,
                    FieldType = DetermineFieldType(newValue ?? oldValue)
                };
            }
        }

        return changes;
    }

    private bool HasEmailChanged(Dictionary<string, object> pre, Dictionary<string, object> post)
    {
        var oldEmail = pre.GetValueOrDefault("emailaddress1")?.ToString();
        var newEmail = post.GetValueOrDefault("emailaddress1")?.ToString();

        return !string.Equals(oldEmail, newEmail, StringComparison.OrdinalIgnoreCase);
    }

    private bool HasStatusChanged(Dictionary<string, object> pre, Dictionary<string, object> post)
    {
        var oldStatus = pre.GetValueOrDefault("statecode");
        var newStatus = post.GetValueOrDefault("statecode");

        return !ValuesEqual(oldStatus, newStatus);
    }

    private bool ValuesEqual(object value1, object value2)
    {
        if (value1 == null && value2 == null) return true;
        if (value1 == null || value2 == null) return false;

        return value1.Equals(value2);
    }

    private string DetermineFieldType(object value)
    {
        return value?.GetType().Name ?? "Unknown";
    }
}

public class ContactChangeAnalysis
{
    public bool HasChanges { get; set; }
    public Guid ContactId { get; set; }
    public Guid CorrelationId { get; set; }
    public Dictionary<string, FieldChange> ChangedFields { get; set; } = new();
    public bool EmailChanged { get; set; }
    public bool StatusChanged { get; set; }
}

Business Event Factory

Converts change analysis into specific business events:

public class BusinessEventFactory
{
    private readonly ILogger<BusinessEventFactory> _logger;

    public BusinessEventFactory(ILogger<BusinessEventFactory> logger)
    {
        _logger = logger;
    }

    public IEnumerable<BusinessEvent> CreateEvents(
        ContactChangeAnalysis analysis, 
        DataVerseEvent originalEvent)
    {
        var events = new List<BusinessEvent>();

        // Always create a generic update event for projection systems
        events.Add(CreateContactUpdatedEvent(analysis, originalEvent));

        // Create specific business events based on what changed
        if (analysis.EmailChanged)
        {
            var emailEvent = CreateEmailChangedEvent(analysis, originalEvent);
            if (emailEvent != null)
            {
                events.Add(emailEvent);
            }
        }

        return events;
    }

    private ContactUpdatedEvent CreateContactUpdatedEvent(
        ContactChangeAnalysis analysis, 
        DataVerseEvent originalEvent)
    {
        return new ContactUpdatedEvent
        {
            ContactId = analysis.ContactId,
            CorrelationId = analysis.CorrelationId,
            ChangedFields = analysis.ChangedFields
        };
    }

    private ContactEmailChangedEvent CreateEmailChangedEvent(
        ContactChangeAnalysis analysis, 
        DataVerseEvent originalEvent)
    {
        var preImage = originalEvent.PreEntityImages?.Values?
            .FirstOrDefault(img => img.LogicalName == "contact");
        var postImage = originalEvent.PostEntityImages?.Values?
            .FirstOrDefault(img => img.LogicalName == "contact");

        if (preImage == null || postImage == null)
        {
            _logger.LogWarning("Cannot create email changed event - missing image data");
            return null;
        }

        var previousEmail = preImage.Attributes.GetValueOrDefault("emailaddress1")?.ToString();
        var newEmail = postImage.Attributes.GetValueOrDefault("emailaddress1")?.ToString();
        var fullName = postImage.Attributes.GetValueOrDefault("fullname")?.ToString();

        return new ContactEmailChangedEvent
        {
            ContactId = analysis.ContactId,
            CorrelationId = analysis.CorrelationId,
            PreviousEmail = previousEmail,
            NewEmail = newEmail,
            FullName = fullName
        };
    }
}

Event Hub Consumer Service

Implements the at-least-once delivery pattern with proper error handling:

public class DataVerseEventProcessor : BackgroundService
{
    private readonly EventHubConsumerClient _consumerClient;
    private readonly EventHubProducerClient _producerClient;
    private readonly ContactChangeDetector _changeDetector;
    private readonly BusinessEventFactory _eventFactory;
    private readonly ILogger<DataVerseEventProcessor> _logger;
    private readonly IServiceProvider _serviceProvider;

    public DataVerseEventProcessor(
        EventHubConsumerClient consumerClient,
        EventHubProducerClient producerClient,
        ContactChangeDetector changeDetector,
        BusinessEventFactory eventFactory,
        ILogger<DataVerseEventProcessor> logger,
        IServiceProvider serviceProvider)
    {
        _consumerClient = consumerClient;
        _producerClient = producerClient;
        _changeDetector = changeDetector;
        _eventFactory = eventFactory;
        _logger = logger;
        _serviceProvider = serviceProvider;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        await foreach (PartitionEvent partitionEvent in _consumerClient.ReadEventsAsync(stoppingToken))
        {
            try
            {
                await ProcessEvent(partitionEvent, stoppingToken);
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Error processing event {EventId}", 
                    partitionEvent.Data?.MessageId);

                // Implement dead letter or retry logic here
                await HandleProcessingError(partitionEvent, ex);
            }
        }
    }

    private async Task ProcessEvent(PartitionEvent partitionEvent, CancellationToken cancellationToken)
    {
        using var scope = _serviceProvider.CreateScope();
        var metricsCollector = scope.ServiceProvider.GetRequiredService<IMetricsCollector>();

        var stopwatch = Stopwatch.StartNew();

        try
        {
            // Parse the DataVerse event
            var dataVerseEvent = JsonSerializer.Deserialize<DataVerseEvent>(
                partitionEvent.Data.EventBody.ToArray());

            metricsCollector.IncrementCounter("events.received", 
                ("entity", dataVerseEvent.PrimaryEntityName),
                ("operation", dataVerseEvent.MessageName));

            // Analyze changes
            var analysis = _changeDetector.AnalyzeChanges(dataVerseEvent);

            if (!analysis.HasChanges)
            {
                _logger.LogDebug("No relevant changes detected for contact {ContactId}", 
                    dataVerseEvent.PrimaryEntityId);
                return;
            }

            // Create business events
            var businessEvents = _eventFactory.CreateEvents(analysis, dataVerseEvent);

            // Publish business events
            await PublishBusinessEvents(businessEvents, cancellationToken);

            metricsCollector.IncrementCounter("events.processed.success");
            metricsCollector.RecordValue("events.processing.duration", stopwatch.ElapsedMilliseconds);

            _logger.LogInformation("Successfully processed DataVerse event for contact {ContactId}, " +
                "generated {EventCount} business events", 
                analysis.ContactId, businessEvents.Count());
        }
        catch (JsonException ex)
        {
            _logger.LogError(ex, "Failed to deserialize DataVerse event");
            metricsCollector.IncrementCounter("events.processed.error", ("error_type", "deserialization"));
            throw;
        }
        catch (Exception ex)
        {
            metricsCollector.IncrementCounter("events.processed.error", ("error_type", "processing"));
            throw;
        }
    }

    private async Task PublishBusinessEvents(IEnumerable<BusinessEvent> events, CancellationToken cancellationToken)
    {
        var eventDataBatch = await _producerClient.CreateBatchAsync(cancellationToken);

        foreach (var businessEvent in events)
        {
            var eventJson = JsonSerializer.Serialize(businessEvent);
            var eventData = new EventData(Encoding.UTF8.GetBytes(eventJson))
            {
                MessageId = businessEvent.EventId.ToString(),
                CorrelationId = businessEvent.CorrelationId.ToString(),
                ContentType = "application/json"
            };

            // Add event type as a property for routing
            eventData.Properties.Add("EventType", businessEvent.EventType);

            if (!eventDataBatch.TryAdd(eventData))
            {
                // Batch is full, send it and create a new one
                await _producerClient.SendAsync(eventDataBatch, cancellationToken);
                eventDataBatch = await _producerClient.CreateBatchAsync(cancellationToken);

                if (!eventDataBatch.TryAdd(eventData))
                {
                    throw new InvalidOperationException("Event too large for batch");
                }
            }
        }

        if (eventDataBatch.Count > 0)
        {
            await _producerClient.SendAsync(eventDataBatch, cancellationToken);
        }
    }

    private async Task HandleProcessingError(PartitionEvent partitionEvent, Exception exception)
    {
        // Implement your error handling strategy here
        // Options: Dead letter queue, retry with exponential backoff, alerting

        _logger.LogError(exception, "Failed to process event {EventId}, implementing error handling",
            partitionEvent.Data?.MessageId);

        // Example: Send to dead letter queue after certain retry attempts
        // await _deadLetterService.SendAsync(partitionEvent.Data);
    }
}

Handling At-Least-Once Delivery

Event Hub guarantees at-least-once delivery, meaning duplicate events are possible. Implement idempotency to handle this:

public class IdempotencyService
{
    private readonly IMemoryCache _cache;
    private readonly ILogger<IdempotencyService> _logger;

    public IdempotencyService(IMemoryCache cache, ILogger<IdempotencyService> logger)
    {
        _cache = cache;
        _logger = logger;
    }

    public async Task<bool> IsProcessedAsync(string eventId, string operationType)
    {
        var key = $"processed:{operationType}:{eventId}";

        if (_cache.TryGetValue(key, out _))
        {
            _logger.LogDebug("Event {EventId} already processed for operation {Operation}", 
                eventId, operationType);
            return true;
        }

        return false;
    }

    public async Task MarkAsProcessedAsync(string eventId, string operationType)
    {
        var key = $"processed:{operationType}:{eventId}";

        // Cache for 24 hours to handle duplicate delivery within reasonable window
        _cache.Set(key, true, TimeSpan.FromHours(24));

        _logger.LogDebug("Marked event {EventId} as processed for operation {Operation}", 
            eventId, operationType);
    }
}

// Usage in event processor
private async Task ProcessEvent(PartitionEvent partitionEvent, CancellationToken cancellationToken)
{
    var eventId = partitionEvent.Data.MessageId ?? Guid.NewGuid().ToString();

    if (await _idempotencyService.IsProcessedAsync(eventId, "contact_transformation"))
    {
        _logger.LogDebug("Skipping duplicate event {EventId}", eventId);
        return;
    }

    // ... process event ...

    await _idempotencyService.MarkAsProcessedAsync(eventId, "contact_transformation");
}

Container Configuration

Dockerfile

FROM mcr.microsoft.com/dotnet/aspnet:8.0 AS base
WORKDIR /app
EXPOSE 80

FROM mcr.microsoft.com/dotnet/sdk:8.0 AS build
WORKDIR /src
COPY ["EventProcessor.csproj", "."]
RUN dotnet restore "EventProcessor.csproj"
COPY . .
WORKDIR "/src"
RUN dotnet build "EventProcessor.csproj" -c Release -o /app/build

FROM build AS publish
RUN dotnet publish "EventProcessor.csproj" -c Release -o /app/publish

FROM base AS final
WORKDIR /app
COPY --from=publish /app/publish .
ENTRYPOINT ["dotnet", "EventProcessor.dll"]

appsettings.json

{
  "EventHub": {
    "InputConnection": "#{EventHub.Input.ConnectionString}#",
    "InputHubName": "dataverse-events",
    "OutputConnection": "#{EventHub.Output.ConnectionString}#",
    "OutputHubName": "business-events",
    "ConsumerGroup": "event-processor"
  },
  "Logging": {
    "LogLevel": {
      "Default": "Information",
      "Microsoft": "Warning",
      "Microsoft.Hosting.Lifetime": "Information"
    }
  },
  "HealthChecks": {
    "Enabled": true,
    "Port": 8080
  }
}

Service Registration

public class Program
{
    public static void Main(string[] args)
    {
        var builder = WebApplication.CreateBuilder(args);

        // Event Hub clients
        builder.Services.AddSingleton(serviceProvider =>
        {
            var configuration = serviceProvider.GetRequiredService<IConfiguration>();
            var connectionString = configuration["EventHub:InputConnection"];
            var hubName = configuration["EventHub:InputHubName"];
            var consumerGroup = configuration["EventHub:ConsumerGroup"];

            return new EventHubConsumerClient(consumerGroup, connectionString, hubName);
        });

        builder.Services.AddSingleton(serviceProvider =>
        {
            var configuration = serviceProvider.GetRequiredService<IConfiguration>();
            var connectionString = configuration["EventHub:OutputConnection"];
            var hubName = configuration["EventHub:OutputHubName"];

            return new EventHubProducerClient(connectionString, hubName);
        });

        // Application services
        builder.Services.AddScoped<ContactChangeDetector>();
        builder.Services.AddScoped<BusinessEventFactory>();
        builder.Services.AddSingleton<IdempotencyService>();
        builder.Services.AddScoped<IMetricsCollector, MetricsCollector>();

        // Background service
        builder.Services.AddHostedService<DataVerseEventProcessor>();

        // Health checks
        builder.Services.AddHealthChecks()
            .AddCheck<EventHubHealthCheck>("eventhub");

        // Memory cache for idempotency
        builder.Services.AddMemoryCache();

        var app = builder.Build();

        app.MapHealthChecks("/health");

        app.Run();
    }
}

Monitoring and Metrics

Implement comprehensive monitoring to ensure system reliability:

public interface IMetricsCollector
{
    void IncrementCounter(string name, params (string key, string value)[] tags);
    void RecordValue(string name, double value, params (string key, string value)[] tags);
}

public class MetricsCollector : IMetricsCollector
{
    private readonly ILogger<MetricsCollector> _logger;

    public MetricsCollector(ILogger<MetricsCollector> logger)
    {
        _logger = logger;
    }

    public void IncrementCounter(string name, params (string key, string value)[] tags)
    {
        // Integration with your metrics system (Application Insights, Prometheus, etc.)
        var tagString = string.Join(", ", tags.Select(t => $"{t.key}={t.value}"));
        _logger.LogInformation("METRIC: Counter {Name} incremented. Tags: {Tags}", name, tagString);
    }

    public void RecordValue(string name, double value, params (string key, string value)[] tags)
    {
        var tagString = string.Join(", ", tags.Select(t => $"{t.key}={t.value}"));
        _logger.LogInformation("METRIC: Value {Name} = {Value}. Tags: {Tags}", name, value, tagString);
    }
}

Critical Metrics to Monitor

Event Processing Metrics:

  • events.received - Total events received from DataVerse
  • Why Important: Baseline measurement of system load and DataVerse activity
  • Tags: entity type, operation type

  • events.processed.success - Successfully processed events

  • Why Important: Measures processing success rate and system health
  • Tags: entity type, business event type generated

  • events.processed.error - Processing errors by type

  • Why Important: Early detection of parsing, connectivity, or logic errors
  • Tags: error_type (deserialization, processing, publishing)

Business Logic Metrics:

  • business_events.generated - Count of business events created by type
  • Why Important: Tracks business logic effectiveness and downstream impact
  • Tags: event_type (Contact.Updated, Contact.EmailChanged)

  • field_changes.detected - Specific field changes detected

  • Why Important: Validates change detection logic and identifies hot fields
  • Tags: field_name, entity_type

Performance Metrics:

  • events.processing.duration - Event processing latency (milliseconds)
  • Why Important: Ensures processing keeps pace with incoming events
  • Threshold: Alert if p95 > 5000ms

  • eventhub.lag - Consumer lag behind Event Hub

  • Why Important: Prevents message loss due to retention limits
  • Threshold: Alert if lag > 100,000 messages

System Health Metrics:

  • duplicate_events.detected - Duplicate events caught by idempotency
  • Why Important: Validates at-least-once delivery handling
  • Normal Range: 1-5% of total events

  • memory.usage - Application memory consumption

  • Why Important: Prevents container out-of-memory conditions
  • Threshold: Alert if > 80% of container limit

Business Event Schema Design

Design your business events with versioning and backward compatibility:

public abstract class BusinessEvent
{
    public string SchemaVersion { get; set; } = "1.0";
    public Guid EventId { get; set; } = Guid.NewGuid();
    public DateTime EventTime { get; set; } = DateTime.UtcNow;
    public string EventType { get; set; }
    public Guid CorrelationId { get; set; }
    public string Source { get; set; } = "DataVerse.EventProcessor";

    // Add common metadata for all business events
    public EventMetadata Metadata { get; set; } = new EventMetadata();
}

public class EventMetadata
{
    public string TenantId { get; set; }
    public string UserId { get; set; }
    public Dictionary<string, string> CustomProperties { get; set; } = new();
}

// Version 1.0 of Contact Email Changed Event
public class ContactEmailChangedEvent : BusinessEvent
{
    public ContactEmailChangedEvent()
    {
        EventType = "Contact.EmailChanged";
        SchemaVersion = "1.0";
    }

    public Guid ContactId { get; set; }
    public string PreviousEmail { get; set; }
    public string NewEmail { get; set; }
    public string FullName { get; set; }

    // Consider future expansion
    public ContactVerificationStatus VerificationStatus { get; set; }
}

public enum ContactVerificationStatus
{
    Unknown,
    Verified,
    Pending,
    Failed
}

Deployment Considerations

Azure Container Apps Environment

# container-app.yaml
apiVersion: containerapps.azure.com/v1beta2
kind: ContainerApp
metadata:
  name: dataverse-event-processor
spec:
  managedEnvironmentId: /subscriptions/{subscription}/resourceGroups/{rg}/providers/Microsoft.App/managedEnvironments/{environment}
  configuration:
    secrets:
    - name: eventhub-input-connection
      value: "{connection-string}"
    - name: eventhub-output-connection
      value: "{connection-string}"
    ingress:
      external: false
      targetPort: 8080
      traffic:
      - weight: 100
        latestRevision: true
  template:
    containers:
    - image: myregistry.azurecr.io/dataverse-event-processor:latest
      name: event-processor
      env:
      - name: EventHub__InputConnection
        secretRef: eventhub-input-connection
      - name: EventHub__OutputConnection
        secretRef: eventhub-output-connection
      resources:
        cpu: 1
        memory: 2Gi
      probes:
      - type: liveness
        httpGet:
          path: /health
          port: 8080
        initialDelaySeconds: 30
        periodSeconds: 30
    scale:
      minReplicas: 2
      maxReplicas: 10
      rules:
      - name: eventhub-scaling
        custom:
          type: azure-eventhub
          metadata:
            consumerGroup: event-processor
            connectionFromEnv: EventHub__InputConnection
            eventHubName: dataverse-events
            threshold: '100'

Health Checks

public class EventHubHealthCheck : IHealthCheck
{
    private readonly EventHubConsumerClient _consumerClient;
    private readonly EventHubProducerClient _producerClient;

    public EventHubHealthCheck(
        EventHubConsumerClient consumerClient,
        EventHubProducerClient producerClient)
    {
        _consumerClient = consumerClient;
        _producerClient = producerClient;
    }

    public async Task<HealthCheckResult> CheckHealthAsync(
        HealthCheckContext context,
        CancellationToken cancellationToken = default)
    {
        try
        {
            // Test connectivity to both Event Hubs
            var inputProperties = await _consumerClient.GetEventHubPropertiesAsync(cancellationToken);
            var outputProperties = await _producerClient.GetEventHubPropertiesAsync(cancellationToken);

            var data = new Dictionary<string, object>
            {
                ["input_partitions"] = inputProperties.PartitionIds.Length,
                ["output_partitions"] = outputProperties.PartitionIds.Length
            };

            return HealthCheckResult.Healthy("Event Hub connections are healthy", data);
        }
        catch (Exception ex)
        {
            return HealthCheckResult.Unhealthy("Event Hub connectivity issues", ex);
        }
    }
}

Best Practices and Recommendations

Error Handling Strategy

  1. Implement Circuit Breaker Pattern: Prevent cascading failures when downstream systems are unavailable
  2. Dead Letter Queues: Route persistently failing messages for manual investigation
  3. Exponential Backoff: Retry transient failures with increasing delays
  4. Correlation IDs: Maintain traceability across all systems

Performance Optimization

  1. Batch Processing: Process multiple events together when possible
  2. Async/Await: Use asynchronous patterns throughout the pipeline
  3. Memory Management: Dispose of resources properly to prevent memory leaks
  4. Connection Pooling: Reuse Event Hub connections across operations

Security Considerations

  1. Managed Identity: Use Azure Managed Identity for Event Hub authentication
  2. Secrets Management: Store connection strings in Azure Key Vault
  3. Network Isolation: Deploy containers in private networks with service endpoints
  4. Data Classification: Ensure sensitive contact data is properly classified and protected

Testing Strategy

[Test]
public async Task Should_Detect_Email_Change_From_PrePost_Images()
{
    // Arrange
    var dataVerseEvent = new DataVerseEvent
    {
        MessageName = "Update",
        PrimaryEntityName = "contact",
        PrimaryEntityId = Guid.NewGuid(),
        PreEntityImages = new Dictionary<string, EntityImage>
        {
            ["preimage"] = new EntityImage
            {
                LogicalName = "contact",
                Attributes = new Dictionary<string, object>
                {
                    ["emailaddress1"] = "old@example.com",
                    ["fullname"] = "John Doe"
                }
            }
        },
        PostEntityImages = new Dictionary<string, EntityImage>
        {
            ["postimage"] = new EntityImage
            {
                LogicalName = "contact",
                Attributes = new Dictionary<string, object>
                {
                    ["emailaddress1"] = "new@example.com",
                    ["fullname"] = "John Doe"
                }
            }
        }
    };

    var detector = new ContactChangeDetector(Mock.Of<ILogger<ContactChangeDetector>>());

    // Act
    var analysis = detector.AnalyzeChanges(dataVerseEvent);

    // Assert
    Assert.True(analysis.HasChanges);
    Assert.True(analysis.EmailChanged);
    Assert.Contains("emailaddress1", analysis.ChangedFields.Keys);
    Assert.Equal("old@example.com", analysis.ChangedFields["emailaddress1"].OldValue);
    Assert.Equal("new@example.com", analysis.ChangedFields["emailaddress1"].NewValue);
}

Conclusion

This architecture provides a robust, scalable solution for transforming generic DataVerse CRUD events into meaningful business events. The containerized approach ensures easy deployment and scaling, while the comprehensive monitoring and error handling strategies provide production-ready reliability.

Key benefits of this approach:

  • Decoupling: Downstream systems receive focused business events rather than raw DataVerse data
  • Scalability: Container-based architecture scales automatically based on Event Hub message volume
  • Reliability: At-least-once processing with idempotency ensures no message loss
  • Observability: Comprehensive metrics provide insight into system health and business activity
  • Maintainability: Clear separation of concerns makes the system easy to extend and modify

The solution transforms the generic nature of DataVerse events into actionable business intelligence, enabling downstream systems to react appropriately to specific business scenarios while maintaining a unified event-driven architecture.