Skip to content

Data Processing Architecture

Overview

The data processing layer transforms raw events from Azure Data Lake into business metrics using Azure Data Factory for orchestration and Synapse Serverless SQL for transformations. This document details the processing pipelines, transformation logic, and scheduling strategies.

Processing Goals

  • Cost Efficiency: Serverless processing with no idle compute costs
  • Scalability: Handle 100K+ events/day with auto-scaling
  • Reliability: Automated retries and error handling
  • Data Quality: Validation and enrichment at each layer

Processing Architecture

Data Processing Layers

graph LR
    subgraph "Raw to Bronze"
        RAW[Raw Events<br/>Avro Files]
        PARSE[Parse & Validate<br/>Schema Validation]
        BRONZE[Bronze Layer<br/>Parquet Files]
    end

    subgraph "Bronze to Silver"
        CLEANSE[Data Cleansing<br/>Standardization]
        ENRICH[Data Enrichment<br/>Lookups & Joins]
        SILVER[Silver Layer<br/>Delta Tables]
    end

    subgraph "Silver to Gold"
        AGG[Aggregations<br/>Business Metrics]
        CALC[Calculations<br/>KPIs & Measures]
        GOLD[Gold Layer<br/>Analytics Ready]
    end

    RAW --> PARSE
    PARSE --> BRONZE
    BRONZE --> CLEANSE
    CLEANSE --> ENRICH
    ENRICH --> SILVER
    SILVER --> AGG
    AGG --> CALC
    CALC --> GOLD

    style RAW fill:#FFE4E1
    style BRONZE fill:#DEB887
    style SILVER fill:#C0C0C0
    style GOLD fill:#FFD700

Processing Pipeline Overview

graph TD
    subgraph "Azure Data Factory"
        TRIGGER[Schedule Trigger<br/>Every Hour]
        PIPELINE[Main Pipeline]
        ACTIVITIES[Pipeline Activities]
    end

    subgraph "Synapse Serverless"
        POOL[SQL On-Demand Pool]
        VIEWS[External Tables/Views]
        QUERIES[Transformation SQL]
    end

    subgraph "Monitoring"
        MONITOR[Pipeline Monitoring]
        ALERTS[Failure Alerts]
        METRICS[Performance Metrics]
    end

    TRIGGER --> PIPELINE
    PIPELINE --> ACTIVITIES
    ACTIVITIES --> POOL
    POOL --> VIEWS
    VIEWS --> QUERIES

    PIPELINE --> MONITOR
    MONITOR --> ALERTS
    MONITOR --> METRICS

Azure Data Factory Configuration

Pipeline Architecture

graph TD
    subgraph "Main Processing Pipeline"
        START[Pipeline Start]
        CHECK[Check for New Files]
        PARALLEL[Parallel Processing]

        subgraph "Domain Processing"
            CRM[Process CRM Events]
            MATCHING[Process Matching Events]
            PROFILE[Process Profile Events]
        end

        subgraph "Aggregation"
            DAILY[Daily Aggregations]
            HOURLY[Hourly Metrics]
            CUSTOM[Custom KPIs]
        end

        CLEANUP[Cleanup & Archival]
        NOTIFY[Success Notification]
        ERROR[Error Handling]
    end

    START --> CHECK
    CHECK --> PARALLEL
    PARALLEL --> CRM
    PARALLEL --> MATCHING
    PARALLEL --> PROFILE

    CRM --> DAILY
    MATCHING --> HOURLY
    PROFILE --> CUSTOM

    DAILY --> CLEANUP
    HOURLY --> CLEANUP
    CUSTOM --> CLEANUP

    CLEANUP --> NOTIFY
    ERROR -.-> NOTIFY

Pipeline JSON Configuration

{
  "name": "MetricsPlatformProcessing",
  "properties": {
    "activities": [
      {
        "name": "CheckNewFiles",
        "type": "Lookup",
        "typeProperties": {
          "source": {
            "type": "SqlDWSource",
            "sqlReaderQuery": "SELECT * FROM OPENROWSET(BULK 'https://datalakemetricsplatform.dfs.core.windows.net/raw-events/*/*/*/*/*.avro', FORMAT = 'DELTA') AS new_files WHERE _metadata_file_path > '@{pipeline().parameters.LastProcessedPath}'"
          }
        }
      },
      {
        "name": "ProcessRawToBronze",
        "type": "SqlServerStoredProcedure",
        "dependsOn": [
          {
            "activity": "CheckNewFiles",
            "dependencyConditions": ["Succeeded"]
          }
        ],
        "typeProperties": {
          "storedProcedureName": "ProcessRawEventsToBronze",
          "storedProcedureParameters": {
            "ProcessingDate": {
              "value": "@formatDateTime(pipeline().TriggerTime, 'yyyy-MM-dd')",
              "type": "String"
            }
          }
        }
      },
      {
        "name": "ProcessBronzeToSilver",
        "type": "ForEach",
        "dependsOn": [
          {
            "activity": "ProcessRawToBronze",
            "dependencyConditions": ["Succeeded"]
          }
        ],
        "typeProperties": {
          "items": {
            "value": "@activity('CheckNewFiles').output.value",
            "type": "Expression"
          },
          "activities": [
            {
              "name": "TransformEvents",
              "type": "SqlServerStoredProcedure",
              "typeProperties": {
                "storedProcedureName": "ProcessBronzeToSilver",
                "storedProcedureParameters": {
                  "Domain": {
                    "value": "@item().domain",
                    "type": "String"
                  },
                  "EventType": {
                    "value": "@item().event_type",
                    "type": "String"
                  }
                }
              }
            }
          ]
        }
      }
    ],
    "parameters": {
      "LastProcessedPath": {
        "type": "String",
        "defaultValue": "1900-01-01"
      }
    },
    "triggers": [
      {
        "name": "HourlyProcessing",
        "type": "ScheduleTrigger",
        "typeProperties": {
          "recurrence": {
            "frequency": "Hour",
            "interval": 1,
            "startTime": "2024-01-01T00:00:00Z"
          }
        }
      }
    ]
  }
}

Synapse Serverless SQL Configuration

External Data Sources

-- Create master key for authentication
CREATE MASTER KEY ENCRYPTION BY PASSWORD = 'YourComplexPassword!';

-- Create database scoped credential
CREATE DATABASE SCOPED CREDENTIAL DataLakeCredential
WITH IDENTITY = 'Managed Identity';

-- Create external data sources
CREATE EXTERNAL DATA SOURCE RawEventsDataSource
WITH (
    LOCATION = 'https://datalakemetricsplatform.dfs.core.windows.net/raw-events/',
    CREDENTIAL = DataLakeCredential
);

CREATE EXTERNAL DATA SOURCE BronzeDataSource
WITH (
    LOCATION = 'https://datalakemetricsplatform.dfs.core.windows.net/bronze/',
    CREDENTIAL = DataLakeCredential
);

CREATE EXTERNAL DATA SOURCE SilverDataSource
WITH (
    LOCATION = 'https://datalakemetricsplatform.dfs.core.windows.net/silver/',
    CREDENTIAL = DataLakeCredential
);

CREATE EXTERNAL DATA SOURCE GoldDataSource
WITH (
    LOCATION = 'https://datalakemetricsplatform.dfs.core.windows.net/gold/',
    CREDENTIAL = DataLakeCredential
);

External Tables and Views

-- Raw events external table
CREATE EXTERNAL TABLE RawEvents (
    Body VARBINARY(MAX)
)
WITH (
    LOCATION = '**/*.avro',
    DATA_SOURCE = RawEventsDataSource,
    FILE_FORMAT = AvroFileFormat
);

-- Bronze events view with JSON parsing
CREATE VIEW BronzeEvents AS
SELECT 
    JSON_VALUE(CAST(Body AS NVARCHAR(MAX)), '$.eventId') AS event_id,
    CAST(JSON_VALUE(CAST(Body AS NVARCHAR(MAX)), '$.timestamp') AS DATETIME2) AS timestamp,
    JSON_VALUE(CAST(Body AS NVARCHAR(MAX)), '$.aggregateId') AS aggregate_id,
    JSON_VALUE(CAST(Body AS NVARCHAR(MAX)), '$.aggregateType') AS aggregate_type,
    JSON_VALUE(CAST(Body AS NVARCHAR(MAX)), '$.eventType') AS event_type,
    JSON_VALUE(CAST(Body AS NVARCHAR(MAX)), '$.version') AS version,
    JSON_VALUE(CAST(Body AS NVARCHAR(MAX)), '$.metadata.correlationId') AS correlation_id,
    JSON_VALUE(CAST(Body AS NVARCHAR(MAX)), '$.metadata.userId') AS user_id,
    JSON_VALUE(CAST(Body AS NVARCHAR(MAX)), '$.metadata.source') AS source,
    JSON_QUERY(CAST(Body AS NVARCHAR(MAX)), '$.payload') AS payload,
    YEAR(CAST(JSON_VALUE(CAST(Body AS NVARCHAR(MAX)), '$.timestamp') AS DATETIME2)) AS year,
    MONTH(CAST(JSON_VALUE(CAST(Body AS NVARCHAR(MAX)), '$.timestamp') AS DATETIME2)) AS month,
    DAY(CAST(JSON_VALUE(CAST(Body AS NVARCHAR(MAX)), '$.timestamp') AS DATETIME2)) AS day
FROM RawEvents
WHERE TRY_CAST(CAST(Body AS NVARCHAR(MAX)) AS JSON) IS NOT NULL;

-- Silver events external table
CREATE EXTERNAL TABLE SilverEvents (
    event_id NVARCHAR(36),
    timestamp DATETIME2,
    aggregate_id NVARCHAR(100),
    aggregate_type NVARCHAR(50),
    event_type NVARCHAR(50),
    version NVARCHAR(10),
    correlation_id NVARCHAR(36),
    user_id NVARCHAR(50),
    source NVARCHAR(50),
    payload NVARCHAR(MAX),
    year INT,
    month INT,
    day INT
)
WITH (
    LOCATION = 'business_events/**/*.parquet',
    DATA_SOURCE = SilverDataSource,
    FILE_FORMAT = ParquetFileFormat
);

Data Transformations

Raw to Bronze Processing

CREATE PROCEDURE ProcessRawEventsToBronze
    @ProcessingDate DATE
AS
BEGIN
    -- Validate and parse raw events
    INSERT INTO OPENROWSET(
        BULK 'https://datalakemetricsplatform.dfs.core.windows.net/bronze/events/',
        DATA_SOURCE = 'BronzeDataSource',
        FORMAT = 'PARQUET'
    ) 
    SELECT 
        event_id,
        timestamp,
        aggregate_id,
        aggregate_type,
        event_type,
        version,
        correlation_id,
        user_id,
        source,
        payload,
        year,
        month,
        day,
        GETUTCDATE() AS processed_at
    FROM BronzeEvents
    WHERE CAST(timestamp AS DATE) = @ProcessingDate
      AND event_id IS NOT NULL
      AND timestamp IS NOT NULL
      AND aggregate_id IS NOT NULL
      AND LEN(aggregate_id) > 0
      AND aggregate_type IS NOT NULL
      AND event_type IS NOT NULL;

    -- Log processing statistics
    INSERT INTO ProcessingLog (
        ProcessingDate,
        Layer,
        RecordsProcessed,
        RecordsSkipped,
        ProcessedAt
    )
    SELECT 
        @ProcessingDate,
        'Bronze',
        COUNT(*) FILTER (WHERE event_id IS NOT NULL),
        COUNT(*) FILTER (WHERE event_id IS NULL),
        GETUTCDATE()
    FROM BronzeEvents
    WHERE CAST(timestamp AS DATE) = @ProcessingDate;
END

Bronze to Silver Processing

CREATE PROCEDURE ProcessBronzeToSilver
    @Domain NVARCHAR(50),
    @EventType NVARCHAR(50)
AS
BEGIN
    -- Domain-specific transformations
    IF @Domain = 'CRM' AND @EventType = 'ContactCreated'
    BEGIN
        INSERT INTO OPENROWSET(
            BULK 'https://datalakemetricsplatform.dfs.core.windows.net/silver/contacts/',
            DATA_SOURCE = 'SilverDataSource',
            FORMAT = 'DELTA'
        )
        SELECT 
            event_id,
            timestamp,
            aggregate_id AS contact_id,
            JSON_VALUE(payload, '$.firstName') AS first_name,
            JSON_VALUE(payload, '$.lastName') AS last_name,
            JSON_VALUE(payload, '$.email') AS email,
            JSON_VALUE(payload, '$.company') AS company,
            CASE 
                WHEN JSON_VALUE(payload, '$.email') LIKE '%@gmail.com' THEN 'Gmail'
                WHEN JSON_VALUE(payload, '$.email') LIKE '%@outlook.com' THEN 'Outlook'
                ELSE 'Other'
            END AS email_provider,
            CASE 
                WHEN JSON_VALUE(payload, '$.company') IS NOT NULL THEN 'Business'
                ELSE 'Personal'
            END AS contact_type,
            correlation_id,
            user_id,
            source,
            year,
            month,
            day,
            GETUTCDATE() AS processed_at
        FROM SilverEvents
        WHERE aggregate_type = 'Contact'
          AND event_type = 'ContactCreated'
          AND CAST(timestamp AS DATE) = CAST(GETDATE() AS DATE);
    END

    -- Add more domain-specific processing...
END

Silver to Gold Aggregations

CREATE PROCEDURE ProcessSilverToGold
AS
BEGIN
    -- Daily contact metrics
    INSERT INTO OPENROWSET(
        BULK 'https://datalakemetricsplatform.dfs.core.windows.net/gold/daily_metrics/',
        DATA_SOURCE = 'GoldDataSource',
        FORMAT = 'DELTA'
    )
    SELECT 
        'contacts_created' AS metric_name,
        COUNT(*) AS metric_value,
        'count' AS metric_type,
        JSON_OBJECT(
            'domain': 'CRM',
            'entity': 'Contact',
            'period': 'daily'
        ) AS dimensions,
        timestamp,
        CAST(timestamp AS DATE) AS date,
        'Contact' AS aggregate_type,
        year,
        month,
        DAY(timestamp) AS day
    FROM SilverEvents
    WHERE aggregate_type = 'Contact'
      AND event_type = 'ContactCreated'
      AND CAST(timestamp AS DATE) = CAST(GETDATE() AS DATE)
    GROUP BY CAST(timestamp AS DATE), year, month, DAY(timestamp);

    -- Weekly aggregations
    INSERT INTO OPENROWSET(
        BULK 'https://datalakemetricsplatform.dfs.core.windows.net/gold/weekly_metrics/',
        DATA_SOURCE = 'GoldDataSource',
        FORMAT = 'DELTA'
    )
    SELECT 
        'contacts_created_weekly' AS metric_name,
        COUNT(*) AS metric_value,
        'count' AS metric_type,
        JSON_OBJECT(
            'domain': 'CRM',
            'entity': 'Contact',
            'period': 'weekly'
        ) AS dimensions,
        MAX(timestamp) AS timestamp,
        CAST(MAX(timestamp) AS DATE) AS date,
        'Contact' AS aggregate_type,
        year,
        month,
        DATEPART(week, timestamp) AS week
    FROM SilverEvents
    WHERE aggregate_type = 'Contact'
      AND event_type = 'ContactCreated'
      AND timestamp >= DATEADD(day, -7, GETDATE())
    GROUP BY year, month, DATEPART(week, timestamp);

    -- Business KPIs
    INSERT INTO OPENROWSET(
        BULK 'https://datalakemetricsplatform.dfs.core.windows.net/gold/business_kpis/',
        DATA_SOURCE = 'GoldDataSource',
        FORMAT = 'DELTA'
    )
    WITH business_metrics AS (
        SELECT 
            COUNT(DISTINCT CASE WHEN event_type = 'ContactCreated' THEN aggregate_id END) AS new_contacts,
            COUNT(DISTINCT CASE WHEN event_type = 'MatchCreated' THEN aggregate_id END) AS new_matches,
            COUNT(DISTINCT CASE WHEN event_type = 'DeclarationCompleted' THEN aggregate_id END) AS completed_declarations,
            CAST(timestamp AS DATE) AS date
        FROM SilverEvents
        WHERE CAST(timestamp AS DATE) = CAST(GETDATE() AS DATE)
        GROUP BY CAST(timestamp AS DATE)
    )
    SELECT 
        'conversion_rate' AS metric_name,
        CASE 
            WHEN new_contacts > 0 THEN (completed_declarations * 100.0) / new_contacts
            ELSE 0
        END AS metric_value,
        'percentage' AS metric_type,
        JSON_OBJECT('period': 'daily', 'calculation': 'declarations/contacts') AS dimensions,
        GETUTCDATE() AS timestamp,
        date,
        'Business' AS aggregate_type,
        YEAR(date) AS year,
        MONTH(date) AS month,
        DAY(date) AS day
    FROM business_metrics;
END

Monitoring and Error Handling

Pipeline Monitoring

graph TD
    subgraph "Monitoring Stack"
        ADF_MONITOR[ADF Monitoring]
        LOG_ANALYTICS[Log Analytics]
        ALERTS[Alert Rules]
        DASHBOARD[Monitoring Dashboard]
    end

    subgraph "Metrics"
        PIPELINE_RUNS[Pipeline Runs]
        ACTIVITY_RUNS[Activity Runs]
        TRIGGER_RUNS[Trigger Runs]
        DATA_FLOW[Data Flow Metrics]
    end

    subgraph "Alerts"
        FAILURE_ALERT[Pipeline Failure]
        DURATION_ALERT[Long Duration]
        DATA_QUALITY[Data Quality Issues]
        COST_ALERT[High Compute Costs]
    end

    ADF_MONITOR --> PIPELINE_RUNS
    ADF_MONITOR --> ACTIVITY_RUNS
    LOG_ANALYTICS --> ALERTS
    ALERTS --> DASHBOARD

    PIPELINE_RUNS --> FAILURE_ALERT
    ACTIVITY_RUNS --> DURATION_ALERT
    DATA_FLOW --> DATA_QUALITY
    DATA_FLOW --> COST_ALERT

Error Handling Logic

{
  "name": "ErrorHandlingActivity",
  "type": "WebActivity",
  "typeProperties": {
    "url": "https://prod-27.westeurope.logic.azure.com/workflows/.../triggers/manual/paths/invoke",
    "method": "POST",
    "headers": {
      "Content-Type": "application/json"
    },
    "body": {
      "PipelineName": "@pipeline().Pipeline",
      "RunId": "@pipeline().RunId",
      "ErrorMessage": "@activity('ProcessRawToBronze').error.message",
      "TriggerTime": "@pipeline().TriggerTime",
      "FailedActivity": "ProcessRawToBronze"
    }
  },
  "onInactiveMarkAs": "Failed"
}

Data Quality Checks

CREATE PROCEDURE ValidateDataQuality
    @Layer NVARCHAR(20),
    @ProcessingDate DATE
AS
BEGIN
    DECLARE @QualityScore DECIMAL(5,2);
    DECLARE @ErrorThreshold DECIMAL(5,2) = 95.0; -- 95% quality threshold

    IF @Layer = 'Bronze'
    BEGIN
        SELECT @QualityScore = (
            COUNT(CASE WHEN event_id IS NOT NULL 
                       AND timestamp IS NOT NULL 
                       AND aggregate_id IS NOT NULL 
                       AND aggregate_id != '' 
                       THEN 1 END) * 100.0
        ) / NULLIF(COUNT(*), 0)
        FROM BronzeEvents
        WHERE CAST(timestamp AS DATE) = @ProcessingDate;
    END

    IF @Layer = 'Silver'
    BEGIN
        SELECT @QualityScore = (
            COUNT(CASE WHEN event_id IS NOT NULL 
                       AND processed_at IS NOT NULL
                       AND JSON_VALID(payload) = 1
                       THEN 1 END) * 100.0
        ) / NULLIF(COUNT(*), 0)
        FROM SilverEvents
        WHERE CAST(timestamp AS DATE) = @ProcessingDate;
    END

    -- Log quality metrics
    INSERT INTO DataQualityLog (
        ProcessingDate,
        Layer,
        QualityScore,
        Threshold,
        Status,
        CreatedAt
    )
    VALUES (
        @ProcessingDate,
        @Layer,
        @QualityScore,
        @ErrorThreshold,
        CASE WHEN @QualityScore >= @ErrorThreshold THEN 'PASS' ELSE 'FAIL' END,
        GETUTCDATE()
    );

    -- Raise error if quality is below threshold
    IF @QualityScore < @ErrorThreshold
    BEGIN
        THROW 50001, 'Data quality below threshold', 1;
    END
END

Performance Optimization

Query Optimization

-- Optimize partition elimination
SELECT 
    aggregate_type,
    COUNT(*) AS event_count,
    AVG(CAST(JSON_VALUE(payload, '$.value') AS FLOAT)) AS avg_value
FROM SilverEvents
WHERE year = 2024
  AND month = 1
  AND aggregate_type IN ('Contact', 'Match')
GROUP BY aggregate_type;

-- Use column pruning
SELECT 
    event_id,
    timestamp,
    aggregate_type,
    JSON_VALUE(payload, '$.status') AS status
FROM SilverEvents
WHERE year = 2024
  AND month = 1
  AND JSON_VALUE(payload, '$.status') = 'Active';

-- Leverage statistics for better query plans
UPDATE STATISTICS SilverEvents;

Parallel Processing

# Example using Azure Functions for parallel processing
import azure.functions as func
from concurrent.futures import ThreadPoolExecutor
import pandas as pd

def process_partition(partition_path):
    """Process a single partition"""
    df = pd.read_parquet(partition_path)

    # Apply transformations
    df['processed_at'] = pd.Timestamp.now()
    df['email_domain'] = df['email'].str.split('@').str[1]

    # Write to silver layer
    output_path = partition_path.replace('bronze', 'silver')
    df.to_parquet(output_path, compression='snappy')

    return len(df)

def main(req: func.HttpRequest) -> func.HttpResponse:
    # Get partition paths
    partition_paths = get_partition_paths()

    # Process partitions in parallel
    with ThreadPoolExecutor(max_workers=10) as executor:
        results = list(executor.map(process_partition, partition_paths))

    total_records = sum(results)

    return func.HttpResponse(
        f"Processed {total_records} records across {len(partition_paths)} partitions",
        status_code=200
    )

Scheduling and Orchestration

Processing Schedule

gantt
    title Data Processing Schedule
    dateFormat HH:mm
    axisFormat %H:%M

    section Raw to Bronze
    Parse Events    :active, parse, 00:05, 00:15

    section Bronze to Silver
    CRM Transform   :active, crm, 00:15, 00:25
    Matching Transform :active, match, 00:15, 00:25
    Profile Transform :active, profile, 00:15, 00:25

    section Silver to Gold
    Daily Metrics   :gold, 00:30, 00:40
    Weekly Metrics  :weekly, 00:40, 00:45
    Monthly Metrics :monthly, 00:45, 00:50

    section Cleanup
    Archive Files   :cleanup, 00:50, 00:55

Pipeline Dependencies

{
  "name": "DependencyManagement",
  "properties": {
    "activities": [
      {
        "name": "WaitForUpstreamData",
        "type": "Until",
        "typeProperties": {
          "expression": {
            "value": "@greater(activity('CheckDataAvailability').output.rowCount, 0)",
            "type": "Expression"
          },
          "activities": [
            {
              "name": "CheckDataAvailability",
              "type": "Lookup",
              "typeProperties": {
                "source": {
                  "type": "SqlSource",
                  "sqlReaderQuery": "SELECT COUNT(*) as rowCount FROM OPENROWSET(BULK 'raw-events/**/*.avro') WHERE _metadata_file_last_modified > '@{addHours(pipeline().TriggerTime, -1)}'"
                }
              }
            },
            {
              "name": "WaitActivity",
              "type": "Wait",
              "typeProperties": {
                "waitTimeInSeconds": 300
              }
            }
          ],
          "timeout": "0.01:00:00"
        }
      }
    ]
  }
}

Best Practices

  1. Processing Design
  2. Use idempotent transformations
  3. Implement incremental processing
  4. Partition data appropriately
  5. Use columnar formats (Parquet/Delta)

  6. Error Handling

  7. Implement retry mechanisms
  8. Log detailed error information
  9. Use circuit breaker patterns
  10. Set up alerting for failures

  11. Performance

  12. Optimize partition pruning
  13. Use parallel processing
  14. Cache frequently used data
  15. Monitor query performance

  16. Data Quality

  17. Validate at ingestion
  18. Implement data profiling
  19. Monitor drift detection
  20. Maintain data lineage

Troubleshooting

Common Issues

Issue Symptoms Solution
Pipeline Timeouts Long-running activities Increase timeout, optimize queries
Memory Errors OOM exceptions Reduce batch size, add more memory
Authentication Failures 401/403 errors Check managed identity permissions
Data Skew Uneven partition processing Review partitioning strategy
Schema Drift Parsing failures Implement schema evolution handling

Debug Query Template

-- Debug processing issues
SELECT 
    pipeline_name,
    run_id,
    status,
    start_time,
    end_time,
    DATEDIFF(minute, start_time, end_time) as duration_minutes,
    error_message
FROM PipelineRunLog
WHERE start_time >= DATEADD(day, -7, GETDATE())
  AND status IN ('Failed', 'Cancelled')
ORDER BY start_time DESC;

Next Steps