Skip to content

Resilience and Retry Policies

using Polly;
using Polly.Extensions.Http;

namespace DataverseEventTranslator.Infrastructure.Resilience
{
    public class ResilienceService
    {
        private readonly IAsyncPolicy _retryPolicy;
        private readonly IAsyncPolicy _circuitBreakerPolicy;
        private readonly ILogger<ResilienceService> _logger;

        public ResilienceService(ILogger<ResilienceService> logger)
        {
            _logger = logger;

            // Retry policy with exponential backoff
            _retryPolicy = Policy
                .Handle<HttpRequestException>()
                .Or<TaskCanceledException>()
                .Or<TimeoutException>()
                .WaitAndRetryAsync(
                    retryCount: 3,
                    sleepDurationProvider: retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)),
                    onRetry: (outcome, timespan, retryCount, context) =>
                    {
                        _logger.LogWarning("Retry attempt {RetryCount} after {Delay}ms for operation {OperationKey}",
                            retryCount, timespan.TotalMilliseconds, context.OperationKey);
                    });

            // Circuit breaker pattern
            _circuitBreakerPolicy = Policy
                .Handle<Exception>()
                .CircuitBreakerAsync(
                    handledEventsAllowedBeforeBreaking: 5,
                    durationOfBreak: TimeSpan.FromMinutes(1),
                    onBreak: (exception, duration) =>
                    {
                        _logger.LogError(exception, "Circuit breaker opened for {Duration}ms", duration.TotalMilliseconds);
                    },
                    onReset: () =>
                    {
                        _logger.LogInformation("Circuit breaker reset");
                    });
        }

        public async Task<T> ExecuteWithRetryAsync<T>(
            Func<Task<T>> operation,
            string operationName,
            CancellationToken cancellationToken = default)
        {
            var context = new Context(operationName);
            return await _retryPolicy.ExecuteAsync(async (ctx) =>
            {
                try
                {
                    return await operation();
                }
                catch (Exception ex) when (IsRetriableException(ex))
                {
                    _logger.LogWarning(ex, "Retriable error in operation {OperationName}", operationName);
                    throw;
                }
                catch (Exception ex)
                {
                    _logger.LogError(ex, "Non-retriable error in operation {OperationName}", operationName);
                    throw;
                }
            }, context);
        }

        public async Task ExecuteWithCircuitBreakerAsync(
            Func<Task> operation,
            string operationName,
            CancellationToken cancellationToken = default)
        {
            try
            {
                await _circuitBreakerPolicy.ExecuteAsync(operation);
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Circuit breaker protected operation {OperationName} failed", operationName);
                throw;
            }
        }

        private bool IsRetriableException(Exception ex)
        {
            return ex is HttpRequestException ||
                   ex is TaskCanceledException ||
                   ex is TimeoutException ||
                   (ex.Message?.Contains("throttle", StringComparison.OrdinalIgnoreCase) == true);
        }
    }
}

Dead Letter Queue Service

namespace DataverseEventTranslator.Infrastructure.DeadLetter
{
    public interface IDeadLetterService
    {
        Task SendAsync(object message, Exception exception, string correlationId);
        Task<IEnumerable<DeadLetterMessage>> GetMessagesAsync(int count = 10);
        Task ReprocessMessageAsync(string messageId);
    }

    public class ServiceBusDeadLetterService : IDeadLetterService
    {
        private readonly ServiceBusClient _serviceBusClient;
        private readonly ILogger<ServiceBusDeadLetterService> _logger;
        private readonly string _deadLetterQueueName;

        public ServiceBusDeadLetterService(
            ServiceBusClient serviceBusClient,
            IConfiguration configuration,
            ILogger<ServiceBusDeadLetterService> logger)
        {
            _serviceBusClient = serviceBusClient;
            _logger = logger;
            _deadLetterQueueName = configuration.GetValue<string>("DeadLetterQueue:Name") ?? "dead-letter-queue";
        }

        public async Task SendAsync(object message, Exception exception, string correlationId)
        {
            try
            {
                var sender = _serviceBusClient.CreateSender(_deadLetterQueueName);

                var deadLetterMessage = new DeadLetterMessage
                {
                    Id = Guid.NewGuid().ToString(),
                    OriginalMessage = JsonConvert.SerializeObject(message),
                    Exception = exception.ToString(),
                    CorrelationId = correlationId,
                    FailedAt = DateTime.UtcNow,
                    RetryCount = 0,
                    MaxRetries = 3
                };

                var serviceBusMessage = new ServiceBusMessage(JsonConvert.SerializeObject(deadLetterMessage))
                {
                    MessageId = deadLetterMessage.Id,
                    CorrelationId = correlationId,
                    ContentType = "application/json"
                };

                await sender.SendMessageAsync(serviceBusMessage);

                _logger.LogInformation("Message sent to dead letter queue. MessageId: {MessageId}, CorrelationId: {CorrelationId}",
                    deadLetterMessage.Id, correlationId);
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Failed to send message to dead letter queue. CorrelationId: {CorrelationId}", correlationId);
                throw;
            }
        }

        public async Task<IEnumerable<DeadLetterMessage>> GetMessagesAsync(int count = 10)
        {
            var receiver = _serviceBusClient.CreateReceiver(_deadLetterQueueName);
            var messages = await receiver.ReceiveMessagesAsync(count, TimeSpan.FromSeconds(10));

            return messages.Select(msg => JsonConvert.DeserializeObject<DeadLetterMessage>(msg.Body.ToString()));
        }

        public async Task ReprocessMessageAsync(string messageI# Translating Dataverse CRUD Events to Business Events

## Overview

This guide demonstrates how to transform generic Dataverse CRUD (Create, Read, Update, Delete) events into meaningful business events using Azure Event Hub and C#. Instead of consuming low-level database operations, downstream systems receive semantically rich events that represent actual business processes.

## Understanding Dataverse Event Publishing

### How Dataverse Publishes Events

Dataverse (formerly Common Data Service) has a sophisticated event publishing system built on the concept of **Service Endpoints**. Understanding this mechanism is crucial for implementing effective event translation.

#### The Plugin Framework Foundation

Dataverse operates on a plugin-based architecture where custom code can be executed in response to data operations. The system provides several execution contexts:

- **Pre-Operation**: Before the database transaction
- **Post-Operation**: After the database transaction but within the same transaction
- **Asynchronous**: After the transaction completes, executed separately

#### Service Endpoints: The Event Publishing Mechanism

Service Endpoints are specialized plugins that instead of executing custom logic locally, forward event data to external systems. They act as bridges between Dataverse's internal event system and external messaging platforms.

**Key Components:**

1. **Plugin Registration Tool**: Administrative interface for configuring what events trigger which endpoints
2. **Event Execution Pipeline**: Dataverse's internal event processing system
3. **Service Endpoint Configuration**: Defines where and how events are sent
4. **Pre/Post Images**: Snapshots of entity data before and after operations

**Event Flow Within Dataverse:**

```mermaid
graph TD
    A[User Action] --> B[Dataverse Core]
    B --> C[Event Pipeline]
    C --> D{Plugin Steps}
    D --> E[Pre-Operation Plugins]
    D --> F[Database Operation]
    D --> G[Post-Operation Plugins]
    D --> H[Service Endpoints]
    H --> I[External System<br/>Azure Event Hub]

    E --> J[Pre-Images Generated]
    G --> K[Post-Images Generated]
    J --> H
    K --> H

What Gets Published:

When a Service Endpoint is triggered, Dataverse packages comprehensive context including: - Operation Type: Create, Update, Delete, or custom operations - Entity Information: Type, ID, and logical name - User Context: Who performed the operation - Pre-Images: Entity state before the operation (configurable fields) - Post-Images: Entity state after the operation (configurable fields) - Input Parameters: Operation-specific data - Execution Context: Transaction details, correlation IDs, etc.

Configuration Flexibility:

Administrators can configure: - Which entities trigger events (Account, Contact, Opportunity, etc.) - Which operations trigger events (Create, Update, Delete, custom actions) - Which fields are included in pre/post images - Filtering conditions (only certain status changes, value thresholds, etc.) - Execution timing (synchronous vs asynchronous)

This granular control allows organizations to publish only business-relevant events while maintaining high performance by avoiding unnecessary event generation.

The Challenge

Translating Dataverse Contact Events to Business Events

Overview

This guide demonstrates how to transform generic Dataverse Contact entity CRUD events into meaningful business events using Azure Event Hub and C#. We'll focus on handling Contact updates to support both generic change projections and specific business events like email address changes.

Understanding Dataverse Event Publishing

How Dataverse Publishes Events

Dataverse (formerly Common Data Service) has a sophisticated event publishing system built on the concept of Service Endpoints. Understanding this mechanism is crucial for implementing effective event translation.

The Plugin Framework Foundation

Dataverse operates on a plugin-based architecture where custom code can be executed in response to data operations. The system provides several execution contexts:

  • Pre-Operation: Before the database transaction
  • Post-Operation: After the database transaction but within the same transaction
  • Asynchronous: After the transaction completes, executed separately

Service Endpoints: The Event Publishing Mechanism

Service Endpoints are specialized plugins that instead of executing custom logic locally, forward event data to external systems. They act as bridges between Dataverse's internal event system and external messaging platforms.

Key Components:

  1. Plugin Registration Tool: Administrative interface for configuring what events trigger which endpoints
  2. Event Execution Pipeline: Dataverse's internal event processing system
  3. Service Endpoint Configuration: Defines where and how events are sent
  4. Pre/Post Images: Snapshots of entity data before and after operations

Event Flow Within Dataverse:

graph TD
    A[User Action] --> B[Dataverse Core]
    B --> C[Event Pipeline]
    C --> D{Plugin Steps}
    D --> E[Pre-Operation Plugins]
    D --> F[Database Operation]
    D --> G[Post-Operation Plugins]
    D --> H[Service Endpoints]
    H --> I[External System<br/>Azure Event Hub]

    E --> J[Pre-Images Generated]
    G --> K[Post-Images Generated]
    J --> H
    K --> H

What Gets Published:

When a Service Endpoint is triggered, Dataverse packages comprehensive context including: - Operation Type: Create, Update, Delete, or custom operations - Entity Information: Type, ID, and logical name - User Context: Who performed the operation - Pre-Images: Entity state before the operation (configurable fields) - Post-Images: Entity state after the operation (configurable fields) - Input Parameters: Operation-specific data - Execution Context: Transaction details, correlation IDs, etc.

Configuration Flexibility:

Administrators can configure: - Which entities trigger events (Contact in our case) - Which operations trigger events (Create, Update, Delete, custom actions) - Which fields are included in pre/post images - Filtering conditions (only certain status changes, value thresholds, etc.) - Execution timing (synchronous vs asynchronous)

This granular control allows organizations to publish only business-relevant events while maintaining high performance by avoiding unnecessary event generation.

The Challenge

Current State: Generic CRUD Events

Dataverse's built-in service endpoints generate technical events that lack business context:

{
  "EntityType": "contact",
  "Operation": "Update",
  "EntityId": "12345678-1234-1234-1234-123456789012",
  "ModifiedFields": ["emailaddress1", "modifiedon"],
  "Timestamp": "2024-07-11T15:30:00Z"
}

Desired State: Both Generic and Business Events

Transform these into two types of events:

1. Generic Contact Update Event (for projections):

{
  "EventType": "ContactUpdated",
  "ContactId": "12345678-1234-1234-1234-123456789012",
  "FullName": "John Doe",
  "ChangedFields": ["emailaddress1"],
  "PreValues": {
    "emailaddress1": "john.old@company.com"
  },
  "PostValues": {
    "emailaddress1": "john.new@company.com"
  },
  "ChangedAt": "2024-07-11T15:30:00Z",
  "ChangedBy": "admin@company.com"
}

2. Specific Business Event (for business logic):

{
  "EventType": "ContactEmailChanged",
  "ContactId": "12345678-1234-1234-1234-123456789012",
  "FullName": "John Doe",
  "OldEmailAddress": "john.old@company.com",
  "NewEmailAddress": "john.new@company.com",
  "ChangedAt": "2024-07-11T15:30:00Z",
  "ChangedBy": "admin@company.com",
  "RequiresEmailVerification": true
}

Problems with CRUD Events

  1. Lack of Business Context: "Contact Updated" doesn't convey business meaning
  2. Field-Level Granularity Missing: Can't determine which specific fields changed
  3. No Semantic Versioning: Schema changes break downstream consumers
  4. Noise: Every minor update generates events, even non-business-relevant ones
  5. Poor Developer Experience: Consumers must understand Dataverse schema

Solution Architecture

High-Level Architecture

graph TB
    DV[Dataverse] --> EH1[Source Event Hub<br/>Raw Contact CRUD Events]
    EH1 --> CA[Containerized .NET App<br/>Contact Event Translator]
    CA --> EH2[Business Event Hub<br/>Contact Business Events]
    EH2 --> C1[Consumer 1<br/>Email Service]
    EH2 --> C2[Consumer 2<br/>Contact View Projector]
    EH2 --> C3[Consumer 3<br/>CRM Analytics]

    CA --> DL[Dead Letter Queue<br/>Failed Events]
    CA --> LOG[Application Insights<br/>Monitoring]

    subgraph "Container Environment"
        CA
        subgraph "Deployment Options"
            AKS[Azure Kubernetes Service]
            ACA[Azure Container Apps]
            ACI[Azure Container Instances]
        end
    end

Event Flow Diagram

sequenceDiagram
    participant DV as Dataverse
    participant EH1 as Source Event Hub
    participant CA as Container App
    participant EH2 as Business Event Hub
    participant P as Projection Consumer
    participant B as Business Consumer

    DV->>EH1: Contact Update with Pre/Post Images
    EH1->>CA: Event Hub Consumer reads events
    CA->>CA: Detect Field Changes
    CA->>CA: Generate Generic ContactUpdated Event
    CA->>CA: Generate Specific ContactEmailChanged Event
    CA->>EH2: Publish ContactUpdated Event
    CA->>EH2: Publish ContactEmailChanged Event
    EH2->>P: ContactUpdated → Update Contact View
    EH2->>B: ContactEmailChanged → Send Verification Email

Translation Process

flowchart TD
    Start([Contact CRUD Event Received]) --> Parse[Parse Pre/Post Images]
    Parse --> Detect[Detect Field Changes]
    Detect --> Filter{Business<br/>Relevant?}
    Filter -->|No| Ignore[Ignore Event]
    Filter -->|Yes| Generic[Generate ContactUpdated Event]
    Generic --> Specific{Specific Business<br/>Logic Needed?}
    Specific -->|Email Changed| Email[Generate ContactEmailChanged Event]
    Specific -->|Phone Changed| Phone[Generate ContactPhoneChanged Event]
    Specific -->|Status Changed| Status[Generate ContactStatusChanged Event]
    Specific -->|No| Publish[Publish Events]
    Email --> Publish
    Phone --> Publish
    Status --> Publish
    Publish --> Log[Log Success]
    Ignore --> End([End])
    Log --> End

Implementation Guide

Step 1: Configure Dataverse Service Endpoint for Contact Entity

Register Service Endpoint in Plugin Registration Tool

  1. Create new Service Endpoint pointing to your Source Event Hub
  2. Register Step for "Update" message on "contact" entity specifically
  3. Configure Pre-Image and Post-Image with Contact-specific attributes

Event Hub Service Endpoint Configuration: - Service Endpoint Type: Event Hub - Connection String: Event Hub connection string - Topic Name: Contact CRUD events topic

Pre-Image Configuration for Contact: - Alias: "PreImage" - Attributes: "firstname,lastname,fullname,emailaddress1,telephone1,mobilephone,jobtitle,statuscode,parentcustomerid"

Post-Image Configuration for Contact: - Alias: "PostImage"
- Attributes: "firstname,lastname,fullname,emailaddress1,telephone1,mobilephone,jobtitle,statuscode,parentcustomerid"

Step 2: Contact Event Translation Logic

Contact-Specific Business Events

// Generic event for view projections
public class ContactUpdated : BusinessEvent
{
    public ContactUpdated() { EventType = "ContactUpdated"; }

    public string ContactId { get; set; }
    public string FullName { get; set; }
    public string[] ChangedFields { get; set; }
    public Dictionary<string, object> PreValues { get; set; }
    public Dictionary<string, object> PostValues { get; set; }
    public string ChangedBy { get; set; }
}

// Specific business event for email changes
public class ContactEmailChanged : BusinessEvent
{
    public ContactEmailChanged() { EventType = "ContactEmailChanged"; }

    public string ContactId { get; set; }
    public string FullName { get; set; }
    public string OldEmailAddress { get; set; }
    public string NewEmailAddress { get; set; }
    public string ChangedBy { get; set; }
    public bool RequiresEmailVerification { get; set; }
    public string AccountId { get; set; } // Parent customer if available
}

// Other specific business events
public class ContactPhoneChanged : BusinessEvent
{
    public ContactPhoneChanged() { EventType = "ContactPhoneChanged"; }

    public string ContactId { get; set; }
    public string FullName { get; set; }
    public string PhoneType { get; set; } // "Business" or "Mobile"
    public string OldPhoneNumber { get; set; }
    public string NewPhoneNumber { get; set; }
    public string ChangedBy { get; set; }
}

public class ContactStatusChanged : BusinessEvent
{
    public ContactStatusChanged() { EventType = "ContactStatusChanged"; }

    public string ContactId { get; set; }
    public string FullName { get; set; }
    public string OldStatus { get; set; }
    public string NewStatus { get; set; }
    public string StatusChangeReason { get; set; }
    public string ChangedBy { get; set; }
}

Contact Event Mapping Logic

public async Task<List<BusinessEvent>> MapContactEventsAsync(
    DataverseExecutionContext context, 
    CancellationToken cancellationToken)
{
    var events = new List<BusinessEvent>();
    var preImage = context.PreEntityImages["PreImage"];
    var postImage = context.PostEntityImages["PostImage"];
    var contactId = context.PrimaryEntityId.ToString();

    // Always generate a generic ContactUpdated event for view projections
    var genericEvent = await CreateGenericContactUpdatedEvent(preImage, postImage, contactId, context);
    if (genericEvent != null)
    {
        events.Add(genericEvent);
    }

    // Generate specific business events based on field changes
    await AddEmailChangeEventIfNeeded(preImage, postImage, contactId, events, context);
    await AddPhoneChangeEventsIfNeeded(preImage, postImage, contactId, events, context);
    await AddStatusChangeEventIfNeeded(preImage, postImage, contactId, events, context);

    return events;
}

private async Task<ContactUpdated> CreateGenericContactUpdatedEvent(
    DataverseEntity preImage, 
    DataverseEntity postImage, 
    string contactId,
    DataverseExecutionContext context)
{
    var changedFields = DetectChangedFields(preImage, postImage);

    if (!changedFields.Any())
        return null; // No relevant changes

    var preValues = new Dictionary<string, object>();
    var postValues = new Dictionary<string, object>();

    foreach (var field in changedFields)
    {
        preValues[field] = preImage.GetAttributeValue<object>(field);
        postValues[field] = postImage.GetAttributeValue<object>(field);
    }

    return new ContactUpdated
    {
        ContactId = contactId,
        FullName = postImage.GetAttributeValue<string>("fullname"),
        ChangedFields = changedFields.ToArray(),
        PreValues = preValues,
        PostValues = postValues,
        ChangedBy = context.InitiatingUser?.Email
    };
}

private async Task AddEmailChangeEventIfNeeded(
    DataverseEntity preImage, 
    DataverseEntity postImage, 
    string contactId,
    List<BusinessEvent> events,
    DataverseExecutionContext context)
{
    var oldEmail = preImage.GetAttributeValue<string>("emailaddress1");
    var newEmail = postImage.GetAttributeValue<string>("emailaddress1");

    if (oldEmail != newEmail && !string.IsNullOrEmpty(newEmail))
    {
        // Get parent account for context
        var parentCustomerId = postImage.GetAttributeValue<Guid?>("parentcustomerid");

        events.Add(new ContactEmailChanged
        {
            ContactId = contactId,
            FullName = postImage.GetAttributeValue<string>("fullname"),
            OldEmailAddress = oldEmail,
            NewEmailAddress = newEmail,
            ChangedBy = context.InitiatingUser?.Email,
            RequiresEmailVerification = ShouldRequireEmailVerification(oldEmail, newEmail),
            AccountId = parentCustomerId?.ToString()
        });

        _logger.LogInformation("Contact email changed: {ContactId} from {OldEmail} to {NewEmail}",
            contactId, oldEmail, newEmail);
    }
}

private bool ShouldRequireEmailVerification(string oldEmail, string newEmail)
{
    // Business logic: require verification if this is a new email or domain change
    if (string.IsNullOrEmpty(oldEmail))
        return true; // New email address

    var oldDomain = oldEmail.Split('@').LastOrDefault();
    var newDomain = newEmail.Split('@').LastOrDefault();

    return oldDomain != newDomain; // Domain changed
}

private List<string> DetectChangedFields(DataverseEntity preImage, DataverseEntity postImage)
{
    var changedFields = new List<string>();
    var relevantFields = new[] 
    { 
        "firstname", "lastname", "emailaddress1", "telephone1", 
        "mobilephone", "jobtitle", "statuscode", "parentcustomerid" 
    };

    foreach (var field in relevantFields)
    {
        var preValue = preImage.GetAttributeValue<object>(field);
        var postValue = postImage.GetAttributeValue<object>(field);

        if (!object.Equals(preValue, postValue))
        {
            changedFields.Add(field);
        }
    }

    return changedFields;
}

Step 3: Event Consumer Patterns

Pattern 1: View Projection Consumer (Generic Events)

// Consumer that updates materialized views based on ContactUpdated events
public class ContactViewProjectionConsumer
{
    public async Task HandleContactUpdatedAsync(ContactUpdated contactEvent)
    {
        // Update search index
        await _searchService.UpdateContactAsync(contactEvent.ContactId, contactEvent.PostValues);

        // Update reporting database
        await _reportingService.ProjectContactChangesAsync(contactEvent);

        // Update cache
        await _cacheService.InvalidateContactAsync(contactEvent.ContactId);

        _logger.LogInformation("Projected contact changes for {ContactId} with fields: {Fields}",
            contactEvent.ContactId, string.Join(", ", contactEvent.ChangedFields));
    }
}

Pattern 2: Business Logic Consumer (Specific Events)

// Consumer that handles email change business logic
public class ContactEmailChangeConsumer
{
    public async Task HandleContactEmailChangedAsync(ContactEmailChanged emailEvent)
    {
        if (emailEvent.RequiresEmailVerification)
        {
            // Send email verification
            await _emailService.SendVerificationEmailAsync(emailEvent.NewEmailAddress, emailEvent.ContactId);

            // Create verification record
            await _verificationService.CreateVerificationRecordAsync(emailEvent.ContactId, emailEvent.NewEmailAddress);

            _logger.LogInformation("Email verification sent for contact {ContactId} to {NewEmail}",
                emailEvent.ContactId, emailEvent.NewEmailAddress);
        }

        // Update email marketing lists
        await _marketingService.UpdateContactEmailAsync(emailEvent.ContactId, emailEvent.NewEmailAddress);

        // Notify related systems
        if (!string.IsNullOrEmpty(emailEvent.AccountId))
        {
            await _notificationService.NotifyAccountOfContactEmailChangeAsync(emailEvent.AccountId, emailEvent);
        }
    }
}

Step 4: Event Hub Message Structure

Source Event Hub Message (from Dataverse)

{
  "MessageName": "Update",
  "TimeStamp": "2024-07-11T15:30:00Z",
  "ExecutionContext": {
    "PrimaryEntityName": "contact",
    "PrimaryEntityId": "12345678-1234-1234-1234-123456789012",
    "PreEntityImages": {
      "PreImage": {
        "Id": "12345678-1234-1234-1234-123456789012",
        "LogicalName": "contact",
        "Attributes": {
          "fullname": "John Doe",
          "emailaddress1": "john.old@company.com",
          "telephone1": "555-0123",
          "jobtitle": "Developer",
          "statuscode": { "Value": 1 }
        }
      }
    },
    "PostEntityImages": {
      "PostImage": {
        "Id": "12345678-1234-1234-1234-123456789012",
        "LogicalName": "contact",
        "Attributes": {
          "fullname": "John Doe",
          "emailaddress1": "john.new@company.com",
          "telephone1": "555-0123",
          "jobtitle": "Senior Developer",
          "statuscode": { "Value": 1 }
        }
      }
    },
    "InitiatingUser": {
      "UserId": "87654321-4321-4321-4321-210987654321",
      "Email": "admin@company.com",
      "FullName": "System Administrator"
    }
  }
}

Business Event Hub Messages (after translation)

Generic ContactUpdated Event:

{
  "EventId": "11111111-1111-1111-1111-111111111111",
  "EventType": "ContactUpdated",
  "ContactId": "12345678-1234-1234-1234-123456789012",
  "FullName": "John Doe",
  "ChangedFields": ["emailaddress1", "jobtitle"],
  "PreValues": {
    "emailaddress1": "john.old@company.com",
    "jobtitle": "Developer"
  },
  "PostValues": {
    "emailaddress1": "john.new@company.com",
    "jobtitle": "Senior Developer"
  },
  "ChangedBy": "admin@company.com",
  "OccurredAt": "2024-07-11T15:30:00Z",
  "SourceSystem": "Dataverse",
  "EventVersion": "1.0",
  "CorrelationId": "22222222-2222-2222-2222-222222222222"
}

Specific ContactEmailChanged Event:

{
  "EventId": "33333333-3333-3333-3333-333333333333",
  "EventType": "ContactEmailChanged",
  "ContactId": "12345678-1234-1234-1234-123456789012",
  "FullName": "John Doe",
  "OldEmailAddress": "john.old@company.com",
  "NewEmailAddress": "john.new@company.com",
  "ChangedBy": "admin@company.com",
  "RequiresEmailVerification": true,
  "AccountId": null,
  "OccurredAt": "2024-07-11T15:30:00Z",
  "SourceSystem": "Dataverse",
  "EventVersion": "1.0",
  "CorrelationId": "22222222-2222-2222-2222-222222222222"
}

Step 5: Key Implementation Patterns

Event Filtering Strategy

private bool ShouldProcessContactUpdate(DataverseExecutionContext context)
{
    var preImage = context.PreEntityImages["PreImage"];
    var postImage = context.PostEntityImages["PostImage"];

    // Filter out system-only updates (like modifiedon, modifiedby)
    var businessRelevantFields = new[] 
    { 
        "firstname", "lastname", "emailaddress1", "telephone1", 
        "mobilephone", "jobtitle", "statuscode", "parentcustomerid" 
    };

    return businessRelevantFields.Any(field => 
        !object.Equals(preImage.GetAttributeValue<object>(field), 
                      postImage.GetAttributeValue<object>(field)));
}

Event Enrichment

private async Task EnrichContactEventAsync(ContactUpdated contactEvent, DataverseEntity postImage)
{
    // Add computed fields
    contactEvent.Metadata.Add("HasEmailAddress", !string.IsNullOrEmpty(postImage.GetAttributeValue<string>("emailaddress1")));
    contactEvent.Metadata.Add("HasPhoneNumber", 
        !string.IsNullOrEmpty(postImage.GetAttributeValue<string>("telephone1")) || 
        !string.IsNullOrEmpty(postImage.GetAttributeValue<string>("mobilephone")));

    // Add relationship context
    var parentCustomerId = postImage.GetAttributeValue<Guid?>("parentcustomerid");
    if (parentCustomerId.HasValue)
    {
        contactEvent.Metadata.Add("ParentAccountId", parentCustomerId.ToString());
        // Could fetch additional account details if needed
    }

    // Add change categorization
    contactEvent.Metadata.Add("ChangeCategory", CategorizeChanges(contactEvent.ChangedFields));
}

private string CategorizeChanges(string[] changedFields)
{
    if (changedFields.Contains("emailaddress1")) return "ContactInformation";
    if (changedFields.Contains("telephone1") || changedFields.Contains("mobilephone")) return "ContactInformation";
    if (changedFields.Contains("jobtitle")) return "ProfessionalInformation";
    if (changedFields.Contains("statuscode")) return "StatusChange";
    return "GeneralUpdate";
}

Batch Processing for Performance

private async Task ProcessEventBatchAsync(IEnumerable<PartitionEvent> events, CancellationToken cancellationToken)
{
    var businessEvents = new List<BusinessEvent>();

    foreach (var partitionEvent in events)
    {
        try
        {
            var dataverseEvent = ParseDataverseEvent(partitionEvent);

            if (dataverseEvent.ExecutionContext.PrimaryEntityName == "contact")
            {
                var contactEvents = await MapContactEventsAsync(dataverseEvent.ExecutionContext, cancellationToken);
                businessEvents.AddRange(contactEvents);
            }
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Error processing individual event in batch");
            // Continue processing other events in batch
        }
    }

    // Publish all business events in a single batch
    if (businessEvents.Any())
    {
        await PublishBusinessEventBatchAsync(businessEvents, cancellationToken);
    }
}

Testing Strategy

Unit Testing Contact Event Mapping

[TestClass]
public class ContactEventMappingTests
{
    [TestMethod]
    public async Task MapContactEvents_EmailChange_GeneratesBothGenericAndSpecificEvents()
    {
        // Arrange
        var context = CreateContactUpdateContext(
            preEmail: "old@company.com",
            postEmail: "new@company.com");

        var mapper = new BusinessEventMapper(Mock.Of<ILogger<BusinessEventMapper>>());

        // Act
        var events = await mapper.MapContactEventsAsync(context, CancellationToken.None);

        // Assert
        Assert.AreEqual(2, events.Count);

        var genericEvent = events.OfType<ContactUpdated>().FirstOrDefault();
        Assert.IsNotNull(genericEvent);
        Assert.IsTrue(genericEvent.ChangedFields.Contains("emailaddress1"));

        var specificEvent = events.OfType<ContactEmailChanged>().FirstOrDefault();
        Assert.IsNotNull(specificEvent);
        Assert.AreEqual("old@company.com", specificEvent.OldEmailAddress);
        Assert.AreEqual("new@company.com", specificEvent.NewEmailAddress);
        Assert.IsTrue(specificEvent.RequiresEmailVerification);
    }

    [TestMethod]
    public async Task MapContactEvents_OnlyJobTitleChange_GeneratesOnlyGenericEvent()
    {
        // Arrange
        var context = CreateContactUpdateContext(
            preJobTitle: "Developer",
            postJobTitle: "Senior Developer");

        var mapper = new BusinessEventMapper(Mock.Of<ILogger<BusinessEventMapper>>());

        // Act
        var events = await mapper.MapContactEventsAsync(context, CancellationToken.None);

        // Assert
        Assert.AreEqual(1, events.Count);
        Assert.IsInstanceOfType(events[0], typeof(ContactUpdated));

        var genericEvent = (ContactUpdated)events[0];
        Assert.IsTrue(genericEvent.ChangedFields.Contains("jobtitle"));
        Assert.AreEqual("Developer", genericEvent.PreValues["jobtitle"]);
        Assert.AreEqual("Senior Developer", genericEvent.PostValues["jobtitle"]);
    }
}

Integration Testing with Test Containers

[TestClass]
public class ContactEventIntegrationTests
{
    private EventHubTestContainer _sourceEventHub;
    private EventHubTestContainer _businessEventHub;
    private ContactEventTranslatorService _translatorService;

    [TestInitialize]
    public async Task Setup()
    {
        _sourceEventHub = new EventHubTestContainer();
        _businessEventHub = new EventHubTestContainer();

        await _sourceEventHub.StartAsync();
        await _businessEventHub.StartAsync();

        _translatorService = new ContactEventTranslatorService(
            _sourceEventHub.ConnectionString,
            _businessEventHub.ConnectionString);
    }

    [TestMethod]
    public async Task EndToEnd_ContactEmailChange_PublishesCorrectEvents()
    {
        // Arrange
        var contactUpdateEvent = CreateDataverseContactUpdateEvent();

        // Act
        await _sourceEventHub.PublishEventAsync(contactUpdateEvent);
        await Task.Delay(5000); // Allow processing time

        // Assert
        var businessEvents = await _businessEventHub.ReadEventsAsync(timeout: TimeSpan.FromSeconds(10));

        Assert.AreEqual(2, businessEvents.Count);

        var contactUpdated = businessEvents.FirstOrDefault(e => e.EventType == "ContactUpdated");
        var emailChanged = businessEvents.FirstOrDefault(e => e.EventType == "ContactEmailChanged");

        Assert.IsNotNull(contactUpdated);
        Assert.IsNotNull(emailChanged);

        // Verify correlation IDs match
        Assert.AreEqual(contactUpdated.CorrelationId, emailChanged.CorrelationId);
    }
}

## Best Practices for Contact Event Translation

### Event Design Principles

1. **Dual Event Strategy**: Always generate both generic `ContactUpdated` events for projections and specific business events for targeted logic
2. **Field-Level Granularity**: Include both pre and post values for changed fields to support different consumer needs
3. **Business Context**: Enrich events with business-relevant computed fields and relationship data
4. **Idempotency**: Ensure events can be safely processed multiple times
5. **Correlation**: Maintain correlation IDs across related events

### Performance Optimization

1. **Field Filtering**: Only process business-relevant field changes to reduce noise
2. **Batch Processing**: Group multiple contact updates when possible to improve throughput
3. **Selective Enrichment**: Only fetch additional data when required by specific business events
4. **Caching**: Cache frequently accessed reference data (like account details for contacts)

### Security Considerations

1. **PII Handling**: Mask or exclude sensitive contact information in generic events
2. **Email Verification**: Implement proper email verification workflows for email changes
3. **Audit Trail**: Log all contact event processing activities for compliance
4. **Data Retention**: Implement appropriate retention policies for contact event data

## Troubleshooting Guide

### Common Contact Event Issues

| Issue | Symptoms | Solution |
|-------|----------|----------|
| Missing Contact Pre/Post Images | Contact events show no field changes | Verify contact-specific image configuration in Plugin Registration Tool |
| Email Verification Not Triggering | ContactEmailChanged events published but no verification emails | Check email change detection logic and verification service integration |
| Duplicate Contact Events | Same contact update generates multiple events | Implement event deduplication based on correlation ID and timestamp |
| Missing Parent Account Context | Contact events lack account relationship data | Ensure parentcustomerid is included in pre/post images |

### Debugging Contact Event Processing

1. **Check Contact-Specific Logs**: Filter logs by contact entity type and correlation ID
2. **Verify Field Detection**: Test field change detection logic with known contact updates
3. **Validate Event Schema**: Ensure contact events match expected schema
4. **Monitor Consumer Lag**: Check if contact event consumers are keeping up with volume
5. **Review Contact Business Logic**: Verify specific business event generation rules

## Conclusion

This solution transforms Dataverse's generic Contact CRUD events into meaningful business events that provide real value to downstream systems. By implementing both generic `ContactUpdated` events for view projections and specific events like `ContactEmailChanged` for business logic, you get the flexibility to support different consumer patterns while maintaining high performance and reliability.

The Contact-focused architecture supports:

- **Flexibility**: Both generic projection events and specific business events from the same source
- **Scalability**: Handle high contact update volumes with Event Hub and containerized processing
- **Reliability**: Built-in retry logic, dead letter queues, and Event Hub durability
- **Observability**: Contact-specific monitoring and alerting
- **Maintainability**: Clean separation between projection and business event logic
- **Extensibility**: Easy to add new contact field monitoring and business events

This approach enables downstream systems to either maintain synchronized contact views through generic events or react to specific contact changes through targeted business events, providing the best of both worlds for event-driven architecture.sharp
public abstract class BusinessEvent
{
    public string EventId { get; set; } = Guid.NewGuid().ToString();
    public string EventType { get; set; }
    public string CorrelationId { get; set; }
    public string SourceEventId { get; set; } // Event Hub sequence number from source event
    public DateTime OccurredAt { get; set; } = DateTime.UtcNow;
    public string SourceSystem { get; set; } = "Dataverse";
    public string EventVersion { get; set; } = "1.0";
    public Dictionary<string, object> Metadata { get; set; } = new Dictionary<string, object>();
}

// Customer Events
public class CustomerCreditLimitChanged : BusinessEvent
{
    public CustomerCreditLimitChanged() { EventType = "CustomerCreditLimitChanged"; }

    public string CustomerId { get; set; }
    public string CustomerName { get; set; }
    public decimal OldCreditLimit { get; set; }
    public decimal NewCreditLimit { get; set; }
    public string ChangedBy { get; set; }
    public string ChangeReason { get; set; }
}

public class CustomerActivated : BusinessEvent
{
    public CustomerActivated() { EventType = "CustomerActivated"; }

    public string CustomerId { get; set; }
    public string CustomerName { get; set; }
    public string Industry { get; set; }
    public string ActivatedBy { get; set; }
}

public class CustomerDeactivated : BusinessEvent
{
    public CustomerDeactivated() { EventType = "CustomerDeactivated"; }

    public string CustomerId { get; set; }
    public string CustomerName { get; set; }
    public string DeactivationReason { get; set; }
    public string DeactivatedBy { get; set; }
}

// Opportunity Events
public class OpportunityWon : BusinessEvent
{
    public OpportunityWon() { EventType = "OpportunityWon"; }

    public string OpportunityId { get; set; }
    public string OpportunityName { get; set; }
    public string CustomerId { get; set; }
    public string CustomerName { get; set; }
    public decimal EstimatedValue { get; set; }
    public decimal ActualValue { get; set; }
    public DateTime CloseDate { get; set; }
    public string WonBy { get; set; }
}

public class OpportunityLost : BusinessEvent
{
    public OpportunityLost() { EventType = "OpportunityLost"; }

    public string OpportunityId { get; set; }
    public string OpportunityName { get; set; }
    public string CustomerId { get; set; }
    public string CustomerName { get; set; }
    public decimal EstimatedValue { get; set; }
    public string LossReason { get; set; }
    public DateTime CloseDate { get; set; }
}

Step 4: Event Translation Logic

Main Translation Service

public async Task<List<BusinessEvent>> ProcessEntityUpdate(
    DataverseExecutionContext context, 
    string correlationId, 
    ILogger log)
{
    var events = new List<BusinessEvent>();

    try
    {
        events = context.PrimaryEntityName.ToLower() switch
        {
            "account" => await ProcessAccountUpdate(context, log),
            "opportunity" => await ProcessOpportunityUpdate(context, log),
            "salesorder" => await ProcessOrderUpdate(context, log),
            _ => new List<BusinessEvent>()
        };

        // Add correlation and metadata to all events
        foreach (var evt in events)
        {
            evt.CorrelationId = correlationId;
            evt.Metadata.Add("SourceEntityId", context.PrimaryEntityId.ToString());
            evt.Metadata.Add("SourceEntityType", context.PrimaryEntityName);

            if (context.InitiatingUser != null)
            {
                evt.Metadata.Add("InitiatingUser", context.InitiatingUser.Email);
            }
        }
    }
    catch (Exception ex)
    {
        log.LogError($"Error in ProcessEntityUpdate: {ex.Message}");
        throw;
    }

    return events;
}

Account Event Processing

private async Task<List<BusinessEvent>> ProcessAccountUpdate(DataverseExecutionContext context, ILogger log)
{
    var events = new List<BusinessEvent>();
    var preImage = context.PreEntityImages["PreImage"];
    var postImage = context.PostEntityImages["PostImage"];
    var accountId = context.PrimaryEntityId.ToString();

    // Credit Limit Changes
    await ProcessCreditLimitChange(preImage, postImage, accountId, events, log);

    // Status Changes
    await ProcessStatusChange(preImage, postImage, accountId, events, log);

    // Contact Information Changes
    await ProcessContactInfoChange(preImage, postImage, accountId, events, log);

    // Industry Changes
    await ProcessIndustryChange(preImage, postImage, accountId, events, log);

    return events;
}

private async Task ProcessCreditLimitChange(
    DataverseEntity preImage, 
    DataverseEntity postImage, 
    string accountId, 
    List<BusinessEvent> events, 
    ILogger log)
{
    var oldCreditLimit = preImage.GetAttributeValue<decimal>("creditlimit");
    var newCreditLimit = postImage.GetAttributeValue<decimal>("creditlimit");

    if (oldCreditLimit != newCreditLimit)
    {
        var customerName = postImage.GetAttributeValue<string>("name");

        events.Add(new CustomerCreditLimitChanged
        {
            CustomerId = accountId,
            CustomerName = customerName,
            OldCreditLimit = oldCreditLimit,
            NewCreditLimit = newCreditLimit,
            ChangeReason = DetermineCreditLimitChangeReason(oldCreditLimit, newCreditLimit)
        });

        log.LogInformation($"Credit limit changed for {customerName} ({accountId}): {oldCreditLimit:C} -> {newCreditLimit:C}");
    }
}

private async Task ProcessStatusChange(
    DataverseEntity preImage, 
    DataverseEntity postImage, 
    string accountId, 
    List<BusinessEvent> events, 
    ILogger log)
{
    var oldStatus = preImage.GetAttributeValue<int>("statuscode");
    var newStatus = postImage.GetAttributeValue<int>("statuscode");

    if (oldStatus != newStatus)
    {
        var customerName = postImage.GetAttributeValue<string>("name");
        var industry = postImage.GetAttributeValue<int>("industrycode");
        var industryName = GetIndustryName(industry);

        // Status codes: 1=Active, 2=Inactive
        if (oldStatus == 2 && newStatus == 1) // Inactive to Active
        {
            events.Add(new CustomerActivated
            {
                CustomerId = accountId,
                CustomerName = customerName,
                Industry = industryName
            });

            log.LogInformation($"Customer activated: {customerName} ({accountId})");
        }
        else if (oldStatus == 1 && newStatus == 2) // Active to Inactive
        {
            events.Add(new CustomerDeactivated
            {
                CustomerId = accountId,
                CustomerName = customerName,
                DeactivationReason = "Status changed to inactive"
            });

            log.LogInformation($"Customer deactivated: {customerName} ({accountId})");
        }
    }
}

private string DetermineCreditLimitChangeReason(decimal oldLimit, decimal newLimit)
{
    if (newLimit > oldLimit)
        return "Credit limit increased";
    else if (newLimit < oldLimit)
        return "Credit limit decreased";
    else
        return "Credit limit adjustment";
}

private string GetIndustryName(int industryCode)
{
    return industryCode switch
    {
        1 => "Technology",
        2 => "Healthcare",
        3 => "Finance",
        4 => "Manufacturing",
        5 => "Retail",
        _ => "Other"
    };
}

Opportunity Event Processing

private async Task<List<BusinessEvent>> ProcessOpportunityUpdate(DataverseExecutionContext context, ILogger log)
{
    var events = new List<BusinessEvent>();
    var preImage = context.PreEntityImages["PreImage"];
    var postImage = context.PostEntityImages["PostImage"];
    var opportunityId = context.PrimaryEntityId.ToString();

    // Check for opportunity status changes
    var oldStatusCode = preImage.GetAttributeValue<int>("statuscode");
    var newStatusCode = postImage.GetAttributeValue<int>("statuscode");

    if (oldStatusCode != newStatusCode)
    {
        var opportunityName = postImage.GetAttributeValue<string>("name");
        var customerId = postImage.GetAttributeValue<Guid>("customerid").ToString();
        var estimatedValue = postImage.GetAttributeValue<decimal>("estimatedvalue");
        var actualValue = postImage.GetAttributeValue<decimal>("actualvalue");
        var closeDate = postImage.GetAttributeValue<DateTime>("actualclosedate");

        // Status codes: 3=Won, 4=Lost
        if (newStatusCode == 3) // Won
        {
            events.Add(new OpportunityWon
            {
                OpportunityId = opportunityId,
                OpportunityName = opportunityName,
                CustomerId = customerId,
                EstimatedValue = estimatedValue,
                ActualValue = actualValue,
                CloseDate = closeDate
            });

            log.LogInformation($"Opportunity won: {opportunityName} ({opportunityId}) - Value: {actualValue:C}");
        }
        else if (newStatusCode == 4) // Lost
        {
            var lossReason = postImage.GetAttributeValue<string>("closereasoncode");

            events.Add(new OpportunityLost
            {
                OpportunityId = opportunityId,
                OpportunityName = opportunityName,
                CustomerId = customerId,
                EstimatedValue = estimatedValue,
                LossReason = lossReason,
                CloseDate = closeDate
            });

            log.LogInformation($"Opportunity lost: {opportunityName} ({opportunityId}) - Reason: {lossReason}");
        }
    }

    return events;
}

Step 4: Configuration and Deployment

Application Configuration (appsettings.json)

{
  "Logging": {
    "LogLevel": {
      "Default": "Information",
      "Microsoft": "Warning",
      "Microsoft.Hosting.Lifetime": "Information"
    }
  },
  "ConnectionStrings": {
    "SourceEventHub": "Endpoint=sb://dataverse-events.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=...",
    "BusinessEventHub": "Endpoint=sb://business-events.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=...",
    "ApplicationInsights": "InstrumentationKey=..."
  },
  "EventProcessing": {
    "MaxConcurrentEvents": 10,
    "EventBatchSize": 100,
    "ProcessingTimeout": "00:05:00",
    "DeadLetterRetryCount": 3
  },
  "HealthChecks": {
    "Enabled": true,
    "Port": 8080
  }
}

Dockerfile

# Use the official .NET runtime image
FROM mcr.microsoft.com/dotnet/aspnet:8.0 AS base
WORKDIR /app
EXPOSE 8080
EXPOSE 8081

# Use the SDK image to build the app
FROM mcr.microsoft.com/dotnet/sdk:8.0 AS build
WORKDIR /src

# Copy project files
COPY ["src/DataverseEventTranslator.Api/DataverseEventTranslator.Api.csproj", "src/DataverseEventTranslator.Api/"]
COPY ["src/DataverseEventTranslator.Core/DataverseEventTranslator.Core.csproj", "src/DataverseEventTranslator.Core/"]
COPY ["src/DataverseEventTranslator.Infrastructure/DataverseEventTranslator.Infrastructure.csproj", "src/DataverseEventTranslator.Infrastructure/"]

# Restore dependencies
RUN dotnet restore "src/DataverseEventTranslator.Api/DataverseEventTranslator.Api.csproj"

# Copy source code
COPY . .

# Build the application
WORKDIR "/src/src/DataverseEventTranslator.Api"
RUN dotnet build "DataverseEventTranslator.Api.csproj" -c Release -o /app/build

# Publish the application
FROM build AS publish
RUN dotnet publish "DataverseEventTranslator.Api.csproj" -c Release -o /app/publish /p:UseAppHost=false

# Final stage - runtime image
FROM base AS final
WORKDIR /app

# Create non-root user for security
RUN adduser --disabled-password --gecos '' appuser && chown -R appuser /app
USER appuser

COPY --from=publish /app/publish .

# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=60s --retries=3 \
  CMD curl -f http://localhost:8080/health || exit 1

ENTRYPOINT ["dotnet", "DataverseEventTranslator.Api.dll"]

Kubernetes Deployment (k8s/deployment.yaml)

apiVersion: apps/v1
kind: Deployment
metadata:
  name: dataverse-event-translator
  namespace: dataverse-events
  labels:
    app: dataverse-event-translator
    version: v1
spec:
  replicas: 3
  selector:
    matchLabels:
      app: dataverse-event-translator
  template:
    metadata:
      labels:
        app: dataverse-event-translator
        version: v1
    spec:
      containers:
      - name: translator
        image: myregistry.azurecr.io/dataverse-event-translator:latest
        ports:
        - containerPort: 8080
          name: http
        - containerPort: 8081
          name: https
        env:
        - name: ASPNETCORE_ENVIRONMENT
          value: "Production"
        - name: ASPNETCORE_URLS
          value: "http://+:8080"
        envFrom:
        - secretRef:
            name: event-hub-secrets
        - configMapRef:
            name: app-config
        resources:
          requests:
            memory: "256Mi"
            cpu: "250m"
          limits:
            memory: "512Mi"
            cpu: "500m"
        livenessProbe:
          httpGet:
            path: /health
            port: 8080
          initialDelaySeconds: 30
          periodSeconds: 30
          timeoutSeconds: 10
          failureThreshold: 3
        readinessProbe:
          httpGet:
            path: /health
            port: 8080
          initialDelaySeconds: 15
          periodSeconds: 15
          timeoutSeconds: 5
          failureThreshold: 3
        securityContext:
          runAsNonRoot: true
          runAsUser: 1000
          allowPrivilegeEscalation: false
          readOnlyRootFilesystem: true
        volumeMounts:
        - name: tmp
          mountPath: /tmp
        - name: app-logs
          mountPath: /app/logs
      volumes:
      - name: tmp
        emptyDir: {}
      - name: app-logs
        emptyDir: {}
      securityContext:
        fsGroup: 1000
---
apiVersion: v1
kind: Service
metadata:
  name: dataverse-event-translator-service
  namespace: dataverse-events
spec:
  selector:
    app: dataverse-event-translator
  ports:
  - name: http
    port: 80
    targetPort: 8080
  type: ClusterIP
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: app-config
  namespace: dataverse-events
data:
  EventProcessing__MaxConcurrentEvents: "10"
  EventProcessing__EventBatchSize: "100"
  EventProcessing__ProcessingTimeout: "00:05:00"
  HealthChecks__Enabled: "true"
  HealthChecks__Port: "8080"
---
apiVersion: v1
kind: Secret
metadata:
  name: event-hub-secrets
  namespace: dataverse-events
type: Opaque
stringData:
  ConnectionStrings__SourceEventHub: "Endpoint=sb://dataverse-events.servicebus.windows.net/;SharedAccessKeyName=..."
  ConnectionStrings__BusinessEventHub: "Endpoint=sb://business-events.servicebus.windows.net/;SharedAccessKeyName=..."
  ConnectionStrings__ApplicationInsights: "InstrumentationKey=..."

Azure Container Apps Deployment

# container-app.yaml
apiVersion: apps/v1alpha2
kind: ContainerApp
metadata:
  name: dataverse-event-translator
  resourceGroup: rg-dataverse-events
location: East US
properties:
  managedEnvironmentId: /subscriptions/{subscription-id}/resourceGroups/rg-dataverse-events/providers/Microsoft.App/managedEnvironments/env-dataverse
  configuration:
    secrets:
    - name: source-eventhub-connection
      value: "Endpoint=sb://dataverse-events.servicebus.windows.net/;SharedAccessKeyName=..."
    - name: business-eventhub-connection
      value: "Endpoint=sb://business-events.servicebus.windows.net/;SharedAccessKeyName=..."
    ingress:
      external: false
      targetPort: 8080
    dapr:
      enabled: false
  template:
    containers:
    - name: dataverse-event-translator
      image: myregistry.azurecr.io/dataverse-event-translator:latest
      env:
      - name: ConnectionStrings__SourceEventHub
        secretRef: source-eventhub-connection
      - name: ConnectionStrings__BusinessEventHub
        secretRef: business-eventhub-connection
      - name: ASPNETCORE_ENVIRONMENT
        value: Production
      resources:
        cpu: 0.5
        memory: 1Gi
      probes:
      - type: liveness
        httpGet:
          path: /health
          port: 8080
        initialDelaySeconds: 30
        periodSeconds: 30
      - type: readiness
        httpGet:
          path: /health
          port: 8080
        initialDelaySeconds: 15
        periodSeconds: 15
    scale:
      minReplicas: 2
      maxReplicas: 10
      rules:
      - name: eventhub-scaling
        custom:
          type: azure-eventhub
          metadata:
            connectionFromEnv: ConnectionStrings__SourceEventHub
            eventHubName: dataverse-events
            consumerGroup: $Default
          auth:
          - secretRef: source-eventhub-connection
            triggerParameter: connection

Project File (DataverseEventTranslator.Api.csproj)

<Project Sdk="Microsoft.NET.Sdk.Web">

  <PropertyGroup>
    <TargetFramework>net8.0</TargetFramework>
    <Nullable>enable</Nullable>
    <ImplicitUsings>enable</ImplicitUsings>
    <DockerDefaultTargetOS>Linux</DockerDefaultTargetOS>
    <UserSecretsId>dataverse-event-translator</UserSecretsId>
  </PropertyGroup>

  <ItemGroup>
    <PackageReference Include="Azure.Messaging.EventHubs" Version="5.11.1" />
    <PackageReference Include="Microsoft.Extensions.Azure" Version="1.7.1" />
    <PackageReference Include="Microsoft.ApplicationInsights.AspNetCore" Version="2.21.0" />
    <PackageReference Include="Microsoft.Extensions.Diagnostics.HealthChecks" Version="8.0.0" />
    <PackageReference Include="Newtonsoft.Json" Version="13.0.3" />
    <PackageReference Include="Serilog.AspNetCore" Version="8.0.0" />
    <PackageReference Include="Serilog.Sinks.ApplicationInsights" Version="4.0.0" />
    <PackageReference Include="AspNetCore.HealthChecks.EventStore" Version="7.0.0" />
  </ItemGroup>

  <ItemGroup>
    <ProjectReference Include="..\DataverseEventTranslator.Core\DataverseEventTranslator.Core.csproj" />
    <ProjectReference Include="..\DataverseEventTranslator.Infrastructure\DataverseEventTranslator.Infrastructure.csproj" />
  </ItemGroup>

</Project>

Step 5: Monitoring and Observability

Telemetry Service

namespace DataverseEventTranslator.Infrastructure.Monitoring
{
    public interface ITelemetryService
    {
        Task TrackEventProcessedAsync(string correlationId, TimeSpan duration, int businessEventCount);
        Task TrackValidationFailureAsync(string correlationId);
        Task TrackProcessingErrorAsync(string correlationId, Exception exception);
        Task<object> GetMetricsAsync();
    }

    public class TelemetryService : ITelemetryService
    {
        private readonly ILogger<TelemetryService> _logger;
        private readonly TelemetryClient _telemetryClient;
        private static readonly ConcurrentDictionary<string, long> _counters = new();
        private static readonly ConcurrentQueue<ProcessingMetric> _metrics = new();

        public TelemetryService(ILogger<TelemetryService> logger, TelemetryClient telemetryClient)
        {
            _logger = logger;
            _telemetryClient = telemetryClient;
        }

        public async Task TrackEventProcessedAsync(string correlationId, TimeSpan duration, int businessEventCount)
        {
            _telemetryClient.TrackDependency("EventProcessing", "ProcessDataverseEvent", correlationId, 
                DateTime.UtcNow.Subtract(duration), duration, true);

            _telemetryClient.TrackMetric("BusinessEventsGenerated", businessEventCount);
            _telemetryClient.TrackMetric("ProcessingDuration", duration.TotalMilliseconds);

            _counters.AddOrUpdate("EventsProcessed", 1, (key, value) => value + 1);
            _counters.AddOrUpdate("BusinessEventsGenerated", businessEventCount, (key, value) => value + businessEventCount);

            _metrics.Enqueue(new ProcessingMetric
            {
                CorrelationId = correlationId,
                Duration = duration,
                BusinessEventCount = businessEventCount,
                Timestamp = DateTime.UtcNow
            });

            // Keep only last 1000 metrics
            while (_metrics.Count > 1000)
            {
                _metrics.TryDequeue(out _);
            }

            _logger.LogInformation("Event processed successfully. CorrelationId: {CorrelationId}, Duration: {Duration}ms, BusinessEvents: {Count}",
                correlationId, duration.TotalMilliseconds, businessEventCount);
        }

        public async Task TrackValidationFailureAsync(string correlationId)
        {
            _telemetryClient.TrackEvent("ValidationFailure", new Dictionary<string, string>
            {
                ["CorrelationId"] = correlationId
            });

            _counters.AddOrUpdate("ValidationFailures", 1, (key, value) => value + 1);

            _logger.LogWarning("Event validation failed. CorrelationId: {CorrelationId}", correlationId);
        }

        public async Task TrackProcessingErrorAsync(string correlationId, Exception exception)
        {
            _telemetryClient.TrackException(exception, new Dictionary<string, string>
            {
                ["CorrelationId"] = correlationId,
                ["ErrorType"] = exception.GetType().Name
            });

            _counters.AddOrUpdate("ProcessingErrors", 1, (key, value) => value + 1);

            _logger.LogError(exception, "Event processing error. CorrelationId: {CorrelationId}", correlationId);
        }

        public async Task<object> GetMetricsAsync()
        {
            var recentMetrics = _metrics.Where(m => m.Timestamp > DateTime.UtcNow.AddMinutes(-5)).ToList();

            return new
            {
                Counters = _counters.ToDictionary(kvp => kvp.Key, kvp => kvp.Value),
                RecentProcessing = new
                {
                    Count = recentMetrics.Count,
                    AverageDuration = recentMetrics.Any() ? recentMetrics.Average(m => m.Duration.TotalMilliseconds) : 0,
                    TotalBusinessEvents = recentMetrics.Sum(m => m.BusinessEventCount)
                },
                Timestamp = DateTime.UtcNow
            };
        }

        private class ProcessingMetric
        {
            public string CorrelationId { get; set; }
            public TimeSpan Duration { get; set; }
            public int BusinessEventCount { get; set; }
            public DateTime Timestamp { get; set; }
        }
    }
}

Health Checks

namespace DataverseEventTranslator.Infrastructure.HealthChecks
{
    public class EventHubHealthCheck : IHealthCheck
    {
        private readonly EventHubConsumerClient _consumerClient;
        private readonly EventHubProducerClient _producerClient;
        private readonly ILogger<EventHubHealthCheck> _logger;

        public EventHubHealthCheck(
            EventHubConsumerClient consumerClient,
            EventHubProducerClient producerClient,
            ILogger<EventHubHealthCheck> logger)
        {
            _consumerClient = consumerClient;
            _producerClient = producerClient;
            _logger = logger;
        }

        public async Task<HealthCheckResult> CheckHealthAsync(
            HealthCheckContext context, 
            CancellationToken cancellationToken = default)
        {
            try
            {
                // Check consumer client
                var consumerProperties = await _consumerClient.GetEventHubPropertiesAsync(cancellationToken);

                // Check producer client  
                var producerProperties = await _producerClient.GetEventHubPropertiesAsync(cancellationToken);

                var data = new Dictionary<string, object>
                {
                    ["SourceEventHub"] = consumerProperties.Name,
                    ["SourcePartitionCount"] = consumerProperties.PartitionIds.Length,
                    ["BusinessEventHub"] = producerProperties.Name,
                    ["BusinessPartitionCount"] = producerProperties.PartitionIds.Length
                };

                return HealthCheckResult.Healthy("Event Hub connections are healthy", data);
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Event Hub health check failed");
                return HealthCheckResult.Unhealthy("Event Hub connections failed", ex);
            }
        }
    }

    public class DeadLetterQueueHealthCheck : IHealthCheck
    {
        private readonly ILogger<DeadLetterQueueHealthCheck> _logger;

        public DeadLetterQueueHealthCheck(ILogger<DeadLetterQueueHealthCheck> logger)
        {
            _logger = logger;
        }

        public async Task<HealthCheckResult> CheckHealthAsync(
            HealthCheckContext context, 
            CancellationToken cancellationToken = default)
        {
            try
            {
                // Check dead letter queue accessibility
                // Implementation depends on your dead letter queue choice
                await Task.Delay(1, cancellationToken); // Placeholder

                return HealthCheckResult.Healthy("Dead letter queue is accessible");
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Dead letter queue health check failed");
                return HealthCheckResult.Degraded("Dead letter queue check failed", ex);
            }
        }
    }
}

Step 6: Error Handling and Resilience

public class ErrorHandling
{
    private readonly ILogger _logger;

    public ErrorHandling(ILogger logger)
    {
        _logger = logger;
    }

    public async Task<IActionResult> HandleWithRetry(
        Func<Task<IActionResult>> operation,
        string operationName,
        int maxRetries = 3)
    {
        for (int attempt = 1; attempt <= maxRetries; attempt++)
        {
            try
            {
                return await operation();
            }
            catch (Exception ex) when (attempt < maxRetries && IsRetriableException(ex))
            {
                var delay = TimeSpan.FromSeconds(Math.Pow(2, attempt)); // Exponential backoff
                _logger.LogWarning($"Attempt {attempt} failed for {operationName}: {ex.Message}. Retrying in {delay.TotalSeconds}s");
                await Task.Delay(delay);
            }
            catch (Exception ex)
            {
                _logger.LogError($"Operation {operationName} failed after {attempt} attempts: {ex.Message}");

                // Send to dead letter queue
                await SendToDeadLetterQueue(operationName, ex);

                return new StatusCodeResult(500);
            }
        }

        return new StatusCodeResult(500);
    }

    private bool IsRetriableException(Exception ex)
    {
        return ex is HttpRequestException ||
               ex is TaskCanceledException ||
               ex is TimeoutException ||
               (ex.Message?.Contains("throttle", StringComparison.OrdinalIgnoreCase) == true);
    }

    private async Task SendToDeadLetterQueue(string operationName, Exception ex)
    {
        // Implementation for dead letter queue
        _logger.LogInformation($"Sending failed operation {operationName} to dead letter queue");
    }
}

Application Insights Integration

public class TelemetryService
{
    private readonly ILogger _logger;

    public void TrackEventTranslation(string sourceEvent, string targetEvent, TimeSpan duration)
    {
        _logger.LogInformation("Event translated: {SourceEvent} -> {TargetEvent} in {Duration}ms",
            sourceEvent, targetEvent, duration.TotalMilliseconds);
    }

    public void TrackBusinessEventPublished(BusinessEvent businessEvent)
    {
        _logger.LogInformation("Business event published: {EventType} for entity {EntityId}",
            businessEvent.EventType, businessEvent.Metadata.GetValueOrDefault("SourceEntityId"));
    }
}

Testing Strategy

Unit Testing

[TestClass]
public class EventTranslationTests
{
    [TestMethod]
    public async Task ProcessAccountUpdate_CreditLimitChange_GeneratesCorrectEvent()
    {
        // Arrange
        var context = CreateTestContext();
        context.PreEntityImages["PreImage"].Attributes["creditlimit"] = new { Value = 10000 };
        context.PostEntityImages["PostImage"].Attributes["creditlimit"] = new { Value = 25000 };

        var processor = new DataverseEventProcessor();

        // Act
        var events = await processor.ProcessAccountUpdate(context, Mock.Of<ILogger>());

        // Assert
        Assert.AreEqual(1, events.Count);
        Assert.IsInstanceOfType(events[0], typeof(CustomerCreditLimitChanged));

        var creditEvent = (CustomerCreditLimitChanged)events[0];
        Assert.AreEqual(10000, creditEvent.OldCreditLimit);
        Assert.AreEqual(25000, creditEvent.NewCreditLimit);
    }

    private DataverseExecutionContext CreateTestContext()
    {
        return new DataverseExecutionContext
        {
            PrimaryEntityName = "account",
            PrimaryEntityId = Guid.NewGuid(),
            PreEntityImages = new Dictionary<string, DataverseEntity>
            {
                ["PreImage"] = new DataverseEntity
                {
                    Attributes = new Dictionary<string, object>()
                }
            },
            PostEntityImages = new Dictionary<string, DataverseEntity>
            {
                ["PostImage"] = new DataverseEntity
                {
                    Attributes = new Dictionary<string, object>()
                }
            }
        };
    }
}

Integration Testing

[TestClass]
public class IntegrationTests
{
    [TestMethod]
    public async Task EndToEnd_DataverseUpdate_PublishesToEventHub()
    {
        // Test with actual Dataverse sandbox environment
        // Verify events flow through both Event Hubs
        // Validate event schema and content
    }
}

Monitoring and Observability

Key Metrics to Track

graph LR
    A[Events Received] --> B[Events Processed]
    B --> C[Business Events Generated]
    C --> D[Events Published]
    D --> E[Processing Latency]

    F[Error Rate] --> G[Retry Attempts]
    G --> H[Dead Letter Events]

    I[Field Change Detection Rate]
    J[Event Type Distribution]

Monitoring Dashboard

Track these metrics in Application Insights:

  1. Volume Metrics
  2. Events received per minute
  3. Business events generated per minute
  4. Event processing success rate

  5. Performance Metrics

  6. End-to-end processing latency
  7. Event Hub publish latency
  8. Field change detection time

  9. Error Metrics

  10. Translation failure rate
  11. Event Hub connection errors
  12. Schema validation failures

  13. Business Metrics

  14. Most frequently changed entity types
  15. Distribution of business event types
  16. Peak processing times

Alerting Rules

// Example alerting thresholds
var alertRules = new[]
{
    new { Metric = "ProcessingFailureRate", Threshold = 5, Period = "5m" },
    new { Metric = "EventHubConnectionErrors", Threshold = 3, Period = "1m" },
    new { Metric = "ProcessingLatency", Threshold = 30000, Period = "5m" } // 30 seconds
};

Best Practices

Event Design Principles

  1. Make Events Immutable: Once published, events should never change
  2. Include Sufficient Context: Events should be self-contained
  3. Use Semantic Versioning: Version events to support schema evolution
  4. Design for Eventual Consistency: Events may be processed out of order
  5. Include Correlation IDs: Enable end-to-end tracing

Performance Optimization

  1. Batch Processing: Group multiple field changes into single events when appropriate
  2. Async Processing: Use async/await throughout the pipeline
  3. Connection Pooling: Reuse Event Hub connections
  4. Caching: Cache frequently accessed reference data
  5. Filtering: Only process business-relevant changes

Security Considerations

  1. Authenticate Service Endpoints: Use proper authentication between Dynamics and Azure
  2. Encrypt in Transit: Use HTTPS/TLS for all communications
  3. Sanitize Sensitive Data: Remove or mask PII in events
  4. Access Control: Limit Event Hub access to authorized consumers
  5. Audit Trail: Log all event processing activities

Troubleshooting Guide

Common Issues

Issue Symptoms Solution
Missing Pre/Post Images Events show no field changes Verify image configuration in Plugin Registration Tool
Source Event Hub Connection Failures Events not reaching translator Check source Event Hub connection string and network connectivity
Business Event Hub Connection Failures Translated events not reaching consumers Check business Event Hub connection string and permissions
High Processing Latency Events delayed significantly Optimize field change detection logic
Schema Validation Errors Malformed events published Add comprehensive input validation
Dead Letter Queue Filling Many failed events Investigate root cause of processing failures

Debugging Steps

  1. Check Application Insights Logs: Review detailed processing logs
  2. Verify Dataverse Configuration: Ensure service endpoint is properly registered and pointing to source Event Hub
  3. Test Event Schema: Validate events match expected schema in both Event Hubs
  4. Monitor Event Hub Metrics: Check both source and business Event Hub metrics and consumer lag
  5. Review Dead Letter Queue: Analyze failed events for patterns

Conclusion

This solution transforms Dataverse's generic CRUD events into meaningful business events that provide real value to downstream systems. By implementing field-level change detection with pre/post images and using Event Hub for scalable event processing, you get precise control over what constitutes a business event while maintaining high performance and reliability.

The architecture supports: - Scalability: Handle high event volumes with Event Hub triggers and Azure Functions - Reliability: Built-in retry logic, dead letter queues, and Event Hub durability - Observability: Comprehensive monitoring and alerting - Maintainability: Clean separation of concerns and testable code - Extensibility: Easy to add new entity types and business events

This approach enables true event-driven architecture where business processes can react to meaningful changes rather than low-level database operations, with the added benefit of Event Hub's enterprise-grade messaging capabilities.