Skip to content

Data Storage Architecture

Overview

The data storage layer provides scalable, cost-effective storage for raw events and processed metrics using Azure Data Lake Storage Gen2. This document covers the storage architecture, file organization, lifecycle management, and optimization strategies.

Storage Goals

  • Cost Efficiency: <$50/month for 100K events/day using tiered storage
  • Schema Flexibility: Support various file formats and schema evolution
  • Query Performance: Optimized for analytical workloads
  • Data Governance: Lifecycle policies and retention management

Storage Architecture

Data Lake Organization

graph TD
    subgraph "Storage Account: datalakemetricsplatform"
        subgraph "Raw Events Container"
            RAW[/raw-events/]
            RAWDATA[Event Hub Capture Files<br/>Avro Format<br/>Retention: 7 days]
        end

        subgraph "Bronze Layer Container"
            BRONZE[/bronze/]
            VALIDATED[Validated Events<br/>Parquet Format<br/>Retention: 90 days]
        end

        subgraph "Silver Layer Container"
            SILVER[/silver/]
            ENRICHED[Cleansed & Enriched<br/>Delta Lake Format<br/>Retention: 2 years]
        end

        subgraph "Gold Layer Container"
            GOLD[/gold/]
            METRICS[Business Metrics<br/>Delta Lake Format<br/>Retention: 5 years]
        end
    end

    RAW --> RAWDATA
    BRONZE --> VALIDATED
    SILVER --> ENRICHED
    GOLD --> METRICS

    style RAW fill:#FFE4E1
    style BRONZE fill:#DEB887
    style SILVER fill:#C0C0C0
    style GOLD fill:#FFD700

File System Hierarchy

graph LR
    subgraph "Partition Structure"
        CONTAINER[/bronze/]
        DOMAIN[/{domain}/]
        ENTITY[/{entity-type}/]
        YEAR[/year=2024/]
        MONTH[/month=01/]
        DAY[/day=15/]
        HOUR[/hour=14/]
        FILE[events_001.parquet]
    end

    CONTAINER --> DOMAIN
    DOMAIN --> ENTITY
    ENTITY --> YEAR
    YEAR --> MONTH
    MONTH --> DAY
    DAY --> HOUR
    HOUR --> FILE

Storage Tiers and Lifecycle

graph TD
    subgraph "Data Lifecycle"
        HOT[Hot Tier<br/>0-30 days<br/>$0.0208/GB/month]
        COOL[Cool Tier<br/>30-90 days<br/>$0.01/GB/month]
        ARCHIVE[Archive Tier<br/>90+ days<br/>$0.00099/GB/month]
    end

    subgraph "Access Patterns"
        FREQ[Frequent Access<br/>Daily queries]
        OCCASIONAL[Occasional Access<br/>Weekly/Monthly]
        RARE[Rare Access<br/>Compliance/Audit]
    end

    HOT --> COOL
    COOL --> ARCHIVE

    HOT --> FREQ
    COOL --> OCCASIONAL
    ARCHIVE --> RARE

Storage Account Configuration

Terraform Configuration

resource "azurerm_storage_account" "metrics_data_lake" {
  name                     = "datalakemetricsplatform"
  resource_group_name      = azurerm_resource_group.metrics_platform.name
  location                 = azurerm_resource_group.metrics_platform.location
  account_tier             = "Standard"
  account_replication_type = "LRS"
  account_kind             = "StorageV2"
  is_hns_enabled          = true  # Hierarchical namespace for Data Lake Gen2

  # Enable blob lifecycle management
  blob_properties {
    versioning_enabled       = true
    change_feed_enabled     = true
    last_access_time_enabled = true

    delete_retention_policy {
      days = 30
    }

    container_delete_retention_policy {
      days = 30
    }
  }

  # Network access rules
  network_rules {
    default_action = "Allow"  # Restrict in production
    bypass         = ["AzureServices"]
  }

  tags = {
    Environment = "Production"
    Purpose     = "MetricsPlatform"
  }
}

# Create containers for each layer
resource "azurerm_storage_container" "raw_events" {
  name                  = "raw-events"
  storage_account_name  = azurerm_storage_account.metrics_data_lake.name
  container_access_type = "private"
}

resource "azurerm_storage_container" "bronze" {
  name                  = "bronze"
  storage_account_name  = azurerm_storage_account.metrics_data_lake.name
  container_access_type = "private"
}

resource "azurerm_storage_container" "silver" {
  name                  = "silver"
  storage_account_name  = azurerm_storage_account.metrics_data_lake.name
  container_access_type = "private"
}

resource "azurerm_storage_container" "gold" {
  name                  = "gold"
  storage_account_name  = azurerm_storage_account.metrics_data_lake.name
  container_access_type = "private"
}

Lifecycle Management Policy

{
  "rules": [
    {
      "enabled": true,
      "name": "raw-events-lifecycle",
      "type": "Lifecycle",
      "definition": {
        "filters": {
          "blobTypes": ["blockBlob"],
          "prefixMatch": ["raw-events/"]
        },
        "actions": {
          "baseBlob": {
            "tierToCool": {
              "daysAfterModificationGreaterThan": 30
            },
            "tierToArchive": {
              "daysAfterModificationGreaterThan": 90
            },
            "delete": {
              "daysAfterModificationGreaterThan": 2555  // 7 years
            }
          }
        }
      }
    },
    {
      "enabled": true,
      "name": "bronze-lifecycle",
      "type": "Lifecycle",
      "definition": {
        "filters": {
          "blobTypes": ["blockBlob"],
          "prefixMatch": ["bronze/"]
        },
        "actions": {
          "baseBlob": {
            "tierToCool": {
              "daysAfterModificationGreaterThan": 30
            },
            "tierToArchive": {
              "daysAfterModificationGreaterThan": 180
            },
            "delete": {
              "daysAfterModificationGreaterThan": 2555
            }
          }
        }
      }
    },
    {
      "enabled": true,
      "name": "gold-retention",
      "type": "Lifecycle",
      "definition": {
        "filters": {
          "blobTypes": ["blockBlob"],
          "prefixMatch": ["gold/"]
        },
        "actions": {
          "baseBlob": {
            "tierToCool": {
              "daysAfterModificationGreaterThan": 90
            },
            "tierToArchive": {
              "daysAfterModificationGreaterThan": 365
            },
            "delete": {
              "daysAfterModificationGreaterThan": 1825  // 5 years
            }
          }
        }
      }
    }
  ]
}

File Formats and Optimization

Format Selection Strategy

Layer Format Compression Use Case Read Performance Write Performance
Raw Avro Snappy Event Hub capture Medium High
Bronze Parquet Snappy Validation & parsing High Medium
Silver Delta Lake Snappy Analytics queries Very High Medium
Gold Delta Lake ZSTD Power BI reports Very High Low

Delta Lake Configuration

# Delta Lake table creation script
from pyspark.sql import SparkSession
from delta.tables import DeltaTable

spark = SparkSession.builder \
    .appName("MetricsPlatform") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

# Create business events table in silver layer
events_schema = """
    event_id STRING,
    timestamp TIMESTAMP,
    aggregate_id STRING,
    aggregate_type STRING,
    event_type STRING,
    version STRING,
    correlation_id STRING,
    user_id STRING,
    source STRING,
    payload STRING,
    year INT,
    month INT,
    day INT,
    hour INT
"""

# Create partitioned Delta table
spark.sql(f"""
    CREATE TABLE IF NOT EXISTS silver.business_events (
        {events_schema}
    )
    USING DELTA
    PARTITIONED BY (aggregate_type, year, month, day)
    LOCATION 'abfss://silver@datalakemetricsplatform.dfs.core.windows.net/business_events/'
    TBLPROPERTIES (
        'delta.autoOptimize.optimizeWrite' = 'true',
        'delta.autoOptimize.autoCompact' = 'true',
        'delta.deletedFileRetentionDuration' = 'interval 30 days',
        'delta.logRetentionDuration' = 'interval 365 days'
    )
""")

# Create metrics table in gold layer
metrics_schema = """
    metric_name STRING,
    metric_value DOUBLE,
    metric_type STRING,
    dimensions MAP<STRING, STRING>,
    timestamp TIMESTAMP,
    date DATE,
    aggregate_type STRING,
    year INT,
    month INT,
    day INT
"""

spark.sql(f"""
    CREATE TABLE IF NOT EXISTS gold.business_metrics (
        {metrics_schema}
    )
    USING DELTA
    PARTITIONED BY (aggregate_type, year, month)
    LOCATION 'abfss://gold@datalakemetricsplatform.dfs.core.windows.net/business_metrics/'
    TBLPROPERTIES (
        'delta.autoOptimize.optimizeWrite' = 'true',
        'delta.autoOptimize.autoCompact' = 'true'
    )
""")

Partitioning Strategy

graph TD
    subgraph "Partitioning Hierarchy"
        ROOT[Table Root]
        AGG[aggregate_type=Contact]
        YEAR[year=2024]
        MONTH[month=01]
        DAY[day=15]
        FILES[Parquet Files<br/>~64MB each]
    end

    subgraph "Benefits"
        PRUNE[Partition Pruning<br/>Skip irrelevant data]
        PARALLEL[Parallel Processing<br/>Per partition]
        COMPRESS[Compression<br/>Similar data together]
    end

    ROOT --> AGG
    AGG --> YEAR
    YEAR --> MONTH
    MONTH --> DAY
    DAY --> FILES

    FILES --> PRUNE
    FILES --> PARALLEL
    FILES --> COMPRESS

Security and Access Control

RBAC Configuration

# Create storage account access roles
az role assignment create \
  --assignee <data-factory-principal-id> \
  --role "Storage Blob Data Contributor" \
  --scope "/subscriptions/{subscription}/resourceGroups/rg-metrics-platform/providers/Microsoft.Storage/storageAccounts/datalakemetricsplatform"

az role assignment create \
  --assignee <synapse-principal-id> \
  --role "Storage Blob Data Reader" \
  --scope "/subscriptions/{subscription}/resourceGroups/rg-metrics-platform/providers/Microsoft.Storage/storageAccounts/datalakemetricsplatform"

az role assignment create \
  --assignee <powerbi-service-principal> \
  --role "Storage Blob Data Reader" \
  --scope "/subscriptions/{subscription}/resourceGroups/rg-metrics-platform/providers/Microsoft.Storage/storageAccounts/datalakemetricsplatform/blobServices/default/containers/gold"

Access Control Lists (ACLs)

# Set ACLs for containers
az storage fs access set \
  --account-name datalakemetricsplatform \
  --file-system bronze \
  --permissions "rwx" \
  --acl "user:data-factory-id:rwx,group:analytics-team:r-x"

az storage fs access set \
  --account-name datalakemetricsplatform \
  --file-system gold \
  --permissions "r-x" \
  --acl "user:powerbi-service:r--,group:report-viewers:r--"

Monitoring and Metrics

Storage Metrics Dashboard

graph TD
    subgraph "Storage Metrics"
        SIZE[Storage Size<br/>Total: ~100GB<br/>Growth: 2GB/month]
        COST[Monthly Cost<br/>Hot: $2<br/>Cool: $1<br/>Archive: $0.10]
        TRANS[Transactions<br/>Read: 1M/month<br/>Write: 100K/month]
        PERF[Performance<br/>Latency: <100ms<br/>Throughput: 100MB/s]
    end

    subgraph "Alerts"
        A1[Size Growth Alert<br/>>10GB/month]
        A2[Cost Alert<br//>>$50/month]
        A3[Error Rate<br/>>1%]
        A4[Latency Alert<br//>>500ms]
    end

    SIZE --> A1
    COST --> A2
    TRANS --> A3
    PERF --> A4

Azure Monitor Queries

// Storage cost analysis
AzureMetrics
| where ResourceProvider == "MICROSOFT.STORAGE"
| where MetricName == "UsedCapacity"
| summarize 
    TotalGB = max(Average) / 1073741824,
    EstimatedCost_Hot = (max(Average) / 1073741824) * 0.0208,
    EstimatedCost_Cool = (max(Average) / 1073741824) * 0.01
    by bin(TimeGenerated, 1d), Resource
| order by TimeGenerated desc

// File count and size trends
StorageBlobLogs
| where OperationName == "PutBlob"
| extend Container = extract(@"https://\w+\.blob\.core\.windows\.net/([^/]+)", 1, uri_s)
| summarize 
    FileCount = count(),
    TotalSize_MB = sum(ResponseBodySize) / 1048576,
    AvgSize_MB = avg(ResponseBodySize) / 1048576
    by bin(TimeGenerated, 1h), Container
| order by TimeGenerated desc

// Data freshness check
StorageBlobLogs
| where OperationName == "PutBlob"
| where uri_s contains "raw-events"
| summarize LastWrite = max(TimeGenerated) by extract(@"/(\d{4}/\d{2}/\d{2})/", 1, uri_s)
| extend DaysOld = datetime_diff('day', now(), LastWrite)
| where DaysOld > 1  // Alert if data is more than 1 day old

Data Quality and Validation

Schema Validation

from pyspark.sql import functions as F
from pyspark.sql.types import *

def validate_business_event(df):
    """Validate business event schema and data quality"""

    # Required fields validation
    required_fields = [
        'event_id', 'timestamp', 'aggregate_id', 
        'aggregate_type', 'event_type', 'version'
    ]

    for field in required_fields:
        if field not in df.columns:
            raise ValueError(f"Missing required field: {field}")

    # Data quality checks
    quality_checks = [
        # No null values in key fields
        df.filter(F.col('event_id').isNull()).count() == 0,
        df.filter(F.col('timestamp').isNull()).count() == 0,
        df.filter(F.col('aggregate_id').isNull()).count() == 0,

        # Valid UUID format for event_id
        df.filter(~F.col('event_id').rlike(
            r'^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$'
        )).count() == 0,

        # Valid timestamp format
        df.filter(F.col('timestamp').isNull() | 
                 (F.col('timestamp') < F.lit('2020-01-01')) |
                 (F.col('timestamp') > F.current_timestamp())).count() == 0,

        # Non-empty aggregate_id
        df.filter(F.col('aggregate_id') == '').count() == 0
    ]

    if not all(quality_checks):
        raise ValueError("Data quality validation failed")

    return df

# Example usage in data pipeline
raw_events = spark.read.parquet("abfss://bronze@datalakemetricsplatform.dfs.core.windows.net/contacts/")
validated_events = validate_business_event(raw_events)

Data Lineage Tracking

def add_lineage_metadata(df, source_path, transformation_type):
    """Add data lineage metadata to DataFrame"""

    return df.withColumn("_lineage", F.struct(
        F.lit(source_path).alias("source_path"),
        F.lit(transformation_type).alias("transformation"),
        F.current_timestamp().alias("processed_at"),
        F.lit("metrics-platform").alias("pipeline")
    ))

# Track transformations
bronze_df = add_lineage_metadata(
    raw_df, 
    "raw-events/contacts/", 
    "raw_to_bronze"
)

silver_df = add_lineage_metadata(
    bronze_df,
    "bronze/contacts/",
    "bronze_to_silver"
)

Performance Optimization

File Size Optimization

def optimize_file_sizes(df, target_file_size_mb=64):
    """Optimize DataFrame partitioning for target file size"""

    # Calculate optimal number of partitions
    df_size_mb = df.rdd.map(lambda x: len(str(x))).sum() / 1048576
    optimal_partitions = max(1, int(df_size_mb / target_file_size_mb))

    return df.coalesce(optimal_partitions)

# Apply before writing
optimized_df = optimize_file_sizes(processed_events)
optimized_df.write \
    .mode("append") \
    .format("delta") \
    .option("mergeSchema", "true") \
    .save("abfss://silver@datalakemetricsplatform.dfs.core.windows.net/business_events/")

Query Performance Tuning

-- Create Z-order optimization for frequently queried columns
OPTIMIZE gold.business_metrics
ZORDER BY (aggregate_type, timestamp, metric_name);

-- Vacuum old files to save storage
VACUUM gold.business_metrics RETAIN 168 HOURS;  -- 7 days

-- Analyze table statistics
ANALYZE TABLE gold.business_metrics COMPUTE STATISTICS FOR ALL COLUMNS;

Best Practices

  1. File Organization
  2. Use consistent naming conventions
  3. Partition by most frequently filtered columns
  4. Keep file sizes between 64MB - 1GB
  5. Avoid small files (<10MB)

  6. Data Lifecycle

  7. Implement automated tier transitions
  8. Set appropriate retention policies
  9. Monitor storage costs regularly
  10. Archive old data promptly

  11. Performance

  12. Use column pruning in queries
  13. Leverage partition elimination
  14. Optimize file sizes for workload
  15. Enable auto-optimization features

  16. Security

  17. Use managed identities
  18. Implement least-privilege access
  19. Encrypt sensitive data
  20. Audit access patterns

Troubleshooting

Common Issues

Issue Symptoms Solution
Slow Queries High latency, timeouts Optimize partitioning, check file sizes
High Costs Unexpected charges Review lifecycle policies, tier configuration
Access Denied 403 errors Check RBAC roles, ACLs, network rules
Data Quality Missing/corrupt data Implement validation, monitor pipelines
Large Files OOM errors Increase cluster size, optimize queries

Health Check Script

#!/bin/bash
# Storage Health Check

STORAGE_ACCOUNT="datalakemetricsplatform"
RESOURCE_GROUP="rg-metrics-platform"

echo "Storage Account Health Check: $STORAGE_ACCOUNT"

# Check account status
az storage account show \
  --name $STORAGE_ACCOUNT \
  --resource-group $RESOURCE_GROUP \
  --query "{Name:name, Status:statusOfPrimary, Tier:accessTier, Replication:replication}" \
  --output table

# Check container sizes
for container in raw-events bronze silver gold; do
  echo "Container: $container"
  az storage blob list \
    --account-name $STORAGE_ACCOUNT \
    --container-name $container \
    --query "[].{Name:name, Size:properties.contentLength}" \
    --output table | head -10
done

# Check lifecycle policy
az storage account management-policy show \
  --account-name $STORAGE_ACCOUNT \
  --resource-group $RESOURCE_GROUP \
  --query "policy.rules[].{Name:name, Enabled:enabled}" \
  --output table

Next Steps