Skip to content

Implementation Guide

Overview

This comprehensive implementation guide provides step-by-step instructions for deploying the event-driven metrics platform. The guide is structured in phases to minimize risk and enable gradual rollout while validating each component before proceeding.

Prerequisites

  • Azure subscription with Owner or Contributor access
  • Power BI Premium or Pro licenses
  • Matomo Cloud instance access
  • Sentry project access
  • Azure CLI installed and configured
  • Terraform installed (optional but recommended)

Implementation Roadmap

Phase Overview

gantt
    title Implementation Timeline (8 weeks)
    dateFormat YYYY-MM-DD
    axisFormat %m/%d

    section Phase 1: Foundation
    Azure Resources     :active, p1-azure, 2024-01-01, 2024-01-07
    Event Hub Setup     :active, p1-eh, 2024-01-01, 2024-01-07
    Storage Config      :p1-storage, 2024-01-03, 2024-01-07

    section Phase 2: Ingestion
    Event Publishing    :p2-pub, 2024-01-08, 2024-01-14
    Capture Validation  :p2-cap, 2024-01-10, 2024-01-14
    Schema Validation   :p2-schema, 2024-01-12, 2024-01-14

    section Phase 3: Processing
    Synapse Setup       :p3-syn, 2024-01-15, 2024-01-21
    Data Factory        :p3-adf, 2024-01-17, 2024-01-21
    Processing Logic    :p3-proc, 2024-01-19, 2024-01-21

    section Phase 4: Analytics
    Power BI Config     :p4-pbi, 2024-01-22, 2024-01-28
    Dashboard Creation  :p4-dash, 2024-01-24, 2024-01-28
    External APIs       :p4-api, 2024-01-26, 2024-01-28

Phase 1: Foundation Setup (Week 1)

1.1 Azure Resource Group and Networking

#!/bin/bash
# Phase 1.1: Create foundation resources

# Set variables
SUBSCRIPTION_ID="your-subscription-id"
RESOURCE_GROUP="rg-metrics-platform"
LOCATION="westeurope"
ENVIRONMENT="prod"

# Login and set subscription
az login
az account set --subscription $SUBSCRIPTION_ID

# Create resource group
az group create \
    --name $RESOURCE_GROUP \
    --location $LOCATION \
    --tags Environment=$ENVIRONMENT Purpose=MetricsPlatform

# Create storage account for Data Lake
STORAGE_ACCOUNT="datalakemetricsplatform"
az storage account create \
    --name $STORAGE_ACCOUNT \
    --resource-group $RESOURCE_GROUP \
    --location $LOCATION \
    --sku Standard_LRS \
    --kind StorageV2 \
    --enable-hierarchical-namespace true \
    --tags Environment=$ENVIRONMENT Purpose=DataLake

# Create containers
STORAGE_KEY=$(az storage account keys list --account-name $STORAGE_ACCOUNT --resource-group $RESOURCE_GROUP --query "[0].value" -o tsv)

for container in raw-events bronze silver gold; do
    az storage container create \
        --name $container \
        --account-name $STORAGE_ACCOUNT \
        --account-key $STORAGE_KEY \
        --public-access off
done

echo "✅ Foundation resources created successfully"

1.2 Event Hub Namespaces

#!/bin/bash
# Phase 1.2: Create Event Hub namespaces and hubs

# Define domains and their event hubs
declare -A DOMAINS=(
    ["crm"]="contacts-events opportunities-events accounts-events"
    ["matching"]="matches-events iterations-events declarations-events"
    ["profile"]="profiles-events skills-events availability-events"
    ["assessment"]="assessments-events results-events feedback-events"
)

for domain in "${!DOMAINS[@]}"; do
    NAMESPACE="${domain}-events-namespace"

    # Create namespace
    az eventhubs namespace create \
        --name $NAMESPACE \
        --resource-group $RESOURCE_GROUP \
        --location $LOCATION \
        --sku Standard \
        --capacity 1 \
        --enable-auto-inflate true \
        --maximum-throughput-units 5 \
        --tags Domain=$domain Environment=$ENVIRONMENT

    # Create event hubs
    for hub in ${DOMAINS[$domain]}; do
        az eventhubs eventhub create \
            --name $hub \
            --namespace-name $NAMESPACE \
            --resource-group $RESOURCE_GROUP \
            --partition-count 4 \
            --message-retention 7

        # Enable capture
        az eventhubs eventhub update \
            --name $hub \
            --namespace-name $NAMESPACE \
            --resource-group $RESOURCE_GROUP \
            --enable-capture true \
            --capture-interval 300 \
            --capture-size-limit 104857600 \
            --destination-name EventHubArchive.AzureDataLake \
            --storage-account $STORAGE_ACCOUNT \
            --blob-container raw-events \
            --archive-name-format "{Namespace}/{EventHub}/{Year}/{Month}/{Day}/{Hour}/{PartitionId}_{Offset}_{SequenceNumber}.avro"
    done

    echo "✅ Domain '$domain' Event Hubs created"
done

1.3 Terraform Configuration (Alternative Approach)

# main.tf
terraform {
  required_providers {
    azurerm = {
      source  = "hashicorp/azurerm"
      version = "~>3.0"
    }
  }

  backend "azurerm" {
    resource_group_name  = "rg-terraform-state"
    storage_account_name = "tfstate_storage_account"
    container_name       = "tfstate"
    key                  = "metrics-platform.tfstate"
  }
}

provider "azurerm" {
  features {}
}

# Resource Group
resource "azurerm_resource_group" "main" {
  name     = "rg-metrics-platform"
  location = "West Europe"

  tags = {
    Environment = "Production"
    Purpose     = "MetricsPlatform"
  }
}

# Storage Account
resource "azurerm_storage_account" "data_lake" {
  name                     = "datalakemetricsplatform"
  resource_group_name      = azurerm_resource_group.main.name
  location                 = azurerm_resource_group.main.location
  account_tier             = "Standard"
  account_replication_type = "LRS"
  account_kind             = "StorageV2"
  is_hns_enabled          = true

  blob_properties {
    versioning_enabled       = true
    change_feed_enabled     = true
    last_access_time_enabled = true

    delete_retention_policy {
      days = 30
    }
  }

  tags = azurerm_resource_group.main.tags
}

# Event Hub Namespace
resource "azurerm_eventhub_namespace" "domains" {
  for_each = {
    "crm"        = ["contacts-events", "opportunities-events", "accounts-events"]
    "matching"   = ["matches-events", "iterations-events", "declarations-events"]
    "profile"    = ["profiles-events", "skills-events", "availability-events"]
    "assessment" = ["assessments-events", "results-events", "feedback-events"]
  }

  name                = "${each.key}-events-namespace"
  location            = azurerm_resource_group.main.location
  resource_group_name = azurerm_resource_group.main.name
  sku                 = "Standard"
  capacity            = 1

  auto_inflate_enabled     = true
  maximum_throughput_units = 5

  tags = merge(azurerm_resource_group.main.tags, {
    Domain = each.key
  })
}

# Event Hubs
resource "azurerm_eventhub" "hubs" {
  for_each = {
    for pair in flatten([
      for domain, hubs in {
        "crm"        = ["contacts-events", "opportunities-events", "accounts-events"]
        "matching"   = ["matches-events", "iterations-events", "declarations-events"]
        "profile"    = ["profiles-events", "skills-events", "availability-events"]
        "assessment" = ["assessments-events", "results-events", "feedback-events"]
      } : [
        for hub in hubs : {
          domain = domain
          hub    = hub
          key    = "${domain}-${hub}"
        }
      ]
    ]) : pair.key => pair
  }

  name                = each.value.hub
  namespace_name      = azurerm_eventhub_namespace.domains[each.value.domain].name
  resource_group_name = azurerm_resource_group.main.name
  partition_count     = 4
  message_retention   = 7

  capture_description {
    enabled  = true
    encoding = "Avro"

    interval_in_seconds = 300
    size_limit_in_bytes = 104857600

    destination {
      name                = "EventHubArchive.AzureDataLake"
      archive_name_format = "{Namespace}/{EventHub}/{Year}/{Month}/{Day}/{Hour}/{PartitionId}_{Offset}_{SequenceNumber}.avro"

      blob_container_name   = "raw-events"
      storage_account_name  = azurerm_storage_account.data_lake.name
    }
  }
}

# Deploy with:
# terraform init
# terraform plan
# terraform apply

1.4 Validation Checklist

  • [ ] Resource group created with correct tags
  • [ ] Storage account has hierarchical namespace enabled
  • [ ] All containers (raw-events, bronze, silver, gold) exist
  • [ ] Event Hub namespaces created for each domain
  • [ ] Event Hubs have capture enabled
  • [ ] Capture points to correct storage account and container

Phase 2: Event Ingestion (Week 2)

2.1 Event Publishing Integration

.NET Service Integration

// Install packages:
// dotnet add package Azure.Messaging.EventHubs
// dotnet add package Microsoft.Extensions.Options

// Models/BusinessEvent.cs
public abstract record BusinessEvent
{
    public string EventId { get; init; } = Guid.NewGuid().ToString();
    public DateTime Timestamp { get; init; } = DateTime.UtcNow;
    public string Version { get; init; } = "1.0";
    public required string AggregateId { get; init; }
    public abstract string AggregateType { get; }
    public abstract string EventType { get; }
    public EventMetadata Metadata { get; init; } = new();
    public abstract object Payload { get; }
}

public record EventMetadata
{
    public string CorrelationId { get; init; } = Guid.NewGuid().ToString();
    public string? CausationId { get; init; }
    public string? UserId { get; init; }
    public required string Source { get; init; }
    public string PartitionKey => AggregateId;
    public Dictionary<string, string> Tags { get; init; } = new();
}

// Events/ContactCreatedEvent.cs
public record ContactCreatedEvent : BusinessEvent
{
    public override string AggregateType => "Contact";
    public override string EventType => "ContactCreated";
    public override ContactCreatedPayload Payload { get; init; } = null!;
}

public record ContactCreatedPayload
{
    public required string ContactId { get; init; }
    public required string FirstName { get; init; }
    public required string LastName { get; init; }
    public required string Email { get; init; }
    public string? Company { get; init; }
    public DateTime CreatedAt { get; init; }
}

// Services/EventPublisher.cs
public interface IEventPublisher
{
    Task PublishAsync<T>(T businessEvent) where T : BusinessEvent;
    Task PublishBatchAsync<T>(IEnumerable<T> businessEvents) where T : BusinessEvent;
}

public class EventHubPublisher : IEventPublisher
{
    private readonly EventHubProducerClient _client;
    private readonly ILogger<EventHubPublisher> _logger;

    public EventHubPublisher(string connectionString, string eventHubName, ILogger<EventHubPublisher> logger)
    {
        _client = new EventHubProducerClient(connectionString, eventHubName);
        _logger = logger;
    }

    public async Task PublishAsync<T>(T businessEvent) where T : BusinessEvent
    {
        await PublishBatchAsync(new[] { businessEvent });
    }

    public async Task PublishBatchAsync<T>(IEnumerable<T> businessEvents) where T : BusinessEvent
    {
        var events = businessEvents.ToList();
        if (!events.Any()) return;

        try
        {
            var batch = await _client.CreateBatchAsync();

            foreach (var businessEvent in events)
            {
                var json = JsonSerializer.Serialize(businessEvent, new JsonSerializerOptions
                {
                    PropertyNamingPolicy = JsonNamingPolicy.CamelCase
                });

                var eventData = new EventData(Encoding.UTF8.GetBytes(json))
                {
                    PartitionKey = businessEvent.Metadata.PartitionKey
                };

                // Add metadata properties
                eventData.Properties["EventType"] = businessEvent.EventType;
                eventData.Properties["AggregateType"] = businessEvent.AggregateType;
                eventData.Properties["Version"] = businessEvent.Version;
                eventData.Properties["CorrelationId"] = businessEvent.Metadata.CorrelationId;

                if (!batch.TryAdd(eventData))
                {
                    // Send current batch and create new one
                    if (batch.Count > 0)
                    {
                        await _client.SendAsync(batch);
                        batch = await _client.CreateBatchAsync();
                    }

                    if (!batch.TryAdd(eventData))
                    {
                        throw new InvalidOperationException($"Event too large: {businessEvent.EventId}");
                    }
                }
            }

            if (batch.Count > 0)
            {
                await _client.SendAsync(batch);
            }

            _logger.LogInformation("Published {EventCount} events", events.Count);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Failed to publish {EventCount} events", events.Count);
            throw;
        }
    }

    public async ValueTask DisposeAsync()
    {
        await _client.DisposeAsync();
    }
}

// Startup.cs / Program.cs registration
services.AddSingleton<IEventPublisher>(provider =>
{
    var configuration = provider.GetRequiredService<IConfiguration>();
    var logger = provider.GetRequiredService<ILogger<EventHubPublisher>>();

    var connectionString = configuration.GetConnectionString("CrmEventHub");
    var eventHubName = configuration["EventHub:CrmEventHubName"];

    return new EventHubPublisher(connectionString, eventHubName, logger);
});

Configuration

// appsettings.json
{
  "ConnectionStrings": {
    "CrmEventHub": "Endpoint=sb://crm-events-namespace.servicebus.windows.net/;SharedAccessKeyName=SendPolicy;SharedAccessKey=...",
    "MatchingEventHub": "Endpoint=sb://matching-events-namespace.servicebus.windows.net/;SharedAccessKeyName=SendPolicy;SharedAccessKey=...",
    "ProfileEventHub": "Endpoint=sb://profile-events-namespace.servicebus.windows.net/;SharedAccessKeyName=SendPolicy;SharedAccessKey=..."
  },
  "EventHub": {
    "CrmEventHubName": "contacts-events",
    "MatchingEventHubName": "matches-events",
    "ProfileEventHubName": "profiles-events"
  }
}

2.2 Event Schema Validation

// Services/EventValidator.cs
public interface IEventValidator
{
    ValidationResult Validate<T>(T businessEvent) where T : BusinessEvent;
}

public class EventValidator : IEventValidator
{
    private readonly ILogger<EventValidator> _logger;

    public EventValidator(ILogger<EventValidator> logger)
    {
        _logger = logger;
    }

    public ValidationResult Validate<T>(T businessEvent) where T : BusinessEvent
    {
        var errors = new List<string>();

        // Required field validation
        if (string.IsNullOrWhiteSpace(businessEvent.EventId))
            errors.Add("EventId is required");

        if (!Guid.TryParse(businessEvent.EventId, out _))
            errors.Add("EventId must be a valid GUID");

        if (string.IsNullOrWhiteSpace(businessEvent.AggregateId))
            errors.Add("AggregateId is required");

        if (businessEvent.Timestamp == default)
            errors.Add("Timestamp is required");

        if (businessEvent.Timestamp > DateTime.UtcNow.AddMinutes(5))
            errors.Add("Timestamp cannot be in the future");

        if (string.IsNullOrWhiteSpace(businessEvent.Metadata.Source))
            errors.Add("Metadata.Source is required");

        // Payload validation
        if (businessEvent.Payload == null)
            errors.Add("Payload is required");

        var result = errors.Count == 0 
            ? ValidationResult.Success() 
            : ValidationResult.Failure(errors);

        if (!result.IsValid)
        {
            _logger.LogWarning("Event validation failed for {EventId}: {Errors}",
                businessEvent.EventId, string.Join(", ", errors));
        }

        return result;
    }
}

public record ValidationResult
{
    public bool IsValid { get; init; }
    public IReadOnlyList<string> Errors { get; init; } = Array.Empty<string>();

    public static ValidationResult Success() => new() { IsValid = true };
    public static ValidationResult Failure(IEnumerable<string> errors) => 
        new() { IsValid = false, Errors = errors.ToList() };
}

2.3 Testing and Validation

#!/bin/bash
# Phase 2.3: Test event publishing and capture

# Create test events
cat << 'EOF' > test-event.json
{
  "eventId": "550e8400-e29b-41d4-a716-446655440000",
  "timestamp": "2024-01-15T14:30:00Z",
  "version": "1.0",
  "aggregateId": "contact-123",
  "aggregateType": "Contact",
  "eventType": "ContactCreated",
  "metadata": {
    "correlationId": "660e8400-e29b-41d4-a716-446655440000",
    "userId": "user-456",
    "source": "TestScript",
    "partitionKey": "contact-123"
  },
  "payload": {
    "contactId": "contact-123",
    "firstName": "John",
    "lastName": "Doe",
    "email": "john.doe@example.com",
    "company": "Test Corp",
    "createdAt": "2024-01-15T14:30:00Z"
  }
}
EOF

# Send test event using Azure CLI
az eventhubs eventhub send \
    --namespace-name crm-events-namespace \
    --name contacts-events \
    --resource-group rg-metrics-platform \
    --message-body "$(cat test-event.json)"

# Wait for capture (5 minutes max)
echo "⏳ Waiting for Event Hub capture..."
sleep 300

# Verify file was captured
az storage blob list \
    --container-name raw-events \
    --account-name datalakemetricsplatform \
    --prefix "crm-events-namespace/contacts-events" \
    --query "[].{Name:name, Size:properties.contentLength, LastModified:properties.lastModified}" \
    --output table

echo "✅ Event publishing and capture validated"

2.4 Phase 2 Validation Checklist

  • [ ] Event publishing library integrated into services
  • [ ] Test events successfully sent to Event Hubs
  • [ ] Events captured to Data Lake within 5 minutes
  • [ ] Schema validation working correctly
  • [ ] Error handling and logging implemented
  • [ ] Connection strings secured

Phase 3: Data Processing (Week 3)

3.1 Synapse Serverless Setup

-- Create Synapse SQL Database
CREATE DATABASE metrics_platform;
GO

USE metrics_platform;
GO

-- Create master key
CREATE MASTER KEY ENCRYPTION BY PASSWORD = 'ComplexPassword123!';
GO

-- Create database scoped credential for managed identity
CREATE DATABASE SCOPED CREDENTIAL DataLakeCredential
WITH IDENTITY = 'Managed Identity';
GO

-- Create external data sources
CREATE EXTERNAL DATA SOURCE RawEventsSource
WITH (
    LOCATION = 'https://datalakemetricsplatform.dfs.core.windows.net/raw-events/',
    CREDENTIAL = DataLakeCredential
);

CREATE EXTERNAL DATA SOURCE BronzeSource
WITH (
    LOCATION = 'https://datalakemetricsplatform.dfs.core.windows.net/bronze/',
    CREDENTIAL = DataLakeCredential
);

CREATE EXTERNAL DATA SOURCE SilverSource
WITH (
    LOCATION = 'https://datalakemetricsplatform.dfs.core.windows.net/silver/',
    CREDENTIAL = DataLakeCredential
);

CREATE EXTERNAL DATA SOURCE GoldSource
WITH (
    LOCATION = 'https://datalakemetricsplatform.dfs.core.windows.net/gold/',
    CREDENTIAL = DataLakeCredential
);
GO

-- Create file formats
CREATE EXTERNAL FILE FORMAT AvroFileFormat
WITH (
    FORMAT_TYPE = PARQUET -- Synapse auto-converts Avro to Parquet for querying
);

CREATE EXTERNAL FILE FORMAT ParquetFileFormat
WITH (
    FORMAT_TYPE = PARQUET,
    DATA_COMPRESSION = 'org.apache.hadoop.io.compress.SnappyCodec'
);
GO

3.2 Create External Tables and Views

-- Raw events external table
CREATE EXTERNAL TABLE raw.events (
    [Body] VARBINARY(MAX)
)
WITH (
    LOCATION = '**/*.avro',
    DATA_SOURCE = RawEventsSource,
    FILE_FORMAT = AvroFileFormat
);
GO

-- Bronze events view with JSON parsing
CREATE VIEW bronze.events AS
SELECT 
    filepath(1) as namespace_name,
    filepath(2) as eventhub_name,
    filepath(3) as year,
    filepath(4) as month,
    filepath(5) as day,
    filepath(6) as hour,
    JSON_VALUE(CAST([Body] AS NVARCHAR(MAX)), '$.eventId') AS event_id,
    CAST(JSON_VALUE(CAST([Body] AS NVARCHAR(MAX)), '$.timestamp') AS DATETIME2) AS timestamp,
    JSON_VALUE(CAST([Body] AS NVARCHAR(MAX)), '$.aggregateId') AS aggregate_id,
    JSON_VALUE(CAST([Body] AS NVARCHAR(MAX)), '$.aggregateType') AS aggregate_type,
    JSON_VALUE(CAST([Body] AS NVARCHAR(MAX)), '$.eventType') AS event_type,
    JSON_VALUE(CAST([Body] AS NVARCHAR(MAX)), '$.version') AS version,
    JSON_VALUE(CAST([Body] AS NVARCHAR(MAX)), '$.metadata.correlationId') AS correlation_id,
    JSON_VALUE(CAST([Body] AS NVARCHAR(MAX)), '$.metadata.userId') AS user_id,
    JSON_VALUE(CAST([Body] AS NVARCHAR(MAX)), '$.metadata.source') AS source,
    JSON_QUERY(CAST([Body] AS NVARCHAR(MAX)), '$.payload') AS payload,
    GETUTCDATE() as processed_at
FROM raw.events
WHERE TRY_CAST(CAST([Body] AS NVARCHAR(MAX)) AS JSON) IS NOT NULL;
GO

-- Test the view
SELECT TOP 10 * FROM bronze.events
WHERE CAST(timestamp AS DATE) = CAST(GETDATE() AS DATE)
ORDER BY timestamp DESC;
GO

3.3 Data Factory Setup

{
  "name": "MetricsPlatformProcessing",
  "properties": {
    "activities": [
      {
        "name": "ProcessRawToBronze",
        "type": "Copy",
        "inputs": [
          {
            "referenceName": "RawEventsDataset",
            "type": "DatasetReference",
            "parameters": {
              "ProcessingDate": "@formatDateTime(pipeline().TriggerTime, 'yyyy-MM-dd')"
            }
          }
        ],
        "outputs": [
          {
            "referenceName": "BronzeEventsDataset",
            "type": "DatasetReference"
          }
        ],
        "typeProperties": {
          "source": {
            "type": "AvroSource",
            "storeSettings": {
              "type": "AzureBlobFSReadSettings",
              "recursive": true,
              "wildcardFolderPath": "*/*/*/*/*",
              "wildcardFileName": "*.avro",
              "enablePartitionDiscovery": true
            }
          },
          "sink": {
            "type": "ParquetSink",
            "storeSettings": {
              "type": "AzureBlobFSWriteSettings"
            },
            "formatSettings": {
              "type": "ParquetWriteSettings"
            }
          },
          "enableStaging": false,
          "translator": {
            "type": "TabularTranslator",
            "mappings": [
              {
                "source": {
                  "path": "$.eventId"
                },
                "sink": {
                  "name": "event_id",
                  "type": "String"
                }
              },
              {
                "source": {
                  "path": "$.timestamp"
                },
                "sink": {
                  "name": "timestamp",
                  "type": "DateTime"
                }
              },
              {
                "source": {
                  "path": "$.aggregateId"
                },
                "sink": {
                  "name": "aggregate_id",
                  "type": "String"
                }
              },
              {
                "source": {
                  "path": "$.aggregateType"
                },
                "sink": {
                  "name": "aggregate_type",
                  "type": "String"
                }
              },
              {
                "source": {
                  "path": "$.eventType"
                },
                "sink": {
                  "name": "event_type",
                  "type": "String"
                }
              },
              {
                "source": {
                  "path": "$"
                },
                "sink": {
                  "name": "payload",
                  "type": "String"
                }
              }
            ]
          }
        }
      },
      {
        "name": "ProcessBronzeToSilver",
        "type": "SqlServerStoredProcedure",
        "dependsOn": [
          {
            "activity": "ProcessRawToBronze",
            "dependencyConditions": [
              "Succeeded"
            ]
          }
        ],
        "typeProperties": {
          "storedProcedureName": "[dbo].[ProcessBronzeToSilver]",
          "storedProcedureParameters": {
            "ProcessingDate": {
              "value": "@formatDateTime(pipeline().TriggerTime, 'yyyy-MM-dd')",
              "type": "String"
            }
          }
        }
      },
      {
        "name": "ProcessSilverToGold",
        "type": "SqlServerStoredProcedure",
        "dependsOn": [
          {
            "activity": "ProcessBronzeToSilver",
            "dependencyConditions": [
              "Succeeded"
            ]
          }
        ],
        "typeProperties": {
          "storedProcedureName": "[dbo].[ProcessSilverToGold]",
          "storedProcedureParameters": {
            "ProcessingDate": {
              "value": "@formatDateTime(pipeline().TriggerTime, 'yyyy-MM-dd')",
              "type": "String"
            }
          }
        }
      }
    ],
    "triggers": [
      {
        "name": "HourlyProcessingTrigger",
        "type": "ScheduleTrigger",
        "typeProperties": {
          "recurrence": {
            "frequency": "Hour",
            "interval": 1,
            "startTime": "2024-01-01T00:00:00Z"
          }
        }
      }
    ]
  }
}

3.4 Processing Stored Procedures

-- Bronze to Silver processing
CREATE PROCEDURE [dbo].[ProcessBronzeToSilver]
    @ProcessingDate DATE
AS
BEGIN
    SET NOCOUNT ON;

    DECLARE @ProcessedRecords INT = 0;

    -- Process Contact events
    INSERT INTO OPENROWSET(
        BULK 'https://datalakemetricsplatform.dfs.core.windows.net/silver/contacts/',
        DATA_SOURCE = 'SilverSource',
        FORMAT = 'PARQUET'
    )
    SELECT 
        event_id,
        timestamp,
        aggregate_id AS contact_id,
        JSON_VALUE(payload, '$.contactId') AS contact_id_payload,
        JSON_VALUE(payload, '$.firstName') AS first_name,
        JSON_VALUE(payload, '$.lastName') AS last_name,
        JSON_VALUE(payload, '$.email') AS email,
        JSON_VALUE(payload, '$.company') AS company,
        CASE 
            WHEN JSON_VALUE(payload, '$.email') LIKE '%@gmail.com' THEN 'Gmail'
            WHEN JSON_VALUE(payload, '$.email') LIKE '%@outlook.com' THEN 'Outlook'
            ELSE 'Other'
        END AS email_provider,
        CASE 
            WHEN JSON_VALUE(payload, '$.company') IS NOT NULL THEN 'Business'
            ELSE 'Personal'
        END AS contact_type,
        correlation_id,
        user_id,
        source,
        CAST(timestamp AS DATE) as date,
        YEAR(timestamp) as year,
        MONTH(timestamp) as month,
        DAY(timestamp) as day,
        GETUTCDATE() AS processed_at
    FROM bronze.events
    WHERE aggregate_type = 'Contact'
      AND event_type = 'ContactCreated'
      AND CAST(timestamp AS DATE) = @ProcessingDate;

    SET @ProcessedRecords = @@ROWCOUNT;

    -- Log processing results
    INSERT INTO ProcessingLog (
        ProcessingDate,
        Layer,
        EventType,
        RecordsProcessed,
        ProcessedAt
    )
    VALUES (
        @ProcessingDate,
        'Silver',
        'ContactCreated',
        @ProcessedRecords,
        GETUTCDATE()
    );

    PRINT 'Processed ' + CAST(@ProcessedRecords AS VARCHAR(10)) + ' Contact events to Silver layer';
END;
GO

-- Silver to Gold aggregation
CREATE PROCEDURE [dbo].[ProcessSilverToGold]
    @ProcessingDate DATE
AS
BEGIN
    SET NOCOUNT ON;

    -- Daily contact metrics
    INSERT INTO OPENROWSET(
        BULK 'https://datalakemetricsplatform.dfs.core.windows.net/gold/daily_metrics/',
        DATA_SOURCE = 'GoldSource',
        FORMAT = 'PARQUET'
    )
    SELECT 
        'contacts_created' AS metric_name,
        COUNT(*) AS metric_value,
        'count' AS metric_type,
        JSON_OBJECT(
            'domain': 'CRM',
            'entity': 'Contact',
            'period': 'daily'
        ) AS dimensions,
        MAX(timestamp) AS timestamp,
        @ProcessingDate AS date,
        'Contact' AS aggregate_type,
        YEAR(@ProcessingDate) AS year,
        MONTH(@ProcessingDate) AS month,
        DAY(@ProcessingDate) AS day
    FROM silver.contacts
    WHERE date = @ProcessingDate
    GROUP BY date;

    PRINT 'Generated daily metrics for ' + CAST(@ProcessingDate AS VARCHAR(10));
END;
GO

3.5 Phase 3 Testing

#!/bin/bash
# Phase 3.5: Test data processing pipeline

echo "🚀 Testing data processing pipeline..."

# Trigger Data Factory pipeline
az datafactory pipeline create-run \
    --factory-name "DataFactoryMetricsPlatform" \
    --resource-group rg-metrics-platform \
    --name "MetricsPlatformProcessing"

echo "⏳ Waiting for pipeline completion..."
sleep 600  # Wait 10 minutes

# Check pipeline status
PIPELINE_RUN_ID=$(az datafactory pipeline-run query-by-factory \
    --factory-name "DataFactoryMetricsPlatform" \
    --resource-group rg-metrics-platform \
    --last-updated-after $(date -u -d '1 hour ago' +%Y-%m-%dT%H:%M:%SZ) \
    --query "value[0].runId" -o tsv)

az datafactory pipeline-run show \
    --factory-name "DataFactoryMetricsPlatform" \
    --resource-group rg-metrics-platform \
    --run-id $PIPELINE_RUN_ID \
    --query "{Status:status, StartTime:runStart, EndTime:runEnd}" \
    --output table

# Verify data in each layer
echo "📊 Verifying data in Bronze layer..."
az synapse sql query \
    --workspace-name "synapse-metrics-platform" \
    --sql-database "metrics_platform" \
    --query-text "SELECT COUNT(*) as bronze_count FROM bronze.events WHERE CAST(timestamp AS DATE) = CAST(GETDATE() AS DATE)"

echo "📊 Verifying data in Silver layer..."
az synapse sql query \
    --workspace-name "synapse-metrics-platform" \
    --sql-database "metrics_platform" \
    --query-text "SELECT COUNT(*) as silver_count FROM silver.contacts WHERE date = CAST(GETDATE() AS DATE)"

echo "📊 Verifying data in Gold layer..."
az synapse sql query \
    --workspace-name "synapse-metrics-platform" \
    --sql-database "metrics_platform" \
    --query-text "SELECT * FROM gold.daily_metrics WHERE date = CAST(GETDATE() AS DATE)"

echo "✅ Data processing pipeline tested successfully"

3.6 Phase 3 Validation Checklist

  • [ ] Synapse Serverless workspace created and configured
  • [ ] External data sources and file formats defined
  • [ ] External tables and views created for each layer
  • [ ] Data Factory pipeline deployed and tested
  • [ ] Stored procedures for data transformation created
  • [ ] Pipeline successfully processes data from raw to gold
  • [ ] Data quality validation implemented
  • [ ] Error handling and logging configured

Phase 4: Analytics Integration (Week 4)

4.1 Power BI Configuration

Create Power BI Workspace

  1. Navigate to Power BI Service (powerbi.microsoft.com)
  2. Create new workspace: "Metrics Platform Analytics"
  3. Add team members with appropriate permissions
  4. Configure workspace settings for shared datasets

Business Events Dataset

// Business Events Query
let
    Source = Sql.Database(
        "synapse-metrics-platform-ondemand.sql.azuresynapse.net",
        "metrics_platform"
    ),

    BusinessMetricsQuery = "
        SELECT 
            metric_name,
            metric_value,
            metric_type,
            JSON_VALUE(dimensions, '$.domain') AS domain,
            JSON_VALUE(dimensions, '$.entity') AS entity,
            JSON_VALUE(dimensions, '$.period') AS period,
            timestamp,
            date,
            aggregate_type,
            year,
            month,
            day
        FROM gold.daily_metrics
        WHERE date >= DATEADD(month, -13, GETDATE())
    ",

    BusinessMetrics = Sql.Database(
        "synapse-metrics-platform-ondemand.sql.azuresynapse.net",
        "metrics_platform",
        [Query = BusinessMetricsQuery]
    ),

    // Data type corrections
    TypedData = Table.TransformColumnTypes(
        BusinessMetrics,
        {
            {"metric_value", type number},
            {"timestamp", type datetimezone},
            {"date", type date},
            {"year", Int64.Type},
            {"month", Int64.Type},
            {"day", Int64.Type}
        }
    )
in
    TypedData

Date Dimension Table

// Date Dimension Query
let
    StartDate = #date(2023, 1, 1),
    EndDate = #date(2025, 12, 31),

    DateList = List.Dates(StartDate, Duration.Days(EndDate - StartDate) + 1, #duration(1, 0, 0, 0)),

    DateTable = Table.FromList(DateList, Splitter.SplitByNothing(), {"Date"}),

    AddedColumns = Table.AddColumn(
        DateTable,
        "Year",
        each Date.Year([Date]),
        Int64.Type
    ),

    AddedMonth = Table.AddColumn(
        AddedColumns,
        "Month",
        each Date.Month([Date]),
        Int64.Type
    ),

    AddedDay = Table.AddColumn(
        AddedMonth,
        "Day",
        each Date.Day([Date]),
        Int64.Type
    ),

    AddedMonthName = Table.AddColumn(
        AddedDay,
        "MonthName",
        each Date.MonthName([Date]),
        type text
    ),

    AddedDayOfWeek = Table.AddColumn(
        AddedMonthName,
        "DayOfWeek",
        each Date.DayOfWeekName([Date]),
        type text
    ),

    AddedIsWeekend = Table.AddColumn(
        AddedDayOfWeek,
        "IsWeekend",
        each Date.DayOfWeek([Date]) = 6 or Date.DayOfWeek([Date]) = 0,
        type logical
    )
in
    AddedIsWeekend

4.2 External API Integration

Matomo Integration (Using POST method)

// Matomo API Function (Secure POST method)
let
    FetchMatomoData = (method as text, optional parameters as record) =>
    let
        ApiUrl = "https://syvntyve.matomo.cloud/index.php",

        DefaultParams = [
            module = "API",
            idSite = "1",
            period = "day",
            date = "last30",
            format = "JSON",
            token_auth = Parameter_MatomoToken
        ],

        MergedParams = Record.Combine({DefaultParams, parameters ?? []}),
        UpdatedParams = Record.AddField(MergedParams, "method", method),

        // Convert parameters to form data string
        ParamPairs = List.Transform(
            Record.FieldNames(UpdatedParams),
            each _ & "=" & Uri.EscapeDataString(Text.From(Record.Field(UpdatedParams, _)))
        ),

        RequestBody = Text.ToBinary(Text.Combine(ParamPairs, "&")),

        Options = [
            Headers = [
                #"Content-Type" = "application/x-www-form-urlencoded"
            ],
            Content = RequestBody
        ],

        Source = Json.Document(Web.Contents(ApiUrl, Options))
    in
        Source,

    // Fetch visits summary
    VisitsSummary = FetchMatomoData("VisitsSummary.get", [date = "last90"]),

    // Transform to table
    VisitsTable = Table.FromRecords(
        List.Transform(
            Record.FieldNames(VisitsSummary),
            each [
                Date = Date.FromText(_),
                Visits = Record.Field(VisitsSummary, _)[nb_visits]?,
                UniqueVisitors = Record.Field(VisitsSummary, _)[nb_uniq_visitors]?,
                PageViews = Record.Field(VisitsSummary, _)[nb_pageviews]?,
                BounceRate = Record.Field(VisitsSummary, _)[bounce_rate]?,
                AvgTimeOnSite = Record.Field(VisitsSummary, _)[avg_time_on_site]?
            ]
        )
    ),

    FinalResult = Table.TransformColumnTypes(
        VisitsTable,
        {
            {"Date", type date},
            {"Visits", Int64.Type},
            {"UniqueVisitors", Int64.Type},
            {"PageViews", Int64.Type},
            {"BounceRate", type number},
            {"AvgTimeOnSite", Int64.Type}
        }
    )
in
    FinalResult

Sentry Integration

// Sentry API Integration
let
    SentryUrl = "https://sentry.io/api/0/projects/your-org/your-project/events/",

    Options = [
        Headers = [
            Authorization = "Bearer " & Parameter_SentryToken,
            #"Content-Type" = "application/json"
        ],
        RelativePath = "?statsPeriod=30d"
    ],

    Source = Json.Document(Web.Contents(SentryUrl, Options)),
    EventsList = Source[data],

    EventsTable = Table.FromList(EventsList, Splitter.SplitByNothing()),

    ExpandedEvents = Table.ExpandRecordColumn(
        EventsTable,
        "Column1",
        {"id", "title", "culprit", "dateCreated", "userCount", "level"},
        {"EventId", "Title", "Culprit", "DateCreated", "UserCount", "Level"}
    ),

    TypedEvents = Table.TransformColumnTypes(
        ExpandedEvents,
        {
            {"DateCreated", type datetimezone},
            {"UserCount", Int64.Type}
        }
    )
in
    TypedEvents

4.3 Create Power BI Measures

// Key Performance Indicators
TotalEvents = SUM(BusinessMetrics[metric_value])

EventsThisMonth = 
CALCULATE(
    [TotalEvents],
    DATESINPERIOD(DimDate[Date], TODAY(), -1, MONTH)
)

EventGrowthRate = 
VAR LastMonth = 
    CALCULATE(
        [TotalEvents],
        DATESINPERIOD(DimDate[Date], EOMONTH(TODAY(), -1), -1, MONTH)
    )
VAR ThisMonth = [EventsThisMonth]
RETURN
    DIVIDE(ThisMonth - LastMonth, LastMonth, 0)

// Domain-specific metrics
CRMEvents = 
CALCULATE(
    [TotalEvents],
    BusinessMetrics[domain] = "CRM"
)

MatchingEvents = 
CALCULATE(
    [TotalEvents],
    BusinessMetrics[domain] = "Matching"
)

ConversionRate = 
VAR ContactsCreated = 
    CALCULATE(
        [TotalEvents],
        BusinessMetrics[metric_name] = "contacts_created"
    )
VAR DeclarationsCompleted = 
    CALCULATE(
        [TotalEvents],
        BusinessMetrics[metric_name] = "declarations_completed"
    )
RETURN
    DIVIDE(DeclarationsCompleted, ContactsCreated, 0)

// Website analytics
TotalPageViews = SUM(MatomoData[PageViews])
UniqueVisitors = SUM(MatomoData[UniqueVisitors])
AverageBounceRate = AVERAGE(MatomoData[BounceRate])

// Error tracking
TotalErrors = SUM(SentryData[UserCount])
ErrorRate = 
DIVIDE([TotalErrors], [TotalEvents], 0) * 100

4.4 Build Executive Dashboard

Create dashboard with following visuals:

  1. KPI Cards
  2. Total Events This Month
  3. Event Growth Rate
  4. Conversion Rate
  5. Error Rate

  6. Line Chart: Event Trends

  7. X-axis: Date
  8. Y-axis: Total Events
  9. Legend: Domain

  10. Donut Chart: Domain Breakdown

  11. Values: Total Events
  12. Legend: Domain

  13. Table: Top Performing Metrics

  14. Columns: Metric Name, Value, Change %

  15. Bar Chart: Error Summary

  16. X-axis: Error Count
  17. Y-axis: Error Type

4.5 Configure Refresh Schedule

  1. Dataset Refresh Settings:
  2. Frequency: 4 times daily (6 AM, 12 PM, 6 PM, 12 AM)
  3. Time zone: UTC
  4. Notify on failure: Yes

  5. Incremental Refresh:

  6. Archive data: 2 years
  7. Refresh data: 30 days
  8. Detect data changes: Yes

4.6 Phase 4 Testing

#!/bin/bash
# Phase 4.6: Test Power BI integration

echo "🎯 Testing Power BI integration..."

# Test Synapse connection
echo "Testing Synapse connection..."
pwsh -Command "
    \$connectionString = 'Server=synapse-metrics-platform-ondemand.sql.azuresynapse.net;Database=metrics_platform;Authentication=Active Directory Default'
    try {
        \$connection = New-Object System.Data.SqlClient.SqlConnection(\$connectionString)
        \$connection.Open()
        Write-Host '✅ Synapse connection successful'
        \$connection.Close()
    }
    catch {
        Write-Host '❌ Synapse connection failed: ' \$_.Exception.Message
    }
"

# Test Matomo API (using POST method)
echo "Testing Matomo API..."
curl -X POST "https://syvntyve.matomo.cloud/index.php" \
  -H "Content-Type: application/x-www-form-urlencoded" \
  -d "module=API&method=VisitsSummary.get&idSite=1&period=day&date=yesterday&format=JSON&token_auth=$MATOMO_TOKEN" \
  -s | jq '.[] | keys' > /dev/null && echo "✅ Matomo API accessible" || echo "❌ Matomo API failed"

# Test Sentry API
echo "Testing Sentry API..."
curl -H "Authorization: Bearer $SENTRY_TOKEN" \
  "https://sentry.io/api/0/projects/your-org/your-project/events/?statsPeriod=1d" \
  -s | jq '.data | length' > /dev/null && echo "✅ Sentry API accessible" || echo "❌ Sentry API failed"

echo "✅ Power BI integration tested successfully"

4.7 Phase 4 Validation Checklist

  • [ ] Power BI workspace created and configured
  • [ ] Business events dataset connected to Synapse
  • [ ] Matomo dataset configured with secure POST method
  • [ ] Sentry dataset integrated
  • [ ] Date dimension table created
  • [ ] Key measures and calculations defined
  • [ ] Executive dashboard created with all required visuals
  • [ ] Refresh schedule configured and tested
  • [ ] Row-level security implemented (if needed)
  • [ ] Performance optimization applied

Go-Live and Monitoring

Pre-Go-Live Checklist

Infrastructure

  • [ ] All Azure resources deployed and configured
  • [ ] Network connectivity verified
  • [ ] Security roles and permissions applied
  • [ ] Monitoring and alerting configured
  • [ ] Backup and disaster recovery planned

Data Flow

  • [ ] Event publishing tested from all services
  • [ ] Data processing pipeline validated
  • [ ] Data quality checks implemented
  • [ ] Error handling and retry logic tested
  • [ ] Performance benchmarks established

Analytics

  • [ ] Power BI dashboards created and tested
  • [ ] External API integrations validated
  • [ ] User access controls configured
  • [ ] Training materials prepared
  • [ ] Support processes established

Go-Live Steps

  1. Enable Production Traffic

    # Enable event publishing in production services
    kubectl patch deployment crm-service -p '{"spec":{"template":{"metadata":{"annotations":{"metrics.platform/enabled":"true"}}}}}'
    kubectl patch deployment matching-service -p '{"spec":{"template":{"metadata":{"annotations":{"metrics.platform/enabled":"true"}}}}}'
    

  2. Monitor Initial Data Flow

    # Monitor event ingestion
    az monitor metrics list --resource /subscriptions/.../Microsoft.EventHub/namespaces/crm-events-namespace --metric IncomingMessages --interval PT5M
    

  3. Verify Data Processing

    -- Check data freshness
    SELECT 
        MAX(timestamp) as latest_event,
        COUNT(*) as event_count
    FROM gold.daily_metrics
    WHERE date = CAST(GETDATE() AS DATE);
    

  4. Validate Power BI Reports

  5. Refresh all datasets
  6. Verify data appears correctly
  7. Test filters and slicers
  8. Confirm calculated measures

Post Go-Live Monitoring

Week 1: Intensive Monitoring

  • Monitor data ingestion every hour
  • Check processing pipeline success rates
  • Validate data quality metrics
  • Monitor costs daily
  • Address any issues immediately

Week 2-4: Regular Monitoring

  • Daily data quality checks
  • Weekly cost reviews
  • Performance monitoring
  • User feedback collection
  • Process refinements

Month 2+: Steady State

  • Weekly monitoring reports
  • Monthly cost optimization reviews
  • Quarterly architecture reviews
  • Continuous improvement initiatives

Success Criteria

Metric Target Current Status
Availability >99.5% TBD 🔄
Data Freshness <2 hours TBD 🔄
Processing Success >99% TBD 🔄
Monthly Cost <$400 TBD 🔄
User Satisfaction >4.0/5.0 TBD 🔄

Troubleshooting Guide

Common Issues and Solutions

Issue Symptoms Investigation Solution
Events Not Captured No files in raw-events container Check Event Hub metrics, capture settings Verify capture configuration, check permissions
Processing Failures Pipeline failures in ADF Check ADF monitoring, Synapse logs Review error messages, check data quality
Power BI Refresh Errors Dataset refresh failures Check connection strings, permissions Validate credentials, test connections
High Costs Budget alerts Review cost analysis dashboard Optimize resource utilization, review pricing tier
Slow Query Performance Long dashboard load times Check Synapse query performance Optimize queries, review partitioning strategy

Support Contacts

Component Primary Contact Secondary Contact Escalation
Infrastructure Platform Team DevOps Team Architecture Team
Data Pipeline Data Engineering Platform Team CTO
Power BI Analytics Team Business Intelligence Data Engineering
Business Events Domain Teams Platform Team Architecture Team

Next Steps

With the metrics platform successfully implemented, consider these future enhancements:

  1. Advanced Analytics
  2. Machine learning models for predictions
  3. Anomaly detection for events
  4. Real-time alerting on critical metrics

  5. Platform Extensions

  6. Additional data sources integration
  7. Self-service analytics capabilities
  8. Advanced data governance features

  9. Performance Optimization

  10. Query performance tuning
  11. Cost optimization initiatives
  12. Scaling for increased load

  13. Process Improvements

  14. Automated testing pipelines
  15. Infrastructure as code enhancements
  16. Continuous deployment automation

The metrics platform provides a solid foundation for data-driven decision making across the organization. Regular reviews and continuous improvements will ensure it continues to deliver value as the business grows and evolves.