Data Processing Architecture¶
Overview¶
The data processing layer transforms raw events from Azure Data Lake into business metrics using Azure Data Factory for orchestration and Synapse Serverless SQL for transformations. This document details the processing pipelines, transformation logic, and scheduling strategies.
Processing Goals
- Cost Efficiency: Serverless processing with no idle compute costs
- Scalability: Handle 100K+ events/day with auto-scaling
- Reliability: Automated retries and error handling
- Data Quality: Validation and enrichment at each layer
Processing Architecture¶
Data Processing Layers¶
graph LR
subgraph "Raw to Bronze"
RAW[Raw Events<br/>Avro Files]
PARSE[Parse & Validate<br/>Schema Validation]
BRONZE[Bronze Layer<br/>Parquet Files]
end
subgraph "Bronze to Silver"
CLEANSE[Data Cleansing<br/>Standardization]
ENRICH[Data Enrichment<br/>Lookups & Joins]
SILVER[Silver Layer<br/>Delta Tables]
end
subgraph "Silver to Gold"
AGG[Aggregations<br/>Business Metrics]
CALC[Calculations<br/>KPIs & Measures]
GOLD[Gold Layer<br/>Analytics Ready]
end
RAW --> PARSE
PARSE --> BRONZE
BRONZE --> CLEANSE
CLEANSE --> ENRICH
ENRICH --> SILVER
SILVER --> AGG
AGG --> CALC
CALC --> GOLD
style RAW fill:#FFE4E1
style BRONZE fill:#DEB887
style SILVER fill:#C0C0C0
style GOLD fill:#FFD700
Processing Pipeline Overview¶
graph TD
subgraph "Azure Data Factory"
TRIGGER[Schedule Trigger<br/>Every Hour]
PIPELINE[Main Pipeline]
ACTIVITIES[Pipeline Activities]
end
subgraph "Synapse Serverless"
POOL[SQL On-Demand Pool]
VIEWS[External Tables/Views]
QUERIES[Transformation SQL]
end
subgraph "Monitoring"
MONITOR[Pipeline Monitoring]
ALERTS[Failure Alerts]
METRICS[Performance Metrics]
end
TRIGGER --> PIPELINE
PIPELINE --> ACTIVITIES
ACTIVITIES --> POOL
POOL --> VIEWS
VIEWS --> QUERIES
PIPELINE --> MONITOR
MONITOR --> ALERTS
MONITOR --> METRICS
Azure Data Factory Configuration¶
Pipeline Architecture¶
graph TD
subgraph "Main Processing Pipeline"
START[Pipeline Start]
CHECK[Check for New Files]
PARALLEL[Parallel Processing]
subgraph "Domain Processing"
CRM[Process CRM Events]
MATCHING[Process Matching Events]
PROFILE[Process Profile Events]
end
subgraph "Aggregation"
DAILY[Daily Aggregations]
HOURLY[Hourly Metrics]
CUSTOM[Custom KPIs]
end
CLEANUP[Cleanup & Archival]
NOTIFY[Success Notification]
ERROR[Error Handling]
end
START --> CHECK
CHECK --> PARALLEL
PARALLEL --> CRM
PARALLEL --> MATCHING
PARALLEL --> PROFILE
CRM --> DAILY
MATCHING --> HOURLY
PROFILE --> CUSTOM
DAILY --> CLEANUP
HOURLY --> CLEANUP
CUSTOM --> CLEANUP
CLEANUP --> NOTIFY
ERROR -.-> NOTIFY
Pipeline JSON Configuration¶
{
"name": "MetricsPlatformProcessing",
"properties": {
"activities": [
{
"name": "CheckNewFiles",
"type": "Lookup",
"typeProperties": {
"source": {
"type": "SqlDWSource",
"sqlReaderQuery": "SELECT * FROM OPENROWSET(BULK 'https://datalakemetricsplatform.dfs.core.windows.net/raw-events/*/*/*/*/*.avro', FORMAT = 'DELTA') AS new_files WHERE _metadata_file_path > '@{pipeline().parameters.LastProcessedPath}'"
}
}
},
{
"name": "ProcessRawToBronze",
"type": "SqlServerStoredProcedure",
"dependsOn": [
{
"activity": "CheckNewFiles",
"dependencyConditions": ["Succeeded"]
}
],
"typeProperties": {
"storedProcedureName": "ProcessRawEventsToBronze",
"storedProcedureParameters": {
"ProcessingDate": {
"value": "@formatDateTime(pipeline().TriggerTime, 'yyyy-MM-dd')",
"type": "String"
}
}
}
},
{
"name": "ProcessBronzeToSilver",
"type": "ForEach",
"dependsOn": [
{
"activity": "ProcessRawToBronze",
"dependencyConditions": ["Succeeded"]
}
],
"typeProperties": {
"items": {
"value": "@activity('CheckNewFiles').output.value",
"type": "Expression"
},
"activities": [
{
"name": "TransformEvents",
"type": "SqlServerStoredProcedure",
"typeProperties": {
"storedProcedureName": "ProcessBronzeToSilver",
"storedProcedureParameters": {
"Domain": {
"value": "@item().domain",
"type": "String"
},
"EventType": {
"value": "@item().event_type",
"type": "String"
}
}
}
}
]
}
}
],
"parameters": {
"LastProcessedPath": {
"type": "String",
"defaultValue": "1900-01-01"
}
},
"triggers": [
{
"name": "HourlyProcessing",
"type": "ScheduleTrigger",
"typeProperties": {
"recurrence": {
"frequency": "Hour",
"interval": 1,
"startTime": "2024-01-01T00:00:00Z"
}
}
}
]
}
}
Synapse Serverless SQL Configuration¶
External Data Sources¶
-- Create master key for authentication
CREATE MASTER KEY ENCRYPTION BY PASSWORD = 'YourComplexPassword!';
-- Create database scoped credential
CREATE DATABASE SCOPED CREDENTIAL DataLakeCredential
WITH IDENTITY = 'Managed Identity';
-- Create external data sources
CREATE EXTERNAL DATA SOURCE RawEventsDataSource
WITH (
LOCATION = 'https://datalakemetricsplatform.dfs.core.windows.net/raw-events/',
CREDENTIAL = DataLakeCredential
);
CREATE EXTERNAL DATA SOURCE BronzeDataSource
WITH (
LOCATION = 'https://datalakemetricsplatform.dfs.core.windows.net/bronze/',
CREDENTIAL = DataLakeCredential
);
CREATE EXTERNAL DATA SOURCE SilverDataSource
WITH (
LOCATION = 'https://datalakemetricsplatform.dfs.core.windows.net/silver/',
CREDENTIAL = DataLakeCredential
);
CREATE EXTERNAL DATA SOURCE GoldDataSource
WITH (
LOCATION = 'https://datalakemetricsplatform.dfs.core.windows.net/gold/',
CREDENTIAL = DataLakeCredential
);
External Tables and Views¶
-- Raw events external table
CREATE EXTERNAL TABLE RawEvents (
Body VARBINARY(MAX)
)
WITH (
LOCATION = '**/*.avro',
DATA_SOURCE = RawEventsDataSource,
FILE_FORMAT = AvroFileFormat
);
-- Bronze events view with JSON parsing
CREATE VIEW BronzeEvents AS
SELECT
JSON_VALUE(CAST(Body AS NVARCHAR(MAX)), '$.eventId') AS event_id,
CAST(JSON_VALUE(CAST(Body AS NVARCHAR(MAX)), '$.timestamp') AS DATETIME2) AS timestamp,
JSON_VALUE(CAST(Body AS NVARCHAR(MAX)), '$.aggregateId') AS aggregate_id,
JSON_VALUE(CAST(Body AS NVARCHAR(MAX)), '$.aggregateType') AS aggregate_type,
JSON_VALUE(CAST(Body AS NVARCHAR(MAX)), '$.eventType') AS event_type,
JSON_VALUE(CAST(Body AS NVARCHAR(MAX)), '$.version') AS version,
JSON_VALUE(CAST(Body AS NVARCHAR(MAX)), '$.metadata.correlationId') AS correlation_id,
JSON_VALUE(CAST(Body AS NVARCHAR(MAX)), '$.metadata.userId') AS user_id,
JSON_VALUE(CAST(Body AS NVARCHAR(MAX)), '$.metadata.source') AS source,
JSON_QUERY(CAST(Body AS NVARCHAR(MAX)), '$.payload') AS payload,
YEAR(CAST(JSON_VALUE(CAST(Body AS NVARCHAR(MAX)), '$.timestamp') AS DATETIME2)) AS year,
MONTH(CAST(JSON_VALUE(CAST(Body AS NVARCHAR(MAX)), '$.timestamp') AS DATETIME2)) AS month,
DAY(CAST(JSON_VALUE(CAST(Body AS NVARCHAR(MAX)), '$.timestamp') AS DATETIME2)) AS day
FROM RawEvents
WHERE TRY_CAST(CAST(Body AS NVARCHAR(MAX)) AS JSON) IS NOT NULL;
-- Silver events external table
CREATE EXTERNAL TABLE SilverEvents (
event_id NVARCHAR(36),
timestamp DATETIME2,
aggregate_id NVARCHAR(100),
aggregate_type NVARCHAR(50),
event_type NVARCHAR(50),
version NVARCHAR(10),
correlation_id NVARCHAR(36),
user_id NVARCHAR(50),
source NVARCHAR(50),
payload NVARCHAR(MAX),
year INT,
month INT,
day INT
)
WITH (
LOCATION = 'business_events/**/*.parquet',
DATA_SOURCE = SilverDataSource,
FILE_FORMAT = ParquetFileFormat
);
Data Transformations¶
Raw to Bronze Processing¶
CREATE PROCEDURE ProcessRawEventsToBronze
@ProcessingDate DATE
AS
BEGIN
-- Validate and parse raw events
INSERT INTO OPENROWSET(
BULK 'https://datalakemetricsplatform.dfs.core.windows.net/bronze/events/',
DATA_SOURCE = 'BronzeDataSource',
FORMAT = 'PARQUET'
)
SELECT
event_id,
timestamp,
aggregate_id,
aggregate_type,
event_type,
version,
correlation_id,
user_id,
source,
payload,
year,
month,
day,
GETUTCDATE() AS processed_at
FROM BronzeEvents
WHERE CAST(timestamp AS DATE) = @ProcessingDate
AND event_id IS NOT NULL
AND timestamp IS NOT NULL
AND aggregate_id IS NOT NULL
AND LEN(aggregate_id) > 0
AND aggregate_type IS NOT NULL
AND event_type IS NOT NULL;
-- Log processing statistics
INSERT INTO ProcessingLog (
ProcessingDate,
Layer,
RecordsProcessed,
RecordsSkipped,
ProcessedAt
)
SELECT
@ProcessingDate,
'Bronze',
COUNT(*) FILTER (WHERE event_id IS NOT NULL),
COUNT(*) FILTER (WHERE event_id IS NULL),
GETUTCDATE()
FROM BronzeEvents
WHERE CAST(timestamp AS DATE) = @ProcessingDate;
END
Bronze to Silver Processing¶
CREATE PROCEDURE ProcessBronzeToSilver
@Domain NVARCHAR(50),
@EventType NVARCHAR(50)
AS
BEGIN
-- Domain-specific transformations
IF @Domain = 'CRM' AND @EventType = 'ContactCreated'
BEGIN
INSERT INTO OPENROWSET(
BULK 'https://datalakemetricsplatform.dfs.core.windows.net/silver/contacts/',
DATA_SOURCE = 'SilverDataSource',
FORMAT = 'DELTA'
)
SELECT
event_id,
timestamp,
aggregate_id AS contact_id,
JSON_VALUE(payload, '$.firstName') AS first_name,
JSON_VALUE(payload, '$.lastName') AS last_name,
JSON_VALUE(payload, '$.email') AS email,
JSON_VALUE(payload, '$.company') AS company,
CASE
WHEN JSON_VALUE(payload, '$.email') LIKE '%@gmail.com' THEN 'Gmail'
WHEN JSON_VALUE(payload, '$.email') LIKE '%@outlook.com' THEN 'Outlook'
ELSE 'Other'
END AS email_provider,
CASE
WHEN JSON_VALUE(payload, '$.company') IS NOT NULL THEN 'Business'
ELSE 'Personal'
END AS contact_type,
correlation_id,
user_id,
source,
year,
month,
day,
GETUTCDATE() AS processed_at
FROM SilverEvents
WHERE aggregate_type = 'Contact'
AND event_type = 'ContactCreated'
AND CAST(timestamp AS DATE) = CAST(GETDATE() AS DATE);
END
-- Add more domain-specific processing...
END
Silver to Gold Aggregations¶
CREATE PROCEDURE ProcessSilverToGold
AS
BEGIN
-- Daily contact metrics
INSERT INTO OPENROWSET(
BULK 'https://datalakemetricsplatform.dfs.core.windows.net/gold/daily_metrics/',
DATA_SOURCE = 'GoldDataSource',
FORMAT = 'DELTA'
)
SELECT
'contacts_created' AS metric_name,
COUNT(*) AS metric_value,
'count' AS metric_type,
JSON_OBJECT(
'domain': 'CRM',
'entity': 'Contact',
'period': 'daily'
) AS dimensions,
timestamp,
CAST(timestamp AS DATE) AS date,
'Contact' AS aggregate_type,
year,
month,
DAY(timestamp) AS day
FROM SilverEvents
WHERE aggregate_type = 'Contact'
AND event_type = 'ContactCreated'
AND CAST(timestamp AS DATE) = CAST(GETDATE() AS DATE)
GROUP BY CAST(timestamp AS DATE), year, month, DAY(timestamp);
-- Weekly aggregations
INSERT INTO OPENROWSET(
BULK 'https://datalakemetricsplatform.dfs.core.windows.net/gold/weekly_metrics/',
DATA_SOURCE = 'GoldDataSource',
FORMAT = 'DELTA'
)
SELECT
'contacts_created_weekly' AS metric_name,
COUNT(*) AS metric_value,
'count' AS metric_type,
JSON_OBJECT(
'domain': 'CRM',
'entity': 'Contact',
'period': 'weekly'
) AS dimensions,
MAX(timestamp) AS timestamp,
CAST(MAX(timestamp) AS DATE) AS date,
'Contact' AS aggregate_type,
year,
month,
DATEPART(week, timestamp) AS week
FROM SilverEvents
WHERE aggregate_type = 'Contact'
AND event_type = 'ContactCreated'
AND timestamp >= DATEADD(day, -7, GETDATE())
GROUP BY year, month, DATEPART(week, timestamp);
-- Business KPIs
INSERT INTO OPENROWSET(
BULK 'https://datalakemetricsplatform.dfs.core.windows.net/gold/business_kpis/',
DATA_SOURCE = 'GoldDataSource',
FORMAT = 'DELTA'
)
WITH business_metrics AS (
SELECT
COUNT(DISTINCT CASE WHEN event_type = 'ContactCreated' THEN aggregate_id END) AS new_contacts,
COUNT(DISTINCT CASE WHEN event_type = 'MatchCreated' THEN aggregate_id END) AS new_matches,
COUNT(DISTINCT CASE WHEN event_type = 'DeclarationCompleted' THEN aggregate_id END) AS completed_declarations,
CAST(timestamp AS DATE) AS date
FROM SilverEvents
WHERE CAST(timestamp AS DATE) = CAST(GETDATE() AS DATE)
GROUP BY CAST(timestamp AS DATE)
)
SELECT
'conversion_rate' AS metric_name,
CASE
WHEN new_contacts > 0 THEN (completed_declarations * 100.0) / new_contacts
ELSE 0
END AS metric_value,
'percentage' AS metric_type,
JSON_OBJECT('period': 'daily', 'calculation': 'declarations/contacts') AS dimensions,
GETUTCDATE() AS timestamp,
date,
'Business' AS aggregate_type,
YEAR(date) AS year,
MONTH(date) AS month,
DAY(date) AS day
FROM business_metrics;
END
Monitoring and Error Handling¶
Pipeline Monitoring¶
graph TD
subgraph "Monitoring Stack"
ADF_MONITOR[ADF Monitoring]
LOG_ANALYTICS[Log Analytics]
ALERTS[Alert Rules]
DASHBOARD[Monitoring Dashboard]
end
subgraph "Metrics"
PIPELINE_RUNS[Pipeline Runs]
ACTIVITY_RUNS[Activity Runs]
TRIGGER_RUNS[Trigger Runs]
DATA_FLOW[Data Flow Metrics]
end
subgraph "Alerts"
FAILURE_ALERT[Pipeline Failure]
DURATION_ALERT[Long Duration]
DATA_QUALITY[Data Quality Issues]
COST_ALERT[High Compute Costs]
end
ADF_MONITOR --> PIPELINE_RUNS
ADF_MONITOR --> ACTIVITY_RUNS
LOG_ANALYTICS --> ALERTS
ALERTS --> DASHBOARD
PIPELINE_RUNS --> FAILURE_ALERT
ACTIVITY_RUNS --> DURATION_ALERT
DATA_FLOW --> DATA_QUALITY
DATA_FLOW --> COST_ALERT
Error Handling Logic¶
{
"name": "ErrorHandlingActivity",
"type": "WebActivity",
"typeProperties": {
"url": "https://prod-27.westeurope.logic.azure.com/workflows/.../triggers/manual/paths/invoke",
"method": "POST",
"headers": {
"Content-Type": "application/json"
},
"body": {
"PipelineName": "@pipeline().Pipeline",
"RunId": "@pipeline().RunId",
"ErrorMessage": "@activity('ProcessRawToBronze').error.message",
"TriggerTime": "@pipeline().TriggerTime",
"FailedActivity": "ProcessRawToBronze"
}
},
"onInactiveMarkAs": "Failed"
}
Data Quality Checks¶
CREATE PROCEDURE ValidateDataQuality
@Layer NVARCHAR(20),
@ProcessingDate DATE
AS
BEGIN
DECLARE @QualityScore DECIMAL(5,2);
DECLARE @ErrorThreshold DECIMAL(5,2) = 95.0; -- 95% quality threshold
IF @Layer = 'Bronze'
BEGIN
SELECT @QualityScore = (
COUNT(CASE WHEN event_id IS NOT NULL
AND timestamp IS NOT NULL
AND aggregate_id IS NOT NULL
AND aggregate_id != ''
THEN 1 END) * 100.0
) / NULLIF(COUNT(*), 0)
FROM BronzeEvents
WHERE CAST(timestamp AS DATE) = @ProcessingDate;
END
IF @Layer = 'Silver'
BEGIN
SELECT @QualityScore = (
COUNT(CASE WHEN event_id IS NOT NULL
AND processed_at IS NOT NULL
AND JSON_VALID(payload) = 1
THEN 1 END) * 100.0
) / NULLIF(COUNT(*), 0)
FROM SilverEvents
WHERE CAST(timestamp AS DATE) = @ProcessingDate;
END
-- Log quality metrics
INSERT INTO DataQualityLog (
ProcessingDate,
Layer,
QualityScore,
Threshold,
Status,
CreatedAt
)
VALUES (
@ProcessingDate,
@Layer,
@QualityScore,
@ErrorThreshold,
CASE WHEN @QualityScore >= @ErrorThreshold THEN 'PASS' ELSE 'FAIL' END,
GETUTCDATE()
);
-- Raise error if quality is below threshold
IF @QualityScore < @ErrorThreshold
BEGIN
THROW 50001, 'Data quality below threshold', 1;
END
END
Performance Optimization¶
Query Optimization¶
-- Optimize partition elimination
SELECT
aggregate_type,
COUNT(*) AS event_count,
AVG(CAST(JSON_VALUE(payload, '$.value') AS FLOAT)) AS avg_value
FROM SilverEvents
WHERE year = 2024
AND month = 1
AND aggregate_type IN ('Contact', 'Match')
GROUP BY aggregate_type;
-- Use column pruning
SELECT
event_id,
timestamp,
aggregate_type,
JSON_VALUE(payload, '$.status') AS status
FROM SilverEvents
WHERE year = 2024
AND month = 1
AND JSON_VALUE(payload, '$.status') = 'Active';
-- Leverage statistics for better query plans
UPDATE STATISTICS SilverEvents;
Parallel Processing¶
# Example using Azure Functions for parallel processing
import azure.functions as func
from concurrent.futures import ThreadPoolExecutor
import pandas as pd
def process_partition(partition_path):
"""Process a single partition"""
df = pd.read_parquet(partition_path)
# Apply transformations
df['processed_at'] = pd.Timestamp.now()
df['email_domain'] = df['email'].str.split('@').str[1]
# Write to silver layer
output_path = partition_path.replace('bronze', 'silver')
df.to_parquet(output_path, compression='snappy')
return len(df)
def main(req: func.HttpRequest) -> func.HttpResponse:
# Get partition paths
partition_paths = get_partition_paths()
# Process partitions in parallel
with ThreadPoolExecutor(max_workers=10) as executor:
results = list(executor.map(process_partition, partition_paths))
total_records = sum(results)
return func.HttpResponse(
f"Processed {total_records} records across {len(partition_paths)} partitions",
status_code=200
)
Scheduling and Orchestration¶
Processing Schedule¶
gantt
title Data Processing Schedule
dateFormat HH:mm
axisFormat %H:%M
section Raw to Bronze
Parse Events :active, parse, 00:05, 00:15
section Bronze to Silver
CRM Transform :active, crm, 00:15, 00:25
Matching Transform :active, match, 00:15, 00:25
Profile Transform :active, profile, 00:15, 00:25
section Silver to Gold
Daily Metrics :gold, 00:30, 00:40
Weekly Metrics :weekly, 00:40, 00:45
Monthly Metrics :monthly, 00:45, 00:50
section Cleanup
Archive Files :cleanup, 00:50, 00:55
Pipeline Dependencies¶
{
"name": "DependencyManagement",
"properties": {
"activities": [
{
"name": "WaitForUpstreamData",
"type": "Until",
"typeProperties": {
"expression": {
"value": "@greater(activity('CheckDataAvailability').output.rowCount, 0)",
"type": "Expression"
},
"activities": [
{
"name": "CheckDataAvailability",
"type": "Lookup",
"typeProperties": {
"source": {
"type": "SqlSource",
"sqlReaderQuery": "SELECT COUNT(*) as rowCount FROM OPENROWSET(BULK 'raw-events/**/*.avro') WHERE _metadata_file_last_modified > '@{addHours(pipeline().TriggerTime, -1)}'"
}
}
},
{
"name": "WaitActivity",
"type": "Wait",
"typeProperties": {
"waitTimeInSeconds": 300
}
}
],
"timeout": "0.01:00:00"
}
}
]
}
}
Best Practices¶
- Processing Design
- Use idempotent transformations
- Implement incremental processing
- Partition data appropriately
-
Use columnar formats (Parquet/Delta)
-
Error Handling
- Implement retry mechanisms
- Log detailed error information
- Use circuit breaker patterns
-
Set up alerting for failures
-
Performance
- Optimize partition pruning
- Use parallel processing
- Cache frequently used data
-
Monitor query performance
-
Data Quality
- Validate at ingestion
- Implement data profiling
- Monitor drift detection
- Maintain data lineage
Troubleshooting¶
Common Issues¶
| Issue | Symptoms | Solution |
|---|---|---|
| Pipeline Timeouts | Long-running activities | Increase timeout, optimize queries |
| Memory Errors | OOM exceptions | Reduce batch size, add more memory |
| Authentication Failures | 401/403 errors | Check managed identity permissions |
| Data Skew | Uneven partition processing | Review partitioning strategy |
| Schema Drift | Parsing failures | Implement schema evolution handling |
Debug Query Template¶
-- Debug processing issues
SELECT
pipeline_name,
run_id,
status,
start_time,
end_time,
DATEDIFF(minute, start_time, end_time) as duration_minutes,
error_message
FROM PipelineRunLog
WHERE start_time >= DATEADD(day, -7, GETDATE())
AND status IN ('Failed', 'Cancelled')
ORDER BY start_time DESC;
Next Steps¶
- Configure Power BI Integration
- Implement Cost Optimization strategies
- Follow Implementation Guide