Resilience and Retry Policies¶
using Polly;
using Polly.Extensions.Http;
namespace DataverseEventTranslator.Infrastructure.Resilience
{
public class ResilienceService
{
private readonly IAsyncPolicy _retryPolicy;
private readonly IAsyncPolicy _circuitBreakerPolicy;
private readonly ILogger<ResilienceService> _logger;
public ResilienceService(ILogger<ResilienceService> logger)
{
_logger = logger;
// Retry policy with exponential backoff
_retryPolicy = Policy
.Handle<HttpRequestException>()
.Or<TaskCanceledException>()
.Or<TimeoutException>()
.WaitAndRetryAsync(
retryCount: 3,
sleepDurationProvider: retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)),
onRetry: (outcome, timespan, retryCount, context) =>
{
_logger.LogWarning("Retry attempt {RetryCount} after {Delay}ms for operation {OperationKey}",
retryCount, timespan.TotalMilliseconds, context.OperationKey);
});
// Circuit breaker pattern
_circuitBreakerPolicy = Policy
.Handle<Exception>()
.CircuitBreakerAsync(
handledEventsAllowedBeforeBreaking: 5,
durationOfBreak: TimeSpan.FromMinutes(1),
onBreak: (exception, duration) =>
{
_logger.LogError(exception, "Circuit breaker opened for {Duration}ms", duration.TotalMilliseconds);
},
onReset: () =>
{
_logger.LogInformation("Circuit breaker reset");
});
}
public async Task<T> ExecuteWithRetryAsync<T>(
Func<Task<T>> operation,
string operationName,
CancellationToken cancellationToken = default)
{
var context = new Context(operationName);
return await _retryPolicy.ExecuteAsync(async (ctx) =>
{
try
{
return await operation();
}
catch (Exception ex) when (IsRetriableException(ex))
{
_logger.LogWarning(ex, "Retriable error in operation {OperationName}", operationName);
throw;
}
catch (Exception ex)
{
_logger.LogError(ex, "Non-retriable error in operation {OperationName}", operationName);
throw;
}
}, context);
}
public async Task ExecuteWithCircuitBreakerAsync(
Func<Task> operation,
string operationName,
CancellationToken cancellationToken = default)
{
try
{
await _circuitBreakerPolicy.ExecuteAsync(operation);
}
catch (Exception ex)
{
_logger.LogError(ex, "Circuit breaker protected operation {OperationName} failed", operationName);
throw;
}
}
private bool IsRetriableException(Exception ex)
{
return ex is HttpRequestException ||
ex is TaskCanceledException ||
ex is TimeoutException ||
(ex.Message?.Contains("throttle", StringComparison.OrdinalIgnoreCase) == true);
}
}
}
Dead Letter Queue Service¶
namespace DataverseEventTranslator.Infrastructure.DeadLetter
{
public interface IDeadLetterService
{
Task SendAsync(object message, Exception exception, string correlationId);
Task<IEnumerable<DeadLetterMessage>> GetMessagesAsync(int count = 10);
Task ReprocessMessageAsync(string messageId);
}
public class ServiceBusDeadLetterService : IDeadLetterService
{
private readonly ServiceBusClient _serviceBusClient;
private readonly ILogger<ServiceBusDeadLetterService> _logger;
private readonly string _deadLetterQueueName;
public ServiceBusDeadLetterService(
ServiceBusClient serviceBusClient,
IConfiguration configuration,
ILogger<ServiceBusDeadLetterService> logger)
{
_serviceBusClient = serviceBusClient;
_logger = logger;
_deadLetterQueueName = configuration.GetValue<string>("DeadLetterQueue:Name") ?? "dead-letter-queue";
}
public async Task SendAsync(object message, Exception exception, string correlationId)
{
try
{
var sender = _serviceBusClient.CreateSender(_deadLetterQueueName);
var deadLetterMessage = new DeadLetterMessage
{
Id = Guid.NewGuid().ToString(),
OriginalMessage = JsonConvert.SerializeObject(message),
Exception = exception.ToString(),
CorrelationId = correlationId,
FailedAt = DateTime.UtcNow,
RetryCount = 0,
MaxRetries = 3
};
var serviceBusMessage = new ServiceBusMessage(JsonConvert.SerializeObject(deadLetterMessage))
{
MessageId = deadLetterMessage.Id,
CorrelationId = correlationId,
ContentType = "application/json"
};
await sender.SendMessageAsync(serviceBusMessage);
_logger.LogInformation("Message sent to dead letter queue. MessageId: {MessageId}, CorrelationId: {CorrelationId}",
deadLetterMessage.Id, correlationId);
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to send message to dead letter queue. CorrelationId: {CorrelationId}", correlationId);
throw;
}
}
public async Task<IEnumerable<DeadLetterMessage>> GetMessagesAsync(int count = 10)
{
var receiver = _serviceBusClient.CreateReceiver(_deadLetterQueueName);
var messages = await receiver.ReceiveMessagesAsync(count, TimeSpan.FromSeconds(10));
return messages.Select(msg => JsonConvert.DeserializeObject<DeadLetterMessage>(msg.Body.ToString()));
}
public async Task ReprocessMessageAsync(string messageI# Translating Dataverse CRUD Events to Business Events
## Overview
This guide demonstrates how to transform generic Dataverse CRUD (Create, Read, Update, Delete) events into meaningful business events using Azure Event Hub and C#. Instead of consuming low-level database operations, downstream systems receive semantically rich events that represent actual business processes.
## Understanding Dataverse Event Publishing
### How Dataverse Publishes Events
Dataverse (formerly Common Data Service) has a sophisticated event publishing system built on the concept of **Service Endpoints**. Understanding this mechanism is crucial for implementing effective event translation.
#### The Plugin Framework Foundation
Dataverse operates on a plugin-based architecture where custom code can be executed in response to data operations. The system provides several execution contexts:
- **Pre-Operation**: Before the database transaction
- **Post-Operation**: After the database transaction but within the same transaction
- **Asynchronous**: After the transaction completes, executed separately
#### Service Endpoints: The Event Publishing Mechanism
Service Endpoints are specialized plugins that instead of executing custom logic locally, forward event data to external systems. They act as bridges between Dataverse's internal event system and external messaging platforms.
**Key Components:**
1. **Plugin Registration Tool**: Administrative interface for configuring what events trigger which endpoints
2. **Event Execution Pipeline**: Dataverse's internal event processing system
3. **Service Endpoint Configuration**: Defines where and how events are sent
4. **Pre/Post Images**: Snapshots of entity data before and after operations
**Event Flow Within Dataverse:**
```mermaid
graph TD
A[User Action] --> B[Dataverse Core]
B --> C[Event Pipeline]
C --> D{Plugin Steps}
D --> E[Pre-Operation Plugins]
D --> F[Database Operation]
D --> G[Post-Operation Plugins]
D --> H[Service Endpoints]
H --> I[External System<br/>Azure Event Hub]
E --> J[Pre-Images Generated]
G --> K[Post-Images Generated]
J --> H
K --> H
What Gets Published:
When a Service Endpoint is triggered, Dataverse packages comprehensive context including: - Operation Type: Create, Update, Delete, or custom operations - Entity Information: Type, ID, and logical name - User Context: Who performed the operation - Pre-Images: Entity state before the operation (configurable fields) - Post-Images: Entity state after the operation (configurable fields) - Input Parameters: Operation-specific data - Execution Context: Transaction details, correlation IDs, etc.
Configuration Flexibility:
Administrators can configure: - Which entities trigger events (Account, Contact, Opportunity, etc.) - Which operations trigger events (Create, Update, Delete, custom actions) - Which fields are included in pre/post images - Filtering conditions (only certain status changes, value thresholds, etc.) - Execution timing (synchronous vs asynchronous)
This granular control allows organizations to publish only business-relevant events while maintaining high performance by avoiding unnecessary event generation.
The Challenge¶
Translating Dataverse Contact Events to Business Events¶
Overview¶
This guide demonstrates how to transform generic Dataverse Contact entity CRUD events into meaningful business events using Azure Event Hub and C#. We'll focus on handling Contact updates to support both generic change projections and specific business events like email address changes.
Understanding Dataverse Event Publishing¶
How Dataverse Publishes Events¶
Dataverse (formerly Common Data Service) has a sophisticated event publishing system built on the concept of Service Endpoints. Understanding this mechanism is crucial for implementing effective event translation.
The Plugin Framework Foundation¶
Dataverse operates on a plugin-based architecture where custom code can be executed in response to data operations. The system provides several execution contexts:
- Pre-Operation: Before the database transaction
- Post-Operation: After the database transaction but within the same transaction
- Asynchronous: After the transaction completes, executed separately
Service Endpoints: The Event Publishing Mechanism¶
Service Endpoints are specialized plugins that instead of executing custom logic locally, forward event data to external systems. They act as bridges between Dataverse's internal event system and external messaging platforms.
Key Components:
- Plugin Registration Tool: Administrative interface for configuring what events trigger which endpoints
- Event Execution Pipeline: Dataverse's internal event processing system
- Service Endpoint Configuration: Defines where and how events are sent
- Pre/Post Images: Snapshots of entity data before and after operations
Event Flow Within Dataverse:
graph TD
A[User Action] --> B[Dataverse Core]
B --> C[Event Pipeline]
C --> D{Plugin Steps}
D --> E[Pre-Operation Plugins]
D --> F[Database Operation]
D --> G[Post-Operation Plugins]
D --> H[Service Endpoints]
H --> I[External System<br/>Azure Event Hub]
E --> J[Pre-Images Generated]
G --> K[Post-Images Generated]
J --> H
K --> H
What Gets Published:
When a Service Endpoint is triggered, Dataverse packages comprehensive context including: - Operation Type: Create, Update, Delete, or custom operations - Entity Information: Type, ID, and logical name - User Context: Who performed the operation - Pre-Images: Entity state before the operation (configurable fields) - Post-Images: Entity state after the operation (configurable fields) - Input Parameters: Operation-specific data - Execution Context: Transaction details, correlation IDs, etc.
Configuration Flexibility:
Administrators can configure: - Which entities trigger events (Contact in our case) - Which operations trigger events (Create, Update, Delete, custom actions) - Which fields are included in pre/post images - Filtering conditions (only certain status changes, value thresholds, etc.) - Execution timing (synchronous vs asynchronous)
This granular control allows organizations to publish only business-relevant events while maintaining high performance by avoiding unnecessary event generation.
The Challenge¶
Current State: Generic CRUD Events¶
Dataverse's built-in service endpoints generate technical events that lack business context:
{
"EntityType": "contact",
"Operation": "Update",
"EntityId": "12345678-1234-1234-1234-123456789012",
"ModifiedFields": ["emailaddress1", "modifiedon"],
"Timestamp": "2024-07-11T15:30:00Z"
}
Desired State: Both Generic and Business Events¶
Transform these into two types of events:
1. Generic Contact Update Event (for projections):
{
"EventType": "ContactUpdated",
"ContactId": "12345678-1234-1234-1234-123456789012",
"FullName": "John Doe",
"ChangedFields": ["emailaddress1"],
"PreValues": {
"emailaddress1": "john.old@company.com"
},
"PostValues": {
"emailaddress1": "john.new@company.com"
},
"ChangedAt": "2024-07-11T15:30:00Z",
"ChangedBy": "admin@company.com"
}
2. Specific Business Event (for business logic):
{
"EventType": "ContactEmailChanged",
"ContactId": "12345678-1234-1234-1234-123456789012",
"FullName": "John Doe",
"OldEmailAddress": "john.old@company.com",
"NewEmailAddress": "john.new@company.com",
"ChangedAt": "2024-07-11T15:30:00Z",
"ChangedBy": "admin@company.com",
"RequiresEmailVerification": true
}
Problems with CRUD Events¶
- Lack of Business Context: "Contact Updated" doesn't convey business meaning
- Field-Level Granularity Missing: Can't determine which specific fields changed
- No Semantic Versioning: Schema changes break downstream consumers
- Noise: Every minor update generates events, even non-business-relevant ones
- Poor Developer Experience: Consumers must understand Dataverse schema
Solution Architecture¶
High-Level Architecture¶
graph TB
DV[Dataverse] --> EH1[Source Event Hub<br/>Raw Contact CRUD Events]
EH1 --> CA[Containerized .NET App<br/>Contact Event Translator]
CA --> EH2[Business Event Hub<br/>Contact Business Events]
EH2 --> C1[Consumer 1<br/>Email Service]
EH2 --> C2[Consumer 2<br/>Contact View Projector]
EH2 --> C3[Consumer 3<br/>CRM Analytics]
CA --> DL[Dead Letter Queue<br/>Failed Events]
CA --> LOG[Application Insights<br/>Monitoring]
subgraph "Container Environment"
CA
subgraph "Deployment Options"
AKS[Azure Kubernetes Service]
ACA[Azure Container Apps]
ACI[Azure Container Instances]
end
end
Event Flow Diagram¶
sequenceDiagram
participant DV as Dataverse
participant EH1 as Source Event Hub
participant CA as Container App
participant EH2 as Business Event Hub
participant P as Projection Consumer
participant B as Business Consumer
DV->>EH1: Contact Update with Pre/Post Images
EH1->>CA: Event Hub Consumer reads events
CA->>CA: Detect Field Changes
CA->>CA: Generate Generic ContactUpdated Event
CA->>CA: Generate Specific ContactEmailChanged Event
CA->>EH2: Publish ContactUpdated Event
CA->>EH2: Publish ContactEmailChanged Event
EH2->>P: ContactUpdated → Update Contact View
EH2->>B: ContactEmailChanged → Send Verification Email
Translation Process¶
flowchart TD
Start([Contact CRUD Event Received]) --> Parse[Parse Pre/Post Images]
Parse --> Detect[Detect Field Changes]
Detect --> Filter{Business<br/>Relevant?}
Filter -->|No| Ignore[Ignore Event]
Filter -->|Yes| Generic[Generate ContactUpdated Event]
Generic --> Specific{Specific Business<br/>Logic Needed?}
Specific -->|Email Changed| Email[Generate ContactEmailChanged Event]
Specific -->|Phone Changed| Phone[Generate ContactPhoneChanged Event]
Specific -->|Status Changed| Status[Generate ContactStatusChanged Event]
Specific -->|No| Publish[Publish Events]
Email --> Publish
Phone --> Publish
Status --> Publish
Publish --> Log[Log Success]
Ignore --> End([End])
Log --> End
Implementation Guide¶
Step 1: Configure Dataverse Service Endpoint for Contact Entity¶
Register Service Endpoint in Plugin Registration Tool¶
- Create new Service Endpoint pointing to your Source Event Hub
- Register Step for "Update" message on "contact" entity specifically
- Configure Pre-Image and Post-Image with Contact-specific attributes
Event Hub Service Endpoint Configuration: - Service Endpoint Type: Event Hub - Connection String: Event Hub connection string - Topic Name: Contact CRUD events topic
Pre-Image Configuration for Contact: - Alias: "PreImage" - Attributes: "firstname,lastname,fullname,emailaddress1,telephone1,mobilephone,jobtitle,statuscode,parentcustomerid"
Post-Image Configuration for Contact:
- Alias: "PostImage"
- Attributes: "firstname,lastname,fullname,emailaddress1,telephone1,mobilephone,jobtitle,statuscode,parentcustomerid"
Step 2: Contact Event Translation Logic¶
Contact-Specific Business Events¶
// Generic event for view projections
public class ContactUpdated : BusinessEvent
{
public ContactUpdated() { EventType = "ContactUpdated"; }
public string ContactId { get; set; }
public string FullName { get; set; }
public string[] ChangedFields { get; set; }
public Dictionary<string, object> PreValues { get; set; }
public Dictionary<string, object> PostValues { get; set; }
public string ChangedBy { get; set; }
}
// Specific business event for email changes
public class ContactEmailChanged : BusinessEvent
{
public ContactEmailChanged() { EventType = "ContactEmailChanged"; }
public string ContactId { get; set; }
public string FullName { get; set; }
public string OldEmailAddress { get; set; }
public string NewEmailAddress { get; set; }
public string ChangedBy { get; set; }
public bool RequiresEmailVerification { get; set; }
public string AccountId { get; set; } // Parent customer if available
}
// Other specific business events
public class ContactPhoneChanged : BusinessEvent
{
public ContactPhoneChanged() { EventType = "ContactPhoneChanged"; }
public string ContactId { get; set; }
public string FullName { get; set; }
public string PhoneType { get; set; } // "Business" or "Mobile"
public string OldPhoneNumber { get; set; }
public string NewPhoneNumber { get; set; }
public string ChangedBy { get; set; }
}
public class ContactStatusChanged : BusinessEvent
{
public ContactStatusChanged() { EventType = "ContactStatusChanged"; }
public string ContactId { get; set; }
public string FullName { get; set; }
public string OldStatus { get; set; }
public string NewStatus { get; set; }
public string StatusChangeReason { get; set; }
public string ChangedBy { get; set; }
}
Contact Event Mapping Logic¶
public async Task<List<BusinessEvent>> MapContactEventsAsync(
DataverseExecutionContext context,
CancellationToken cancellationToken)
{
var events = new List<BusinessEvent>();
var preImage = context.PreEntityImages["PreImage"];
var postImage = context.PostEntityImages["PostImage"];
var contactId = context.PrimaryEntityId.ToString();
// Always generate a generic ContactUpdated event for view projections
var genericEvent = await CreateGenericContactUpdatedEvent(preImage, postImage, contactId, context);
if (genericEvent != null)
{
events.Add(genericEvent);
}
// Generate specific business events based on field changes
await AddEmailChangeEventIfNeeded(preImage, postImage, contactId, events, context);
await AddPhoneChangeEventsIfNeeded(preImage, postImage, contactId, events, context);
await AddStatusChangeEventIfNeeded(preImage, postImage, contactId, events, context);
return events;
}
private async Task<ContactUpdated> CreateGenericContactUpdatedEvent(
DataverseEntity preImage,
DataverseEntity postImage,
string contactId,
DataverseExecutionContext context)
{
var changedFields = DetectChangedFields(preImage, postImage);
if (!changedFields.Any())
return null; // No relevant changes
var preValues = new Dictionary<string, object>();
var postValues = new Dictionary<string, object>();
foreach (var field in changedFields)
{
preValues[field] = preImage.GetAttributeValue<object>(field);
postValues[field] = postImage.GetAttributeValue<object>(field);
}
return new ContactUpdated
{
ContactId = contactId,
FullName = postImage.GetAttributeValue<string>("fullname"),
ChangedFields = changedFields.ToArray(),
PreValues = preValues,
PostValues = postValues,
ChangedBy = context.InitiatingUser?.Email
};
}
private async Task AddEmailChangeEventIfNeeded(
DataverseEntity preImage,
DataverseEntity postImage,
string contactId,
List<BusinessEvent> events,
DataverseExecutionContext context)
{
var oldEmail = preImage.GetAttributeValue<string>("emailaddress1");
var newEmail = postImage.GetAttributeValue<string>("emailaddress1");
if (oldEmail != newEmail && !string.IsNullOrEmpty(newEmail))
{
// Get parent account for context
var parentCustomerId = postImage.GetAttributeValue<Guid?>("parentcustomerid");
events.Add(new ContactEmailChanged
{
ContactId = contactId,
FullName = postImage.GetAttributeValue<string>("fullname"),
OldEmailAddress = oldEmail,
NewEmailAddress = newEmail,
ChangedBy = context.InitiatingUser?.Email,
RequiresEmailVerification = ShouldRequireEmailVerification(oldEmail, newEmail),
AccountId = parentCustomerId?.ToString()
});
_logger.LogInformation("Contact email changed: {ContactId} from {OldEmail} to {NewEmail}",
contactId, oldEmail, newEmail);
}
}
private bool ShouldRequireEmailVerification(string oldEmail, string newEmail)
{
// Business logic: require verification if this is a new email or domain change
if (string.IsNullOrEmpty(oldEmail))
return true; // New email address
var oldDomain = oldEmail.Split('@').LastOrDefault();
var newDomain = newEmail.Split('@').LastOrDefault();
return oldDomain != newDomain; // Domain changed
}
private List<string> DetectChangedFields(DataverseEntity preImage, DataverseEntity postImage)
{
var changedFields = new List<string>();
var relevantFields = new[]
{
"firstname", "lastname", "emailaddress1", "telephone1",
"mobilephone", "jobtitle", "statuscode", "parentcustomerid"
};
foreach (var field in relevantFields)
{
var preValue = preImage.GetAttributeValue<object>(field);
var postValue = postImage.GetAttributeValue<object>(field);
if (!object.Equals(preValue, postValue))
{
changedFields.Add(field);
}
}
return changedFields;
}
Step 3: Event Consumer Patterns¶
Pattern 1: View Projection Consumer (Generic Events)¶
// Consumer that updates materialized views based on ContactUpdated events
public class ContactViewProjectionConsumer
{
public async Task HandleContactUpdatedAsync(ContactUpdated contactEvent)
{
// Update search index
await _searchService.UpdateContactAsync(contactEvent.ContactId, contactEvent.PostValues);
// Update reporting database
await _reportingService.ProjectContactChangesAsync(contactEvent);
// Update cache
await _cacheService.InvalidateContactAsync(contactEvent.ContactId);
_logger.LogInformation("Projected contact changes for {ContactId} with fields: {Fields}",
contactEvent.ContactId, string.Join(", ", contactEvent.ChangedFields));
}
}
Pattern 2: Business Logic Consumer (Specific Events)¶
// Consumer that handles email change business logic
public class ContactEmailChangeConsumer
{
public async Task HandleContactEmailChangedAsync(ContactEmailChanged emailEvent)
{
if (emailEvent.RequiresEmailVerification)
{
// Send email verification
await _emailService.SendVerificationEmailAsync(emailEvent.NewEmailAddress, emailEvent.ContactId);
// Create verification record
await _verificationService.CreateVerificationRecordAsync(emailEvent.ContactId, emailEvent.NewEmailAddress);
_logger.LogInformation("Email verification sent for contact {ContactId} to {NewEmail}",
emailEvent.ContactId, emailEvent.NewEmailAddress);
}
// Update email marketing lists
await _marketingService.UpdateContactEmailAsync(emailEvent.ContactId, emailEvent.NewEmailAddress);
// Notify related systems
if (!string.IsNullOrEmpty(emailEvent.AccountId))
{
await _notificationService.NotifyAccountOfContactEmailChangeAsync(emailEvent.AccountId, emailEvent);
}
}
}
Step 4: Event Hub Message Structure¶
Source Event Hub Message (from Dataverse)¶
{
"MessageName": "Update",
"TimeStamp": "2024-07-11T15:30:00Z",
"ExecutionContext": {
"PrimaryEntityName": "contact",
"PrimaryEntityId": "12345678-1234-1234-1234-123456789012",
"PreEntityImages": {
"PreImage": {
"Id": "12345678-1234-1234-1234-123456789012",
"LogicalName": "contact",
"Attributes": {
"fullname": "John Doe",
"emailaddress1": "john.old@company.com",
"telephone1": "555-0123",
"jobtitle": "Developer",
"statuscode": { "Value": 1 }
}
}
},
"PostEntityImages": {
"PostImage": {
"Id": "12345678-1234-1234-1234-123456789012",
"LogicalName": "contact",
"Attributes": {
"fullname": "John Doe",
"emailaddress1": "john.new@company.com",
"telephone1": "555-0123",
"jobtitle": "Senior Developer",
"statuscode": { "Value": 1 }
}
}
},
"InitiatingUser": {
"UserId": "87654321-4321-4321-4321-210987654321",
"Email": "admin@company.com",
"FullName": "System Administrator"
}
}
}
Business Event Hub Messages (after translation)¶
Generic ContactUpdated Event:
{
"EventId": "11111111-1111-1111-1111-111111111111",
"EventType": "ContactUpdated",
"ContactId": "12345678-1234-1234-1234-123456789012",
"FullName": "John Doe",
"ChangedFields": ["emailaddress1", "jobtitle"],
"PreValues": {
"emailaddress1": "john.old@company.com",
"jobtitle": "Developer"
},
"PostValues": {
"emailaddress1": "john.new@company.com",
"jobtitle": "Senior Developer"
},
"ChangedBy": "admin@company.com",
"OccurredAt": "2024-07-11T15:30:00Z",
"SourceSystem": "Dataverse",
"EventVersion": "1.0",
"CorrelationId": "22222222-2222-2222-2222-222222222222"
}
Specific ContactEmailChanged Event:
{
"EventId": "33333333-3333-3333-3333-333333333333",
"EventType": "ContactEmailChanged",
"ContactId": "12345678-1234-1234-1234-123456789012",
"FullName": "John Doe",
"OldEmailAddress": "john.old@company.com",
"NewEmailAddress": "john.new@company.com",
"ChangedBy": "admin@company.com",
"RequiresEmailVerification": true,
"AccountId": null,
"OccurredAt": "2024-07-11T15:30:00Z",
"SourceSystem": "Dataverse",
"EventVersion": "1.0",
"CorrelationId": "22222222-2222-2222-2222-222222222222"
}
Step 5: Key Implementation Patterns¶
Event Filtering Strategy¶
private bool ShouldProcessContactUpdate(DataverseExecutionContext context)
{
var preImage = context.PreEntityImages["PreImage"];
var postImage = context.PostEntityImages["PostImage"];
// Filter out system-only updates (like modifiedon, modifiedby)
var businessRelevantFields = new[]
{
"firstname", "lastname", "emailaddress1", "telephone1",
"mobilephone", "jobtitle", "statuscode", "parentcustomerid"
};
return businessRelevantFields.Any(field =>
!object.Equals(preImage.GetAttributeValue<object>(field),
postImage.GetAttributeValue<object>(field)));
}
Event Enrichment¶
private async Task EnrichContactEventAsync(ContactUpdated contactEvent, DataverseEntity postImage)
{
// Add computed fields
contactEvent.Metadata.Add("HasEmailAddress", !string.IsNullOrEmpty(postImage.GetAttributeValue<string>("emailaddress1")));
contactEvent.Metadata.Add("HasPhoneNumber",
!string.IsNullOrEmpty(postImage.GetAttributeValue<string>("telephone1")) ||
!string.IsNullOrEmpty(postImage.GetAttributeValue<string>("mobilephone")));
// Add relationship context
var parentCustomerId = postImage.GetAttributeValue<Guid?>("parentcustomerid");
if (parentCustomerId.HasValue)
{
contactEvent.Metadata.Add("ParentAccountId", parentCustomerId.ToString());
// Could fetch additional account details if needed
}
// Add change categorization
contactEvent.Metadata.Add("ChangeCategory", CategorizeChanges(contactEvent.ChangedFields));
}
private string CategorizeChanges(string[] changedFields)
{
if (changedFields.Contains("emailaddress1")) return "ContactInformation";
if (changedFields.Contains("telephone1") || changedFields.Contains("mobilephone")) return "ContactInformation";
if (changedFields.Contains("jobtitle")) return "ProfessionalInformation";
if (changedFields.Contains("statuscode")) return "StatusChange";
return "GeneralUpdate";
}
Batch Processing for Performance¶
private async Task ProcessEventBatchAsync(IEnumerable<PartitionEvent> events, CancellationToken cancellationToken)
{
var businessEvents = new List<BusinessEvent>();
foreach (var partitionEvent in events)
{
try
{
var dataverseEvent = ParseDataverseEvent(partitionEvent);
if (dataverseEvent.ExecutionContext.PrimaryEntityName == "contact")
{
var contactEvents = await MapContactEventsAsync(dataverseEvent.ExecutionContext, cancellationToken);
businessEvents.AddRange(contactEvents);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing individual event in batch");
// Continue processing other events in batch
}
}
// Publish all business events in a single batch
if (businessEvents.Any())
{
await PublishBusinessEventBatchAsync(businessEvents, cancellationToken);
}
}
Testing Strategy¶
Unit Testing Contact Event Mapping¶
[TestClass]
public class ContactEventMappingTests
{
[TestMethod]
public async Task MapContactEvents_EmailChange_GeneratesBothGenericAndSpecificEvents()
{
// Arrange
var context = CreateContactUpdateContext(
preEmail: "old@company.com",
postEmail: "new@company.com");
var mapper = new BusinessEventMapper(Mock.Of<ILogger<BusinessEventMapper>>());
// Act
var events = await mapper.MapContactEventsAsync(context, CancellationToken.None);
// Assert
Assert.AreEqual(2, events.Count);
var genericEvent = events.OfType<ContactUpdated>().FirstOrDefault();
Assert.IsNotNull(genericEvent);
Assert.IsTrue(genericEvent.ChangedFields.Contains("emailaddress1"));
var specificEvent = events.OfType<ContactEmailChanged>().FirstOrDefault();
Assert.IsNotNull(specificEvent);
Assert.AreEqual("old@company.com", specificEvent.OldEmailAddress);
Assert.AreEqual("new@company.com", specificEvent.NewEmailAddress);
Assert.IsTrue(specificEvent.RequiresEmailVerification);
}
[TestMethod]
public async Task MapContactEvents_OnlyJobTitleChange_GeneratesOnlyGenericEvent()
{
// Arrange
var context = CreateContactUpdateContext(
preJobTitle: "Developer",
postJobTitle: "Senior Developer");
var mapper = new BusinessEventMapper(Mock.Of<ILogger<BusinessEventMapper>>());
// Act
var events = await mapper.MapContactEventsAsync(context, CancellationToken.None);
// Assert
Assert.AreEqual(1, events.Count);
Assert.IsInstanceOfType(events[0], typeof(ContactUpdated));
var genericEvent = (ContactUpdated)events[0];
Assert.IsTrue(genericEvent.ChangedFields.Contains("jobtitle"));
Assert.AreEqual("Developer", genericEvent.PreValues["jobtitle"]);
Assert.AreEqual("Senior Developer", genericEvent.PostValues["jobtitle"]);
}
}
Integration Testing with Test Containers¶
[TestClass]
public class ContactEventIntegrationTests
{
private EventHubTestContainer _sourceEventHub;
private EventHubTestContainer _businessEventHub;
private ContactEventTranslatorService _translatorService;
[TestInitialize]
public async Task Setup()
{
_sourceEventHub = new EventHubTestContainer();
_businessEventHub = new EventHubTestContainer();
await _sourceEventHub.StartAsync();
await _businessEventHub.StartAsync();
_translatorService = new ContactEventTranslatorService(
_sourceEventHub.ConnectionString,
_businessEventHub.ConnectionString);
}
[TestMethod]
public async Task EndToEnd_ContactEmailChange_PublishesCorrectEvents()
{
// Arrange
var contactUpdateEvent = CreateDataverseContactUpdateEvent();
// Act
await _sourceEventHub.PublishEventAsync(contactUpdateEvent);
await Task.Delay(5000); // Allow processing time
// Assert
var businessEvents = await _businessEventHub.ReadEventsAsync(timeout: TimeSpan.FromSeconds(10));
Assert.AreEqual(2, businessEvents.Count);
var contactUpdated = businessEvents.FirstOrDefault(e => e.EventType == "ContactUpdated");
var emailChanged = businessEvents.FirstOrDefault(e => e.EventType == "ContactEmailChanged");
Assert.IsNotNull(contactUpdated);
Assert.IsNotNull(emailChanged);
// Verify correlation IDs match
Assert.AreEqual(contactUpdated.CorrelationId, emailChanged.CorrelationId);
}
}
## Best Practices for Contact Event Translation
### Event Design Principles
1. **Dual Event Strategy**: Always generate both generic `ContactUpdated` events for projections and specific business events for targeted logic
2. **Field-Level Granularity**: Include both pre and post values for changed fields to support different consumer needs
3. **Business Context**: Enrich events with business-relevant computed fields and relationship data
4. **Idempotency**: Ensure events can be safely processed multiple times
5. **Correlation**: Maintain correlation IDs across related events
### Performance Optimization
1. **Field Filtering**: Only process business-relevant field changes to reduce noise
2. **Batch Processing**: Group multiple contact updates when possible to improve throughput
3. **Selective Enrichment**: Only fetch additional data when required by specific business events
4. **Caching**: Cache frequently accessed reference data (like account details for contacts)
### Security Considerations
1. **PII Handling**: Mask or exclude sensitive contact information in generic events
2. **Email Verification**: Implement proper email verification workflows for email changes
3. **Audit Trail**: Log all contact event processing activities for compliance
4. **Data Retention**: Implement appropriate retention policies for contact event data
## Troubleshooting Guide
### Common Contact Event Issues
| Issue | Symptoms | Solution |
|-------|----------|----------|
| Missing Contact Pre/Post Images | Contact events show no field changes | Verify contact-specific image configuration in Plugin Registration Tool |
| Email Verification Not Triggering | ContactEmailChanged events published but no verification emails | Check email change detection logic and verification service integration |
| Duplicate Contact Events | Same contact update generates multiple events | Implement event deduplication based on correlation ID and timestamp |
| Missing Parent Account Context | Contact events lack account relationship data | Ensure parentcustomerid is included in pre/post images |
### Debugging Contact Event Processing
1. **Check Contact-Specific Logs**: Filter logs by contact entity type and correlation ID
2. **Verify Field Detection**: Test field change detection logic with known contact updates
3. **Validate Event Schema**: Ensure contact events match expected schema
4. **Monitor Consumer Lag**: Check if contact event consumers are keeping up with volume
5. **Review Contact Business Logic**: Verify specific business event generation rules
## Conclusion
This solution transforms Dataverse's generic Contact CRUD events into meaningful business events that provide real value to downstream systems. By implementing both generic `ContactUpdated` events for view projections and specific events like `ContactEmailChanged` for business logic, you get the flexibility to support different consumer patterns while maintaining high performance and reliability.
The Contact-focused architecture supports:
- **Flexibility**: Both generic projection events and specific business events from the same source
- **Scalability**: Handle high contact update volumes with Event Hub and containerized processing
- **Reliability**: Built-in retry logic, dead letter queues, and Event Hub durability
- **Observability**: Contact-specific monitoring and alerting
- **Maintainability**: Clean separation between projection and business event logic
- **Extensibility**: Easy to add new contact field monitoring and business events
This approach enables downstream systems to either maintain synchronized contact views through generic events or react to specific contact changes through targeted business events, providing the best of both worlds for event-driven architecture.sharp
public abstract class BusinessEvent
{
public string EventId { get; set; } = Guid.NewGuid().ToString();
public string EventType { get; set; }
public string CorrelationId { get; set; }
public string SourceEventId { get; set; } // Event Hub sequence number from source event
public DateTime OccurredAt { get; set; } = DateTime.UtcNow;
public string SourceSystem { get; set; } = "Dataverse";
public string EventVersion { get; set; } = "1.0";
public Dictionary<string, object> Metadata { get; set; } = new Dictionary<string, object>();
}
// Customer Events
public class CustomerCreditLimitChanged : BusinessEvent
{
public CustomerCreditLimitChanged() { EventType = "CustomerCreditLimitChanged"; }
public string CustomerId { get; set; }
public string CustomerName { get; set; }
public decimal OldCreditLimit { get; set; }
public decimal NewCreditLimit { get; set; }
public string ChangedBy { get; set; }
public string ChangeReason { get; set; }
}
public class CustomerActivated : BusinessEvent
{
public CustomerActivated() { EventType = "CustomerActivated"; }
public string CustomerId { get; set; }
public string CustomerName { get; set; }
public string Industry { get; set; }
public string ActivatedBy { get; set; }
}
public class CustomerDeactivated : BusinessEvent
{
public CustomerDeactivated() { EventType = "CustomerDeactivated"; }
public string CustomerId { get; set; }
public string CustomerName { get; set; }
public string DeactivationReason { get; set; }
public string DeactivatedBy { get; set; }
}
// Opportunity Events
public class OpportunityWon : BusinessEvent
{
public OpportunityWon() { EventType = "OpportunityWon"; }
public string OpportunityId { get; set; }
public string OpportunityName { get; set; }
public string CustomerId { get; set; }
public string CustomerName { get; set; }
public decimal EstimatedValue { get; set; }
public decimal ActualValue { get; set; }
public DateTime CloseDate { get; set; }
public string WonBy { get; set; }
}
public class OpportunityLost : BusinessEvent
{
public OpportunityLost() { EventType = "OpportunityLost"; }
public string OpportunityId { get; set; }
public string OpportunityName { get; set; }
public string CustomerId { get; set; }
public string CustomerName { get; set; }
public decimal EstimatedValue { get; set; }
public string LossReason { get; set; }
public DateTime CloseDate { get; set; }
}
Step 4: Event Translation Logic¶
Main Translation Service¶
public async Task<List<BusinessEvent>> ProcessEntityUpdate(
DataverseExecutionContext context,
string correlationId,
ILogger log)
{
var events = new List<BusinessEvent>();
try
{
events = context.PrimaryEntityName.ToLower() switch
{
"account" => await ProcessAccountUpdate(context, log),
"opportunity" => await ProcessOpportunityUpdate(context, log),
"salesorder" => await ProcessOrderUpdate(context, log),
_ => new List<BusinessEvent>()
};
// Add correlation and metadata to all events
foreach (var evt in events)
{
evt.CorrelationId = correlationId;
evt.Metadata.Add("SourceEntityId", context.PrimaryEntityId.ToString());
evt.Metadata.Add("SourceEntityType", context.PrimaryEntityName);
if (context.InitiatingUser != null)
{
evt.Metadata.Add("InitiatingUser", context.InitiatingUser.Email);
}
}
}
catch (Exception ex)
{
log.LogError($"Error in ProcessEntityUpdate: {ex.Message}");
throw;
}
return events;
}
Account Event Processing¶
private async Task<List<BusinessEvent>> ProcessAccountUpdate(DataverseExecutionContext context, ILogger log)
{
var events = new List<BusinessEvent>();
var preImage = context.PreEntityImages["PreImage"];
var postImage = context.PostEntityImages["PostImage"];
var accountId = context.PrimaryEntityId.ToString();
// Credit Limit Changes
await ProcessCreditLimitChange(preImage, postImage, accountId, events, log);
// Status Changes
await ProcessStatusChange(preImage, postImage, accountId, events, log);
// Contact Information Changes
await ProcessContactInfoChange(preImage, postImage, accountId, events, log);
// Industry Changes
await ProcessIndustryChange(preImage, postImage, accountId, events, log);
return events;
}
private async Task ProcessCreditLimitChange(
DataverseEntity preImage,
DataverseEntity postImage,
string accountId,
List<BusinessEvent> events,
ILogger log)
{
var oldCreditLimit = preImage.GetAttributeValue<decimal>("creditlimit");
var newCreditLimit = postImage.GetAttributeValue<decimal>("creditlimit");
if (oldCreditLimit != newCreditLimit)
{
var customerName = postImage.GetAttributeValue<string>("name");
events.Add(new CustomerCreditLimitChanged
{
CustomerId = accountId,
CustomerName = customerName,
OldCreditLimit = oldCreditLimit,
NewCreditLimit = newCreditLimit,
ChangeReason = DetermineCreditLimitChangeReason(oldCreditLimit, newCreditLimit)
});
log.LogInformation($"Credit limit changed for {customerName} ({accountId}): {oldCreditLimit:C} -> {newCreditLimit:C}");
}
}
private async Task ProcessStatusChange(
DataverseEntity preImage,
DataverseEntity postImage,
string accountId,
List<BusinessEvent> events,
ILogger log)
{
var oldStatus = preImage.GetAttributeValue<int>("statuscode");
var newStatus = postImage.GetAttributeValue<int>("statuscode");
if (oldStatus != newStatus)
{
var customerName = postImage.GetAttributeValue<string>("name");
var industry = postImage.GetAttributeValue<int>("industrycode");
var industryName = GetIndustryName(industry);
// Status codes: 1=Active, 2=Inactive
if (oldStatus == 2 && newStatus == 1) // Inactive to Active
{
events.Add(new CustomerActivated
{
CustomerId = accountId,
CustomerName = customerName,
Industry = industryName
});
log.LogInformation($"Customer activated: {customerName} ({accountId})");
}
else if (oldStatus == 1 && newStatus == 2) // Active to Inactive
{
events.Add(new CustomerDeactivated
{
CustomerId = accountId,
CustomerName = customerName,
DeactivationReason = "Status changed to inactive"
});
log.LogInformation($"Customer deactivated: {customerName} ({accountId})");
}
}
}
private string DetermineCreditLimitChangeReason(decimal oldLimit, decimal newLimit)
{
if (newLimit > oldLimit)
return "Credit limit increased";
else if (newLimit < oldLimit)
return "Credit limit decreased";
else
return "Credit limit adjustment";
}
private string GetIndustryName(int industryCode)
{
return industryCode switch
{
1 => "Technology",
2 => "Healthcare",
3 => "Finance",
4 => "Manufacturing",
5 => "Retail",
_ => "Other"
};
}
Opportunity Event Processing¶
private async Task<List<BusinessEvent>> ProcessOpportunityUpdate(DataverseExecutionContext context, ILogger log)
{
var events = new List<BusinessEvent>();
var preImage = context.PreEntityImages["PreImage"];
var postImage = context.PostEntityImages["PostImage"];
var opportunityId = context.PrimaryEntityId.ToString();
// Check for opportunity status changes
var oldStatusCode = preImage.GetAttributeValue<int>("statuscode");
var newStatusCode = postImage.GetAttributeValue<int>("statuscode");
if (oldStatusCode != newStatusCode)
{
var opportunityName = postImage.GetAttributeValue<string>("name");
var customerId = postImage.GetAttributeValue<Guid>("customerid").ToString();
var estimatedValue = postImage.GetAttributeValue<decimal>("estimatedvalue");
var actualValue = postImage.GetAttributeValue<decimal>("actualvalue");
var closeDate = postImage.GetAttributeValue<DateTime>("actualclosedate");
// Status codes: 3=Won, 4=Lost
if (newStatusCode == 3) // Won
{
events.Add(new OpportunityWon
{
OpportunityId = opportunityId,
OpportunityName = opportunityName,
CustomerId = customerId,
EstimatedValue = estimatedValue,
ActualValue = actualValue,
CloseDate = closeDate
});
log.LogInformation($"Opportunity won: {opportunityName} ({opportunityId}) - Value: {actualValue:C}");
}
else if (newStatusCode == 4) // Lost
{
var lossReason = postImage.GetAttributeValue<string>("closereasoncode");
events.Add(new OpportunityLost
{
OpportunityId = opportunityId,
OpportunityName = opportunityName,
CustomerId = customerId,
EstimatedValue = estimatedValue,
LossReason = lossReason,
CloseDate = closeDate
});
log.LogInformation($"Opportunity lost: {opportunityName} ({opportunityId}) - Reason: {lossReason}");
}
}
return events;
}
Step 4: Configuration and Deployment¶
Application Configuration (appsettings.json)¶
{
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft": "Warning",
"Microsoft.Hosting.Lifetime": "Information"
}
},
"ConnectionStrings": {
"SourceEventHub": "Endpoint=sb://dataverse-events.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=...",
"BusinessEventHub": "Endpoint=sb://business-events.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=...",
"ApplicationInsights": "InstrumentationKey=..."
},
"EventProcessing": {
"MaxConcurrentEvents": 10,
"EventBatchSize": 100,
"ProcessingTimeout": "00:05:00",
"DeadLetterRetryCount": 3
},
"HealthChecks": {
"Enabled": true,
"Port": 8080
}
}
Dockerfile¶
# Use the official .NET runtime image
FROM mcr.microsoft.com/dotnet/aspnet:8.0 AS base
WORKDIR /app
EXPOSE 8080
EXPOSE 8081
# Use the SDK image to build the app
FROM mcr.microsoft.com/dotnet/sdk:8.0 AS build
WORKDIR /src
# Copy project files
COPY ["src/DataverseEventTranslator.Api/DataverseEventTranslator.Api.csproj", "src/DataverseEventTranslator.Api/"]
COPY ["src/DataverseEventTranslator.Core/DataverseEventTranslator.Core.csproj", "src/DataverseEventTranslator.Core/"]
COPY ["src/DataverseEventTranslator.Infrastructure/DataverseEventTranslator.Infrastructure.csproj", "src/DataverseEventTranslator.Infrastructure/"]
# Restore dependencies
RUN dotnet restore "src/DataverseEventTranslator.Api/DataverseEventTranslator.Api.csproj"
# Copy source code
COPY . .
# Build the application
WORKDIR "/src/src/DataverseEventTranslator.Api"
RUN dotnet build "DataverseEventTranslator.Api.csproj" -c Release -o /app/build
# Publish the application
FROM build AS publish
RUN dotnet publish "DataverseEventTranslator.Api.csproj" -c Release -o /app/publish /p:UseAppHost=false
# Final stage - runtime image
FROM base AS final
WORKDIR /app
# Create non-root user for security
RUN adduser --disabled-password --gecos '' appuser && chown -R appuser /app
USER appuser
COPY --from=publish /app/publish .
# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=60s --retries=3 \
CMD curl -f http://localhost:8080/health || exit 1
ENTRYPOINT ["dotnet", "DataverseEventTranslator.Api.dll"]
Kubernetes Deployment (k8s/deployment.yaml)¶
apiVersion: apps/v1
kind: Deployment
metadata:
name: dataverse-event-translator
namespace: dataverse-events
labels:
app: dataverse-event-translator
version: v1
spec:
replicas: 3
selector:
matchLabels:
app: dataverse-event-translator
template:
metadata:
labels:
app: dataverse-event-translator
version: v1
spec:
containers:
- name: translator
image: myregistry.azurecr.io/dataverse-event-translator:latest
ports:
- containerPort: 8080
name: http
- containerPort: 8081
name: https
env:
- name: ASPNETCORE_ENVIRONMENT
value: "Production"
- name: ASPNETCORE_URLS
value: "http://+:8080"
envFrom:
- secretRef:
name: event-hub-secrets
- configMapRef:
name: app-config
resources:
requests:
memory: "256Mi"
cpu: "250m"
limits:
memory: "512Mi"
cpu: "500m"
livenessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 30
periodSeconds: 30
timeoutSeconds: 10
failureThreshold: 3
readinessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 15
periodSeconds: 15
timeoutSeconds: 5
failureThreshold: 3
securityContext:
runAsNonRoot: true
runAsUser: 1000
allowPrivilegeEscalation: false
readOnlyRootFilesystem: true
volumeMounts:
- name: tmp
mountPath: /tmp
- name: app-logs
mountPath: /app/logs
volumes:
- name: tmp
emptyDir: {}
- name: app-logs
emptyDir: {}
securityContext:
fsGroup: 1000
---
apiVersion: v1
kind: Service
metadata:
name: dataverse-event-translator-service
namespace: dataverse-events
spec:
selector:
app: dataverse-event-translator
ports:
- name: http
port: 80
targetPort: 8080
type: ClusterIP
---
apiVersion: v1
kind: ConfigMap
metadata:
name: app-config
namespace: dataverse-events
data:
EventProcessing__MaxConcurrentEvents: "10"
EventProcessing__EventBatchSize: "100"
EventProcessing__ProcessingTimeout: "00:05:00"
HealthChecks__Enabled: "true"
HealthChecks__Port: "8080"
---
apiVersion: v1
kind: Secret
metadata:
name: event-hub-secrets
namespace: dataverse-events
type: Opaque
stringData:
ConnectionStrings__SourceEventHub: "Endpoint=sb://dataverse-events.servicebus.windows.net/;SharedAccessKeyName=..."
ConnectionStrings__BusinessEventHub: "Endpoint=sb://business-events.servicebus.windows.net/;SharedAccessKeyName=..."
ConnectionStrings__ApplicationInsights: "InstrumentationKey=..."
Azure Container Apps Deployment¶
# container-app.yaml
apiVersion: apps/v1alpha2
kind: ContainerApp
metadata:
name: dataverse-event-translator
resourceGroup: rg-dataverse-events
location: East US
properties:
managedEnvironmentId: /subscriptions/{subscription-id}/resourceGroups/rg-dataverse-events/providers/Microsoft.App/managedEnvironments/env-dataverse
configuration:
secrets:
- name: source-eventhub-connection
value: "Endpoint=sb://dataverse-events.servicebus.windows.net/;SharedAccessKeyName=..."
- name: business-eventhub-connection
value: "Endpoint=sb://business-events.servicebus.windows.net/;SharedAccessKeyName=..."
ingress:
external: false
targetPort: 8080
dapr:
enabled: false
template:
containers:
- name: dataverse-event-translator
image: myregistry.azurecr.io/dataverse-event-translator:latest
env:
- name: ConnectionStrings__SourceEventHub
secretRef: source-eventhub-connection
- name: ConnectionStrings__BusinessEventHub
secretRef: business-eventhub-connection
- name: ASPNETCORE_ENVIRONMENT
value: Production
resources:
cpu: 0.5
memory: 1Gi
probes:
- type: liveness
httpGet:
path: /health
port: 8080
initialDelaySeconds: 30
periodSeconds: 30
- type: readiness
httpGet:
path: /health
port: 8080
initialDelaySeconds: 15
periodSeconds: 15
scale:
minReplicas: 2
maxReplicas: 10
rules:
- name: eventhub-scaling
custom:
type: azure-eventhub
metadata:
connectionFromEnv: ConnectionStrings__SourceEventHub
eventHubName: dataverse-events
consumerGroup: $Default
auth:
- secretRef: source-eventhub-connection
triggerParameter: connection
Project File (DataverseEventTranslator.Api.csproj)¶
<Project Sdk="Microsoft.NET.Sdk.Web">
<PropertyGroup>
<TargetFramework>net8.0</TargetFramework>
<Nullable>enable</Nullable>
<ImplicitUsings>enable</ImplicitUsings>
<DockerDefaultTargetOS>Linux</DockerDefaultTargetOS>
<UserSecretsId>dataverse-event-translator</UserSecretsId>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Azure.Messaging.EventHubs" Version="5.11.1" />
<PackageReference Include="Microsoft.Extensions.Azure" Version="1.7.1" />
<PackageReference Include="Microsoft.ApplicationInsights.AspNetCore" Version="2.21.0" />
<PackageReference Include="Microsoft.Extensions.Diagnostics.HealthChecks" Version="8.0.0" />
<PackageReference Include="Newtonsoft.Json" Version="13.0.3" />
<PackageReference Include="Serilog.AspNetCore" Version="8.0.0" />
<PackageReference Include="Serilog.Sinks.ApplicationInsights" Version="4.0.0" />
<PackageReference Include="AspNetCore.HealthChecks.EventStore" Version="7.0.0" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\DataverseEventTranslator.Core\DataverseEventTranslator.Core.csproj" />
<ProjectReference Include="..\DataverseEventTranslator.Infrastructure\DataverseEventTranslator.Infrastructure.csproj" />
</ItemGroup>
</Project>
Step 5: Monitoring and Observability¶
Telemetry Service¶
namespace DataverseEventTranslator.Infrastructure.Monitoring
{
public interface ITelemetryService
{
Task TrackEventProcessedAsync(string correlationId, TimeSpan duration, int businessEventCount);
Task TrackValidationFailureAsync(string correlationId);
Task TrackProcessingErrorAsync(string correlationId, Exception exception);
Task<object> GetMetricsAsync();
}
public class TelemetryService : ITelemetryService
{
private readonly ILogger<TelemetryService> _logger;
private readonly TelemetryClient _telemetryClient;
private static readonly ConcurrentDictionary<string, long> _counters = new();
private static readonly ConcurrentQueue<ProcessingMetric> _metrics = new();
public TelemetryService(ILogger<TelemetryService> logger, TelemetryClient telemetryClient)
{
_logger = logger;
_telemetryClient = telemetryClient;
}
public async Task TrackEventProcessedAsync(string correlationId, TimeSpan duration, int businessEventCount)
{
_telemetryClient.TrackDependency("EventProcessing", "ProcessDataverseEvent", correlationId,
DateTime.UtcNow.Subtract(duration), duration, true);
_telemetryClient.TrackMetric("BusinessEventsGenerated", businessEventCount);
_telemetryClient.TrackMetric("ProcessingDuration", duration.TotalMilliseconds);
_counters.AddOrUpdate("EventsProcessed", 1, (key, value) => value + 1);
_counters.AddOrUpdate("BusinessEventsGenerated", businessEventCount, (key, value) => value + businessEventCount);
_metrics.Enqueue(new ProcessingMetric
{
CorrelationId = correlationId,
Duration = duration,
BusinessEventCount = businessEventCount,
Timestamp = DateTime.UtcNow
});
// Keep only last 1000 metrics
while (_metrics.Count > 1000)
{
_metrics.TryDequeue(out _);
}
_logger.LogInformation("Event processed successfully. CorrelationId: {CorrelationId}, Duration: {Duration}ms, BusinessEvents: {Count}",
correlationId, duration.TotalMilliseconds, businessEventCount);
}
public async Task TrackValidationFailureAsync(string correlationId)
{
_telemetryClient.TrackEvent("ValidationFailure", new Dictionary<string, string>
{
["CorrelationId"] = correlationId
});
_counters.AddOrUpdate("ValidationFailures", 1, (key, value) => value + 1);
_logger.LogWarning("Event validation failed. CorrelationId: {CorrelationId}", correlationId);
}
public async Task TrackProcessingErrorAsync(string correlationId, Exception exception)
{
_telemetryClient.TrackException(exception, new Dictionary<string, string>
{
["CorrelationId"] = correlationId,
["ErrorType"] = exception.GetType().Name
});
_counters.AddOrUpdate("ProcessingErrors", 1, (key, value) => value + 1);
_logger.LogError(exception, "Event processing error. CorrelationId: {CorrelationId}", correlationId);
}
public async Task<object> GetMetricsAsync()
{
var recentMetrics = _metrics.Where(m => m.Timestamp > DateTime.UtcNow.AddMinutes(-5)).ToList();
return new
{
Counters = _counters.ToDictionary(kvp => kvp.Key, kvp => kvp.Value),
RecentProcessing = new
{
Count = recentMetrics.Count,
AverageDuration = recentMetrics.Any() ? recentMetrics.Average(m => m.Duration.TotalMilliseconds) : 0,
TotalBusinessEvents = recentMetrics.Sum(m => m.BusinessEventCount)
},
Timestamp = DateTime.UtcNow
};
}
private class ProcessingMetric
{
public string CorrelationId { get; set; }
public TimeSpan Duration { get; set; }
public int BusinessEventCount { get; set; }
public DateTime Timestamp { get; set; }
}
}
}
Health Checks¶
namespace DataverseEventTranslator.Infrastructure.HealthChecks
{
public class EventHubHealthCheck : IHealthCheck
{
private readonly EventHubConsumerClient _consumerClient;
private readonly EventHubProducerClient _producerClient;
private readonly ILogger<EventHubHealthCheck> _logger;
public EventHubHealthCheck(
EventHubConsumerClient consumerClient,
EventHubProducerClient producerClient,
ILogger<EventHubHealthCheck> logger)
{
_consumerClient = consumerClient;
_producerClient = producerClient;
_logger = logger;
}
public async Task<HealthCheckResult> CheckHealthAsync(
HealthCheckContext context,
CancellationToken cancellationToken = default)
{
try
{
// Check consumer client
var consumerProperties = await _consumerClient.GetEventHubPropertiesAsync(cancellationToken);
// Check producer client
var producerProperties = await _producerClient.GetEventHubPropertiesAsync(cancellationToken);
var data = new Dictionary<string, object>
{
["SourceEventHub"] = consumerProperties.Name,
["SourcePartitionCount"] = consumerProperties.PartitionIds.Length,
["BusinessEventHub"] = producerProperties.Name,
["BusinessPartitionCount"] = producerProperties.PartitionIds.Length
};
return HealthCheckResult.Healthy("Event Hub connections are healthy", data);
}
catch (Exception ex)
{
_logger.LogError(ex, "Event Hub health check failed");
return HealthCheckResult.Unhealthy("Event Hub connections failed", ex);
}
}
}
public class DeadLetterQueueHealthCheck : IHealthCheck
{
private readonly ILogger<DeadLetterQueueHealthCheck> _logger;
public DeadLetterQueueHealthCheck(ILogger<DeadLetterQueueHealthCheck> logger)
{
_logger = logger;
}
public async Task<HealthCheckResult> CheckHealthAsync(
HealthCheckContext context,
CancellationToken cancellationToken = default)
{
try
{
// Check dead letter queue accessibility
// Implementation depends on your dead letter queue choice
await Task.Delay(1, cancellationToken); // Placeholder
return HealthCheckResult.Healthy("Dead letter queue is accessible");
}
catch (Exception ex)
{
_logger.LogError(ex, "Dead letter queue health check failed");
return HealthCheckResult.Degraded("Dead letter queue check failed", ex);
}
}
}
}
Step 6: Error Handling and Resilience¶
public class ErrorHandling
{
private readonly ILogger _logger;
public ErrorHandling(ILogger logger)
{
_logger = logger;
}
public async Task<IActionResult> HandleWithRetry(
Func<Task<IActionResult>> operation,
string operationName,
int maxRetries = 3)
{
for (int attempt = 1; attempt <= maxRetries; attempt++)
{
try
{
return await operation();
}
catch (Exception ex) when (attempt < maxRetries && IsRetriableException(ex))
{
var delay = TimeSpan.FromSeconds(Math.Pow(2, attempt)); // Exponential backoff
_logger.LogWarning($"Attempt {attempt} failed for {operationName}: {ex.Message}. Retrying in {delay.TotalSeconds}s");
await Task.Delay(delay);
}
catch (Exception ex)
{
_logger.LogError($"Operation {operationName} failed after {attempt} attempts: {ex.Message}");
// Send to dead letter queue
await SendToDeadLetterQueue(operationName, ex);
return new StatusCodeResult(500);
}
}
return new StatusCodeResult(500);
}
private bool IsRetriableException(Exception ex)
{
return ex is HttpRequestException ||
ex is TaskCanceledException ||
ex is TimeoutException ||
(ex.Message?.Contains("throttle", StringComparison.OrdinalIgnoreCase) == true);
}
private async Task SendToDeadLetterQueue(string operationName, Exception ex)
{
// Implementation for dead letter queue
_logger.LogInformation($"Sending failed operation {operationName} to dead letter queue");
}
}
Application Insights Integration¶
public class TelemetryService
{
private readonly ILogger _logger;
public void TrackEventTranslation(string sourceEvent, string targetEvent, TimeSpan duration)
{
_logger.LogInformation("Event translated: {SourceEvent} -> {TargetEvent} in {Duration}ms",
sourceEvent, targetEvent, duration.TotalMilliseconds);
}
public void TrackBusinessEventPublished(BusinessEvent businessEvent)
{
_logger.LogInformation("Business event published: {EventType} for entity {EntityId}",
businessEvent.EventType, businessEvent.Metadata.GetValueOrDefault("SourceEntityId"));
}
}
Testing Strategy¶
Unit Testing¶
[TestClass]
public class EventTranslationTests
{
[TestMethod]
public async Task ProcessAccountUpdate_CreditLimitChange_GeneratesCorrectEvent()
{
// Arrange
var context = CreateTestContext();
context.PreEntityImages["PreImage"].Attributes["creditlimit"] = new { Value = 10000 };
context.PostEntityImages["PostImage"].Attributes["creditlimit"] = new { Value = 25000 };
var processor = new DataverseEventProcessor();
// Act
var events = await processor.ProcessAccountUpdate(context, Mock.Of<ILogger>());
// Assert
Assert.AreEqual(1, events.Count);
Assert.IsInstanceOfType(events[0], typeof(CustomerCreditLimitChanged));
var creditEvent = (CustomerCreditLimitChanged)events[0];
Assert.AreEqual(10000, creditEvent.OldCreditLimit);
Assert.AreEqual(25000, creditEvent.NewCreditLimit);
}
private DataverseExecutionContext CreateTestContext()
{
return new DataverseExecutionContext
{
PrimaryEntityName = "account",
PrimaryEntityId = Guid.NewGuid(),
PreEntityImages = new Dictionary<string, DataverseEntity>
{
["PreImage"] = new DataverseEntity
{
Attributes = new Dictionary<string, object>()
}
},
PostEntityImages = new Dictionary<string, DataverseEntity>
{
["PostImage"] = new DataverseEntity
{
Attributes = new Dictionary<string, object>()
}
}
};
}
}
Integration Testing¶
[TestClass]
public class IntegrationTests
{
[TestMethod]
public async Task EndToEnd_DataverseUpdate_PublishesToEventHub()
{
// Test with actual Dataverse sandbox environment
// Verify events flow through both Event Hubs
// Validate event schema and content
}
}
Monitoring and Observability¶
Key Metrics to Track¶
graph LR
A[Events Received] --> B[Events Processed]
B --> C[Business Events Generated]
C --> D[Events Published]
D --> E[Processing Latency]
F[Error Rate] --> G[Retry Attempts]
G --> H[Dead Letter Events]
I[Field Change Detection Rate]
J[Event Type Distribution]
Monitoring Dashboard¶
Track these metrics in Application Insights:
- Volume Metrics
- Events received per minute
- Business events generated per minute
-
Event processing success rate
-
Performance Metrics
- End-to-end processing latency
- Event Hub publish latency
-
Field change detection time
-
Error Metrics
- Translation failure rate
- Event Hub connection errors
-
Schema validation failures
-
Business Metrics
- Most frequently changed entity types
- Distribution of business event types
- Peak processing times
Alerting Rules¶
// Example alerting thresholds
var alertRules = new[]
{
new { Metric = "ProcessingFailureRate", Threshold = 5, Period = "5m" },
new { Metric = "EventHubConnectionErrors", Threshold = 3, Period = "1m" },
new { Metric = "ProcessingLatency", Threshold = 30000, Period = "5m" } // 30 seconds
};
Best Practices¶
Event Design Principles¶
- Make Events Immutable: Once published, events should never change
- Include Sufficient Context: Events should be self-contained
- Use Semantic Versioning: Version events to support schema evolution
- Design for Eventual Consistency: Events may be processed out of order
- Include Correlation IDs: Enable end-to-end tracing
Performance Optimization¶
- Batch Processing: Group multiple field changes into single events when appropriate
- Async Processing: Use async/await throughout the pipeline
- Connection Pooling: Reuse Event Hub connections
- Caching: Cache frequently accessed reference data
- Filtering: Only process business-relevant changes
Security Considerations¶
- Authenticate Service Endpoints: Use proper authentication between Dynamics and Azure
- Encrypt in Transit: Use HTTPS/TLS for all communications
- Sanitize Sensitive Data: Remove or mask PII in events
- Access Control: Limit Event Hub access to authorized consumers
- Audit Trail: Log all event processing activities
Troubleshooting Guide¶
Common Issues¶
Issue | Symptoms | Solution |
---|---|---|
Missing Pre/Post Images | Events show no field changes | Verify image configuration in Plugin Registration Tool |
Source Event Hub Connection Failures | Events not reaching translator | Check source Event Hub connection string and network connectivity |
Business Event Hub Connection Failures | Translated events not reaching consumers | Check business Event Hub connection string and permissions |
High Processing Latency | Events delayed significantly | Optimize field change detection logic |
Schema Validation Errors | Malformed events published | Add comprehensive input validation |
Dead Letter Queue Filling | Many failed events | Investigate root cause of processing failures |
Debugging Steps¶
- Check Application Insights Logs: Review detailed processing logs
- Verify Dataverse Configuration: Ensure service endpoint is properly registered and pointing to source Event Hub
- Test Event Schema: Validate events match expected schema in both Event Hubs
- Monitor Event Hub Metrics: Check both source and business Event Hub metrics and consumer lag
- Review Dead Letter Queue: Analyze failed events for patterns
Conclusion¶
This solution transforms Dataverse's generic CRUD events into meaningful business events that provide real value to downstream systems. By implementing field-level change detection with pre/post images and using Event Hub for scalable event processing, you get precise control over what constitutes a business event while maintaining high performance and reliability.
The architecture supports: - Scalability: Handle high event volumes with Event Hub triggers and Azure Functions - Reliability: Built-in retry logic, dead letter queues, and Event Hub durability - Observability: Comprehensive monitoring and alerting - Maintainability: Clean separation of concerns and testable code - Extensibility: Easy to add new entity types and business events
This approach enables true event-driven architecture where business processes can react to meaningful changes rather than low-level database operations, with the added benefit of Event Hub's enterprise-grade messaging capabilities.