Data Storage Architecture¶
Overview¶
The data storage layer provides scalable, cost-effective storage for raw events and processed metrics using Azure Data Lake Storage Gen2. This document covers the storage architecture, file organization, lifecycle management, and optimization strategies.
Storage Goals
- Cost Efficiency: <$50/month for 100K events/day using tiered storage
- Schema Flexibility: Support various file formats and schema evolution
- Query Performance: Optimized for analytical workloads
- Data Governance: Lifecycle policies and retention management
Storage Architecture¶
Data Lake Organization¶
graph TD
subgraph "Storage Account: datalakemetricsplatform"
subgraph "Raw Events Container"
RAW[/raw-events/]
RAWDATA[Event Hub Capture Files<br/>Avro Format<br/>Retention: 7 days]
end
subgraph "Bronze Layer Container"
BRONZE[/bronze/]
VALIDATED[Validated Events<br/>Parquet Format<br/>Retention: 90 days]
end
subgraph "Silver Layer Container"
SILVER[/silver/]
ENRICHED[Cleansed & Enriched<br/>Delta Lake Format<br/>Retention: 2 years]
end
subgraph "Gold Layer Container"
GOLD[/gold/]
METRICS[Business Metrics<br/>Delta Lake Format<br/>Retention: 5 years]
end
end
RAW --> RAWDATA
BRONZE --> VALIDATED
SILVER --> ENRICHED
GOLD --> METRICS
style RAW fill:#FFE4E1
style BRONZE fill:#DEB887
style SILVER fill:#C0C0C0
style GOLD fill:#FFD700
File System Hierarchy¶
graph LR
subgraph "Partition Structure"
CONTAINER[/bronze/]
DOMAIN[/{domain}/]
ENTITY[/{entity-type}/]
YEAR[/year=2024/]
MONTH[/month=01/]
DAY[/day=15/]
HOUR[/hour=14/]
FILE[events_001.parquet]
end
CONTAINER --> DOMAIN
DOMAIN --> ENTITY
ENTITY --> YEAR
YEAR --> MONTH
MONTH --> DAY
DAY --> HOUR
HOUR --> FILE
Storage Tiers and Lifecycle¶
graph TD
subgraph "Data Lifecycle"
HOT[Hot Tier<br/>0-30 days<br/>$0.0208/GB/month]
COOL[Cool Tier<br/>30-90 days<br/>$0.01/GB/month]
ARCHIVE[Archive Tier<br/>90+ days<br/>$0.00099/GB/month]
end
subgraph "Access Patterns"
FREQ[Frequent Access<br/>Daily queries]
OCCASIONAL[Occasional Access<br/>Weekly/Monthly]
RARE[Rare Access<br/>Compliance/Audit]
end
HOT --> COOL
COOL --> ARCHIVE
HOT --> FREQ
COOL --> OCCASIONAL
ARCHIVE --> RARE
Storage Account Configuration¶
Terraform Configuration¶
resource "azurerm_storage_account" "metrics_data_lake" {
name = "datalakemetricsplatform"
resource_group_name = azurerm_resource_group.metrics_platform.name
location = azurerm_resource_group.metrics_platform.location
account_tier = "Standard"
account_replication_type = "LRS"
account_kind = "StorageV2"
is_hns_enabled = true # Hierarchical namespace for Data Lake Gen2
# Enable blob lifecycle management
blob_properties {
versioning_enabled = true
change_feed_enabled = true
last_access_time_enabled = true
delete_retention_policy {
days = 30
}
container_delete_retention_policy {
days = 30
}
}
# Network access rules
network_rules {
default_action = "Allow" # Restrict in production
bypass = ["AzureServices"]
}
tags = {
Environment = "Production"
Purpose = "MetricsPlatform"
}
}
# Create containers for each layer
resource "azurerm_storage_container" "raw_events" {
name = "raw-events"
storage_account_name = azurerm_storage_account.metrics_data_lake.name
container_access_type = "private"
}
resource "azurerm_storage_container" "bronze" {
name = "bronze"
storage_account_name = azurerm_storage_account.metrics_data_lake.name
container_access_type = "private"
}
resource "azurerm_storage_container" "silver" {
name = "silver"
storage_account_name = azurerm_storage_account.metrics_data_lake.name
container_access_type = "private"
}
resource "azurerm_storage_container" "gold" {
name = "gold"
storage_account_name = azurerm_storage_account.metrics_data_lake.name
container_access_type = "private"
}
Lifecycle Management Policy¶
{
"rules": [
{
"enabled": true,
"name": "raw-events-lifecycle",
"type": "Lifecycle",
"definition": {
"filters": {
"blobTypes": ["blockBlob"],
"prefixMatch": ["raw-events/"]
},
"actions": {
"baseBlob": {
"tierToCool": {
"daysAfterModificationGreaterThan": 30
},
"tierToArchive": {
"daysAfterModificationGreaterThan": 90
},
"delete": {
"daysAfterModificationGreaterThan": 2555 // 7 years
}
}
}
}
},
{
"enabled": true,
"name": "bronze-lifecycle",
"type": "Lifecycle",
"definition": {
"filters": {
"blobTypes": ["blockBlob"],
"prefixMatch": ["bronze/"]
},
"actions": {
"baseBlob": {
"tierToCool": {
"daysAfterModificationGreaterThan": 30
},
"tierToArchive": {
"daysAfterModificationGreaterThan": 180
},
"delete": {
"daysAfterModificationGreaterThan": 2555
}
}
}
}
},
{
"enabled": true,
"name": "gold-retention",
"type": "Lifecycle",
"definition": {
"filters": {
"blobTypes": ["blockBlob"],
"prefixMatch": ["gold/"]
},
"actions": {
"baseBlob": {
"tierToCool": {
"daysAfterModificationGreaterThan": 90
},
"tierToArchive": {
"daysAfterModificationGreaterThan": 365
},
"delete": {
"daysAfterModificationGreaterThan": 1825 // 5 years
}
}
}
}
}
]
}
File Formats and Optimization¶
Format Selection Strategy¶
| Layer | Format | Compression | Use Case | Read Performance | Write Performance |
|---|---|---|---|---|---|
| Raw | Avro | Snappy | Event Hub capture | Medium | High |
| Bronze | Parquet | Snappy | Validation & parsing | High | Medium |
| Silver | Delta Lake | Snappy | Analytics queries | Very High | Medium |
| Gold | Delta Lake | ZSTD | Power BI reports | Very High | Low |
Delta Lake Configuration¶
# Delta Lake table creation script
from pyspark.sql import SparkSession
from delta.tables import DeltaTable
spark = SparkSession.builder \
.appName("MetricsPlatform") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
# Create business events table in silver layer
events_schema = """
event_id STRING,
timestamp TIMESTAMP,
aggregate_id STRING,
aggregate_type STRING,
event_type STRING,
version STRING,
correlation_id STRING,
user_id STRING,
source STRING,
payload STRING,
year INT,
month INT,
day INT,
hour INT
"""
# Create partitioned Delta table
spark.sql(f"""
CREATE TABLE IF NOT EXISTS silver.business_events (
{events_schema}
)
USING DELTA
PARTITIONED BY (aggregate_type, year, month, day)
LOCATION 'abfss://silver@datalakemetricsplatform.dfs.core.windows.net/business_events/'
TBLPROPERTIES (
'delta.autoOptimize.optimizeWrite' = 'true',
'delta.autoOptimize.autoCompact' = 'true',
'delta.deletedFileRetentionDuration' = 'interval 30 days',
'delta.logRetentionDuration' = 'interval 365 days'
)
""")
# Create metrics table in gold layer
metrics_schema = """
metric_name STRING,
metric_value DOUBLE,
metric_type STRING,
dimensions MAP<STRING, STRING>,
timestamp TIMESTAMP,
date DATE,
aggregate_type STRING,
year INT,
month INT,
day INT
"""
spark.sql(f"""
CREATE TABLE IF NOT EXISTS gold.business_metrics (
{metrics_schema}
)
USING DELTA
PARTITIONED BY (aggregate_type, year, month)
LOCATION 'abfss://gold@datalakemetricsplatform.dfs.core.windows.net/business_metrics/'
TBLPROPERTIES (
'delta.autoOptimize.optimizeWrite' = 'true',
'delta.autoOptimize.autoCompact' = 'true'
)
""")
Partitioning Strategy¶
graph TD
subgraph "Partitioning Hierarchy"
ROOT[Table Root]
AGG[aggregate_type=Contact]
YEAR[year=2024]
MONTH[month=01]
DAY[day=15]
FILES[Parquet Files<br/>~64MB each]
end
subgraph "Benefits"
PRUNE[Partition Pruning<br/>Skip irrelevant data]
PARALLEL[Parallel Processing<br/>Per partition]
COMPRESS[Compression<br/>Similar data together]
end
ROOT --> AGG
AGG --> YEAR
YEAR --> MONTH
MONTH --> DAY
DAY --> FILES
FILES --> PRUNE
FILES --> PARALLEL
FILES --> COMPRESS
Security and Access Control¶
RBAC Configuration¶
# Create storage account access roles
az role assignment create \
--assignee <data-factory-principal-id> \
--role "Storage Blob Data Contributor" \
--scope "/subscriptions/{subscription}/resourceGroups/rg-metrics-platform/providers/Microsoft.Storage/storageAccounts/datalakemetricsplatform"
az role assignment create \
--assignee <synapse-principal-id> \
--role "Storage Blob Data Reader" \
--scope "/subscriptions/{subscription}/resourceGroups/rg-metrics-platform/providers/Microsoft.Storage/storageAccounts/datalakemetricsplatform"
az role assignment create \
--assignee <powerbi-service-principal> \
--role "Storage Blob Data Reader" \
--scope "/subscriptions/{subscription}/resourceGroups/rg-metrics-platform/providers/Microsoft.Storage/storageAccounts/datalakemetricsplatform/blobServices/default/containers/gold"
Access Control Lists (ACLs)¶
# Set ACLs for containers
az storage fs access set \
--account-name datalakemetricsplatform \
--file-system bronze \
--permissions "rwx" \
--acl "user:data-factory-id:rwx,group:analytics-team:r-x"
az storage fs access set \
--account-name datalakemetricsplatform \
--file-system gold \
--permissions "r-x" \
--acl "user:powerbi-service:r--,group:report-viewers:r--"
Monitoring and Metrics¶
Storage Metrics Dashboard¶
graph TD
subgraph "Storage Metrics"
SIZE[Storage Size<br/>Total: ~100GB<br/>Growth: 2GB/month]
COST[Monthly Cost<br/>Hot: $2<br/>Cool: $1<br/>Archive: $0.10]
TRANS[Transactions<br/>Read: 1M/month<br/>Write: 100K/month]
PERF[Performance<br/>Latency: <100ms<br/>Throughput: 100MB/s]
end
subgraph "Alerts"
A1[Size Growth Alert<br/>>10GB/month]
A2[Cost Alert<br//>>$50/month]
A3[Error Rate<br/>>1%]
A4[Latency Alert<br//>>500ms]
end
SIZE --> A1
COST --> A2
TRANS --> A3
PERF --> A4
Azure Monitor Queries¶
// Storage cost analysis
AzureMetrics
| where ResourceProvider == "MICROSOFT.STORAGE"
| where MetricName == "UsedCapacity"
| summarize
TotalGB = max(Average) / 1073741824,
EstimatedCost_Hot = (max(Average) / 1073741824) * 0.0208,
EstimatedCost_Cool = (max(Average) / 1073741824) * 0.01
by bin(TimeGenerated, 1d), Resource
| order by TimeGenerated desc
// File count and size trends
StorageBlobLogs
| where OperationName == "PutBlob"
| extend Container = extract(@"https://\w+\.blob\.core\.windows\.net/([^/]+)", 1, uri_s)
| summarize
FileCount = count(),
TotalSize_MB = sum(ResponseBodySize) / 1048576,
AvgSize_MB = avg(ResponseBodySize) / 1048576
by bin(TimeGenerated, 1h), Container
| order by TimeGenerated desc
// Data freshness check
StorageBlobLogs
| where OperationName == "PutBlob"
| where uri_s contains "raw-events"
| summarize LastWrite = max(TimeGenerated) by extract(@"/(\d{4}/\d{2}/\d{2})/", 1, uri_s)
| extend DaysOld = datetime_diff('day', now(), LastWrite)
| where DaysOld > 1 // Alert if data is more than 1 day old
Data Quality and Validation¶
Schema Validation¶
from pyspark.sql import functions as F
from pyspark.sql.types import *
def validate_business_event(df):
"""Validate business event schema and data quality"""
# Required fields validation
required_fields = [
'event_id', 'timestamp', 'aggregate_id',
'aggregate_type', 'event_type', 'version'
]
for field in required_fields:
if field not in df.columns:
raise ValueError(f"Missing required field: {field}")
# Data quality checks
quality_checks = [
# No null values in key fields
df.filter(F.col('event_id').isNull()).count() == 0,
df.filter(F.col('timestamp').isNull()).count() == 0,
df.filter(F.col('aggregate_id').isNull()).count() == 0,
# Valid UUID format for event_id
df.filter(~F.col('event_id').rlike(
r'^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$'
)).count() == 0,
# Valid timestamp format
df.filter(F.col('timestamp').isNull() |
(F.col('timestamp') < F.lit('2020-01-01')) |
(F.col('timestamp') > F.current_timestamp())).count() == 0,
# Non-empty aggregate_id
df.filter(F.col('aggregate_id') == '').count() == 0
]
if not all(quality_checks):
raise ValueError("Data quality validation failed")
return df
# Example usage in data pipeline
raw_events = spark.read.parquet("abfss://bronze@datalakemetricsplatform.dfs.core.windows.net/contacts/")
validated_events = validate_business_event(raw_events)
Data Lineage Tracking¶
def add_lineage_metadata(df, source_path, transformation_type):
"""Add data lineage metadata to DataFrame"""
return df.withColumn("_lineage", F.struct(
F.lit(source_path).alias("source_path"),
F.lit(transformation_type).alias("transformation"),
F.current_timestamp().alias("processed_at"),
F.lit("metrics-platform").alias("pipeline")
))
# Track transformations
bronze_df = add_lineage_metadata(
raw_df,
"raw-events/contacts/",
"raw_to_bronze"
)
silver_df = add_lineage_metadata(
bronze_df,
"bronze/contacts/",
"bronze_to_silver"
)
Performance Optimization¶
File Size Optimization¶
def optimize_file_sizes(df, target_file_size_mb=64):
"""Optimize DataFrame partitioning for target file size"""
# Calculate optimal number of partitions
df_size_mb = df.rdd.map(lambda x: len(str(x))).sum() / 1048576
optimal_partitions = max(1, int(df_size_mb / target_file_size_mb))
return df.coalesce(optimal_partitions)
# Apply before writing
optimized_df = optimize_file_sizes(processed_events)
optimized_df.write \
.mode("append") \
.format("delta") \
.option("mergeSchema", "true") \
.save("abfss://silver@datalakemetricsplatform.dfs.core.windows.net/business_events/")
Query Performance Tuning¶
-- Create Z-order optimization for frequently queried columns
OPTIMIZE gold.business_metrics
ZORDER BY (aggregate_type, timestamp, metric_name);
-- Vacuum old files to save storage
VACUUM gold.business_metrics RETAIN 168 HOURS; -- 7 days
-- Analyze table statistics
ANALYZE TABLE gold.business_metrics COMPUTE STATISTICS FOR ALL COLUMNS;
Best Practices¶
- File Organization
- Use consistent naming conventions
- Partition by most frequently filtered columns
- Keep file sizes between 64MB - 1GB
-
Avoid small files (<10MB)
-
Data Lifecycle
- Implement automated tier transitions
- Set appropriate retention policies
- Monitor storage costs regularly
-
Archive old data promptly
-
Performance
- Use column pruning in queries
- Leverage partition elimination
- Optimize file sizes for workload
-
Enable auto-optimization features
-
Security
- Use managed identities
- Implement least-privilege access
- Encrypt sensitive data
- Audit access patterns
Troubleshooting¶
Common Issues¶
| Issue | Symptoms | Solution |
|---|---|---|
| Slow Queries | High latency, timeouts | Optimize partitioning, check file sizes |
| High Costs | Unexpected charges | Review lifecycle policies, tier configuration |
| Access Denied | 403 errors | Check RBAC roles, ACLs, network rules |
| Data Quality | Missing/corrupt data | Implement validation, monitor pipelines |
| Large Files | OOM errors | Increase cluster size, optimize queries |
Health Check Script¶
#!/bin/bash
# Storage Health Check
STORAGE_ACCOUNT="datalakemetricsplatform"
RESOURCE_GROUP="rg-metrics-platform"
echo "Storage Account Health Check: $STORAGE_ACCOUNT"
# Check account status
az storage account show \
--name $STORAGE_ACCOUNT \
--resource-group $RESOURCE_GROUP \
--query "{Name:name, Status:statusOfPrimary, Tier:accessTier, Replication:replication}" \
--output table
# Check container sizes
for container in raw-events bronze silver gold; do
echo "Container: $container"
az storage blob list \
--account-name $STORAGE_ACCOUNT \
--container-name $container \
--query "[].{Name:name, Size:properties.contentLength}" \
--output table | head -10
done
# Check lifecycle policy
az storage account management-policy show \
--account-name $STORAGE_ACCOUNT \
--resource-group $RESOURCE_GROUP \
--query "policy.rules[].{Name:name, Enabled:enabled}" \
--output table
Next Steps¶
- Configure Data Processing pipelines
- Set up Power BI Integration
- Implement Cost Optimization strategies