DataVerse Event Processing: From CRUD to Business Events¶
Overview¶
This guide explains how to transform generic DataVerse CRUD (Create, Read, Update, Delete) events into meaningful business events using a containerized .NET application. We'll focus on Contact entities as our primary example, demonstrating how to detect specific field changes and emit targeted business events.
Architecture Overview¶
Our event-driven architecture follows this flow:
graph LR
A[DataVerse] -->|CRUD Events| B[Service Endpoint]
B -->|Publishes| C[Azure Event Hub<br/>Generic Events]
C -->|Consumes| D[.NET Container App<br/>Event Processor]
D -->|Publishes| E[Azure Event Hub<br/>Business Events]
E -->|Consumes| F[Downstream Systems]
style A fill:#f9f,stroke:#333,stroke-width:4px
style D fill:#9ff,stroke:#333,stroke-width:4px
style E fill:#ff9,stroke:#333,stroke-width:4px
Understanding DataVerse Event Publishing¶
What is DataVerse?¶
Microsoft DataVerse is a cloud-based storage platform that provides a secure and scalable database for business applications. It's the underlying data platform for Microsoft Power Platform and Dynamics 365.
How DataVerse Publishes Events¶
DataVerse uses Service Endpoints to publish events when data changes occur. Here's how it works:
- Plugin Registration: Administrators register plugins that trigger on specific entity operations (Create, Update, Delete)
- Service Endpoint Configuration: These plugins are configured to send messages to Azure Service Bus or Event Hub
- Event Payload: DataVerse sends a structured message containing:
- Entity metadata
- Operation type
- Pre and Post images (snapshots of data before and after the change)
- User context
Event Structure¶
{
"MessageName": "Update",
"PrimaryEntityName": "contact",
"PrimaryEntityId": "123e4567-e89b-12d3-a456-426614174000",
"BusinessUnit": "Default",
"PreEntityImages": [{
"key": "PreImage",
"value": {
"contactid": "123e4567-e89b-12d3-a456-426614174000",
"emailaddress1": "john.doe@oldcompany.com",
"firstname": "John",
"lastname": "Doe"
}
}],
"PostEntityImages": [{
"key": "PostImage",
"value": {
"contactid": "123e4567-e89b-12d3-a456-426614174000",
"emailaddress1": "john.doe@newcompany.com",
"firstname": "John",
"lastname": "Doe"
}
}]
}
Event Processing Architecture¶
Component Breakdown¶
graph TB
subgraph "Azure Event Hub - Generic Events"
EH1[Event Hub<br/>Partitioned Stream]
end
subgraph "Container Environment (AKS/Container Apps)"
subgraph "Event Processor Pod"
EP[Event Processor<br/>.NET 8]
CD[Change Detector]
ET[Event Translator]
EP --> CD
CD --> ET
end
end
subgraph "Azure Event Hub - Business Events"
EH2[Event Hub<br/>Partitioned Stream]
end
EH1 -->|Consume| EP
ET -->|Publish| EH2
Key Components¶
- Event Processor: Main orchestrator that consumes from Event Hub
- Change Detector: Analyzes pre/post images to identify specific field changes
- Event Translator: Transforms generic CRUD events into business events
Implementation Guide¶
1. Event Models¶
First, define your event models:
// Generic DataVerse event
public class DataVerseEvent
{
public string MessageName { get; set; }
public string PrimaryEntityName { get; set; }
public Guid PrimaryEntityId { get; set; }
public Dictionary<string, object> PreEntityImages { get; set; }
public Dictionary<string, object> PostEntityImages { get; set; }
public DateTime Timestamp { get; set; }
}
// Business event for email changes
public class ContactEmailChangedEvent
{
public Guid ContactId { get; set; }
public string OldEmail { get; set; }
public string NewEmail { get; set; }
public string FirstName { get; set; }
public string LastName { get; set; }
public DateTime ChangedAt { get; set; }
public string ChangedBy { get; set; }
// Event metadata
public string EventType => "ContactEmailChanged";
public string EventVersion => "1.0";
public Guid EventId { get; set; } = Guid.NewGuid();
}
// Generic contact update event for projection systems
public class ContactUpdatedEvent
{
public Guid ContactId { get; set; }
public Dictionary<string, FieldChange> Changes { get; set; }
public DateTime UpdatedAt { get; set; }
public string UpdatedBy { get; set; }
public string EventType => "ContactUpdated";
public string EventVersion => "1.0";
public Guid EventId { get; set; } = Guid.NewGuid();
}
public class FieldChange
{
public string FieldName { get; set; }
public object OldValue { get; set; }
public object NewValue { get; set; }
}
2. Change Detection Service¶
Implement a service to detect specific field changes:
public interface IChangeDetectionService
{
Dictionary<string, FieldChange> DetectChanges(
Dictionary<string, object> preImage,
Dictionary<string, object> postImage);
bool HasFieldChanged(
Dictionary<string, object> preImage,
Dictionary<string, object> postImage,
string fieldName);
}
public class ChangeDetectionService : IChangeDetectionService
{
public Dictionary<string, FieldChange> DetectChanges(
Dictionary<string, object> preImage,
Dictionary<string, object> postImage)
{
var changes = new Dictionary<string, FieldChange>();
// Get all unique field names
var allFields = preImage.Keys.Union(postImage.Keys);
foreach (var field in allFields)
{
var preValue = preImage.ContainsKey(field) ? preImage[field] : null;
var postValue = postImage.ContainsKey(field) ? postImage[field] : null;
if (!AreValuesEqual(preValue, postValue))
{
changes[field] = new FieldChange
{
FieldName = field,
OldValue = preValue,
NewValue = postValue
};
}
}
return changes;
}
public bool HasFieldChanged(
Dictionary<string, object> preImage,
Dictionary<string, object> postImage,
string fieldName)
{
var preValue = preImage.ContainsKey(fieldName) ? preImage[fieldName] : null;
var postValue = postImage.ContainsKey(fieldName) ? postImage[fieldName] : null;
return !AreValuesEqual(preValue, postValue);
}
private bool AreValuesEqual(object value1, object value2)
{
if (value1 == null && value2 == null) return true;
if (value1 == null || value2 == null) return false;
// Handle special cases for DataVerse types
if (value1 is DateTime dt1 && value2 is DateTime dt2)
{
return dt1.ToUniversalTime() == dt2.ToUniversalTime();
}
return value1.Equals(value2);
}
}
3. Event Translation Service¶
Transform generic events into business events:
public interface IEventTranslator
{
IEnumerable<object> TranslateEvent(DataVerseEvent dataVerseEvent);
}
public class ContactEventTranslator : IEventTranslator
{
private readonly IChangeDetectionService _changeDetection;
private readonly ILogger<ContactEventTranslator> _logger;
public ContactEventTranslator(
IChangeDetectionService changeDetection,
ILogger<ContactEventTranslator> logger)
{
_changeDetection = changeDetection;
_logger = logger;
}
public IEnumerable<object> TranslateEvent(DataVerseEvent dataVerseEvent)
{
var events = new List<object>();
if (dataVerseEvent.PrimaryEntityName != "contact" ||
dataVerseEvent.MessageName != "Update")
{
return events;
}
var preImage = dataVerseEvent.PreEntityImages;
var postImage = dataVerseEvent.PostEntityImages;
// Detect all changes for generic update event
var changes = _changeDetection.DetectChanges(preImage, postImage);
if (changes.Any())
{
// Always emit generic update event
events.Add(new ContactUpdatedEvent
{
ContactId = dataVerseEvent.PrimaryEntityId,
Changes = changes,
UpdatedAt = dataVerseEvent.Timestamp,
UpdatedBy = GetUserContext(dataVerseEvent)
});
// Check for specific business events
if (_changeDetection.HasFieldChanged(preImage, postImage, "emailaddress1"))
{
events.Add(new ContactEmailChangedEvent
{
ContactId = dataVerseEvent.PrimaryEntityId,
OldEmail = preImage.GetValueOrDefault("emailaddress1")?.ToString(),
NewEmail = postImage.GetValueOrDefault("emailaddress1")?.ToString(),
FirstName = postImage.GetValueOrDefault("firstname")?.ToString(),
LastName = postImage.GetValueOrDefault("lastname")?.ToString(),
ChangedAt = dataVerseEvent.Timestamp,
ChangedBy = GetUserContext(dataVerseEvent)
});
_logger.LogInformation(
"Email changed for contact {ContactId}: {OldEmail} -> {NewEmail}",
dataVerseEvent.PrimaryEntityId,
preImage.GetValueOrDefault("emailaddress1"),
postImage.GetValueOrDefault("emailaddress1"));
}
}
return events;
}
private string GetUserContext(DataVerseEvent dataVerseEvent)
{
// Extract user context from event metadata
return dataVerseEvent.UserId ?? "System";
}
}
4. Event Processor Host¶
Main processing loop with at-least-once delivery handling:
public class EventProcessorHost : BackgroundService
{
private readonly EventHubConsumerClient _consumerClient;
private readonly EventHubProducerClient _producerClient;
private readonly IEventTranslator _translator;
private readonly ICheckpointService _checkpointService;
private readonly ILogger<EventProcessorHost> _logger;
private readonly IMetrics _metrics;
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await foreach (var partition in _consumerClient.ReadEventsAsync(
cancellationToken: stoppingToken))
{
try
{
await ProcessEventAsync(partition);
await _checkpointService.UpdateCheckpointAsync(partition);
_metrics.IncrementCounter("events_processed_total",
new[] { "status:success" });
}
catch (Exception ex)
{
_logger.LogError(ex,
"Error processing event from partition {PartitionId}",
partition.Partition.PartitionId);
_metrics.IncrementCounter("events_processed_total",
new[] { "status:error" });
// Implement retry logic here
await HandleProcessingError(partition, ex);
}
}
}
private async Task ProcessEventAsync(PartitionEvent partitionEvent)
{
var dataVerseEvent = JsonSerializer.Deserialize<DataVerseEvent>(
partitionEvent.Data.EventBody.ToString());
// Check for duplicate processing
if (await _checkpointService.IsDuplicateAsync(dataVerseEvent))
{
_logger.LogDebug("Skipping duplicate event {EventId}",
dataVerseEvent.EventId);
return;
}
// Translate to business events
var businessEvents = _translator.TranslateEvent(dataVerseEvent);
// Publish business events
var eventBatch = await _producerClient.CreateBatchAsync();
foreach (var businessEvent in businessEvents)
{
var eventData = new EventData(JsonSerializer.SerializeToUtf8Bytes(businessEvent));
eventData.Properties["EventType"] = businessEvent.GetType().Name;
if (!eventBatch.TryAdd(eventData))
{
// Send current batch and create new one
await _producerClient.SendAsync(eventBatch);
eventBatch = await _producerClient.CreateBatchAsync();
eventBatch.TryAdd(eventData);
}
}
if (eventBatch.Count > 0)
{
await _producerClient.SendAsync(eventBatch);
}
// Mark as processed
await _checkpointService.MarkAsProcessedAsync(dataVerseEvent);
}
}
5. Handling At-Least-Once Delivery¶
Implement idempotency to handle duplicate messages:
public interface ICheckpointService
{
Task<bool> IsDuplicateAsync(DataVerseEvent event);
Task MarkAsProcessedAsync(DataVerseEvent event);
Task UpdateCheckpointAsync(PartitionEvent partition);
}
public class RedisCheckpointService : ICheckpointService
{
private readonly IConnectionMultiplexer _redis;
private readonly TimeSpan _duplicateCheckWindow = TimeSpan.FromHours(24);
public async Task<bool> IsDuplicateAsync(DataVerseEvent dataVerseEvent)
{
var db = _redis.GetDatabase();
var key = $"processed:{dataVerseEvent.PrimaryEntityId}:{dataVerseEvent.Timestamp:O}";
return await db.KeyExistsAsync(key);
}
public async Task MarkAsProcessedAsync(DataVerseEvent dataVerseEvent)
{
var db = _redis.GetDatabase();
var key = $"processed:{dataVerseEvent.PrimaryEntityId}:{dataVerseEvent.Timestamp:O}";
await db.StringSetAsync(key, DateTime.UtcNow.ToString("O"),
expiry: _duplicateCheckWindow);
}
public async Task UpdateCheckpointAsync(PartitionEvent partition)
{
var db = _redis.GetDatabase();
var key = $"checkpoint:{partition.Partition.PartitionId}";
await db.StringSetAsync(key, partition.Data.SequenceNumber.ToString());
}
}
Container Configuration¶
Dockerfile¶
FROM mcr.microsoft.com/dotnet/aspnet:8.0 AS base
WORKDIR /app
EXPOSE 8080
FROM mcr.microsoft.com/dotnet/sdk:8.0 AS build
WORKDIR /src
COPY ["EventProcessor.csproj", "."]
RUN dotnet restore "EventProcessor.csproj"
COPY . .
RUN dotnet build "EventProcessor.csproj" -c Release -o /app/build
FROM build AS publish
RUN dotnet publish "EventProcessor.csproj" -c Release -o /app/publish
FROM base AS final
WORKDIR /app
COPY --from=publish /app/publish .
# Health check endpoint
HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \
CMD curl -f http://localhost:8080/health || exit 1
ENTRYPOINT ["dotnet", "EventProcessor.dll"]
Kubernetes Deployment¶
apiVersion: apps/v1
kind: Deployment
metadata:
name: dataverse-event-processor
spec:
replicas: 3
selector:
matchLabels:
app: dataverse-event-processor
template:
metadata:
labels:
app: dataverse-event-processor
spec:
containers:
- name: event-processor
image: myregistry.azurecr.io/dataverse-event-processor:latest
env:
- name: EventHub__ConsumerConnectionString
valueFrom:
secretKeyRef:
name: eventhub-secrets
key: consumer-connection-string
- name: EventHub__ProducerConnectionString
valueFrom:
secretKeyRef:
name: eventhub-secrets
key: producer-connection-string
- name: Redis__ConnectionString
valueFrom:
secretKeyRef:
name: redis-secrets
key: connection-string
resources:
requests:
memory: "256Mi"
cpu: "250m"
limits:
memory: "512Mi"
cpu: "500m"
livenessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: 8080
initialDelaySeconds: 5
periodSeconds: 5
Monitoring and Metrics¶
Key Metrics to Monitor¶
- Event Processing Metrics
events_received_total- Total events received from DataVerseevents_processed_total- Total events successfully processedevents_failed_total- Total events that failed processingevent_processing_duration_seconds- Time to process each event-
business_events_published_total- Business events published by type -
Lag Metrics
consumer_lag_seconds- Time difference between event creation and processing-
partition_lag_count- Number of unprocessed events per partition -
Infrastructure Metrics
pod_cpu_usage_percentage- CPU utilization per podpod_memory_usage_bytes- Memory usage per podeventhub_throughput_bytes- Data throughput rates
Prometheus Metrics Implementation¶
public class MetricsService : IMetrics
{
private readonly Counter _eventsProcessed;
private readonly Histogram _processingDuration;
private readonly Gauge _consumerLag;
public MetricsService()
{
_eventsProcessed = Metrics.CreateCounter(
"events_processed_total",
"Total number of events processed",
new CounterConfiguration
{
LabelNames = new[] { "status", "event_type" }
});
_processingDuration = Metrics.CreateHistogram(
"event_processing_duration_seconds",
"Event processing duration in seconds",
new HistogramConfiguration
{
Buckets = Histogram.LinearBuckets(0.001, 0.001, 100)
});
_consumerLag = Metrics.CreateGauge(
"consumer_lag_seconds",
"Consumer lag in seconds",
new GaugeConfiguration
{
LabelNames = new[] { "partition" }
});
}
public void RecordEventProcessed(string status, string eventType)
{
_eventsProcessed.WithLabels(status, eventType).Inc();
}
public IDisposable MeasureProcessingDuration()
{
return _processingDuration.NewTimer();
}
public void SetConsumerLag(string partition, double lagSeconds)
{
_consumerLag.WithLabels(partition).Set(lagSeconds);
}
}
Monitoring Dashboard¶
graph TB
subgraph "Monitoring Stack"
A[Prometheus<br/>Metrics Collection] --> B[Grafana<br/>Visualization]
C[Application<br/>Metrics Endpoint] --> A
D[Azure Monitor<br/>Event Hub Metrics] --> B
E[Container<br/>Metrics] --> A
end
subgraph "Alert Rules"
F[High Error Rate]
G[Consumer Lag > 5min]
H[Low Throughput]
I[Pod Restarts]
end
B --> F
B --> G
B --> H
B --> I
Best Practices¶
1. Event Schema Evolution¶
- Version your business events