Skip to content

DataVerse Business Events Architecture Strategy

Overview

This document defines the architecture for transforming generic DataVerse CRUD events into business-focused events within the 7N20 AgentHub platform. Given the extensive use of custom entities (rqm_* entities) and custom optionsets, we recommend a unified event stream approach using a Business Event Shim to serve both integration and metrics use cases.

Problem Statement

The challenge involves transforming generic DataVerse CRUD operations into meaningful business events for two distinct use cases:

Use Case 1: Event-Driven System Integration

  • Real-time notifications between microservices
  • Business workflow triggers (e.g., candidate shortlisted → notify matching service)
  • Cross-domain coordination (e.g., CRM updates → update profiles)
  • Low latency requirements (seconds)

Use Case 2: Event Collection for Metrics and Analytics

  • Business intelligence and reporting
  • Performance metrics tracking
  • Audit trails and compliance
  • Batch processing acceptable (minutes to hours)

Key Requirements

  • Custom Entity Support: Handle rqm_requestshortlist, rqm_request, and other custom entities
  • Dual Use Case Optimization: Same events serve integration and analytics needs
  • Business Context: Events with clear business intent
  • Scalability: Support high-volume partitioned Event Hub distribution
  • Ordering: Handle out-of-order event delivery
  • References: Manage related entity data efficiently
  • Configuration-Driven: Easy addition of new custom entities

Given the extensive custom entities in 7N20's DataVerse implementation, a Business Event Shim is essential infrastructure, not optional complexity. Microsoft's native business events don't support custom entities like rqm_requestshortlist, making a translation layer necessary.

The Unified Approach

We recommend a single event stream serving both integration and metrics use cases through different consumer group strategies:

graph TB
    subgraph "DataVerse Platform"
        CE[Custom Entities<br/>rqm_requestshortlist<br/>rqm_request, etc.]
        SE[Standard Entities<br/>contact, account]
    end

    subgraph "Event Processing Layer"
        EH1[Event Hub<br/>Raw CRUD Events]
        SHIM[Business Event Shim<br/>with Rules Engine]
        CONFIG[Event Configuration<br/>Registry]
    end

    subgraph "Business Events Hub - Single Source of Truth"
        EH2[Event Hub<br/>32 Partitions<br/>7-day retention]
        CG1[Consumer Group:<br/>Integration Services]
        CG2[Consumer Group:<br/>Metrics Pipeline]
        CG3[Consumer Group:<br/>Audit & Compliance]
    end

    subgraph "Integration Consumers"
        MS1[Matching Service]
        MS2[Notification Service] 
        MS3[Profile Service]
        MS4[Email Service]
    end

    subgraph "Analytics Consumers"
        DL[Data Lake<br/>Capture]
        METRICS[Metrics<br/>Aggregation]
        BI[Business<br/>Intelligence]
    end

    CE --> EH1
    SE --> EH1
    CONFIG -.->|Event Rules| SHIM
    EH1 --> SHIM
    SHIM -->|Enriched Business Events| EH2

    EH2 --> CG1
    EH2 --> CG2
    EH2 --> CG3

    CG1 --> MS1
    CG1 --> MS2
    CG1 --> MS3
    CG1 --> MS4

    CG2 --> DL
    CG2 --> METRICS
    CG3 --> BI

    style SHIM fill:#9f9,stroke:#333,stroke-width:3px
    style EH2 fill:#ff9,stroke:#333,stroke-width:2px

Why the Unified Approach Works

Single Source of Truth: - All consumers receive the same business-contextualized events - Consistent event semantics across integration and analytics - Simplified governance and schema management

Consumer Group Flexibility: - Integration services use real-time processing with immediate checkpointing - Metrics pipeline uses batch processing with periodic checkpointing - Audit services maintain separate retention and replay capabilities

Cost Optimization: - Shared infrastructure for both use cases - Single shim service to maintain - Leverages Event Hub's natural partitioning and consumer group isolation

Configuration-Driven Event Mapping

The Business Event Shim uses a configuration registry to define how DataVerse entity changes map to business events. This approach is essential for supporting 7N20's extensive custom entities (rqm_*) and provides the flexibility to adapt business event generation without code changes.

Configuration Registry Architecture

The configuration system operates as a runtime registry that transforms raw DataVerse CRUD events into structured business events based on declarative rules:

graph TB
    subgraph "Configuration System"
        CONFIG[Configuration Files<br/>YAML/JSON]
        CACHE[In-Memory Cache<br/>Hot Reload]
        VALIDATOR[Schema Validator]
        FACTORY[Event Factory]
    end

    subgraph "Runtime Processing"
        RAW[Raw DataVerse Event]
        MATCHER[Configuration Matcher]
        GENERATOR[Event Generator]
        BIZ[Business Events]
    end

    CONFIG --> VALIDATOR
    VALIDATOR --> CACHE
    RAW --> MATCHER
    MATCHER -.->|Lookup Rules| CACHE
    MATCHER --> GENERATOR
    GENERATOR -.->|Apply Rules| FACTORY
    GENERATOR --> BIZ

    style CONFIG fill:#e1f5fe,stroke:#333,stroke-width:2px
    style CACHE fill:#fff3e0,stroke:#333,stroke-width:2px

Core Configuration Principles

1. Entity-Centric Configuration Each DataVerse entity (standard or custom) has its own configuration section defining: - Which operations (create/update/delete) generate events - Field-level change detection rules - Event payload structure and content - Routing and processing metadata

2. Event Mode Strategies Different events require different payload structures:

Mode Use Case Payload Size Performance Impact
slim Field-specific changes Minimal Low latency
fat Entity creation/major changes Complete entity Higher latency
changes_only Analytics/audit Changed fields only Medium latency
tombstone Entity deletion Pre-deletion snapshot Medium latency

3. Dynamic Rule Evaluation The configuration supports complex field matching: - Exact field names: ["rqm_salesstatus"] - Wildcards: ["rqm_*"] matches all fields starting with rqm_ - Field groups: ["contact_info": ["emailaddress1", "telephone1", "mobilephone"]] - Computed triggers: Custom logic for complex field interactions

Detailed Configuration Schema

# Complete event-mappings.yaml structure
schema_version: "1.0"
metadata:
  created_by: "architecture-team"
  created_at: "2024-01-15T10:00:00Z"
  description: "Business event mappings for 7N20 custom entities"

# Global configuration
global:
  event_hub:
    business_events:
      connection_string: "${EVENT_HUB_BUSINESS_EVENTS_CONNECTION}"
      hub_name: "business-events"
      partitions: 32
      retention_days: 7
      batch_size_limit: 256000  # 256KB max payload

  # Default entity reference resolution
  default_references:
    cache_ttl_minutes: 30
    max_batch_size: 50
    required_fields: ["name", "createdon", "modifiedon"]

# Entity-specific mappings
entities:
  rqm_requestshortlist:
    display_name: "Request Shortlist"
    primary_key: "rqm_requestshortlistid"

    # Global entity settings
    settings:
      always_include_references: ["rqm_candidateid", "rqm_requestid"]
      default_domains: ["matching", "crm"]
      enable_soft_delete_detection: true

    events:
      create:
        event_type: "RequestShortlist.Created"
        mode: "fat"  # Include complete entity
        domains: ["matching", "crm", "metrics"]
        business_critical: true

        # Reference resolution configuration
        references:
          rqm_candidateid:
            target_entity: "contact"
            required_fields: ["fullname", "emailaddress1", "telephone1", "jobtitle"]
            cache_ttl_minutes: 60  # Override global default

          rqm_requestid:
            target_entity: "rqm_request" 
            required_fields: ["rqm_name", "rqm_clientid", "rqm_startdate"]
            include_nested_references: true
            nested_references:
              rqm_clientid:
                target_entity: "account"
                required_fields: ["name", "websiteurl"]

        # Payload customization
        payload:
          include_fields: "*"  # All fields
          exclude_fields: ["rqm_internal_notes", "rqm_debug_info"]
          computed_fields:
            age_in_hours: "DATEDIFF(hour, createdon, GETUTCDATE())"
            candidate_display_name: "CONCAT(candidate.firstname, ' ', candidate.lastname)"

      update:
        # Status change events
        - trigger_fields: ["rqm_salesstatus"]
          event_type: "RequestShortlist.StatusChanged"
          mode: "slim"
          domains: ["matching", "notifications", "metrics"]
          business_critical: true

          # Conditional event generation
          conditions:
            - field: "rqm_salesstatus"
              operator: "changed_from_to"
              from_values: [112020000, 112020001]  # Under Review, Shortlisted
              to_values: [112020002, 112020003]    # Accepted, Rejected

          payload:
            include_fields: ["rqm_salesstatus", "rqm_requestshortlistid", "modifiedon", "modifiedby"]
            include_previous_values: true
            custom_fields:
              status_transition: "CONCAT(previous.rqm_salesstatus, ' -> ', current.rqm_salesstatus)"
              notification_required: true

        # Financial updates  
        - trigger_fields: ["rqm_saleperhour", "rqm_costperhour"]
          event_type: "RequestShortlist.RatesUpdated"
          mode: "slim"
          domains: ["finance", "crm", "metrics"]
          business_critical: false

          # Only generate event if change is significant
          conditions:
            - field: "rqm_saleperhour"
              operator: "percentage_change_greater_than"
              threshold: 5.0  # 5% change threshold
            - field: "rqm_costperhour" 
              operator: "percentage_change_greater_than"
              threshold: 5.0

        # Composite event - multiple field changes
        - trigger_fields: ["rqm_candidateid", "rqm_requestid", "rqm_priority"]
          event_type: "RequestShortlist.AssignmentChanged"
          mode: "changes_only"
          domains: ["matching", "workflow"]
          business_critical: true

          conditions:
            - operator: "any_changed"  # Trigger if ANY field changes
            - field: "statecode"
              operator: "equals"
              value: 0  # Only for active records

        # Catch-all for analytics
        - trigger_fields: ["*"]  # Any field change
          event_type: "RequestShortlist.Updated"
          mode: "changes_only"
          domains: ["metrics", "audit"]
          business_critical: false

          # Exclusions to prevent noise
          conditions:
            - field: "modifiedon"
              operator: "ignore"  # Don't trigger on modifiedon changes alone
            - operator: "not_in_other_events"  # Only if no other event was generated

      delete:
        event_type: "RequestShortlist.Deleted"
        mode: "tombstone"
        domains: ["all"]
        business_critical: true

        payload:
          include_pre_image: true  # Full entity state before deletion
          include_deletion_context:
            deleted_by_user: true
            deletion_reason: "user_initiated"  # Could be "cascade", "bulk_delete", etc.
            related_entities_affected: true

  # Standard entity example with simpler configuration
  contact:
    display_name: "Contact"
    primary_key: "contactid"

    events:
      update:
        - trigger_fields: ["emailaddress1"]
          event_type: "Contact.EmailChanged"
          mode: "slim"
          domains: ["crm", "notifications", "verification", "metrics"]
          business_critical: true

          conditions:
            - field: "emailaddress1"
              operator: "not_null"  # Only if new email is not empty

          payload:
            include_fields: ["contactid", "emailaddress1", "fullname"]
            include_previous_values: true
            custom_fields:
              email_verification_required: true
              notification_type: "email_change"

        - trigger_fields: ["statecode"]
          event_type: "Contact.StatusChanged"
          mode: "slim"
          domains: ["crm", "matching", "metrics"]
          business_critical: true

# Consumer group configurations  
consumer_groups:
  integration-services:
    description: "Real-time processing for microservice integration"
    processing_pattern: "immediate"
    checkpoint_frequency: "per_event"
    max_batch_size: 1
    prefetch_count: 1
    filter_domains: ["matching", "notifications", "crm"]  # Only process these domains

  metrics-pipeline:
    description: "Batch processing for analytics and metrics"
    processing_pattern: "batch"
    checkpoint_frequency: "per_batch"
    max_batch_size: 100
    prefetch_count: 100
    batch_interval_minutes: 5
    filter_domains: ["metrics", "audit"]  # Process all domains for completeness

  audit-compliance:
    description: "Long-term storage and compliance"
    processing_pattern: "batch"
    checkpoint_frequency: "per_batch"
    max_batch_size: 50
    retention_days: 2555  # 7 years
    filter_business_critical: true  # Only business-critical events

# Configuration validation rules
validation:
  required_fields: ["event_type", "mode", "domains"]
  allowed_modes: ["slim", "fat", "changes_only", "tombstone"]
  max_domains_per_event: 10
  max_trigger_fields: 50

  # Custom validation rules
  rules:
    - name: "business_critical_must_have_create"
      condition: "business_critical == true"
      requirement: "must have create event defined"

    - name: "fat_mode_size_limit"
      condition: "mode == 'fat'"
      requirement: "entity must have less than 100 fields"

Configuration Processing Implementation

The configuration system is implemented through several key C# classes:

// Configuration model classes
public class EventMappingConfiguration
{
    public string SchemaVersion { get; set; }
    public ConfigurationMetadata Metadata { get; set; }
    public GlobalConfiguration Global { get; set; }
    public Dictionary<string, EntityConfiguration> Entities { get; set; }
    public List<ConsumerGroupConfiguration> ConsumerGroups { get; set; }
    public ValidationConfiguration Validation { get; set; }
}

public class EntityConfiguration
{
    public string DisplayName { get; set; }
    public string PrimaryKey { get; set; }
    public EntitySettings Settings { get; set; }
    public Dictionary<string, List<EventConfiguration>> Events { get; set; }
}

public class EventConfiguration
{
    public string[] TriggerFields { get; set; }
    public string EventType { get; set; }
    public string Mode { get; set; }  // slim, fat, changes_only, tombstone
    public string[] Domains { get; set; }
    public bool BusinessCritical { get; set; }
    public List<EventCondition> Conditions { get; set; }
    public PayloadConfiguration Payload { get; set; }
    public Dictionary<string, ReferenceConfiguration> References { get; set; }
}

public class EventCondition
{
    public string Field { get; set; }
    public string Operator { get; set; }  // equals, changed_from_to, percentage_change_greater_than, etc.
    public object Value { get; set; }
    public object[] FromValues { get; set; }
    public object[] ToValues { get; set; }
    public double Threshold { get; set; }
}

// Configuration registry service
public class EventConfigurationRegistry : IEventConfigurationRegistry
{
    private readonly IConfiguration _configuration;
    private readonly ILogger<EventConfigurationRegistry> _logger;
    private readonly IMemoryCache _cache;
    private EventMappingConfiguration _eventMappings;
    private FileSystemWatcher _configWatcher;

    public EventConfigurationRegistry(IConfiguration configuration, ILogger<EventConfigurationRegistry> logger, IMemoryCache cache)
    {
        _configuration = configuration;
        _logger = logger;
        _cache = cache;

        LoadConfiguration();
        SetupHotReload();
    }

    public async Task<List<EventConfiguration>> GetEventConfigurationsAsync(
        string entityName, 
        string operation,
        Dictionary<string, FieldChange> changedFields)
    {
        var cacheKey = $"events:{entityName}:{operation}";

        if (_cache.TryGetValue(cacheKey, out List<EventConfiguration> cached))
        {
            return FilterByChangedFields(cached, changedFields);
        }

        if (!_eventMappings.Entities.TryGetValue(entityName, out var entityConfig))
        {
            _logger.LogWarning("No configuration found for entity {EntityName}", entityName);
            return new List<EventConfiguration>();
        }

        if (!entityConfig.Events.TryGetValue(operation.ToLower(), out var eventConfigs))
        {
            return new List<EventConfiguration>();
        }

        // Cache the base configurations
        _cache.Set(cacheKey, eventConfigs, TimeSpan.FromMinutes(30));

        return FilterByChangedFields(eventConfigs, changedFields);
    }

    private List<EventConfiguration> FilterByChangedFields(
        List<EventConfiguration> eventConfigs, 
        Dictionary<string, FieldChange> changedFields)
    {
        var matchingConfigs = new List<EventConfiguration>();

        foreach (var config in eventConfigs)
        {
            if (DoesConfigurationMatch(config, changedFields))
            {
                matchingConfigs.Add(config);
            }
        }

        return matchingConfigs;
    }

    private bool DoesConfigurationMatch(EventConfiguration config, Dictionary<string, FieldChange> changedFields)
    {
        // Check if any trigger field matches the changed fields
        foreach (var triggerField in config.TriggerFields)
        {
            if (triggerField == "*")
                return true; // Wildcard matches any change

            if (triggerField.EndsWith("*"))
            {
                var prefix = triggerField.TrimEnd('*');
                if (changedFields.Keys.Any(field => field.StartsWith(prefix)))
                    return true;
            }

            if (changedFields.ContainsKey(triggerField))
                return true;
        }

        return false;
    }

    private void LoadConfiguration()
    {
        try
        {
            var configPath = _configuration["EventMapping:ConfigurationPath"] ?? "event-mappings.yaml";
            var yamlContent = File.ReadAllText(configPath);

            var deserializer = new DeserializerBuilder()
                .WithNamingConvention(UnderscoredNamingConvention.Instance)
                .Build();

            _eventMappings = deserializer.Deserialize<EventMappingConfiguration>(yamlContent);

            ValidateConfiguration(_eventMappings);

            _logger.LogInformation("Event mapping configuration loaded successfully from {ConfigPath}", configPath);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Failed to load event mapping configuration");
            throw;
        }
    }

    private void SetupHotReload()
    {
        var configPath = _configuration["EventMapping:ConfigurationPath"] ?? "event-mappings.yaml";
        var directory = Path.GetDirectoryName(configPath);
        var fileName = Path.GetFileName(configPath);

        _configWatcher = new FileSystemWatcher(directory, fileName)
        {
            EnableRaisingEvents = true,
            NotifyFilter = NotifyFilters.LastWrite
        };

        _configWatcher.Changed += async (sender, e) =>
        {
            _logger.LogInformation("Configuration file changed, reloading...");

            // Debounce multiple file change events
            await Task.Delay(1000);

            try
            {
                LoadConfiguration();
                _cache.Clear(); // Clear cached configurations
                _logger.LogInformation("Configuration reloaded successfully");
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Failed to reload configuration");
            }
        };
    }

    private void ValidateConfiguration(EventMappingConfiguration config)
    {
        var validator = new EventConfigurationValidator();
        var results = validator.Validate(config);

        if (results.Any(r => r.Severity == ValidationSeverity.Error))
        {
            throw new InvalidOperationException($"Configuration validation failed: {string.Join(", ", results.Select(r => r.Message))}");
        }

        if (results.Any(r => r.Severity == ValidationSeverity.Warning))
        {
            _logger.LogWarning("Configuration validation warnings: {Warnings}", 
                string.Join(", ", results.Where(r => r.Severity == ValidationSeverity.Warning).Select(r => r.Message)));
        }
    }
}

Advanced Configuration Patterns

1. Conditional Event Generation

# Only generate event if specific conditions are met
- trigger_fields: ["rqm_salesstatus"]
  event_type: "RequestShortlist.FinalStatusReached"
  conditions:
    - field: "rqm_salesstatus"
      operator: "in_values"
      values: [112020002, 112020003, 112020004]  # Accepted, Rejected, Withdrawn
    - field: "rqm_priority"
      operator: "greater_than"
      value: 5
    - operator: "time_based"
      field: "createdon"
      condition: "older_than_hours"
      value: 24  # Only for requests older than 24 hours

2. Computed Field Generation

payload:
  computed_fields:
    # Simple field concatenation
    candidate_full_name: "CONCAT(candidate.firstname, ' ', candidate.lastname)"

    # Complex business logic
    urgency_score: |
      CASE 
        WHEN rqm_priority > 8 AND rqm_startdate < DATEADD(day, 7, GETDATE()) THEN 'CRITICAL'
        WHEN rqm_priority > 5 AND rqm_startdate < DATEADD(day, 14, GETDATE()) THEN 'HIGH'
        ELSE 'NORMAL'
      END

    # External service calls
    candidate_score:
      service: "MatchingService"
      method: "GetCandidateScore" 
      parameters: ["rqm_candidateid"]
      cache_minutes: 60

3. Event Composition and Aggregation

# Composite events that combine multiple entity changes
composite_events:
  - event_type: "RequestWorkflow.CandidateShortlisted"
    trigger_entities:
      rqm_requestshortlist:
        conditions:
          - field: "rqm_salesstatus"
            operator: "equals"
            value: 112020001  # Shortlisted
      contact:
        conditions:
          - field: "statecode"
            operator: "equals" 
            value: 0  # Active
        correlation: "rqm_requestshortlist.rqm_candidateid = contact.contactid"

    domains: ["matching", "workflow", "notifications"]
    business_critical: true

    payload:
      include_entities: ["rqm_requestshortlist", "contact"]
      computed_fields:
        workflow_step: "candidate_shortlisted"
        next_actions: ["schedule_interview", "notify_client"]

Configuration Management Strategies

1. Environment-Specific Overrides

# base-config.yaml
entities:
  contact:
    events:
      update:
        - trigger_fields: ["emailaddress1"]
          event_type: "Contact.EmailChanged"
          domains: ["crm", "notifications"]

---
# dev-overrides.yaml  
entities:
  contact:
    events:
      update:
        - trigger_fields: ["emailaddress1"]
          event_type: "Contact.EmailChanged"
          domains: ["crm", "notifications", "debug"]  # Add debug domain in dev
          payload:
            include_debug_info: true

---
# prod-overrides.yaml
entities:
  contact:
    events:
      update:
        - trigger_fields: ["emailaddress1"]
          event_type: "Contact.EmailChanged"
          domains: ["crm", "notifications", "metrics", "audit"]  # Full domains in prod
          business_critical: true

2. Configuration Versioning and Migration

public class ConfigurationMigrationService
{
    public async Task<EventMappingConfiguration> MigrateConfigurationAsync(
        EventMappingConfiguration oldConfig, 
        string targetVersion)
    {
        var migrations = new Dictionary<string, Func<EventMappingConfiguration, EventMappingConfiguration>>
        {
            ["1.0-to-1.1"] = MigrateV1ToV11,
            ["1.1-to-1.2"] = MigrateV11ToV12
        };

        var currentVersion = oldConfig.SchemaVersion;
        var config = oldConfig;

        while (currentVersion != targetVersion)
        {
            var migrationKey = $"{currentVersion}-to-{GetNextVersion(currentVersion)}";

            if (migrations.TryGetValue(migrationKey, out var migration))
            {
                config = migration(config);
                currentVersion = config.SchemaVersion;
            }
            else
            {
                throw new InvalidOperationException($"No migration path from {currentVersion} to {targetVersion}");
            }
        }

        return config;
    }

    private EventMappingConfiguration MigrateV1ToV11(EventMappingConfiguration config)
    {
        // Example: Add new required fields introduced in v1.1
        foreach (var entity in config.Entities.Values)
        {
            foreach (var events in entity.Events.Values)
            {
                foreach (var eventConfig in events)
                {
                    // Add default domain if not specified
                    if (eventConfig.Domains == null || !eventConfig.Domains.Any())
                    {
                        eventConfig.Domains = new[] { "default" };
                    }

                    // Add default business criticality
                    if (!eventConfig.BusinessCritical)
                    {
                        eventConfig.BusinessCritical = eventConfig.EventType.Contains("Created") || 
                                                      eventConfig.EventType.Contains("Deleted");
                    }
                }
            }
        }

        config.SchemaVersion = "1.1";
        return config;
    }
}

3. Configuration Testing and Validation

[Test]
public async Task Configuration_Should_Generate_Correct_Events_For_RequestShortlist_StatusChange()
{
    // Arrange
    var config = LoadTestConfiguration();
    var registry = new EventConfigurationRegistry(config, Mock.Of<ILogger>(), new MemoryCache(new MemoryCacheOptions()));

    var changedFields = new Dictionary<string, FieldChange>
    {
        ["rqm_salesstatus"] = new FieldChange
        {
            FieldName = "rqm_salesstatus",
            OldValue = 112020000, // Under Review
            NewValue = 112020001, // Shortlisted
            DataType = "OptionSetValue"
        }
    };

    // Act
    var eventConfigs = await registry.GetEventConfigurationsAsync("rqm_requestshortlist", "update", changedFields);

    // Assert
    Assert.Single(eventConfigs);
    Assert.Equal("RequestShortlist.StatusChanged", eventConfigs[0].EventType);
    Assert.Equal("slim", eventConfigs[0].Mode);
    Assert.Contains("matching", eventConfigs[0].Domains);
    Assert.Contains("notifications", eventConfigs[0].Domains);
    Assert.True(eventConfigs[0].BusinessCritical);
}

[Test]
public void Configuration_Should_Validate_Required_Fields()
{
    // Arrange
    var invalidConfig = new EventMappingConfiguration
    {
        Entities = new Dictionary<string, EntityConfiguration>
        {
            ["test_entity"] = new EntityConfiguration
            {
                Events = new Dictionary<string, List<EventConfiguration>>
                {
                    ["create"] = new List<EventConfiguration>
                    {
                        new EventConfiguration
                        {
                            // Missing required EventType
                            Mode = "fat",
                            Domains = new[] { "test" }
                        }
                    }
                }
            }
        }
    };

    // Act & Assert
    var validator = new EventConfigurationValidator();
    var results = validator.Validate(invalidConfig);

    Assert.True(results.Any(r => r.Severity == ValidationSeverity.Error));
    Assert.Contains(results, r => r.Message.Contains("EventType is required"));
}

This configuration-driven approach provides the flexibility to handle 7N20's complex custom entity requirements while maintaining clean separation between business rules and implementation code.

# event-mappings.yaml
entities:
  rqm_requestshortlist:
    display_name: "Request Shortlist"
    events:
      create:
        event_type: "RequestShortlist.Created"
        mode: "fat"  # Include full entity snapshot
        domains: ["matching", "crm", "metrics"]
        business_critical: true
        include_references: ["rqm_candidateid", "rqm_requestid"]

      update:
        - trigger_fields: ["rqm_salesstatus"] 
          event_type: "RequestShortlist.StatusChanged"
          mode: "slim"
          domains: ["matching", "notifications"]
          business_critical: true

        - trigger_fields: ["rqm_saleperhour", "rqm_costperhour"]
          event_type: "RequestShortlist.RatesUpdated" 
          mode: "slim"
          domains: ["finance", "crm", "metrics"]
          business_critical: false

        - trigger_fields: ["*"]  # Catch-all for analytics
          event_type: "RequestShortlist.Updated"
          mode: "changes_only"
          domains: ["metrics", "audit"]
          business_critical: false

      delete:
        event_type: "RequestShortlist.Deleted"
        mode: "tombstone"  # Full pre-image + deletion metadata
        domains: ["all"]
        business_critical: true

  contact:
    display_name: "Contact"
    events:
      update:
        - trigger_fields: ["emailaddress1"]
          event_type: "Contact.EmailChanged" 
          mode: "slim"
          domains: ["crm", "notifications", "verification"]
          business_critical: true

        - trigger_fields: ["statecode"]
          event_type: "Contact.StatusChanged"
          mode: "slim" 
          domains: ["crm", "matching"]
          business_critical: true

# Global configuration
event_hub:
  business_events:
    connection_string: "${EVENT_HUB_BUSINESS_EVENTS_CONNECTION}"
    hub_name: "business-events"
    partitions: 32
    retention_days: 7

consumer_groups:
  - name: "integration-services"
    checkpoint_interval: "immediate"  # Real-time processing
    max_batch_size: 1

  - name: "metrics-pipeline"  
    checkpoint_interval: "5min"  # Batch processing
    max_batch_size: 100

  - name: "audit-compliance"
    checkpoint_interval: "1min"
    max_batch_size: 50
    retention_override: 90  # Extended retention for compliance

Business Event Schema Design

Events follow a consistent schema with flexible payload design:

public class BusinessEvent
{
    // Standard fields (always present)
    public Guid EventId { get; set; } = Guid.NewGuid();
    public string EventType { get; set; }  // "RequestShortlist.StatusChanged"
    public Guid EntityId { get; set; }
    public string EntityType { get; set; }  // "rqm_requestshortlist"
    public DateTime Timestamp { get; set; }
    public Guid CorrelationId { get; set; }
    public Guid RequestId { get; set; }  // From DataVerse context
    public string Source { get; set; } = "DataVerse.BusinessEventShim";

    // Event routing and processing hints
    public EventMetadata Metadata { get; set; }

    // Flexible payload based on event mode
    public JObject Data { get; set; }

    // Change tracking (for update events)
    public List<FieldChange> Changes { get; set; }

    // Entity references (resolved based on configuration)  
    public Dictionary<string, EntityReference> References { get; set; }
}

public class EventMetadata
{
    public string[] Domains { get; set; }  // ["matching", "crm"] 
    public bool IsBusinessCritical { get; set; }
    public bool IncludeInMetrics { get; set; } = true;
    public string Mode { get; set; }  // "slim", "fat", "changes_only", "tombstone"
    public int SchemaVersion { get; set; } = 1;
    public DateTime CreatedAt { get; set; }
    public string CreatedBy { get; set; }  // DataVerse user ID
}

public class FieldChange
{
    public string FieldName { get; set; }
    public object OldValue { get; set; }
    public object NewValue { get; set; }
    public string DataType { get; set; }  // From DataVerse metadata
    public string DisplayName { get; set; }  // User-friendly field name
}

public class EntityReference
{
    public Guid Id { get; set; }
    public string LogicalName { get; set; }
    public string DisplayName { get; set; }  // Resolved name
    public Dictionary<string, object> Attributes { get; set; }  // Key fields only
}

Event Payload Examples

Slim Event (Status Change):

{
  "eventId": "123e4567-e89b-12d3-a456-426614174000",
  "eventType": "RequestShortlist.StatusChanged",
  "entityId": "ab89c994-065d-f011-bec2-000d3ac241ab",
  "entityType": "rqm_requestshortlist", 
  "timestamp": "2024-01-15T10:30:00.123Z",
  "correlationId": "fc65709c-7b89-460a-bafc-7193dd5f7c5b",
  "requestId": "b41ea0f1-81da-4218-9558-c1bcd3cce484",

  "metadata": {
    "domains": ["matching", "notifications"],
    "isBusinessCritical": true,
    "mode": "slim",
    "schemaVersion": 1,
    "createdBy": "f88d0d38-940d-ed11-82e4-000d3a2fc6ad"
  },

  "data": {
    "newStatus": {
      "value": 112020001,
      "displayValue": "Shortlisted"
    },
    "previousStatus": {
      "value": 112020000, 
      "displayValue": "Under Review"
    }
  },

  "changes": [{
    "fieldName": "rqm_salesstatus",
    "oldValue": 112020000,
    "newValue": 112020001,
    "dataType": "OptionSetValue",
    "displayName": "Sales Status"
  }],

  "references": {
    "rqm_candidateid": {
      "id": "12ae6e67-6f3b-45ad-819e-dde77feb6d5a",
      "logicalName": "contact", 
      "displayName": "Andrzej Numerek",
      "attributes": {
        "emailaddress1": "andrzej@example.com",
        "telephone1": "+48123456789"
      }
    }
  }
}

Fat Event (Entity Created):

{
  "eventId": "789e4567-e89b-12d3-a456-426614174001",
  "eventType": "RequestShortlist.Created",
  "entityId": "ab89c994-065d-f011-bec2-000d3ac241ab",
  "entityType": "rqm_requestshortlist",
  "timestamp": "2024-01-15T10:25:00.456Z",

  "metadata": {
    "domains": ["matching", "crm", "metrics"],
    "isBusinessCritical": true,
    "mode": "fat",
    "schemaVersion": 1
  },

  "data": {
    "rqm_requestshortlistid": "ab89c994-065d-f011-bec2-000d3ac241ab",
    "rqm_title": "Andrzej Numerek is considered for request SR Type for client Nordea Poland",
    "rqm_saleperhour": {
      "value": 8.0000,
      "currency": "PLN"
    },
    "rqm_costperhour": {
      "value": 6.0000, 
      "currency": "PLN"
    },
    "rqm_salesstatus": {
      "value": 112020000,
      "displayValue": "Under Review" 
    },
    "statecode": {
      "value": 0,
      "displayValue": "Active"
    },
    "createdon": "2024-01-15T10:25:00.456Z"
  },

  "references": {
    "rqm_candidateid": {
      "id": "12ae6e67-6f3b-45ad-819e-dde77feb6d5a",
      "logicalName": "contact",
      "displayName": "Andrzej Numerek" 
    },
    "rqm_requestid": {
      "id": "98765432-1234-5678-9abc-def012345678",
      "logicalName": "rqm_request",
      "displayName": "SR Type - Nordea Poland"
    }
  }
}

Consumer Group Strategies

The unified event stream serves different consumption patterns through Event Hub consumer groups:

Integration Services Consumer Group

Characteristics: - Real-time processing (latency < 5 seconds) - Immediate checkpointing after successful processing - Small batch sizes (typically 1 event) - Focus on business-critical events

// Integration service consumer pattern
public class IntegrationEventConsumer : BackgroundService
{
    private readonly EventHubConsumerClient _consumerClient;
    private readonly ILogger<IntegrationEventConsumer> _logger;

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        await foreach (PartitionEvent partitionEvent in 
            _consumerClient.ReadEventsAsync(stoppingToken))
        {
            var businessEvent = JsonSerializer.Deserialize<BusinessEvent>(
                partitionEvent.Data.EventBody);

            // Only process events for this service's domains
            if (!IsRelevantEvent(businessEvent))
                continue;

            // Process immediately
            await ProcessBusinessEventAsync(businessEvent);

            // Immediate checkpoint for real-time processing
            await UpdateCheckpointAsync(partitionEvent);
        }
    }

    private bool IsRelevantEvent(BusinessEvent eventData)
    {
        var serviceDomains = new[] { "matching", "notifications" };
        return eventData.Metadata.Domains.Any(d => serviceDomains.Contains(d));
    }
}

Metrics Pipeline Consumer Group

Characteristics: - Batch processing (5-minute intervals) - Deferred checkpointing for efficiency - Large batch sizes (100+ events) - Processes all events for completeness

// Metrics pipeline consumer pattern  
public class MetricsEventConsumer : BackgroundService
{
    private readonly EventHubConsumerClient _consumerClient;
    private readonly List<BusinessEvent> _eventBuffer = new();
    private readonly Timer _batchTimer;

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        // Setup batch processing timer
        _batchTimer = new Timer(ProcessBatch, null, 
            TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(5));

        await foreach (PartitionEvent partitionEvent in 
            _consumerClient.ReadEventsAsync(stoppingToken))
        {
            var businessEvent = JsonSerializer.Deserialize<BusinessEvent>(
                partitionEvent.Data.EventBody);

            // Add all events to buffer for batch processing
            lock (_eventBuffer)
            {
                _eventBuffer.Add(businessEvent);
            }
        }
    }

    private async void ProcessBatch(object state)
    {
        List<BusinessEvent> eventsToProcess;
        lock (_eventBuffer)
        {
            eventsToProcess = new List<BusinessEvent>(_eventBuffer);
            _eventBuffer.Clear();
        }

        if (eventsToProcess.Any())
        {
            await ProcessEventBatch(eventsToProcess);

            // Update checkpoint after successful batch processing
            await UpdateBatchCheckpoint();
        }
    }
}

Implementation Guidance for Custom Entities

Handling Referenced Entities

Challenge: Custom entities often reference other entities (rqm_candidateidcontact). How do we efficiently include related data?

Solution: Multi-tier caching strategy with lazy loading

public class EntityReferenceService
{
    private readonly IMemoryCache _cache;
    private readonly IDataverseClient _dataverseClient;

    public async Task<EntityReference> ResolveReferenceAsync(
        string logicalName, Guid entityId, string[] requiredFields = null)
    {
        var cacheKey = $"entity:{logicalName}:{entityId}";

        if (_cache.TryGetValue(cacheKey, out EntityReference cached))
            return cached;

        // Fetch from DataVerse with only required fields
        var entity = await _dataverseClient.GetEntityAsync(
            logicalName, entityId, requiredFields ?? new[] { "name" });

        var reference = new EntityReference
        {
            Id = entityId,
            LogicalName = logicalName,
            DisplayName = entity.GetAttributeValue<string>("name"),
            Attributes = entity.Attributes.ToDictionary(
                kvp => kvp.Key, kvp => kvp.Value)
        };

        // Cache for 30 minutes (balance freshness vs performance)
        _cache.Set(cacheKey, reference, TimeSpan.FromMinutes(30));

        return reference;
    }
}

// Usage in business event creation
var candidateRef = await _entityReferenceService.ResolveReferenceAsync(
    "contact", 
    candidateId, 
    new[] { "fullname", "emailaddress1", "telephone1" });

Event Ordering and Consistency

Challenge: DataVerse plugin execution can result in out-of-order events, especially with "Full Remote" features.

Solution: Version vectors with event buffering

public class EventOrderingService
{
    private readonly Dictionary<Guid, long> _entityVersions = new();
    private readonly Dictionary<Guid, Queue<BusinessEvent>> _eventBuffer = new();

    public async Task<IEnumerable<BusinessEvent>> ProcessEventAsync(BusinessEvent incomingEvent)
    {
        var entityId = incomingEvent.EntityId;
        var eventVersion = ExtractVersionFromEvent(incomingEvent);

        if (!_entityVersions.ContainsKey(entityId))
        {
            // First event for this entity
            _entityVersions[entityId] = eventVersion;
            return new[] { incomingEvent };
        }

        var lastVersion = _entityVersions[entityId];

        if (eventVersion == lastVersion + 1)
        {
            // In order - process immediately
            _entityVersions[entityId] = eventVersion;
            var eventsToEmit = new List<BusinessEvent> { incomingEvent };

            // Check if buffered events can now be processed
            eventsToEmit.AddRange(ProcessBufferedEvents(entityId));

            return eventsToEmit;
        }
        else if (eventVersion > lastVersion + 1)
        {
            // Out of order - buffer for later
            if (!_eventBuffer.ContainsKey(entityId))
                _eventBuffer[entityId] = new Queue<BusinessEvent>();

            _eventBuffer[entityId].Enqueue(incomingEvent);
            return Enumerable.Empty<BusinessEvent>();
        }
        else
        {
            // Duplicate or very old event - ignore
            return Enumerable.Empty<BusinessEvent>();
        }
    }

    private long ExtractVersionFromEvent(BusinessEvent businessEvent)
    {
        // Use DataVerse RowVersion or create sequence number
        return businessEvent.Data.Value<long>("rowversion") ?? 
               DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
    }
}

Deleted Events Handling

Challenge: Capture full entity state when entities are deleted.

Solution: Tombstone events with pre-operation images

# DataVerse plugin configuration for delete events
plugin_steps:
  - message: "Delete"
    entity: "rqm_requestshortlist"  
    stage: "PreOperation"  # Critical: Before deletion
    mode: "Asynchronous"
    images:
      - name: "PreImage"
        type: "PreImage" 
        attributes: "all"  # Capture complete entity state
// Tombstone event creation
public BusinessEvent CreateTombstoneEvent(DataVerseEvent deleteEvent)
{
    var preImage = deleteEvent.PreEntityImages?.FirstOrDefault();

    return new BusinessEvent
    {
        EventType = $"{GetEntityDisplayName(deleteEvent.PrimaryEntityName)}.Deleted",
        EntityId = deleteEvent.PrimaryEntityId,
        EntityType = deleteEvent.PrimaryEntityName,

        Metadata = new EventMetadata
        {
            Domains = new[] { "all" },
            IsBusinessCritical = true,
            Mode = "tombstone"
        },

        Data = JObject.FromObject(new
        {
            deletedAt = DateTime.UtcNow,
            deletedBy = deleteEvent.UserId,
            deletionReason = "user_initiated", // Could be extracted from context
            lastKnownState = preImage?.Attributes // Full entity snapshot
        })
    };
}

Bidirectional Flow (AH → DataVerse)

Challenge: When AgentHub updates data that affects DataVerse records, how do we notify users?

Solution: Command/Event separation with correlation tracking

sequenceDiagram
    participant AH as AgentHub Service
    participant CMD as Command Handler
    participant DV as DataVerse
    participant EH as Event Hub
    participant SHIM as Business Event Shim
    participant USER as User Notification
    participant UI as User Interface

    AH->>CMD: Update Contact Command
    CMD->>DV: Execute Update via API
    DV->>EH: Contact Updated (Raw Event)
    EH->>SHIM: Process Event
    SHIM->>EH: Contact.Updated (Business Event)
    EH->>USER: Notification Event
    USER->>UI: Real-time Update

    Note over AH,UI: Correlation ID tracks entire flow
public class BidirectionalEventCorrelation
{
    public async Task<Guid> ExecuteCommandAsync<TCommand>(TCommand command) 
        where TCommand : IDataVerseCommand
    {
        var correlationId = Guid.NewGuid();

        // Set correlation context
        using var correlation = new CorrelationContext(correlationId);

        // Execute command in DataVerse
        await _dataverseClient.ExecuteAsync(command);

        // The resulting DataVerse event will include this correlation ID
        // Business event shim will preserve it for downstream processing

        return correlationId;
    }

    public async Task WaitForEventConfirmation(Guid correlationId, TimeSpan timeout)
    {
        // Wait for business event with matching correlation ID
        // Useful for critical operations requiring confirmation
    }
}

CRM Service Improvements

Current Issues: - Monolithic service handling multiple concerns - Circuit breaker doesn't immediately detect admin mode ending - No clear domain separation

Recommended Improvements:

  1. Domain-Based Service Splitting
// Before: Monolithic CRM Service
public class CrmService
{
    // Handles contacts, accounts, leads, opportunities, etc.
}

// After: Domain-specific services
public class ContactService { /* Contact operations only */ }
public class AccountService { /* Account operations only */ }
public class OpportunityService { /* Opportunity operations only */ }
public class RequestShortlistService { /* Custom rqm_* entities */ }
  1. Enhanced Admin Mode Detection
public class DataVerseHealthService : BackgroundService
{
    private readonly IDataverseClient _client;
    private readonly ICircuitBreakerService _circuitBreaker;

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        while (!stoppingToken.IsCancellationRequested)
        {
            try
            {
                var healthStatus = await CheckDataVerseHealthAsync();

                if (healthStatus.IsMaintenanceMode)
                {
                    await _circuitBreaker.OpenAsync("DataVerse maintenance mode detected");
                }
                else if (healthStatus.IsHealthy && _circuitBreaker.IsOpen)
                {
                    await _circuitBreaker.CloseAsync("DataVerse healthy, maintenance ended");
                }

                await Task.Delay(TimeSpan.FromSeconds(30), stoppingToken);
            }
            catch (Exception ex)
            {
                // Handle monitoring errors
            }
        }
    }

    private async Task<DataVerseHealthStatus> CheckDataVerseHealthAsync()
    {
        // Multiple health indicators
        var response = await _client.ExecuteAsync(new WhoAmIRequest());
        var metadata = await _client.GetEntityMetadataAsync("systemuser");

        return new DataVerseHealthStatus
        {
            IsHealthy = response != null,
            IsMaintenanceMode = IsMaintenanceIndicator(response, metadata),
            ResponseTime = /* measure response time */,
            LastChecked = DateTime.UtcNow
        };
    }
}

Alternative Approaches Analysis

For comparison, here's why other approaches are insufficient for 7N20's custom entity requirements:

1. Microsoft's Native Business Events Catalog

Limitation: Only supports standard DataVerse entities (contact, account, opportunity, etc.)

graph LR
    DV[DataVerse<br/>Standard Entities] -->|Native Events| PA[Power Automate]
    CE[Custom Entities<br/>rqm_requestshortlist] -.->|❌ Not Supported| PA
    PA --> SERVICES[Integration Services]

    style CE fill:#faa,stroke:#333,stroke-width:2px

Verdict: ❌ Insufficient - Cannot handle rqm_* custom entities that are central to 7N20's business logic.

2. Azure Event Grid Only

Limitation: Provides routing but no business context transformation

graph LR
    DV[DataVerse<br/>Raw CRUD Events] --> EG[Event Grid]
    EG -->|Still Raw Events| S1[Service 1]
    EG -->|Still Raw Events| S2[Service 2]

    style EG fill:#ffa,stroke:#333,stroke-width:2px

Verdict: ❌ Insufficient - Services still need to understand DataVerse-specific formats and custom field structures.

3. Batch Processing Only (Data Lake)

Limitation: Cannot serve real-time integration use cases

graph LR
    DV[DataVerse] --> DL[Data Lake<br/>1-hour latency]
    DL --> BI[Business Intelligence]
    DL -.->|❌ Too Slow| INTEGRATION[Real-time<br/>Integration]

    style INTEGRATION fill:#faa,stroke:#333,stroke-width:2px

Verdict: ❌ Insufficient - Acceptable for metrics but fails integration requirements.

Final Recommendations

Based on the analysis, the unified approach with Business Event Shim is the optimal solution for 7N20:

graph TB
    subgraph "Why This Approach"
        R1[✅ Supports Custom Entities<br/>rqm_requestshortlist, etc.]
        R2[✅ Serves Both Use Cases<br/>Integration + Metrics]
        R3[✅ Single Infrastructure<br/>Reduced complexity]
        R4[✅ Business Context<br/>Meaningful events]
        R5[✅ Configurable<br/>Easy to extend]
    end

    DV[DataVerse<br/>All Entities] --> SHIM[Business Event Shim<br/>Configuration-Driven]
    SHIM --> EH[Event Hub<br/>Business Events]
    EH --> CG1[Integration Services<br/>Real-time]
    EH --> CG2[Metrics Pipeline<br/>Batch]

    style SHIM fill:#9f9,stroke:#333,stroke-width:3px

Implementation Phases

Phase 1: Foundation (0-2 months) 1. Build Business Event Shim with configuration registry 2. Implement basic event mapping for rqm_requestshortlist 3. Setup Event Hub with consumer groups 4. Create integration service consumer pattern

Phase 2: Integration (2-4 months) 1. Migrate existing services to consume business events 2. Implement entity reference caching 3. Add event ordering and consistency handling 4. Split CRM service into domain-specific services

Phase 3: Optimization (4-6 months) 1. Add comprehensive event mappings for all custom entities 2. Implement bidirectional flow with correlation tracking 3. Enhanced admin mode detection and circuit breakers 4. Performance optimization and monitoring

Success Metrics

Technical Metrics: - Event processing latency < 5 seconds for integration use cases - Event completeness > 99.9% for metrics use cases - System availability > 99.5% - Custom entity coverage: 100% of rqm_* entities

Business Metrics: - Integration services decoupled from DataVerse schema changes - Metrics and analytics using same events as real-time systems - Reduced time to add new custom entity support (< 1 day configuration) - Single source of truth for all business events

Risk Mitigation

Operational Risks: - Shim Service Failure: Implement high availability with multiple replicas - Event Hub Saturation: Auto-scaling with partition expansion - Schema Evolution: Version-aware event processing

Business Risks: - Event Loss: At-least-once delivery with idempotency checks - Integration Delays: Circuit breakers and fallback mechanisms - Data Consistency: Event ordering and correlation tracking

Conclusion

The unified Business Event Shim approach is essential infrastructure for 7N20's DataVerse integration, not optional enhancement. Given the extensive custom entities (rqm_*), Microsoft's native solutions are insufficient.

Key Benefits: - Single investment serves both integration and metrics needs - Future-proof architecture supports business growth - Reduced complexity compared to multiple integration patterns - Business-focused events improve development velocity

The recommended approach balances functionality, maintainability, and cost while providing a foundation for 7N20's event-driven architecture evolution.


Related Documentation: - ADR-0002: Event-Driven Metrics Platform Architecture - DataVerse Event Structure - CRM Domain Business Events DV[DataVerse Entities] SP[Service Endpoints] end

subgraph "Azure Event Grid"
    EG[Event Grid<br/>Custom Topics]
    F1[Filter: Email Changes]
    F2[Filter: Status Changes]
    F3[Filter: Create Events]
end

subgraph "Business Services"
    ES[Email Service]
    NS[Notification Service]
    AS[Analytics Service]
    MS[Metrics Service]
end

DV --> SP
SP --> EG
EG --> F1
EG --> F2
EG --> F3
F1 --> ES
F2 --> NS
F3 --> AS
EG --> MS

``` How It Works: - DataVerse publishes to Event Grid custom topics - Event Grid provides advanced filtering on event attributes - Different services subscribe to specific event types - Built-in retry, dead lettering, and delivery guarantees Advantages: - Advanced Filtering: Content-based routing without custom code - Scalability: Handles millions of events with automatic scaling - Reliability: Built-in retry mechanisms and error handling - Cost-Effective: Pay per operation, no idle costs - CloudEvents Standard: Supports cross-platform interoperability Disadvantages: - Limited Transformation: Cannot modify event payloads significantly - Azure Lock-in: Specific to Azure ecosystem - Complexity: Additional service to configure and monitor

3. Hybrid Approach: Native + Custom Enhancement

Combine Microsoft's native business events with selective custom processing for complex scenarios. mermaid graph TB subgraph "DataVerse Events" DV[DataVerse] NBE[Native Business Events] RE[Raw CRUD Events] end subgraph "Event Processing" PA[Power Automate] CS[Custom Shim<br/>Complex Events Only] end subgraph "Event Distribution" EG[Event Grid] EH[Event Hub] end subgraph "Consumers" S1[Simple Subscriptions] S2[Analytics Pipeline] S3[Complex Business Logic] end NBE --> PA RE --> CS PA --> EG CS --> EH EG --> S1 EH --> S2 EH --> S3

Strategy: - Use native business events for standard scenarios (80% of use cases) - Custom shim only for complex, multi-entity, or computed events (20% of use cases) - Route through Event Grid for intelligent distribution

4. Batch Processing with Data Lake (Existing ADR Pattern)

Leverage the existing metrics platform architecture for business events.

graph LR
    subgraph "Event Sources"
        DV[DataVerse<br/>Service Endpoints]
    end

    subgraph "Ingestion"
        EH[Event Hub<br/>with Capture]
    end

    subgraph "Processing (Existing)"
        DL[Data Lake<br/>Raw Events]
        ADF[Data Factory<br/>Transformations]
        SYN[Synapse<br/>Business Event Views]
    end

    subgraph "Consumers"
        PBI[Power BI<br/>Dashboards]
        API[REST APIs<br/>Business Events]
    end

    DV --> EH
    EH --> DL
    DL --> ADF
    ADF --> SYN
    SYN --> PBI
    SYN --> API

Benefits: - Leverages Existing Infrastructure: Uses established metrics platform - Cost-Effective: Already approved and implemented - Batch Efficiency: Optimized for analytical workloads - SQL Familiarity: Business analysts can create custom event views

Architecture Decision Matrix

Approach Development Cost Operational Cost Complexity Flexibility Time to Market
Custom Shim High Medium High High Long
Native Events + Power Automate Low Low Low Medium Short
Event Grid Routing Medium Low Medium Medium Medium
Hybrid Approach Medium Medium Medium High Medium
Batch with Data Lake Low Very Low Low Medium Very Short

1. Metrics and Analytics

Recommendation: Use existing Data Lake architecture from ADR-0002

flowchart TD
    A[Business Event Need] --> B{Real-time?}
    B -->|No| C[Data Lake Pattern<br/>Cost: $400/month<br/>Latency: 1 hour]
    B -->|Yes| D{Complex Logic?}
    D -->|No| E[Native Events<br/>+ Power Automate]
    D -->|Yes| F[Custom Shim<br/>High maintenance]

    style C fill:#9f9,stroke:#333,stroke-width:2px
    style E fill:#9f9,stroke:#333,stroke-width:2px
    style F fill:#f99,stroke:#333,stroke-width:2px

Rationale: - Metrics don't require real-time processing - Existing infrastructure handles event volume efficiently - Cost-effective scaling to 100K+ events/day - SQL transformations for business event creation

2. Real-time Business Notifications

Recommendation: Native Business Events + Event Grid

graph LR
    DV[DataVerse<br/>Event Catalog] --> PA[Power Automate<br/>Business Events]
    PA --> EG[Event Grid<br/>Smart Routing]
    EG --> EMAIL[Email Service]
    EG --> PUSH[Push Notifications]
    EG --> SLACK[Slack Integration]

Benefits: - Low latency (seconds) - Microsoft-supported reliability - Built-in filtering and routing - Minimal development overhead

3. Complex Business Rules

Recommendation: Hybrid approach with selective custom processing

Use Custom Shim For: - Multi-entity change detection - Computed business metrics - Complex validation logic - Legacy system integration

Use Native Events For: - Simple field changes - Status transitions
- Standard CRUD notifications

Implementation Guidance

Handling Referenced Entities

Problem: How to include related entity data in business events efficiently?

Solutions by Approach:

  1. Native Events: Limited reference expansion
  2. Custom Shim:
    // Lazy loading with caching
    var contact = await _dataverseClient.GetContactAsync(eventData.ContactId);
    var account = await _entityCache.GetOrFetchAsync<Account>(contact.ParentCustomerId);
    
  3. Data Lake: Join operations in SQL transformations

Event Ordering and Consistency

Problem: DataVerse plugin pipeline can create out-of-order events.

Solutions:

  1. Version Vectors: Add monotonic sequence numbers

    {
      "eventId": "123e4567-...",
      "entityId": "contact-456",
      "version": 142,
      "timestamp": "2024-01-15T10:30:00Z"
    }
    

  2. Event Buffering: Temporary storage for reordering

  3. Eventual Consistency: Design consumers to handle ordering issues

Deleted Events Handling

Problem: Capture entity state before deletion.

Solutions:

  1. Pre-Operation Images: Configure DataVerse to capture full entity before delete
  2. Tombstone Events:
    {
      "eventType": "ContactDeleted",
      "entityId": "contact-123",
      "deletedAt": "2024-01-15T10:30:00Z",
      "lastKnownState": { /* full entity data */ },
      "deletedBy": "user-456"
    }
    

Bidirectional Flow (AH → DataVerse)

Problem: Notify users when AgentHub changes affect their DataVerse records.

Recommended Pattern: Command/Event Separation

sequenceDiagram
    participant AH as AgentHub
    participant CMD as Command Bus
    participant DV as DataVerse
    participant EVT as Event Hub
    participant USER as User Service

    AH->>CMD: Update Contact Command
    CMD->>DV: Execute Update
    DV->>EVT: Contact Updated Event
    EVT->>USER: Send Notification
    USER->>AH: Update UI

CRM Service Improvements

Current Issues: - Monolithic service handling multiple concerns - Circuit breaker doesn't immediately detect admin mode ending - No clear domain separation

Recommended Improvements:

  1. Split into Domain Services:

    CRMService → ContactService + AccountService + LeadService
    

  2. Admin Mode Detection:

    // Poll DataVerse metadata API
    public async Task<bool> IsInMaintenanceModeAsync()
    {
        var response = await _dataverseClient.GetMetadataAsync();
        return response.Headers.Contains("X-Maintenance-Mode");
    }
    

  3. Health Checks:

    services.AddHealthChecks()
        .AddCheck<DataverseHealthCheck>("dataverse")
        .AddCheck<AdminModeHealthCheck>("admin-mode");
    

Final Recommendations

Immediate Actions (0-3 months)

  1. Leverage Existing: Use Data Lake pattern for analytics and metrics
  2. Prototype Native Events: Test DataVerse Event Catalog for simple notifications
  3. Evaluate Event Grid: Pilot for intelligent event routing

Medium Term (3-6 months)

  1. Implement Hybrid: Native events for 80% of use cases, custom processing for complex scenarios
  2. Refactor CRM Service: Split into domain-specific microservices
  3. Add Event Versioning: Prepare for schema evolution

Long Term (6+ months)

  1. Custom Shim Decision: Only implement if native + hybrid approaches insufficient
  2. Event Sourcing: Consider full event sourcing pattern if audit requirements grow
  3. Multi-Cloud: Evaluate alternatives as platform matures

Conclusion

The custom shim approach, while powerful, introduces significant complexity and maintenance overhead. Microsoft's native business events capabilities, combined with Azure Event Grid for intelligent routing, provide a more sustainable and cost-effective solution for most use cases.

Key Principle: Start with the simplest approach that meets requirements, then add complexity only when necessary.

The hybrid strategy of leveraging native capabilities for standard scenarios while reserving custom processing for truly complex business logic provides the best balance of functionality, maintainability, and cost.


Related Documentation: - ADR-0002: Event-Driven Metrics Platform Architecture - DataVerse Event Structure - Business Events Implementation Guide