Skip to content

DataVerse Event Processing: From CRUD to Business Events

Overview

This guide explains how to transform generic DataVerse CRUD (Create, Read, Update, Delete) events into meaningful business events using a containerized .NET application. We'll focus on Contact entities as our primary example, demonstrating how to detect specific field changes and emit targeted business events.

Architecture Overview

Our event-driven architecture follows this flow:

graph LR
    A[DataVerse] -->|CRUD Events| B[Service Endpoint]
    B -->|Publishes| C[Azure Event Hub<br/>Generic Events]
    C -->|Consumes| D[.NET Container App<br/>Event Processor]
    D -->|Publishes| E[Azure Event Hub<br/>Business Events]
    E -->|Consumes| F[Downstream Systems]

    style A fill:#f9f,stroke:#333,stroke-width:4px
    style D fill:#9ff,stroke:#333,stroke-width:4px
    style E fill:#ff9,stroke:#333,stroke-width:4px

Understanding DataVerse Event Publishing

What is DataVerse?

Microsoft DataVerse is a cloud-based storage platform that provides a secure and scalable database for business applications. It's the underlying data platform for Microsoft Power Platform and Dynamics 365.

How DataVerse Publishes Events

DataVerse uses Service Endpoints to publish events when data changes occur. Here's how it works:

  1. Plugin Registration: Administrators register plugins that trigger on specific entity operations (Create, Update, Delete)
  2. Service Endpoint Configuration: These plugins are configured to send messages to Azure Service Bus or Event Hub
  3. Event Payload: DataVerse sends a structured message containing:
  4. Entity metadata
  5. Operation type
  6. Pre and Post images (snapshots of data before and after the change)
  7. User context

Event Structure

{
  "MessageName": "Update",
  "PrimaryEntityName": "contact",
  "PrimaryEntityId": "123e4567-e89b-12d3-a456-426614174000",
  "BusinessUnit": "Default",
  "PreEntityImages": [{
    "key": "PreImage",
    "value": {
      "contactid": "123e4567-e89b-12d3-a456-426614174000",
      "emailaddress1": "john.doe@oldcompany.com",
      "firstname": "John",
      "lastname": "Doe"
    }
  }],
  "PostEntityImages": [{
    "key": "PostImage", 
    "value": {
      "contactid": "123e4567-e89b-12d3-a456-426614174000",
      "emailaddress1": "john.doe@newcompany.com",
      "firstname": "John",
      "lastname": "Doe"
    }
  }]
}

Event Processing Architecture

Component Breakdown

graph TB
    subgraph "Azure Event Hub - Generic Events"
        EH1[Event Hub<br/>Partitioned Stream]
    end

    subgraph "Container Environment (AKS/Container Apps)"
        subgraph "Event Processor Pod"
            EP[Event Processor<br/>.NET 8]
            CD[Change Detector]
            ET[Event Translator]
            EP --> CD
            CD --> ET
        end
    end

    subgraph "Azure Event Hub - Business Events"
        EH2[Event Hub<br/>Partitioned Stream]
    end

    EH1 -->|Consume| EP
    ET -->|Publish| EH2

Key Components

  1. Event Processor: Main orchestrator that consumes from Event Hub
  2. Change Detector: Analyzes pre/post images to identify specific field changes
  3. Event Translator: Transforms generic CRUD events into business events

Implementation Guide

1. Event Models

First, define your event models:

// Generic DataVerse event
public class DataVerseEvent
{
    public string MessageName { get; set; }
    public string PrimaryEntityName { get; set; }
    public Guid PrimaryEntityId { get; set; }
    public Dictionary<string, object> PreEntityImages { get; set; }
    public Dictionary<string, object> PostEntityImages { get; set; }
    public DateTime Timestamp { get; set; }
}

// Business event for email changes
public class ContactEmailChangedEvent
{
    public Guid ContactId { get; set; }
    public string OldEmail { get; set; }
    public string NewEmail { get; set; }
    public string FirstName { get; set; }
    public string LastName { get; set; }
    public DateTime ChangedAt { get; set; }
    public string ChangedBy { get; set; }

    // Event metadata
    public string EventType => "ContactEmailChanged";
    public string EventVersion => "1.0";
    public Guid EventId { get; set; } = Guid.NewGuid();
}

// Generic contact update event for projection systems
public class ContactUpdatedEvent
{
    public Guid ContactId { get; set; }
    public Dictionary<string, FieldChange> Changes { get; set; }
    public DateTime UpdatedAt { get; set; }
    public string UpdatedBy { get; set; }

    public string EventType => "ContactUpdated";
    public string EventVersion => "1.0";
    public Guid EventId { get; set; } = Guid.NewGuid();
}

public class FieldChange
{
    public string FieldName { get; set; }
    public object OldValue { get; set; }
    public object NewValue { get; set; }
}

2. Change Detection Service

Implement a service to detect specific field changes:

public interface IChangeDetectionService
{
    Dictionary<string, FieldChange> DetectChanges(
        Dictionary<string, object> preImage, 
        Dictionary<string, object> postImage);

    bool HasFieldChanged(
        Dictionary<string, object> preImage,
        Dictionary<string, object> postImage,
        string fieldName);
}

public class ChangeDetectionService : IChangeDetectionService
{
    public Dictionary<string, FieldChange> DetectChanges(
        Dictionary<string, object> preImage, 
        Dictionary<string, object> postImage)
    {
        var changes = new Dictionary<string, FieldChange>();

        // Get all unique field names
        var allFields = preImage.Keys.Union(postImage.Keys);

        foreach (var field in allFields)
        {
            var preValue = preImage.ContainsKey(field) ? preImage[field] : null;
            var postValue = postImage.ContainsKey(field) ? postImage[field] : null;

            if (!AreValuesEqual(preValue, postValue))
            {
                changes[field] = new FieldChange
                {
                    FieldName = field,
                    OldValue = preValue,
                    NewValue = postValue
                };
            }
        }

        return changes;
    }

    public bool HasFieldChanged(
        Dictionary<string, object> preImage,
        Dictionary<string, object> postImage,
        string fieldName)
    {
        var preValue = preImage.ContainsKey(fieldName) ? preImage[fieldName] : null;
        var postValue = postImage.ContainsKey(fieldName) ? postImage[fieldName] : null;

        return !AreValuesEqual(preValue, postValue);
    }

    private bool AreValuesEqual(object value1, object value2)
    {
        if (value1 == null && value2 == null) return true;
        if (value1 == null || value2 == null) return false;

        // Handle special cases for DataVerse types
        if (value1 is DateTime dt1 && value2 is DateTime dt2)
        {
            return dt1.ToUniversalTime() == dt2.ToUniversalTime();
        }

        return value1.Equals(value2);
    }
}

3. Event Translation Service

Transform generic events into business events:

public interface IEventTranslator
{
    IEnumerable<object> TranslateEvent(DataVerseEvent dataVerseEvent);
}

public class ContactEventTranslator : IEventTranslator
{
    private readonly IChangeDetectionService _changeDetection;
    private readonly ILogger<ContactEventTranslator> _logger;

    public ContactEventTranslator(
        IChangeDetectionService changeDetection,
        ILogger<ContactEventTranslator> logger)
    {
        _changeDetection = changeDetection;
        _logger = logger;
    }

    public IEnumerable<object> TranslateEvent(DataVerseEvent dataVerseEvent)
    {
        var events = new List<object>();

        if (dataVerseEvent.PrimaryEntityName != "contact" || 
            dataVerseEvent.MessageName != "Update")
        {
            return events;
        }

        var preImage = dataVerseEvent.PreEntityImages;
        var postImage = dataVerseEvent.PostEntityImages;

        // Detect all changes for generic update event
        var changes = _changeDetection.DetectChanges(preImage, postImage);

        if (changes.Any())
        {
            // Always emit generic update event
            events.Add(new ContactUpdatedEvent
            {
                ContactId = dataVerseEvent.PrimaryEntityId,
                Changes = changes,
                UpdatedAt = dataVerseEvent.Timestamp,
                UpdatedBy = GetUserContext(dataVerseEvent)
            });

            // Check for specific business events
            if (_changeDetection.HasFieldChanged(preImage, postImage, "emailaddress1"))
            {
                events.Add(new ContactEmailChangedEvent
                {
                    ContactId = dataVerseEvent.PrimaryEntityId,
                    OldEmail = preImage.GetValueOrDefault("emailaddress1")?.ToString(),
                    NewEmail = postImage.GetValueOrDefault("emailaddress1")?.ToString(),
                    FirstName = postImage.GetValueOrDefault("firstname")?.ToString(),
                    LastName = postImage.GetValueOrDefault("lastname")?.ToString(),
                    ChangedAt = dataVerseEvent.Timestamp,
                    ChangedBy = GetUserContext(dataVerseEvent)
                });

                _logger.LogInformation(
                    "Email changed for contact {ContactId}: {OldEmail} -> {NewEmail}",
                    dataVerseEvent.PrimaryEntityId,
                    preImage.GetValueOrDefault("emailaddress1"),
                    postImage.GetValueOrDefault("emailaddress1"));
            }
        }

        return events;
    }

    private string GetUserContext(DataVerseEvent dataVerseEvent)
    {
        // Extract user context from event metadata
        return dataVerseEvent.UserId ?? "System";
    }
}

4. Event Processor Host

Main processing loop with at-least-once delivery handling:

public class EventProcessorHost : BackgroundService
{
    private readonly EventHubConsumerClient _consumerClient;
    private readonly EventHubProducerClient _producerClient;
    private readonly IEventTranslator _translator;
    private readonly ICheckpointService _checkpointService;
    private readonly ILogger<EventProcessorHost> _logger;
    private readonly IMetrics _metrics;

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        await foreach (var partition in _consumerClient.ReadEventsAsync(
            cancellationToken: stoppingToken))
        {
            try
            {
                await ProcessEventAsync(partition);
                await _checkpointService.UpdateCheckpointAsync(partition);

                _metrics.IncrementCounter("events_processed_total", 
                    new[] { "status:success" });
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, 
                    "Error processing event from partition {PartitionId}",
                    partition.Partition.PartitionId);

                _metrics.IncrementCounter("events_processed_total", 
                    new[] { "status:error" });

                // Implement retry logic here
                await HandleProcessingError(partition, ex);
            }
        }
    }

    private async Task ProcessEventAsync(PartitionEvent partitionEvent)
    {
        var dataVerseEvent = JsonSerializer.Deserialize<DataVerseEvent>(
            partitionEvent.Data.EventBody.ToString());

        // Check for duplicate processing
        if (await _checkpointService.IsDuplicateAsync(dataVerseEvent))
        {
            _logger.LogDebug("Skipping duplicate event {EventId}", 
                dataVerseEvent.EventId);
            return;
        }

        // Translate to business events
        var businessEvents = _translator.TranslateEvent(dataVerseEvent);

        // Publish business events
        var eventBatch = await _producerClient.CreateBatchAsync();

        foreach (var businessEvent in businessEvents)
        {
            var eventData = new EventData(JsonSerializer.SerializeToUtf8Bytes(businessEvent));
            eventData.Properties["EventType"] = businessEvent.GetType().Name;

            if (!eventBatch.TryAdd(eventData))
            {
                // Send current batch and create new one
                await _producerClient.SendAsync(eventBatch);
                eventBatch = await _producerClient.CreateBatchAsync();
                eventBatch.TryAdd(eventData);
            }
        }

        if (eventBatch.Count > 0)
        {
            await _producerClient.SendAsync(eventBatch);
        }

        // Mark as processed
        await _checkpointService.MarkAsProcessedAsync(dataVerseEvent);
    }
}

5. Handling At-Least-Once Delivery

Implement idempotency to handle duplicate messages:

public interface ICheckpointService
{
    Task<bool> IsDuplicateAsync(DataVerseEvent event);
    Task MarkAsProcessedAsync(DataVerseEvent event);
    Task UpdateCheckpointAsync(PartitionEvent partition);
}

public class RedisCheckpointService : ICheckpointService
{
    private readonly IConnectionMultiplexer _redis;
    private readonly TimeSpan _duplicateCheckWindow = TimeSpan.FromHours(24);

    public async Task<bool> IsDuplicateAsync(DataVerseEvent dataVerseEvent)
    {
        var db = _redis.GetDatabase();
        var key = $"processed:{dataVerseEvent.PrimaryEntityId}:{dataVerseEvent.Timestamp:O}";

        return await db.KeyExistsAsync(key);
    }

    public async Task MarkAsProcessedAsync(DataVerseEvent dataVerseEvent)
    {
        var db = _redis.GetDatabase();
        var key = $"processed:{dataVerseEvent.PrimaryEntityId}:{dataVerseEvent.Timestamp:O}";

        await db.StringSetAsync(key, DateTime.UtcNow.ToString("O"), 
            expiry: _duplicateCheckWindow);
    }

    public async Task UpdateCheckpointAsync(PartitionEvent partition)
    {
        var db = _redis.GetDatabase();
        var key = $"checkpoint:{partition.Partition.PartitionId}";

        await db.StringSetAsync(key, partition.Data.SequenceNumber.ToString());
    }
}

Container Configuration

Dockerfile

FROM mcr.microsoft.com/dotnet/aspnet:8.0 AS base
WORKDIR /app
EXPOSE 8080

FROM mcr.microsoft.com/dotnet/sdk:8.0 AS build
WORKDIR /src
COPY ["EventProcessor.csproj", "."]
RUN dotnet restore "EventProcessor.csproj"
COPY . .
RUN dotnet build "EventProcessor.csproj" -c Release -o /app/build

FROM build AS publish
RUN dotnet publish "EventProcessor.csproj" -c Release -o /app/publish

FROM base AS final
WORKDIR /app
COPY --from=publish /app/publish .

# Health check endpoint
HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \
  CMD curl -f http://localhost:8080/health || exit 1

ENTRYPOINT ["dotnet", "EventProcessor.dll"]

Kubernetes Deployment

apiVersion: apps/v1
kind: Deployment
metadata:
  name: dataverse-event-processor
spec:
  replicas: 3
  selector:
    matchLabels:
      app: dataverse-event-processor
  template:
    metadata:
      labels:
        app: dataverse-event-processor
    spec:
      containers:
      - name: event-processor
        image: myregistry.azurecr.io/dataverse-event-processor:latest
        env:
        - name: EventHub__ConsumerConnectionString
          valueFrom:
            secretKeyRef:
              name: eventhub-secrets
              key: consumer-connection-string
        - name: EventHub__ProducerConnectionString
          valueFrom:
            secretKeyRef:
              name: eventhub-secrets
              key: producer-connection-string
        - name: Redis__ConnectionString
          valueFrom:
            secretKeyRef:
              name: redis-secrets
              key: connection-string
        resources:
          requests:
            memory: "256Mi"
            cpu: "250m"
          limits:
            memory: "512Mi"
            cpu: "500m"
        livenessProbe:
          httpGet:
            path: /health
            port: 8080
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /ready
            port: 8080
          initialDelaySeconds: 5
          periodSeconds: 5

Monitoring and Metrics

Key Metrics to Monitor

  1. Event Processing Metrics
  2. events_received_total - Total events received from DataVerse
  3. events_processed_total - Total events successfully processed
  4. events_failed_total - Total events that failed processing
  5. event_processing_duration_seconds - Time to process each event
  6. business_events_published_total - Business events published by type

  7. Lag Metrics

  8. consumer_lag_seconds - Time difference between event creation and processing
  9. partition_lag_count - Number of unprocessed events per partition

  10. Infrastructure Metrics

  11. pod_cpu_usage_percentage - CPU utilization per pod
  12. pod_memory_usage_bytes - Memory usage per pod
  13. eventhub_throughput_bytes - Data throughput rates

Prometheus Metrics Implementation

public class MetricsService : IMetrics
{
    private readonly Counter _eventsProcessed;
    private readonly Histogram _processingDuration;
    private readonly Gauge _consumerLag;

    public MetricsService()
    {
        _eventsProcessed = Metrics.CreateCounter(
            "events_processed_total",
            "Total number of events processed",
            new CounterConfiguration
            {
                LabelNames = new[] { "status", "event_type" }
            });

        _processingDuration = Metrics.CreateHistogram(
            "event_processing_duration_seconds",
            "Event processing duration in seconds",
            new HistogramConfiguration
            {
                Buckets = Histogram.LinearBuckets(0.001, 0.001, 100)
            });

        _consumerLag = Metrics.CreateGauge(
            "consumer_lag_seconds",
            "Consumer lag in seconds",
            new GaugeConfiguration
            {
                LabelNames = new[] { "partition" }
            });
    }

    public void RecordEventProcessed(string status, string eventType)
    {
        _eventsProcessed.WithLabels(status, eventType).Inc();
    }

    public IDisposable MeasureProcessingDuration()
    {
        return _processingDuration.NewTimer();
    }

    public void SetConsumerLag(string partition, double lagSeconds)
    {
        _consumerLag.WithLabels(partition).Set(lagSeconds);
    }
}

Monitoring Dashboard

graph TB
    subgraph "Monitoring Stack"
        A[Prometheus<br/>Metrics Collection] --> B[Grafana<br/>Visualization]
        C[Application<br/>Metrics Endpoint] --> A
        D[Azure Monitor<br/>Event Hub Metrics] --> B
        E[Container<br/>Metrics] --> A
    end

    subgraph "Alert Rules"
        F[High Error Rate]
        G[Consumer Lag > 5min]
        H[Low Throughput]
        I[Pod Restarts]
    end

    B --> F
    B --> G
    B --> H
    B --> I

Best Practices

1. Event Schema Evolution

  • Version your business events