Transforming DataVerse CRUD Events into Business Events¶
Overview¶
This guide demonstrates how to build a containerized .NET application that transforms generic DataVerse CRUD events into meaningful business events. We'll use DataVerse Contact entities as our primary example, showing how to detect specific field changes and publish targeted business events.
Understanding DataVerse Event Publishing¶
DataVerse is Microsoft's cloud-based data platform that provides built-in event publishing capabilities through Service Endpoints. When entities are created, updated, or deleted, DataVerse can automatically publish these events to external systems.
Key DataVerse Concepts¶
Service Endpoints: Configuration points in DataVerse that define where events should be sent. These can target Azure Service Bus, Event Hubs, or webhook endpoints.
Pre/Post Images: DataVerse can include snapshots of entity data before and after an operation, enabling precise change detection at the field level.
Event Context: Rich metadata about the operation including user information, organization context, and correlation identifiers.
DataVerse Event Structure¶
DataVerse events contain standardized information regardless of the entity type:
{
"MessageName": "Update",
"PrimaryEntityName": "contact",
"PrimaryEntityId": "12345678-1234-1234-1234-123456789012",
"UserId": "87654321-4321-4321-4321-210987654321",
"OrganizationId": "11111111-2222-3333-4444-555555555555",
"CorrelationId": "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee",
"RequestId": "ffffffff-gggg-hhhh-iiii-jjjjjjjjjjjj",
"InputParameters": { ... },
"PreEntityImages": { ... },
"PostEntityImages": { ... }
}
Solution Architecture¶
Our solution implements an event transformation pattern where generic DataVerse events are processed and converted into specific business events.
graph TB
A[DataVerse Contact Entity] -->|CRUD Operations| B[DataVerse Service Endpoint]
B -->|Generic Events| C[Azure Event Hub<br/>dataverse-events]
C --> D[Containerized .NET App<br/>Event Processor]
D -->|Business Events| E[Azure Event Hub<br/>business-events]
subgraph "Event Processor Container"
D1[Event Hub Consumer]
D2[Event Parser]
D3[Change Detector]
D4[Business Event Factory]
D5[Event Hub Producer]
D1 --> D2
D2 --> D3
D3 --> D4
D4 --> D5
end
E -->|Contact Updated| F[Generic Projection Systems]
E -->|Email Changed| G[Email Verification Service]
E -->|Contact Created| H[Welcome Campaign Service]
Event Flow Details¶
sequenceDiagram
participant DV as DataVerse
participant EH1 as Event Hub (Input)
participant APP as .NET Processor
participant EH2 as Event Hub (Output)
participant SYS as Downstream Systems
DV->>EH1: Generic CRUD Event
EH1->>APP: Event Message
APP->>APP: Parse & Analyze Changes
APP->>EH2: Business Event(s)
EH2->>SYS: Specific Business Events
Core Implementation¶
Event Models¶
First, define the core event models for both input and output:
// DataVerse Event Structure
public class DataVerseEvent
{
public string MessageName { get; set; }
public string PrimaryEntityName { get; set; }
public Guid PrimaryEntityId { get; set; }
public Guid UserId { get; set; }
public Guid OrganizationId { get; set; }
public Guid CorrelationId { get; set; }
public Guid RequestId { get; set; }
public Dictionary<string, object> InputParameters { get; set; }
public Dictionary<string, EntityImage> PreEntityImages { get; set; }
public Dictionary<string, EntityImage> PostEntityImages { get; set; }
}
public class EntityImage
{
public string LogicalName { get; set; }
public Guid Id { get; set; }
public Dictionary<string, object> Attributes { get; set; }
}
// Business Event Base
public abstract class BusinessEvent
{
public Guid EventId { get; set; } = Guid.NewGuid();
public DateTime EventTime { get; set; } = DateTime.UtcNow;
public string EventType { get; set; }
public Guid CorrelationId { get; set; }
public string Source { get; set; } = "DataVerse.EventProcessor";
}
// Specific Business Events
public class ContactUpdatedEvent : BusinessEvent
{
public ContactUpdatedEvent()
{
EventType = "Contact.Updated";
}
public Guid ContactId { get; set; }
public Dictionary<string, FieldChange> ChangedFields { get; set; }
}
public class ContactEmailChangedEvent : BusinessEvent
{
public ContactEmailChangedEvent()
{
EventType = "Contact.EmailChanged";
}
public Guid ContactId { get; set; }
public string PreviousEmail { get; set; }
public string NewEmail { get; set; }
public string FullName { get; set; }
}
public class FieldChange
{
public object OldValue { get; set; }
public object NewValue { get; set; }
public string FieldType { get; set; }
}
Change Detection Engine¶
The core logic for detecting specific field changes using pre/post images:
public class ContactChangeDetector
{
private readonly ILogger<ContactChangeDetector> _logger;
public ContactChangeDetector(ILogger<ContactChangeDetector> logger)
{
_logger = logger;
}
public ContactChangeAnalysis AnalyzeChanges(DataVerseEvent dataVerseEvent)
{
if (dataVerseEvent.MessageName != "Update" ||
dataVerseEvent.PrimaryEntityName != "contact")
{
return new ContactChangeAnalysis { HasChanges = false };
}
var preImage = GetContactImage(dataVerseEvent.PreEntityImages);
var postImage = GetContactImage(dataVerseEvent.PostEntityImages);
if (preImage == null || postImage == null)
{
_logger.LogWarning("Missing pre or post image for contact {ContactId}",
dataVerseEvent.PrimaryEntityId);
return new ContactChangeAnalysis { HasChanges = false };
}
var analysis = new ContactChangeAnalysis
{
HasChanges = true,
ContactId = dataVerseEvent.PrimaryEntityId,
CorrelationId = dataVerseEvent.CorrelationId,
ChangedFields = DetectFieldChanges(preImage.Attributes, postImage.Attributes)
};
// Detect specific business-relevant changes
analysis.EmailChanged = HasEmailChanged(preImage.Attributes, postImage.Attributes);
analysis.StatusChanged = HasStatusChanged(preImage.Attributes, postImage.Attributes);
return analysis;
}
private EntityImage GetContactImage(Dictionary<string, EntityImage> images)
{
return images?.Values?.FirstOrDefault(img => img.LogicalName == "contact");
}
private Dictionary<string, FieldChange> DetectFieldChanges(
Dictionary<string, object> preAttributes,
Dictionary<string, object> postAttributes)
{
var changes = new Dictionary<string, FieldChange>();
// Check all fields that exist in either pre or post image
var allFields = preAttributes.Keys.Union(postAttributes.Keys);
foreach (var field in allFields)
{
var oldValue = preAttributes.GetValueOrDefault(field);
var newValue = postAttributes.GetValueOrDefault(field);
if (!ValuesEqual(oldValue, newValue))
{
changes[field] = new FieldChange
{
OldValue = oldValue,
NewValue = newValue,
FieldType = DetermineFieldType(newValue ?? oldValue)
};
}
}
return changes;
}
private bool HasEmailChanged(Dictionary<string, object> pre, Dictionary<string, object> post)
{
var oldEmail = pre.GetValueOrDefault("emailaddress1")?.ToString();
var newEmail = post.GetValueOrDefault("emailaddress1")?.ToString();
return !string.Equals(oldEmail, newEmail, StringComparison.OrdinalIgnoreCase);
}
private bool HasStatusChanged(Dictionary<string, object> pre, Dictionary<string, object> post)
{
var oldStatus = pre.GetValueOrDefault("statecode");
var newStatus = post.GetValueOrDefault("statecode");
return !ValuesEqual(oldStatus, newStatus);
}
private bool ValuesEqual(object value1, object value2)
{
if (value1 == null && value2 == null) return true;
if (value1 == null || value2 == null) return false;
return value1.Equals(value2);
}
private string DetermineFieldType(object value)
{
return value?.GetType().Name ?? "Unknown";
}
}
public class ContactChangeAnalysis
{
public bool HasChanges { get; set; }
public Guid ContactId { get; set; }
public Guid CorrelationId { get; set; }
public Dictionary<string, FieldChange> ChangedFields { get; set; } = new();
public bool EmailChanged { get; set; }
public bool StatusChanged { get; set; }
}
Business Event Factory¶
Converts change analysis into specific business events:
public class BusinessEventFactory
{
private readonly ILogger<BusinessEventFactory> _logger;
public BusinessEventFactory(ILogger<BusinessEventFactory> logger)
{
_logger = logger;
}
public IEnumerable<BusinessEvent> CreateEvents(
ContactChangeAnalysis analysis,
DataVerseEvent originalEvent)
{
var events = new List<BusinessEvent>();
// Always create a generic update event for projection systems
events.Add(CreateContactUpdatedEvent(analysis, originalEvent));
// Create specific business events based on what changed
if (analysis.EmailChanged)
{
var emailEvent = CreateEmailChangedEvent(analysis, originalEvent);
if (emailEvent != null)
{
events.Add(emailEvent);
}
}
return events;
}
private ContactUpdatedEvent CreateContactUpdatedEvent(
ContactChangeAnalysis analysis,
DataVerseEvent originalEvent)
{
return new ContactUpdatedEvent
{
ContactId = analysis.ContactId,
CorrelationId = analysis.CorrelationId,
ChangedFields = analysis.ChangedFields
};
}
private ContactEmailChangedEvent CreateEmailChangedEvent(
ContactChangeAnalysis analysis,
DataVerseEvent originalEvent)
{
var preImage = originalEvent.PreEntityImages?.Values?
.FirstOrDefault(img => img.LogicalName == "contact");
var postImage = originalEvent.PostEntityImages?.Values?
.FirstOrDefault(img => img.LogicalName == "contact");
if (preImage == null || postImage == null)
{
_logger.LogWarning("Cannot create email changed event - missing image data");
return null;
}
var previousEmail = preImage.Attributes.GetValueOrDefault("emailaddress1")?.ToString();
var newEmail = postImage.Attributes.GetValueOrDefault("emailaddress1")?.ToString();
var fullName = postImage.Attributes.GetValueOrDefault("fullname")?.ToString();
return new ContactEmailChangedEvent
{
ContactId = analysis.ContactId,
CorrelationId = analysis.CorrelationId,
PreviousEmail = previousEmail,
NewEmail = newEmail,
FullName = fullName
};
}
}
Event Hub Consumer Service¶
Implements the at-least-once delivery pattern with proper error handling:
public class DataVerseEventProcessor : BackgroundService
{
private readonly EventHubConsumerClient _consumerClient;
private readonly EventHubProducerClient _producerClient;
private readonly ContactChangeDetector _changeDetector;
private readonly BusinessEventFactory _eventFactory;
private readonly ILogger<DataVerseEventProcessor> _logger;
private readonly IServiceProvider _serviceProvider;
public DataVerseEventProcessor(
EventHubConsumerClient consumerClient,
EventHubProducerClient producerClient,
ContactChangeDetector changeDetector,
BusinessEventFactory eventFactory,
ILogger<DataVerseEventProcessor> logger,
IServiceProvider serviceProvider)
{
_consumerClient = consumerClient;
_producerClient = producerClient;
_changeDetector = changeDetector;
_eventFactory = eventFactory;
_logger = logger;
_serviceProvider = serviceProvider;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await foreach (PartitionEvent partitionEvent in _consumerClient.ReadEventsAsync(stoppingToken))
{
try
{
await ProcessEvent(partitionEvent, stoppingToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing event {EventId}",
partitionEvent.Data?.MessageId);
// Implement dead letter or retry logic here
await HandleProcessingError(partitionEvent, ex);
}
}
}
private async Task ProcessEvent(PartitionEvent partitionEvent, CancellationToken cancellationToken)
{
using var scope = _serviceProvider.CreateScope();
var metricsCollector = scope.ServiceProvider.GetRequiredService<IMetricsCollector>();
var stopwatch = Stopwatch.StartNew();
try
{
// Parse the DataVerse event
var dataVerseEvent = JsonSerializer.Deserialize<DataVerseEvent>(
partitionEvent.Data.EventBody.ToArray());
metricsCollector.IncrementCounter("events.received",
("entity", dataVerseEvent.PrimaryEntityName),
("operation", dataVerseEvent.MessageName));
// Analyze changes
var analysis = _changeDetector.AnalyzeChanges(dataVerseEvent);
if (!analysis.HasChanges)
{
_logger.LogDebug("No relevant changes detected for contact {ContactId}",
dataVerseEvent.PrimaryEntityId);
return;
}
// Create business events
var businessEvents = _eventFactory.CreateEvents(analysis, dataVerseEvent);
// Publish business events
await PublishBusinessEvents(businessEvents, cancellationToken);
metricsCollector.IncrementCounter("events.processed.success");
metricsCollector.RecordValue("events.processing.duration", stopwatch.ElapsedMilliseconds);
_logger.LogInformation("Successfully processed DataVerse event for contact {ContactId}, " +
"generated {EventCount} business events",
analysis.ContactId, businessEvents.Count());
}
catch (JsonException ex)
{
_logger.LogError(ex, "Failed to deserialize DataVerse event");
metricsCollector.IncrementCounter("events.processed.error", ("error_type", "deserialization"));
throw;
}
catch (Exception ex)
{
metricsCollector.IncrementCounter("events.processed.error", ("error_type", "processing"));
throw;
}
}
private async Task PublishBusinessEvents(IEnumerable<BusinessEvent> events, CancellationToken cancellationToken)
{
var eventDataBatch = await _producerClient.CreateBatchAsync(cancellationToken);
foreach (var businessEvent in events)
{
var eventJson = JsonSerializer.Serialize(businessEvent);
var eventData = new EventData(Encoding.UTF8.GetBytes(eventJson))
{
MessageId = businessEvent.EventId.ToString(),
CorrelationId = businessEvent.CorrelationId.ToString(),
ContentType = "application/json"
};
// Add event type as a property for routing
eventData.Properties.Add("EventType", businessEvent.EventType);
if (!eventDataBatch.TryAdd(eventData))
{
// Batch is full, send it and create a new one
await _producerClient.SendAsync(eventDataBatch, cancellationToken);
eventDataBatch = await _producerClient.CreateBatchAsync(cancellationToken);
if (!eventDataBatch.TryAdd(eventData))
{
throw new InvalidOperationException("Event too large for batch");
}
}
}
if (eventDataBatch.Count > 0)
{
await _producerClient.SendAsync(eventDataBatch, cancellationToken);
}
}
private async Task HandleProcessingError(PartitionEvent partitionEvent, Exception exception)
{
// Implement your error handling strategy here
// Options: Dead letter queue, retry with exponential backoff, alerting
_logger.LogError(exception, "Failed to process event {EventId}, implementing error handling",
partitionEvent.Data?.MessageId);
// Example: Send to dead letter queue after certain retry attempts
// await _deadLetterService.SendAsync(partitionEvent.Data);
}
}
Handling At-Least-Once Delivery¶
Event Hub guarantees at-least-once delivery, meaning duplicate events are possible. Implement idempotency to handle this:
public class IdempotencyService
{
private readonly IMemoryCache _cache;
private readonly ILogger<IdempotencyService> _logger;
public IdempotencyService(IMemoryCache cache, ILogger<IdempotencyService> logger)
{
_cache = cache;
_logger = logger;
}
public async Task<bool> IsProcessedAsync(string eventId, string operationType)
{
var key = $"processed:{operationType}:{eventId}";
if (_cache.TryGetValue(key, out _))
{
_logger.LogDebug("Event {EventId} already processed for operation {Operation}",
eventId, operationType);
return true;
}
return false;
}
public async Task MarkAsProcessedAsync(string eventId, string operationType)
{
var key = $"processed:{operationType}:{eventId}";
// Cache for 24 hours to handle duplicate delivery within reasonable window
_cache.Set(key, true, TimeSpan.FromHours(24));
_logger.LogDebug("Marked event {EventId} as processed for operation {Operation}",
eventId, operationType);
}
}
// Usage in event processor
private async Task ProcessEvent(PartitionEvent partitionEvent, CancellationToken cancellationToken)
{
var eventId = partitionEvent.Data.MessageId ?? Guid.NewGuid().ToString();
if (await _idempotencyService.IsProcessedAsync(eventId, "contact_transformation"))
{
_logger.LogDebug("Skipping duplicate event {EventId}", eventId);
return;
}
// ... process event ...
await _idempotencyService.MarkAsProcessedAsync(eventId, "contact_transformation");
}
Container Configuration¶
Dockerfile¶
FROM mcr.microsoft.com/dotnet/aspnet:8.0 AS base
WORKDIR /app
EXPOSE 80
FROM mcr.microsoft.com/dotnet/sdk:8.0 AS build
WORKDIR /src
COPY ["EventProcessor.csproj", "."]
RUN dotnet restore "EventProcessor.csproj"
COPY . .
WORKDIR "/src"
RUN dotnet build "EventProcessor.csproj" -c Release -o /app/build
FROM build AS publish
RUN dotnet publish "EventProcessor.csproj" -c Release -o /app/publish
FROM base AS final
WORKDIR /app
COPY --from=publish /app/publish .
ENTRYPOINT ["dotnet", "EventProcessor.dll"]
appsettings.json¶
{
"EventHub": {
"InputConnection": "#{EventHub.Input.ConnectionString}#",
"InputHubName": "dataverse-events",
"OutputConnection": "#{EventHub.Output.ConnectionString}#",
"OutputHubName": "business-events",
"ConsumerGroup": "event-processor"
},
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft": "Warning",
"Microsoft.Hosting.Lifetime": "Information"
}
},
"HealthChecks": {
"Enabled": true,
"Port": 8080
}
}
Service Registration¶
public class Program
{
public static void Main(string[] args)
{
var builder = WebApplication.CreateBuilder(args);
// Event Hub clients
builder.Services.AddSingleton(serviceProvider =>
{
var configuration = serviceProvider.GetRequiredService<IConfiguration>();
var connectionString = configuration["EventHub:InputConnection"];
var hubName = configuration["EventHub:InputHubName"];
var consumerGroup = configuration["EventHub:ConsumerGroup"];
return new EventHubConsumerClient(consumerGroup, connectionString, hubName);
});
builder.Services.AddSingleton(serviceProvider =>
{
var configuration = serviceProvider.GetRequiredService<IConfiguration>();
var connectionString = configuration["EventHub:OutputConnection"];
var hubName = configuration["EventHub:OutputHubName"];
return new EventHubProducerClient(connectionString, hubName);
});
// Application services
builder.Services.AddScoped<ContactChangeDetector>();
builder.Services.AddScoped<BusinessEventFactory>();
builder.Services.AddSingleton<IdempotencyService>();
builder.Services.AddScoped<IMetricsCollector, MetricsCollector>();
// Background service
builder.Services.AddHostedService<DataVerseEventProcessor>();
// Health checks
builder.Services.AddHealthChecks()
.AddCheck<EventHubHealthCheck>("eventhub");
// Memory cache for idempotency
builder.Services.AddMemoryCache();
var app = builder.Build();
app.MapHealthChecks("/health");
app.Run();
}
}
Monitoring and Metrics¶
Implement comprehensive monitoring to ensure system reliability:
public interface IMetricsCollector
{
void IncrementCounter(string name, params (string key, string value)[] tags);
void RecordValue(string name, double value, params (string key, string value)[] tags);
}
public class MetricsCollector : IMetricsCollector
{
private readonly ILogger<MetricsCollector> _logger;
public MetricsCollector(ILogger<MetricsCollector> logger)
{
_logger = logger;
}
public void IncrementCounter(string name, params (string key, string value)[] tags)
{
// Integration with your metrics system (Application Insights, Prometheus, etc.)
var tagString = string.Join(", ", tags.Select(t => $"{t.key}={t.value}"));
_logger.LogInformation("METRIC: Counter {Name} incremented. Tags: {Tags}", name, tagString);
}
public void RecordValue(string name, double value, params (string key, string value)[] tags)
{
var tagString = string.Join(", ", tags.Select(t => $"{t.key}={t.value}"));
_logger.LogInformation("METRIC: Value {Name} = {Value}. Tags: {Tags}", name, value, tagString);
}
}
Critical Metrics to Monitor¶
Event Processing Metrics:
events.received- Total events received from DataVerse- Why Important: Baseline measurement of system load and DataVerse activity
-
Tags: entity type, operation type
-
events.processed.success- Successfully processed events - Why Important: Measures processing success rate and system health
-
Tags: entity type, business event type generated
-
events.processed.error- Processing errors by type - Why Important: Early detection of parsing, connectivity, or logic errors
- Tags: error_type (deserialization, processing, publishing)
Business Logic Metrics:
business_events.generated- Count of business events created by type- Why Important: Tracks business logic effectiveness and downstream impact
-
Tags: event_type (Contact.Updated, Contact.EmailChanged)
-
field_changes.detected- Specific field changes detected - Why Important: Validates change detection logic and identifies hot fields
- Tags: field_name, entity_type
Performance Metrics:
events.processing.duration- Event processing latency (milliseconds)- Why Important: Ensures processing keeps pace with incoming events
-
Threshold: Alert if p95 > 5000ms
-
eventhub.lag- Consumer lag behind Event Hub - Why Important: Prevents message loss due to retention limits
- Threshold: Alert if lag > 100,000 messages
System Health Metrics:
duplicate_events.detected- Duplicate events caught by idempotency- Why Important: Validates at-least-once delivery handling
-
Normal Range: 1-5% of total events
-
memory.usage- Application memory consumption - Why Important: Prevents container out-of-memory conditions
- Threshold: Alert if > 80% of container limit
Business Event Schema Design¶
Design your business events with versioning and backward compatibility:
public abstract class BusinessEvent
{
public string SchemaVersion { get; set; } = "1.0";
public Guid EventId { get; set; } = Guid.NewGuid();
public DateTime EventTime { get; set; } = DateTime.UtcNow;
public string EventType { get; set; }
public Guid CorrelationId { get; set; }
public string Source { get; set; } = "DataVerse.EventProcessor";
// Add common metadata for all business events
public EventMetadata Metadata { get; set; } = new EventMetadata();
}
public class EventMetadata
{
public string TenantId { get; set; }
public string UserId { get; set; }
public Dictionary<string, string> CustomProperties { get; set; } = new();
}
// Version 1.0 of Contact Email Changed Event
public class ContactEmailChangedEvent : BusinessEvent
{
public ContactEmailChangedEvent()
{
EventType = "Contact.EmailChanged";
SchemaVersion = "1.0";
}
public Guid ContactId { get; set; }
public string PreviousEmail { get; set; }
public string NewEmail { get; set; }
public string FullName { get; set; }
// Consider future expansion
public ContactVerificationStatus VerificationStatus { get; set; }
}
public enum ContactVerificationStatus
{
Unknown,
Verified,
Pending,
Failed
}
Deployment Considerations¶
Azure Container Apps Environment¶
# container-app.yaml
apiVersion: containerapps.azure.com/v1beta2
kind: ContainerApp
metadata:
name: dataverse-event-processor
spec:
managedEnvironmentId: /subscriptions/{subscription}/resourceGroups/{rg}/providers/Microsoft.App/managedEnvironments/{environment}
configuration:
secrets:
- name: eventhub-input-connection
value: "{connection-string}"
- name: eventhub-output-connection
value: "{connection-string}"
ingress:
external: false
targetPort: 8080
traffic:
- weight: 100
latestRevision: true
template:
containers:
- image: myregistry.azurecr.io/dataverse-event-processor:latest
name: event-processor
env:
- name: EventHub__InputConnection
secretRef: eventhub-input-connection
- name: EventHub__OutputConnection
secretRef: eventhub-output-connection
resources:
cpu: 1
memory: 2Gi
probes:
- type: liveness
httpGet:
path: /health
port: 8080
initialDelaySeconds: 30
periodSeconds: 30
scale:
minReplicas: 2
maxReplicas: 10
rules:
- name: eventhub-scaling
custom:
type: azure-eventhub
metadata:
consumerGroup: event-processor
connectionFromEnv: EventHub__InputConnection
eventHubName: dataverse-events
threshold: '100'
Health Checks¶
public class EventHubHealthCheck : IHealthCheck
{
private readonly EventHubConsumerClient _consumerClient;
private readonly EventHubProducerClient _producerClient;
public EventHubHealthCheck(
EventHubConsumerClient consumerClient,
EventHubProducerClient producerClient)
{
_consumerClient = consumerClient;
_producerClient = producerClient;
}
public async Task<HealthCheckResult> CheckHealthAsync(
HealthCheckContext context,
CancellationToken cancellationToken = default)
{
try
{
// Test connectivity to both Event Hubs
var inputProperties = await _consumerClient.GetEventHubPropertiesAsync(cancellationToken);
var outputProperties = await _producerClient.GetEventHubPropertiesAsync(cancellationToken);
var data = new Dictionary<string, object>
{
["input_partitions"] = inputProperties.PartitionIds.Length,
["output_partitions"] = outputProperties.PartitionIds.Length
};
return HealthCheckResult.Healthy("Event Hub connections are healthy", data);
}
catch (Exception ex)
{
return HealthCheckResult.Unhealthy("Event Hub connectivity issues", ex);
}
}
}
Best Practices and Recommendations¶
Error Handling Strategy¶
- Implement Circuit Breaker Pattern: Prevent cascading failures when downstream systems are unavailable
- Dead Letter Queues: Route persistently failing messages for manual investigation
- Exponential Backoff: Retry transient failures with increasing delays
- Correlation IDs: Maintain traceability across all systems
Performance Optimization¶
- Batch Processing: Process multiple events together when possible
- Async/Await: Use asynchronous patterns throughout the pipeline
- Memory Management: Dispose of resources properly to prevent memory leaks
- Connection Pooling: Reuse Event Hub connections across operations
Security Considerations¶
- Managed Identity: Use Azure Managed Identity for Event Hub authentication
- Secrets Management: Store connection strings in Azure Key Vault
- Network Isolation: Deploy containers in private networks with service endpoints
- Data Classification: Ensure sensitive contact data is properly classified and protected
Testing Strategy¶
[Test]
public async Task Should_Detect_Email_Change_From_PrePost_Images()
{
// Arrange
var dataVerseEvent = new DataVerseEvent
{
MessageName = "Update",
PrimaryEntityName = "contact",
PrimaryEntityId = Guid.NewGuid(),
PreEntityImages = new Dictionary<string, EntityImage>
{
["preimage"] = new EntityImage
{
LogicalName = "contact",
Attributes = new Dictionary<string, object>
{
["emailaddress1"] = "old@example.com",
["fullname"] = "John Doe"
}
}
},
PostEntityImages = new Dictionary<string, EntityImage>
{
["postimage"] = new EntityImage
{
LogicalName = "contact",
Attributes = new Dictionary<string, object>
{
["emailaddress1"] = "new@example.com",
["fullname"] = "John Doe"
}
}
}
};
var detector = new ContactChangeDetector(Mock.Of<ILogger<ContactChangeDetector>>());
// Act
var analysis = detector.AnalyzeChanges(dataVerseEvent);
// Assert
Assert.True(analysis.HasChanges);
Assert.True(analysis.EmailChanged);
Assert.Contains("emailaddress1", analysis.ChangedFields.Keys);
Assert.Equal("old@example.com", analysis.ChangedFields["emailaddress1"].OldValue);
Assert.Equal("new@example.com", analysis.ChangedFields["emailaddress1"].NewValue);
}
Conclusion¶
This architecture provides a robust, scalable solution for transforming generic DataVerse CRUD events into meaningful business events. The containerized approach ensures easy deployment and scaling, while the comprehensive monitoring and error handling strategies provide production-ready reliability.
Key benefits of this approach:
- Decoupling: Downstream systems receive focused business events rather than raw DataVerse data
- Scalability: Container-based architecture scales automatically based on Event Hub message volume
- Reliability: At-least-once processing with idempotency ensures no message loss
- Observability: Comprehensive metrics provide insight into system health and business activity
- Maintainability: Clear separation of concerns makes the system easy to extend and modify
The solution transforms the generic nature of DataVerse events into actionable business intelligence, enabling downstream systems to react appropriately to specific business scenarios while maintaining a unified event-driven architecture.