Implementation Guide¶
Overview¶
This comprehensive implementation guide provides step-by-step instructions for deploying the event-driven metrics platform. The guide is structured in phases to minimize risk and enable gradual rollout while validating each component before proceeding.
Prerequisites
- Azure subscription with Owner or Contributor access
- Power BI Premium or Pro licenses
- Matomo Cloud instance access
- Sentry project access
- Azure CLI installed and configured
- Terraform installed (optional but recommended)
Implementation Roadmap¶
Phase Overview¶
gantt
title Implementation Timeline (8 weeks)
dateFormat YYYY-MM-DD
axisFormat %m/%d
section Phase 1: Foundation
Azure Resources :active, p1-azure, 2024-01-01, 2024-01-07
Event Hub Setup :active, p1-eh, 2024-01-01, 2024-01-07
Storage Config :p1-storage, 2024-01-03, 2024-01-07
section Phase 2: Ingestion
Event Publishing :p2-pub, 2024-01-08, 2024-01-14
Capture Validation :p2-cap, 2024-01-10, 2024-01-14
Schema Validation :p2-schema, 2024-01-12, 2024-01-14
section Phase 3: Processing
Synapse Setup :p3-syn, 2024-01-15, 2024-01-21
Data Factory :p3-adf, 2024-01-17, 2024-01-21
Processing Logic :p3-proc, 2024-01-19, 2024-01-21
section Phase 4: Analytics
Power BI Config :p4-pbi, 2024-01-22, 2024-01-28
Dashboard Creation :p4-dash, 2024-01-24, 2024-01-28
External APIs :p4-api, 2024-01-26, 2024-01-28
Phase 1: Foundation Setup (Week 1)¶
1.1 Azure Resource Group and Networking¶
#!/bin/bash
# Phase 1.1: Create foundation resources
# Set variables
SUBSCRIPTION_ID="your-subscription-id"
RESOURCE_GROUP="rg-metrics-platform"
LOCATION="westeurope"
ENVIRONMENT="prod"
# Login and set subscription
az login
az account set --subscription $SUBSCRIPTION_ID
# Create resource group
az group create \
--name $RESOURCE_GROUP \
--location $LOCATION \
--tags Environment=$ENVIRONMENT Purpose=MetricsPlatform
# Create storage account for Data Lake
STORAGE_ACCOUNT="datalakemetricsplatform"
az storage account create \
--name $STORAGE_ACCOUNT \
--resource-group $RESOURCE_GROUP \
--location $LOCATION \
--sku Standard_LRS \
--kind StorageV2 \
--enable-hierarchical-namespace true \
--tags Environment=$ENVIRONMENT Purpose=DataLake
# Create containers
STORAGE_KEY=$(az storage account keys list --account-name $STORAGE_ACCOUNT --resource-group $RESOURCE_GROUP --query "[0].value" -o tsv)
for container in raw-events bronze silver gold; do
az storage container create \
--name $container \
--account-name $STORAGE_ACCOUNT \
--account-key $STORAGE_KEY \
--public-access off
done
echo "✅ Foundation resources created successfully"
1.2 Event Hub Namespaces¶
#!/bin/bash
# Phase 1.2: Create Event Hub namespaces and hubs
# Define domains and their event hubs
declare -A DOMAINS=(
["crm"]="contacts-events opportunities-events accounts-events"
["matching"]="matches-events iterations-events declarations-events"
["profile"]="profiles-events skills-events availability-events"
["assessment"]="assessments-events results-events feedback-events"
)
for domain in "${!DOMAINS[@]}"; do
NAMESPACE="${domain}-events-namespace"
# Create namespace
az eventhubs namespace create \
--name $NAMESPACE \
--resource-group $RESOURCE_GROUP \
--location $LOCATION \
--sku Standard \
--capacity 1 \
--enable-auto-inflate true \
--maximum-throughput-units 5 \
--tags Domain=$domain Environment=$ENVIRONMENT
# Create event hubs
for hub in ${DOMAINS[$domain]}; do
az eventhubs eventhub create \
--name $hub \
--namespace-name $NAMESPACE \
--resource-group $RESOURCE_GROUP \
--partition-count 4 \
--message-retention 7
# Enable capture
az eventhubs eventhub update \
--name $hub \
--namespace-name $NAMESPACE \
--resource-group $RESOURCE_GROUP \
--enable-capture true \
--capture-interval 300 \
--capture-size-limit 104857600 \
--destination-name EventHubArchive.AzureDataLake \
--storage-account $STORAGE_ACCOUNT \
--blob-container raw-events \
--archive-name-format "{Namespace}/{EventHub}/{Year}/{Month}/{Day}/{Hour}/{PartitionId}_{Offset}_{SequenceNumber}.avro"
done
echo "✅ Domain '$domain' Event Hubs created"
done
1.3 Terraform Configuration (Alternative Approach)¶
# main.tf
terraform {
required_providers {
azurerm = {
source = "hashicorp/azurerm"
version = "~>3.0"
}
}
backend "azurerm" {
resource_group_name = "rg-terraform-state"
storage_account_name = "tfstate_storage_account"
container_name = "tfstate"
key = "metrics-platform.tfstate"
}
}
provider "azurerm" {
features {}
}
# Resource Group
resource "azurerm_resource_group" "main" {
name = "rg-metrics-platform"
location = "West Europe"
tags = {
Environment = "Production"
Purpose = "MetricsPlatform"
}
}
# Storage Account
resource "azurerm_storage_account" "data_lake" {
name = "datalakemetricsplatform"
resource_group_name = azurerm_resource_group.main.name
location = azurerm_resource_group.main.location
account_tier = "Standard"
account_replication_type = "LRS"
account_kind = "StorageV2"
is_hns_enabled = true
blob_properties {
versioning_enabled = true
change_feed_enabled = true
last_access_time_enabled = true
delete_retention_policy {
days = 30
}
}
tags = azurerm_resource_group.main.tags
}
# Event Hub Namespace
resource "azurerm_eventhub_namespace" "domains" {
for_each = {
"crm" = ["contacts-events", "opportunities-events", "accounts-events"]
"matching" = ["matches-events", "iterations-events", "declarations-events"]
"profile" = ["profiles-events", "skills-events", "availability-events"]
"assessment" = ["assessments-events", "results-events", "feedback-events"]
}
name = "${each.key}-events-namespace"
location = azurerm_resource_group.main.location
resource_group_name = azurerm_resource_group.main.name
sku = "Standard"
capacity = 1
auto_inflate_enabled = true
maximum_throughput_units = 5
tags = merge(azurerm_resource_group.main.tags, {
Domain = each.key
})
}
# Event Hubs
resource "azurerm_eventhub" "hubs" {
for_each = {
for pair in flatten([
for domain, hubs in {
"crm" = ["contacts-events", "opportunities-events", "accounts-events"]
"matching" = ["matches-events", "iterations-events", "declarations-events"]
"profile" = ["profiles-events", "skills-events", "availability-events"]
"assessment" = ["assessments-events", "results-events", "feedback-events"]
} : [
for hub in hubs : {
domain = domain
hub = hub
key = "${domain}-${hub}"
}
]
]) : pair.key => pair
}
name = each.value.hub
namespace_name = azurerm_eventhub_namespace.domains[each.value.domain].name
resource_group_name = azurerm_resource_group.main.name
partition_count = 4
message_retention = 7
capture_description {
enabled = true
encoding = "Avro"
interval_in_seconds = 300
size_limit_in_bytes = 104857600
destination {
name = "EventHubArchive.AzureDataLake"
archive_name_format = "{Namespace}/{EventHub}/{Year}/{Month}/{Day}/{Hour}/{PartitionId}_{Offset}_{SequenceNumber}.avro"
blob_container_name = "raw-events"
storage_account_name = azurerm_storage_account.data_lake.name
}
}
}
# Deploy with:
# terraform init
# terraform plan
# terraform apply
1.4 Validation Checklist¶
- [ ] Resource group created with correct tags
- [ ] Storage account has hierarchical namespace enabled
- [ ] All containers (raw-events, bronze, silver, gold) exist
- [ ] Event Hub namespaces created for each domain
- [ ] Event Hubs have capture enabled
- [ ] Capture points to correct storage account and container
Phase 2: Event Ingestion (Week 2)¶
2.1 Event Publishing Integration¶
.NET Service Integration¶
// Install packages:
// dotnet add package Azure.Messaging.EventHubs
// dotnet add package Microsoft.Extensions.Options
// Models/BusinessEvent.cs
public abstract record BusinessEvent
{
public string EventId { get; init; } = Guid.NewGuid().ToString();
public DateTime Timestamp { get; init; } = DateTime.UtcNow;
public string Version { get; init; } = "1.0";
public required string AggregateId { get; init; }
public abstract string AggregateType { get; }
public abstract string EventType { get; }
public EventMetadata Metadata { get; init; } = new();
public abstract object Payload { get; }
}
public record EventMetadata
{
public string CorrelationId { get; init; } = Guid.NewGuid().ToString();
public string? CausationId { get; init; }
public string? UserId { get; init; }
public required string Source { get; init; }
public string PartitionKey => AggregateId;
public Dictionary<string, string> Tags { get; init; } = new();
}
// Events/ContactCreatedEvent.cs
public record ContactCreatedEvent : BusinessEvent
{
public override string AggregateType => "Contact";
public override string EventType => "ContactCreated";
public override ContactCreatedPayload Payload { get; init; } = null!;
}
public record ContactCreatedPayload
{
public required string ContactId { get; init; }
public required string FirstName { get; init; }
public required string LastName { get; init; }
public required string Email { get; init; }
public string? Company { get; init; }
public DateTime CreatedAt { get; init; }
}
// Services/EventPublisher.cs
public interface IEventPublisher
{
Task PublishAsync<T>(T businessEvent) where T : BusinessEvent;
Task PublishBatchAsync<T>(IEnumerable<T> businessEvents) where T : BusinessEvent;
}
public class EventHubPublisher : IEventPublisher
{
private readonly EventHubProducerClient _client;
private readonly ILogger<EventHubPublisher> _logger;
public EventHubPublisher(string connectionString, string eventHubName, ILogger<EventHubPublisher> logger)
{
_client = new EventHubProducerClient(connectionString, eventHubName);
_logger = logger;
}
public async Task PublishAsync<T>(T businessEvent) where T : BusinessEvent
{
await PublishBatchAsync(new[] { businessEvent });
}
public async Task PublishBatchAsync<T>(IEnumerable<T> businessEvents) where T : BusinessEvent
{
var events = businessEvents.ToList();
if (!events.Any()) return;
try
{
var batch = await _client.CreateBatchAsync();
foreach (var businessEvent in events)
{
var json = JsonSerializer.Serialize(businessEvent, new JsonSerializerOptions
{
PropertyNamingPolicy = JsonNamingPolicy.CamelCase
});
var eventData = new EventData(Encoding.UTF8.GetBytes(json))
{
PartitionKey = businessEvent.Metadata.PartitionKey
};
// Add metadata properties
eventData.Properties["EventType"] = businessEvent.EventType;
eventData.Properties["AggregateType"] = businessEvent.AggregateType;
eventData.Properties["Version"] = businessEvent.Version;
eventData.Properties["CorrelationId"] = businessEvent.Metadata.CorrelationId;
if (!batch.TryAdd(eventData))
{
// Send current batch and create new one
if (batch.Count > 0)
{
await _client.SendAsync(batch);
batch = await _client.CreateBatchAsync();
}
if (!batch.TryAdd(eventData))
{
throw new InvalidOperationException($"Event too large: {businessEvent.EventId}");
}
}
}
if (batch.Count > 0)
{
await _client.SendAsync(batch);
}
_logger.LogInformation("Published {EventCount} events", events.Count);
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to publish {EventCount} events", events.Count);
throw;
}
}
public async ValueTask DisposeAsync()
{
await _client.DisposeAsync();
}
}
// Startup.cs / Program.cs registration
services.AddSingleton<IEventPublisher>(provider =>
{
var configuration = provider.GetRequiredService<IConfiguration>();
var logger = provider.GetRequiredService<ILogger<EventHubPublisher>>();
var connectionString = configuration.GetConnectionString("CrmEventHub");
var eventHubName = configuration["EventHub:CrmEventHubName"];
return new EventHubPublisher(connectionString, eventHubName, logger);
});
Configuration¶
// appsettings.json
{
"ConnectionStrings": {
"CrmEventHub": "Endpoint=sb://crm-events-namespace.servicebus.windows.net/;SharedAccessKeyName=SendPolicy;SharedAccessKey=...",
"MatchingEventHub": "Endpoint=sb://matching-events-namespace.servicebus.windows.net/;SharedAccessKeyName=SendPolicy;SharedAccessKey=...",
"ProfileEventHub": "Endpoint=sb://profile-events-namespace.servicebus.windows.net/;SharedAccessKeyName=SendPolicy;SharedAccessKey=..."
},
"EventHub": {
"CrmEventHubName": "contacts-events",
"MatchingEventHubName": "matches-events",
"ProfileEventHubName": "profiles-events"
}
}
2.2 Event Schema Validation¶
// Services/EventValidator.cs
public interface IEventValidator
{
ValidationResult Validate<T>(T businessEvent) where T : BusinessEvent;
}
public class EventValidator : IEventValidator
{
private readonly ILogger<EventValidator> _logger;
public EventValidator(ILogger<EventValidator> logger)
{
_logger = logger;
}
public ValidationResult Validate<T>(T businessEvent) where T : BusinessEvent
{
var errors = new List<string>();
// Required field validation
if (string.IsNullOrWhiteSpace(businessEvent.EventId))
errors.Add("EventId is required");
if (!Guid.TryParse(businessEvent.EventId, out _))
errors.Add("EventId must be a valid GUID");
if (string.IsNullOrWhiteSpace(businessEvent.AggregateId))
errors.Add("AggregateId is required");
if (businessEvent.Timestamp == default)
errors.Add("Timestamp is required");
if (businessEvent.Timestamp > DateTime.UtcNow.AddMinutes(5))
errors.Add("Timestamp cannot be in the future");
if (string.IsNullOrWhiteSpace(businessEvent.Metadata.Source))
errors.Add("Metadata.Source is required");
// Payload validation
if (businessEvent.Payload == null)
errors.Add("Payload is required");
var result = errors.Count == 0
? ValidationResult.Success()
: ValidationResult.Failure(errors);
if (!result.IsValid)
{
_logger.LogWarning("Event validation failed for {EventId}: {Errors}",
businessEvent.EventId, string.Join(", ", errors));
}
return result;
}
}
public record ValidationResult
{
public bool IsValid { get; init; }
public IReadOnlyList<string> Errors { get; init; } = Array.Empty<string>();
public static ValidationResult Success() => new() { IsValid = true };
public static ValidationResult Failure(IEnumerable<string> errors) =>
new() { IsValid = false, Errors = errors.ToList() };
}
2.3 Testing and Validation¶
#!/bin/bash
# Phase 2.3: Test event publishing and capture
# Create test events
cat << 'EOF' > test-event.json
{
"eventId": "550e8400-e29b-41d4-a716-446655440000",
"timestamp": "2024-01-15T14:30:00Z",
"version": "1.0",
"aggregateId": "contact-123",
"aggregateType": "Contact",
"eventType": "ContactCreated",
"metadata": {
"correlationId": "660e8400-e29b-41d4-a716-446655440000",
"userId": "user-456",
"source": "TestScript",
"partitionKey": "contact-123"
},
"payload": {
"contactId": "contact-123",
"firstName": "John",
"lastName": "Doe",
"email": "john.doe@example.com",
"company": "Test Corp",
"createdAt": "2024-01-15T14:30:00Z"
}
}
EOF
# Send test event using Azure CLI
az eventhubs eventhub send \
--namespace-name crm-events-namespace \
--name contacts-events \
--resource-group rg-metrics-platform \
--message-body "$(cat test-event.json)"
# Wait for capture (5 minutes max)
echo "⏳ Waiting for Event Hub capture..."
sleep 300
# Verify file was captured
az storage blob list \
--container-name raw-events \
--account-name datalakemetricsplatform \
--prefix "crm-events-namespace/contacts-events" \
--query "[].{Name:name, Size:properties.contentLength, LastModified:properties.lastModified}" \
--output table
echo "✅ Event publishing and capture validated"
2.4 Phase 2 Validation Checklist¶
- [ ] Event publishing library integrated into services
- [ ] Test events successfully sent to Event Hubs
- [ ] Events captured to Data Lake within 5 minutes
- [ ] Schema validation working correctly
- [ ] Error handling and logging implemented
- [ ] Connection strings secured
Phase 3: Data Processing (Week 3)¶
3.1 Synapse Serverless Setup¶
-- Create Synapse SQL Database
CREATE DATABASE metrics_platform;
GO
USE metrics_platform;
GO
-- Create master key
CREATE MASTER KEY ENCRYPTION BY PASSWORD = 'ComplexPassword123!';
GO
-- Create database scoped credential for managed identity
CREATE DATABASE SCOPED CREDENTIAL DataLakeCredential
WITH IDENTITY = 'Managed Identity';
GO
-- Create external data sources
CREATE EXTERNAL DATA SOURCE RawEventsSource
WITH (
LOCATION = 'https://datalakemetricsplatform.dfs.core.windows.net/raw-events/',
CREDENTIAL = DataLakeCredential
);
CREATE EXTERNAL DATA SOURCE BronzeSource
WITH (
LOCATION = 'https://datalakemetricsplatform.dfs.core.windows.net/bronze/',
CREDENTIAL = DataLakeCredential
);
CREATE EXTERNAL DATA SOURCE SilverSource
WITH (
LOCATION = 'https://datalakemetricsplatform.dfs.core.windows.net/silver/',
CREDENTIAL = DataLakeCredential
);
CREATE EXTERNAL DATA SOURCE GoldSource
WITH (
LOCATION = 'https://datalakemetricsplatform.dfs.core.windows.net/gold/',
CREDENTIAL = DataLakeCredential
);
GO
-- Create file formats
CREATE EXTERNAL FILE FORMAT AvroFileFormat
WITH (
FORMAT_TYPE = PARQUET -- Synapse auto-converts Avro to Parquet for querying
);
CREATE EXTERNAL FILE FORMAT ParquetFileFormat
WITH (
FORMAT_TYPE = PARQUET,
DATA_COMPRESSION = 'org.apache.hadoop.io.compress.SnappyCodec'
);
GO
3.2 Create External Tables and Views¶
-- Raw events external table
CREATE EXTERNAL TABLE raw.events (
[Body] VARBINARY(MAX)
)
WITH (
LOCATION = '**/*.avro',
DATA_SOURCE = RawEventsSource,
FILE_FORMAT = AvroFileFormat
);
GO
-- Bronze events view with JSON parsing
CREATE VIEW bronze.events AS
SELECT
filepath(1) as namespace_name,
filepath(2) as eventhub_name,
filepath(3) as year,
filepath(4) as month,
filepath(5) as day,
filepath(6) as hour,
JSON_VALUE(CAST([Body] AS NVARCHAR(MAX)), '$.eventId') AS event_id,
CAST(JSON_VALUE(CAST([Body] AS NVARCHAR(MAX)), '$.timestamp') AS DATETIME2) AS timestamp,
JSON_VALUE(CAST([Body] AS NVARCHAR(MAX)), '$.aggregateId') AS aggregate_id,
JSON_VALUE(CAST([Body] AS NVARCHAR(MAX)), '$.aggregateType') AS aggregate_type,
JSON_VALUE(CAST([Body] AS NVARCHAR(MAX)), '$.eventType') AS event_type,
JSON_VALUE(CAST([Body] AS NVARCHAR(MAX)), '$.version') AS version,
JSON_VALUE(CAST([Body] AS NVARCHAR(MAX)), '$.metadata.correlationId') AS correlation_id,
JSON_VALUE(CAST([Body] AS NVARCHAR(MAX)), '$.metadata.userId') AS user_id,
JSON_VALUE(CAST([Body] AS NVARCHAR(MAX)), '$.metadata.source') AS source,
JSON_QUERY(CAST([Body] AS NVARCHAR(MAX)), '$.payload') AS payload,
GETUTCDATE() as processed_at
FROM raw.events
WHERE TRY_CAST(CAST([Body] AS NVARCHAR(MAX)) AS JSON) IS NOT NULL;
GO
-- Test the view
SELECT TOP 10 * FROM bronze.events
WHERE CAST(timestamp AS DATE) = CAST(GETDATE() AS DATE)
ORDER BY timestamp DESC;
GO
3.3 Data Factory Setup¶
{
"name": "MetricsPlatformProcessing",
"properties": {
"activities": [
{
"name": "ProcessRawToBronze",
"type": "Copy",
"inputs": [
{
"referenceName": "RawEventsDataset",
"type": "DatasetReference",
"parameters": {
"ProcessingDate": "@formatDateTime(pipeline().TriggerTime, 'yyyy-MM-dd')"
}
}
],
"outputs": [
{
"referenceName": "BronzeEventsDataset",
"type": "DatasetReference"
}
],
"typeProperties": {
"source": {
"type": "AvroSource",
"storeSettings": {
"type": "AzureBlobFSReadSettings",
"recursive": true,
"wildcardFolderPath": "*/*/*/*/*",
"wildcardFileName": "*.avro",
"enablePartitionDiscovery": true
}
},
"sink": {
"type": "ParquetSink",
"storeSettings": {
"type": "AzureBlobFSWriteSettings"
},
"formatSettings": {
"type": "ParquetWriteSettings"
}
},
"enableStaging": false,
"translator": {
"type": "TabularTranslator",
"mappings": [
{
"source": {
"path": "$.eventId"
},
"sink": {
"name": "event_id",
"type": "String"
}
},
{
"source": {
"path": "$.timestamp"
},
"sink": {
"name": "timestamp",
"type": "DateTime"
}
},
{
"source": {
"path": "$.aggregateId"
},
"sink": {
"name": "aggregate_id",
"type": "String"
}
},
{
"source": {
"path": "$.aggregateType"
},
"sink": {
"name": "aggregate_type",
"type": "String"
}
},
{
"source": {
"path": "$.eventType"
},
"sink": {
"name": "event_type",
"type": "String"
}
},
{
"source": {
"path": "$"
},
"sink": {
"name": "payload",
"type": "String"
}
}
]
}
}
},
{
"name": "ProcessBronzeToSilver",
"type": "SqlServerStoredProcedure",
"dependsOn": [
{
"activity": "ProcessRawToBronze",
"dependencyConditions": [
"Succeeded"
]
}
],
"typeProperties": {
"storedProcedureName": "[dbo].[ProcessBronzeToSilver]",
"storedProcedureParameters": {
"ProcessingDate": {
"value": "@formatDateTime(pipeline().TriggerTime, 'yyyy-MM-dd')",
"type": "String"
}
}
}
},
{
"name": "ProcessSilverToGold",
"type": "SqlServerStoredProcedure",
"dependsOn": [
{
"activity": "ProcessBronzeToSilver",
"dependencyConditions": [
"Succeeded"
]
}
],
"typeProperties": {
"storedProcedureName": "[dbo].[ProcessSilverToGold]",
"storedProcedureParameters": {
"ProcessingDate": {
"value": "@formatDateTime(pipeline().TriggerTime, 'yyyy-MM-dd')",
"type": "String"
}
}
}
}
],
"triggers": [
{
"name": "HourlyProcessingTrigger",
"type": "ScheduleTrigger",
"typeProperties": {
"recurrence": {
"frequency": "Hour",
"interval": 1,
"startTime": "2024-01-01T00:00:00Z"
}
}
}
]
}
}
3.4 Processing Stored Procedures¶
-- Bronze to Silver processing
CREATE PROCEDURE [dbo].[ProcessBronzeToSilver]
@ProcessingDate DATE
AS
BEGIN
SET NOCOUNT ON;
DECLARE @ProcessedRecords INT = 0;
-- Process Contact events
INSERT INTO OPENROWSET(
BULK 'https://datalakemetricsplatform.dfs.core.windows.net/silver/contacts/',
DATA_SOURCE = 'SilverSource',
FORMAT = 'PARQUET'
)
SELECT
event_id,
timestamp,
aggregate_id AS contact_id,
JSON_VALUE(payload, '$.contactId') AS contact_id_payload,
JSON_VALUE(payload, '$.firstName') AS first_name,
JSON_VALUE(payload, '$.lastName') AS last_name,
JSON_VALUE(payload, '$.email') AS email,
JSON_VALUE(payload, '$.company') AS company,
CASE
WHEN JSON_VALUE(payload, '$.email') LIKE '%@gmail.com' THEN 'Gmail'
WHEN JSON_VALUE(payload, '$.email') LIKE '%@outlook.com' THEN 'Outlook'
ELSE 'Other'
END AS email_provider,
CASE
WHEN JSON_VALUE(payload, '$.company') IS NOT NULL THEN 'Business'
ELSE 'Personal'
END AS contact_type,
correlation_id,
user_id,
source,
CAST(timestamp AS DATE) as date,
YEAR(timestamp) as year,
MONTH(timestamp) as month,
DAY(timestamp) as day,
GETUTCDATE() AS processed_at
FROM bronze.events
WHERE aggregate_type = 'Contact'
AND event_type = 'ContactCreated'
AND CAST(timestamp AS DATE) = @ProcessingDate;
SET @ProcessedRecords = @@ROWCOUNT;
-- Log processing results
INSERT INTO ProcessingLog (
ProcessingDate,
Layer,
EventType,
RecordsProcessed,
ProcessedAt
)
VALUES (
@ProcessingDate,
'Silver',
'ContactCreated',
@ProcessedRecords,
GETUTCDATE()
);
PRINT 'Processed ' + CAST(@ProcessedRecords AS VARCHAR(10)) + ' Contact events to Silver layer';
END;
GO
-- Silver to Gold aggregation
CREATE PROCEDURE [dbo].[ProcessSilverToGold]
@ProcessingDate DATE
AS
BEGIN
SET NOCOUNT ON;
-- Daily contact metrics
INSERT INTO OPENROWSET(
BULK 'https://datalakemetricsplatform.dfs.core.windows.net/gold/daily_metrics/',
DATA_SOURCE = 'GoldSource',
FORMAT = 'PARQUET'
)
SELECT
'contacts_created' AS metric_name,
COUNT(*) AS metric_value,
'count' AS metric_type,
JSON_OBJECT(
'domain': 'CRM',
'entity': 'Contact',
'period': 'daily'
) AS dimensions,
MAX(timestamp) AS timestamp,
@ProcessingDate AS date,
'Contact' AS aggregate_type,
YEAR(@ProcessingDate) AS year,
MONTH(@ProcessingDate) AS month,
DAY(@ProcessingDate) AS day
FROM silver.contacts
WHERE date = @ProcessingDate
GROUP BY date;
PRINT 'Generated daily metrics for ' + CAST(@ProcessingDate AS VARCHAR(10));
END;
GO
3.5 Phase 3 Testing¶
#!/bin/bash
# Phase 3.5: Test data processing pipeline
echo "🚀 Testing data processing pipeline..."
# Trigger Data Factory pipeline
az datafactory pipeline create-run \
--factory-name "DataFactoryMetricsPlatform" \
--resource-group rg-metrics-platform \
--name "MetricsPlatformProcessing"
echo "⏳ Waiting for pipeline completion..."
sleep 600 # Wait 10 minutes
# Check pipeline status
PIPELINE_RUN_ID=$(az datafactory pipeline-run query-by-factory \
--factory-name "DataFactoryMetricsPlatform" \
--resource-group rg-metrics-platform \
--last-updated-after $(date -u -d '1 hour ago' +%Y-%m-%dT%H:%M:%SZ) \
--query "value[0].runId" -o tsv)
az datafactory pipeline-run show \
--factory-name "DataFactoryMetricsPlatform" \
--resource-group rg-metrics-platform \
--run-id $PIPELINE_RUN_ID \
--query "{Status:status, StartTime:runStart, EndTime:runEnd}" \
--output table
# Verify data in each layer
echo "📊 Verifying data in Bronze layer..."
az synapse sql query \
--workspace-name "synapse-metrics-platform" \
--sql-database "metrics_platform" \
--query-text "SELECT COUNT(*) as bronze_count FROM bronze.events WHERE CAST(timestamp AS DATE) = CAST(GETDATE() AS DATE)"
echo "📊 Verifying data in Silver layer..."
az synapse sql query \
--workspace-name "synapse-metrics-platform" \
--sql-database "metrics_platform" \
--query-text "SELECT COUNT(*) as silver_count FROM silver.contacts WHERE date = CAST(GETDATE() AS DATE)"
echo "📊 Verifying data in Gold layer..."
az synapse sql query \
--workspace-name "synapse-metrics-platform" \
--sql-database "metrics_platform" \
--query-text "SELECT * FROM gold.daily_metrics WHERE date = CAST(GETDATE() AS DATE)"
echo "✅ Data processing pipeline tested successfully"
3.6 Phase 3 Validation Checklist¶
- [ ] Synapse Serverless workspace created and configured
- [ ] External data sources and file formats defined
- [ ] External tables and views created for each layer
- [ ] Data Factory pipeline deployed and tested
- [ ] Stored procedures for data transformation created
- [ ] Pipeline successfully processes data from raw to gold
- [ ] Data quality validation implemented
- [ ] Error handling and logging configured
Phase 4: Analytics Integration (Week 4)¶
4.1 Power BI Configuration¶
Create Power BI Workspace¶
- Navigate to Power BI Service (powerbi.microsoft.com)
- Create new workspace: "Metrics Platform Analytics"
- Add team members with appropriate permissions
- Configure workspace settings for shared datasets
Business Events Dataset¶
// Business Events Query
let
Source = Sql.Database(
"synapse-metrics-platform-ondemand.sql.azuresynapse.net",
"metrics_platform"
),
BusinessMetricsQuery = "
SELECT
metric_name,
metric_value,
metric_type,
JSON_VALUE(dimensions, '$.domain') AS domain,
JSON_VALUE(dimensions, '$.entity') AS entity,
JSON_VALUE(dimensions, '$.period') AS period,
timestamp,
date,
aggregate_type,
year,
month,
day
FROM gold.daily_metrics
WHERE date >= DATEADD(month, -13, GETDATE())
",
BusinessMetrics = Sql.Database(
"synapse-metrics-platform-ondemand.sql.azuresynapse.net",
"metrics_platform",
[Query = BusinessMetricsQuery]
),
// Data type corrections
TypedData = Table.TransformColumnTypes(
BusinessMetrics,
{
{"metric_value", type number},
{"timestamp", type datetimezone},
{"date", type date},
{"year", Int64.Type},
{"month", Int64.Type},
{"day", Int64.Type}
}
)
in
TypedData
Date Dimension Table¶
// Date Dimension Query
let
StartDate = #date(2023, 1, 1),
EndDate = #date(2025, 12, 31),
DateList = List.Dates(StartDate, Duration.Days(EndDate - StartDate) + 1, #duration(1, 0, 0, 0)),
DateTable = Table.FromList(DateList, Splitter.SplitByNothing(), {"Date"}),
AddedColumns = Table.AddColumn(
DateTable,
"Year",
each Date.Year([Date]),
Int64.Type
),
AddedMonth = Table.AddColumn(
AddedColumns,
"Month",
each Date.Month([Date]),
Int64.Type
),
AddedDay = Table.AddColumn(
AddedMonth,
"Day",
each Date.Day([Date]),
Int64.Type
),
AddedMonthName = Table.AddColumn(
AddedDay,
"MonthName",
each Date.MonthName([Date]),
type text
),
AddedDayOfWeek = Table.AddColumn(
AddedMonthName,
"DayOfWeek",
each Date.DayOfWeekName([Date]),
type text
),
AddedIsWeekend = Table.AddColumn(
AddedDayOfWeek,
"IsWeekend",
each Date.DayOfWeek([Date]) = 6 or Date.DayOfWeek([Date]) = 0,
type logical
)
in
AddedIsWeekend
4.2 External API Integration¶
Matomo Integration (Using POST method)¶
// Matomo API Function (Secure POST method)
let
FetchMatomoData = (method as text, optional parameters as record) =>
let
ApiUrl = "https://syvntyve.matomo.cloud/index.php",
DefaultParams = [
module = "API",
idSite = "1",
period = "day",
date = "last30",
format = "JSON",
token_auth = Parameter_MatomoToken
],
MergedParams = Record.Combine({DefaultParams, parameters ?? []}),
UpdatedParams = Record.AddField(MergedParams, "method", method),
// Convert parameters to form data string
ParamPairs = List.Transform(
Record.FieldNames(UpdatedParams),
each _ & "=" & Uri.EscapeDataString(Text.From(Record.Field(UpdatedParams, _)))
),
RequestBody = Text.ToBinary(Text.Combine(ParamPairs, "&")),
Options = [
Headers = [
#"Content-Type" = "application/x-www-form-urlencoded"
],
Content = RequestBody
],
Source = Json.Document(Web.Contents(ApiUrl, Options))
in
Source,
// Fetch visits summary
VisitsSummary = FetchMatomoData("VisitsSummary.get", [date = "last90"]),
// Transform to table
VisitsTable = Table.FromRecords(
List.Transform(
Record.FieldNames(VisitsSummary),
each [
Date = Date.FromText(_),
Visits = Record.Field(VisitsSummary, _)[nb_visits]?,
UniqueVisitors = Record.Field(VisitsSummary, _)[nb_uniq_visitors]?,
PageViews = Record.Field(VisitsSummary, _)[nb_pageviews]?,
BounceRate = Record.Field(VisitsSummary, _)[bounce_rate]?,
AvgTimeOnSite = Record.Field(VisitsSummary, _)[avg_time_on_site]?
]
)
),
FinalResult = Table.TransformColumnTypes(
VisitsTable,
{
{"Date", type date},
{"Visits", Int64.Type},
{"UniqueVisitors", Int64.Type},
{"PageViews", Int64.Type},
{"BounceRate", type number},
{"AvgTimeOnSite", Int64.Type}
}
)
in
FinalResult
Sentry Integration¶
// Sentry API Integration
let
SentryUrl = "https://sentry.io/api/0/projects/your-org/your-project/events/",
Options = [
Headers = [
Authorization = "Bearer " & Parameter_SentryToken,
#"Content-Type" = "application/json"
],
RelativePath = "?statsPeriod=30d"
],
Source = Json.Document(Web.Contents(SentryUrl, Options)),
EventsList = Source[data],
EventsTable = Table.FromList(EventsList, Splitter.SplitByNothing()),
ExpandedEvents = Table.ExpandRecordColumn(
EventsTable,
"Column1",
{"id", "title", "culprit", "dateCreated", "userCount", "level"},
{"EventId", "Title", "Culprit", "DateCreated", "UserCount", "Level"}
),
TypedEvents = Table.TransformColumnTypes(
ExpandedEvents,
{
{"DateCreated", type datetimezone},
{"UserCount", Int64.Type}
}
)
in
TypedEvents
4.3 Create Power BI Measures¶
// Key Performance Indicators
TotalEvents = SUM(BusinessMetrics[metric_value])
EventsThisMonth =
CALCULATE(
[TotalEvents],
DATESINPERIOD(DimDate[Date], TODAY(), -1, MONTH)
)
EventGrowthRate =
VAR LastMonth =
CALCULATE(
[TotalEvents],
DATESINPERIOD(DimDate[Date], EOMONTH(TODAY(), -1), -1, MONTH)
)
VAR ThisMonth = [EventsThisMonth]
RETURN
DIVIDE(ThisMonth - LastMonth, LastMonth, 0)
// Domain-specific metrics
CRMEvents =
CALCULATE(
[TotalEvents],
BusinessMetrics[domain] = "CRM"
)
MatchingEvents =
CALCULATE(
[TotalEvents],
BusinessMetrics[domain] = "Matching"
)
ConversionRate =
VAR ContactsCreated =
CALCULATE(
[TotalEvents],
BusinessMetrics[metric_name] = "contacts_created"
)
VAR DeclarationsCompleted =
CALCULATE(
[TotalEvents],
BusinessMetrics[metric_name] = "declarations_completed"
)
RETURN
DIVIDE(DeclarationsCompleted, ContactsCreated, 0)
// Website analytics
TotalPageViews = SUM(MatomoData[PageViews])
UniqueVisitors = SUM(MatomoData[UniqueVisitors])
AverageBounceRate = AVERAGE(MatomoData[BounceRate])
// Error tracking
TotalErrors = SUM(SentryData[UserCount])
ErrorRate =
DIVIDE([TotalErrors], [TotalEvents], 0) * 100
4.4 Build Executive Dashboard¶
Create dashboard with following visuals:
- KPI Cards
- Total Events This Month
- Event Growth Rate
- Conversion Rate
-
Error Rate
-
Line Chart: Event Trends
- X-axis: Date
- Y-axis: Total Events
-
Legend: Domain
-
Donut Chart: Domain Breakdown
- Values: Total Events
-
Legend: Domain
-
Table: Top Performing Metrics
-
Columns: Metric Name, Value, Change %
-
Bar Chart: Error Summary
- X-axis: Error Count
- Y-axis: Error Type
4.5 Configure Refresh Schedule¶
- Dataset Refresh Settings:
- Frequency: 4 times daily (6 AM, 12 PM, 6 PM, 12 AM)
- Time zone: UTC
-
Notify on failure: Yes
-
Incremental Refresh:
- Archive data: 2 years
- Refresh data: 30 days
- Detect data changes: Yes
4.6 Phase 4 Testing¶
#!/bin/bash
# Phase 4.6: Test Power BI integration
echo "🎯 Testing Power BI integration..."
# Test Synapse connection
echo "Testing Synapse connection..."
pwsh -Command "
\$connectionString = 'Server=synapse-metrics-platform-ondemand.sql.azuresynapse.net;Database=metrics_platform;Authentication=Active Directory Default'
try {
\$connection = New-Object System.Data.SqlClient.SqlConnection(\$connectionString)
\$connection.Open()
Write-Host '✅ Synapse connection successful'
\$connection.Close()
}
catch {
Write-Host '❌ Synapse connection failed: ' \$_.Exception.Message
}
"
# Test Matomo API (using POST method)
echo "Testing Matomo API..."
curl -X POST "https://syvntyve.matomo.cloud/index.php" \
-H "Content-Type: application/x-www-form-urlencoded" \
-d "module=API&method=VisitsSummary.get&idSite=1&period=day&date=yesterday&format=JSON&token_auth=$MATOMO_TOKEN" \
-s | jq '.[] | keys' > /dev/null && echo "✅ Matomo API accessible" || echo "❌ Matomo API failed"
# Test Sentry API
echo "Testing Sentry API..."
curl -H "Authorization: Bearer $SENTRY_TOKEN" \
"https://sentry.io/api/0/projects/your-org/your-project/events/?statsPeriod=1d" \
-s | jq '.data | length' > /dev/null && echo "✅ Sentry API accessible" || echo "❌ Sentry API failed"
echo "✅ Power BI integration tested successfully"
4.7 Phase 4 Validation Checklist¶
- [ ] Power BI workspace created and configured
- [ ] Business events dataset connected to Synapse
- [ ] Matomo dataset configured with secure POST method
- [ ] Sentry dataset integrated
- [ ] Date dimension table created
- [ ] Key measures and calculations defined
- [ ] Executive dashboard created with all required visuals
- [ ] Refresh schedule configured and tested
- [ ] Row-level security implemented (if needed)
- [ ] Performance optimization applied
Go-Live and Monitoring¶
Pre-Go-Live Checklist¶
Infrastructure¶
- [ ] All Azure resources deployed and configured
- [ ] Network connectivity verified
- [ ] Security roles and permissions applied
- [ ] Monitoring and alerting configured
- [ ] Backup and disaster recovery planned
Data Flow¶
- [ ] Event publishing tested from all services
- [ ] Data processing pipeline validated
- [ ] Data quality checks implemented
- [ ] Error handling and retry logic tested
- [ ] Performance benchmarks established
Analytics¶
- [ ] Power BI dashboards created and tested
- [ ] External API integrations validated
- [ ] User access controls configured
- [ ] Training materials prepared
- [ ] Support processes established
Go-Live Steps¶
-
Enable Production Traffic
# Enable event publishing in production services kubectl patch deployment crm-service -p '{"spec":{"template":{"metadata":{"annotations":{"metrics.platform/enabled":"true"}}}}}' kubectl patch deployment matching-service -p '{"spec":{"template":{"metadata":{"annotations":{"metrics.platform/enabled":"true"}}}}}' -
Monitor Initial Data Flow
# Monitor event ingestion az monitor metrics list --resource /subscriptions/.../Microsoft.EventHub/namespaces/crm-events-namespace --metric IncomingMessages --interval PT5M -
Verify Data Processing
-- Check data freshness SELECT MAX(timestamp) as latest_event, COUNT(*) as event_count FROM gold.daily_metrics WHERE date = CAST(GETDATE() AS DATE); -
Validate Power BI Reports
- Refresh all datasets
- Verify data appears correctly
- Test filters and slicers
- Confirm calculated measures
Post Go-Live Monitoring¶
Week 1: Intensive Monitoring¶
- Monitor data ingestion every hour
- Check processing pipeline success rates
- Validate data quality metrics
- Monitor costs daily
- Address any issues immediately
Week 2-4: Regular Monitoring¶
- Daily data quality checks
- Weekly cost reviews
- Performance monitoring
- User feedback collection
- Process refinements
Month 2+: Steady State¶
- Weekly monitoring reports
- Monthly cost optimization reviews
- Quarterly architecture reviews
- Continuous improvement initiatives
Success Criteria¶
| Metric | Target | Current | Status |
|---|---|---|---|
| Availability | >99.5% | TBD | 🔄 |
| Data Freshness | <2 hours | TBD | 🔄 |
| Processing Success | >99% | TBD | 🔄 |
| Monthly Cost | <$400 | TBD | 🔄 |
| User Satisfaction | >4.0/5.0 | TBD | 🔄 |
Troubleshooting Guide¶
Common Issues and Solutions¶
| Issue | Symptoms | Investigation | Solution |
|---|---|---|---|
| Events Not Captured | No files in raw-events container | Check Event Hub metrics, capture settings | Verify capture configuration, check permissions |
| Processing Failures | Pipeline failures in ADF | Check ADF monitoring, Synapse logs | Review error messages, check data quality |
| Power BI Refresh Errors | Dataset refresh failures | Check connection strings, permissions | Validate credentials, test connections |
| High Costs | Budget alerts | Review cost analysis dashboard | Optimize resource utilization, review pricing tier |
| Slow Query Performance | Long dashboard load times | Check Synapse query performance | Optimize queries, review partitioning strategy |
Support Contacts¶
| Component | Primary Contact | Secondary Contact | Escalation |
|---|---|---|---|
| Infrastructure | Platform Team | DevOps Team | Architecture Team |
| Data Pipeline | Data Engineering | Platform Team | CTO |
| Power BI | Analytics Team | Business Intelligence | Data Engineering |
| Business Events | Domain Teams | Platform Team | Architecture Team |
Next Steps¶
With the metrics platform successfully implemented, consider these future enhancements:
- Advanced Analytics
- Machine learning models for predictions
- Anomaly detection for events
-
Real-time alerting on critical metrics
-
Platform Extensions
- Additional data sources integration
- Self-service analytics capabilities
-
Advanced data governance features
-
Performance Optimization
- Query performance tuning
- Cost optimization initiatives
-
Scaling for increased load
-
Process Improvements
- Automated testing pipelines
- Infrastructure as code enhancements
- Continuous deployment automation
The metrics platform provides a solid foundation for data-driven decision making across the organization. Regular reviews and continuous improvements will ensure it continues to deliver value as the business grows and evolves.