Skip to content

Real-time Streaming Integration

Overview

The real-time streaming integration provides near-instantaneous data flow from Matomo to Power BI using Azure's streaming infrastructure. This approach is ideal for live dashboards, real-time monitoring, and immediate alerting on key metrics.

Architecture

graph TB
    subgraph "Matomo Cloud"
        MA[Matomo Analytics]
        MAPI[Reporting API]
        MW[Webhooks<br/>If Available]
    end

    subgraph "Ingestion Layer"
        AF[Azure Function<br/>Event Triggered]
        AKV[Azure Key Vault]
        APIM[API Management<br/>Optional]
    end

    subgraph "Streaming Layer"
        EH[Event Hubs<br/>Partitioned]
        ASA[Stream Analytics<br/>Jobs]
        ASAQ[Stream Analytics<br/>Queries]
    end

    subgraph "Storage Layer"
        ADLS[Data Lake<br/>Archive]
        COS[Cosmos DB<br/>Hot Path]
    end

    subgraph "Power BI"
        PDS[Push Dataset<br/>5M rows]
        SDS[Streaming Dataset]
        DASH[Live Dashboard]
        ALERT[Alerts]
    end

    MA --> MAPI
    MAPI -->|Poll/Push| AF
    MW -.->|Webhook| AF
    AKV -->|Secrets| AF
    APIM --> AF
    AF -->|Events| EH
    EH --> ASA
    ASA --> ASAQ
    ASAQ -->|Archive| ADLS
    ASAQ -->|Hot Data| COS
    ASAQ -->|Stream| PDS
    ASAQ -->|Stream| SDS
    PDS --> DASH
    SDS --> DASH
    DASH --> ALERT

    style MA fill:#f9f,stroke:#333,stroke-width:2px
    style DASH fill:#9f9,stroke:#333,stroke-width:2px
    style EH fill:#ffd,stroke:#333,stroke-width:2px
    style ASA fill:#dfd,stroke:#333,stroke-width:2px

Data Flow Patterns

sequenceDiagram
    participant M as Matomo
    participant AF as Azure Function
    participant EH as Event Hub
    participant ASA as Stream Analytics
    participant PBI as Power BI
    participant User as Dashboard User

    loop Every 60 seconds
        AF->>M: Fetch latest metrics
        M-->>AF: Return JSON data
        AF->>AF: Transform to events
        AF->>EH: Send events
    end

    EH->>ASA: Stream events
    ASA->>ASA: Window aggregation
    ASA->>PBI: Push to dataset
    PBI->>PBI: Update visuals
    User->>PBI: View dashboard
    PBI-->>User: Real-time data

Implementation Steps

Step 1: Create Event Hub Namespace

# Variables
RESOURCE_GROUP="rg-matomo-streaming"
LOCATION="westeurope"
NAMESPACE="eh-matomo-stream"
EVENT_HUB="matomo-events"

# Create Event Hub Namespace
az eventhubs namespace create \
    --name $NAMESPACE \
    --resource-group $RESOURCE_GROUP \
    --location $LOCATION \
    --sku Standard \
    --capacity 1

# Create Event Hub with partitions
az eventhubs eventhub create \
    --name $EVENT_HUB \
    --namespace-name $NAMESPACE \
    --resource-group $RESOURCE_GROUP \
    --partition-count 4 \
    --message-retention 1

# Create consumer group for Stream Analytics
az eventhubs eventhub consumer-group create \
    --name streamanalytics \
    --eventhub-name $EVENT_HUB \
    --namespace-name $NAMESPACE \
    --resource-group $RESOURCE_GROUP

# Get connection string
az eventhubs namespace authorization-rule keys list \
    --name RootManageSharedAccessKey \
    --namespace-name $NAMESPACE \
    --resource-group $RESOURCE_GROUP \
    --query primaryConnectionString -o tsv

Step 2: Create Azure Function for Real-time Data Ingestion

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Azure.EventHubs;
using Microsoft.Azure.WebJobs;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using System.Net.Http;
using Azure.Security.KeyVault.Secrets;
using Azure.Identity;

public static class MatomoStreamingIngestion
{
    private static readonly HttpClient httpClient = new HttpClient();
    private static EventHubClient eventHubClient;

    [FunctionName("StreamMatomoData")]
    public static async Task Run(
        [TimerTrigger("*/60 * * * * *")] TimerInfo myTimer,
        ILogger log)
    {
        log.LogInformation($"Streaming Matomo data at: {DateTime.Now}");

        try
        {
            // Initialize Event Hub client if needed
            if (eventHubClient == null)
            {
                var connectionString = Environment.GetEnvironmentVariable("EventHubConnectionString");
                eventHubClient = EventHubClient.CreateFromConnectionString(connectionString);
            }

            // Get Matomo configuration
            var matomoUrl = Environment.GetEnvironmentVariable("MatomoUrl");
            var siteId = Environment.GetEnvironmentVariable("MatomoSiteId");
            var apiToken = await GetApiToken();

            // Fetch real-time data
            var realtimeData = await FetchRealtimeMetrics(matomoUrl, siteId, apiToken);

            // Transform to events
            var events = TransformToEvents(realtimeData);

            // Send to Event Hub
            var batch = eventHubClient.CreateBatch();
            foreach (var evt in events)
            {
                var eventData = new EventData(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(evt)));
                eventData.Properties["EventType"] = evt.EventType;
                eventData.Properties["Timestamp"] = evt.Timestamp.ToString("o");

                if (!batch.TryAdd(eventData))
                {
                    // Send current batch and start new one
                    await eventHubClient.SendAsync(batch);
                    batch = eventHubClient.CreateBatch();
                    batch.TryAdd(eventData);
                }
            }

            // Send remaining events
            if (batch.Count > 0)
            {
                await eventHubClient.SendAsync(batch);
            }

            log.LogInformation($"Sent {events.Count} events to Event Hub");
        }
        catch (Exception ex)
        {
            log.LogError($"Error streaming data: {ex.Message}");
            throw;
        }
    }

    private static async Task<string> GetApiToken()
    {
        var keyVaultUrl = Environment.GetEnvironmentVariable("KeyVaultUrl");
        var client = new SecretClient(new Uri(keyVaultUrl), new DefaultAzureCredential());
        var secret = await client.GetSecretAsync("MatomoApiToken");
        return secret.Value.Value;
    }

    private static async Task<MatomoRealtimeData> FetchRealtimeMetrics(
        string baseUrl, string siteId, string apiToken)
    {
        var apiUrl = $"{baseUrl}/index.php" +
            $"?module=API" +
            $"&method=Live.getLastVisitsDetails" +
            $"&idSite={siteId}" +
            $"&period=day" +
            $"&date=today" +
            $"&format=JSON" +
            $"&token_auth={apiToken}" +
            $"&filter_limit=100";

        var response = await httpClient.GetStringAsync(apiUrl);
        return JsonConvert.DeserializeObject<MatomoRealtimeData>(response);
    }

    private static List<StreamingEvent> TransformToEvents(MatomoRealtimeData data)
    {
        var events = new List<StreamingEvent>();

        // Transform visits to events
        foreach (var visit in data.Visits)
        {
            events.Add(new StreamingEvent
            {
                EventType = "PageView",
                Timestamp = DateTime.UtcNow,
                VisitorId = visit.VisitorId,
                SessionId = visit.IdVisit,
                PageUrl = visit.LastActionUrl,
                PageTitle = visit.LastActionTitle,
                TimeOnPage = visit.TimeOnPage,
                Country = visit.Country,
                Device = visit.DeviceType,
                Browser = visit.Browser,
                Properties = new Dictionary<string, object>
                {
                    ["referrer"] = visit.Referrer,
                    ["campaign"] = visit.Campaign,
                    ["isNewVisitor"] = visit.IsNewVisitor
                }
            });
        }

        return events;
    }
}

public class StreamingEvent
{
    public string EventType { get; set; }
    public DateTime Timestamp { get; set; }
    public string VisitorId { get; set; }
    public string SessionId { get; set; }
    public string PageUrl { get; set; }
    public string PageTitle { get; set; }
    public int TimeOnPage { get; set; }
    public string Country { get; set; }
    public string Device { get; set; }
    public string Browser { get; set; }
    public Dictionary<string, object> Properties { get; set; }
}

Step 3: Configure Stream Analytics Job

-- Input configuration
CREATE INPUT EventHubInput
FROM EventHub
WITH (
    EventHubName = 'matomo-events',
    ConsumerGroupName = 'streamanalytics'
);

-- Output to Power BI
CREATE OUTPUT PowerBIOutput
TO PowerBI
WITH (
    Dataset = 'MatomoRealtime',
    Table = 'Metrics',
    AuthenticationMode = 'UserToken'
);

-- Output to Data Lake for archival
CREATE OUTPUT DataLakeOutput
TO DataLake
WITH (
    AccountName = 'stmatomoanalytics',
    Container = 'streaming-archive',
    PathPattern = '{date}/{time}',
    DateFormat = 'yyyy/MM/dd',
    TimeFormat = 'HH'
);

-- Real-time aggregation query
WITH RealtimeMetrics AS (
    SELECT
        System.Timestamp() as WindowEnd,
        COUNT(*) as PageViews,
        COUNT(DISTINCT VisitorId) as UniqueVisitors,
        COUNT(DISTINCT SessionId) as Sessions,
        AVG(TimeOnPage) as AvgTimeOnPage,
        PageUrl,
        Country,
        Device
    FROM EventHubInput TIMESTAMP BY Timestamp
    GROUP BY
        PageUrl,
        Country,
        Device,
        TumblingWindow(minute, 1)
),
SessionMetrics AS (
    SELECT
        System.Timestamp() as WindowEnd,
        SessionId,
        VisitorId,
        COUNT(*) as PageViewsPerSession,
        SUM(TimeOnPage) as SessionDuration,
        COLLECT() as PagesVisited
    FROM EventHubInput TIMESTAMP BY Timestamp
    GROUP BY
        SessionId,
        VisitorId,
        SessionWindow(minute, 30, 5)
)

-- Output to Power BI
SELECT
    WindowEnd,
    PageViews,
    UniqueVisitors,
    Sessions,
    AvgTimeOnPage,
    PageUrl,
    Country,
    Device
INTO PowerBIOutput
FROM RealtimeMetrics;

-- Archive raw events
SELECT
    *
INTO DataLakeOutput
FROM EventHubInput TIMESTAMP BY Timestamp;

Step 4: Create Power BI Streaming Dataset

{
    "name": "MatomoRealtime",
    "defaultMode": "pushStreaming",
    "tables": [
        {
            "name": "Metrics",
            "columns": [
                {
                    "name": "WindowEnd",
                    "dataType": "DateTime"
                },
                {
                    "name": "PageViews",
                    "dataType": "Int64"
                },
                {
                    "name": "UniqueVisitors",
                    "dataType": "Int64"
                },
                {
                    "name": "Sessions",
                    "dataType": "Int64"
                },
                {
                    "name": "AvgTimeOnPage",
                    "dataType": "Double"
                },
                {
                    "name": "PageUrl",
                    "dataType": "String"
                },
                {
                    "name": "Country",
                    "dataType": "String"
                },
                {
                    "name": "Device",
                    "dataType": "String"
                }
            ]
        }
    ]
}

PowerShell script to create dataset:

# Install Power BI PowerShell module
Install-Module -Name MicrosoftPowerBIMgmt -Force

# Connect to Power BI
Connect-PowerBIServiceAccount

# Create streaming dataset
$datasetSchema = @{
    name = "MatomoRealtime"
    defaultMode = "pushStreaming"
    tables = @(
        @{
            name = "Metrics"
            columns = @(
                @{name = "WindowEnd"; dataType = "DateTime"},
                @{name = "PageViews"; dataType = "Int64"},
                @{name = "UniqueVisitors"; dataType = "Int64"},
                @{name = "Sessions"; dataType = "Int64"},
                @{name = "AvgTimeOnPage"; dataType = "Double"},
                @{name = "PageUrl"; dataType = "String"},
                @{name = "Country"; dataType = "String"},
                @{name = "Device"; dataType = "String"}
            )
        }
    )
}

$workspace = Get-PowerBIWorkspace -Name "Matomo Analytics"
$dataset = New-PowerBIDataset -WorkspaceId $workspace.Id -Dataset $datasetSchema

Step 5: Build Real-time Dashboard

graph TB
    subgraph "Dashboard Components"
        subgraph "KPI Tiles"
            KPI1[Current Visitors<br/>Card Visual]
            KPI2[Page Views/Min<br/>Card Visual]
            KPI3[Avg Session Time<br/>Gauge Visual]
            KPI4[Bounce Rate<br/>KPI Visual]
        end

        subgraph "Time Series"
            TS1[Visitors Over Time<br/>Line Chart]
            TS2[Page Views Trend<br/>Area Chart]
        end

        subgraph "Breakdowns"
            BD1[Top Pages<br/>Bar Chart]
            BD2[Device Distribution<br/>Pie Chart]
            BD3[Geographic Map<br/>Map Visual]
        end

        subgraph "Detailed Views"
            DV1[Session Details<br/>Table]
            DV2[Real-time Feed<br/>Stream Visual]
        end
    end

    style KPI1 fill:#ffd,stroke:#333,stroke-width:2px
    style TS1 fill:#dfd,stroke:#333,stroke-width:2px
    style BD3 fill:#dff,stroke:#333,stroke-width:2px

Advanced Streaming Patterns

Pattern 1: Anomaly Detection

-- Stream Analytics anomaly detection
WITH AnomalyDetection AS (
    SELECT
        System.Timestamp() as Time,
        PageViews,
        ANOMALYDETECTION(PageViews) OVER (
            LIMIT DURATION(hour, 2)
        ) as AnomalyScore
    FROM EventHubInput TIMESTAMP BY Timestamp
    GROUP BY TumblingWindow(minute, 5)
)

SELECT
    Time,
    PageViews,
    AnomalyScore,
    CASE
        WHEN AnomalyScore > 0.95 THEN 'High'
        WHEN AnomalyScore > 0.75 THEN 'Medium'
        ELSE 'Low'
    END as AnomalyLevel
INTO AnomalyOutput
FROM AnomalyDetection
WHERE AnomalyScore > 0.75;

Pattern 2: Complex Event Processing

[FunctionName("ProcessComplexEvents")]
public static async Task ProcessUserJourney(
    [EventHubTrigger("matomo-events", Connection = "EventHubConnectionString")] 
    EventData[] events,
    ILogger log)
{
    var userJourneys = new Dictionary<string, UserJourney>();

    foreach (var eventData in events)
    {
        var evt = JsonConvert.DeserializeObject<StreamingEvent>(
            Encoding.UTF8.GetString(eventData.Body)
        );

        if (!userJourneys.ContainsKey(evt.VisitorId))
        {
            userJourneys[evt.VisitorId] = new UserJourney
            {
                VisitorId = evt.VisitorId,
                StartTime = evt.Timestamp,
                Pages = new List<PageVisit>()
            };
        }

        var journey = userJourneys[evt.VisitorId];
        journey.Pages.Add(new PageVisit
        {
            PageUrl = evt.PageUrl,
            Timestamp = evt.Timestamp,
            TimeOnPage = evt.TimeOnPage
        });

        // Detect conversion patterns
        if (IsConversionPath(journey))
        {
            await SendConversionAlert(journey);
        }

        // Detect abandonment patterns
        if (IsAbandonmentPattern(journey))
        {
            await SendAbandonmentAlert(journey);
        }
    }
}

Pattern 3: Real-time Personalization

sequenceDiagram
    participant User as Website User
    participant Web as Website
    participant Stream as Stream Processing
    participant ML as ML Model
    participant PBI as Power BI

    User->>Web: Visit page
    Web->>Stream: Send event
    Stream->>ML: Get predictions
    ML-->>Stream: Personalization data
    Stream->>PBI: Update dashboard
    Stream->>Web: Send recommendations
    Web->>User: Show personalized content

Performance Optimization

Event Hub Partitioning

// Partition by visitor ID for better distribution
public static string GetPartitionKey(StreamingEvent evt)
{
    // Use consistent hashing for even distribution
    var hash = evt.VisitorId.GetHashCode();
    var partitionCount = 4; // Match Event Hub partition count
    var partition = Math.Abs(hash) % partitionCount;
    return partition.ToString();
}

// Send with partition key
var eventData = new EventData(Encoding.UTF8.GetBytes(json));
await eventHubClient.SendAsync(eventData, GetPartitionKey(evt));

Stream Analytics Optimization

-- Use partitioning for parallel processing
WITH PartitionedInput AS (
    SELECT
        *,
        PartitionId
    FROM EventHubInput
    PARTITION BY PartitionId
)

SELECT
    System.Timestamp() as Time,
    PartitionId,
    COUNT(*) as EventCount
INTO Output
FROM PartitionedInput TIMESTAMP BY Timestamp
GROUP BY
    PartitionId,
    TumblingWindow(minute, 1);

Monitoring and Alerting

Azure Monitor Alerts

{
    "type": "Microsoft.Insights/metricAlerts",
    "name": "HighEventLatency",
    "properties": {
        "severity": 2,
        "enabled": true,
        "scopes": [
            "/subscriptions/{subscription}/resourceGroups/{rg}/providers/Microsoft.EventHub/namespaces/{namespace}"
        ],
        "evaluationFrequency": "PT5M",
        "windowSize": "PT15M",
        "criteria": {
            "odata.type": "Microsoft.Azure.Monitor.SingleResourceMultipleMetricCriteria",
            "allOf": [
                {
                    "name": "EventProcessingLatency",
                    "metricName": "IncomingMessages",
                    "operator": "LessThan",
                    "threshold": 100,
                    "timeAggregation": "Average"
                }
            ]
        },
        "actions": [
            {
                "actionGroupId": "/subscriptions/{subscription}/resourceGroups/{rg}/providers/Microsoft.Insights/actionGroups/{actionGroup}"
            }
        ]
    }
}

Power BI Alerts

// Configure data-driven alerts in Power BI
public class PowerBIAlertConfiguration
{
    public string AlertName { get; set; } = "High Traffic Alert";
    public string Condition { get; set; } = "PageViews > 1000";
    public string Frequency { get; set; } = "Hourly";
    public List<string> Recipients { get; set; } = new List<string>
    {
        "analytics-team@company.com"
    };
}

Scaling Considerations

graph LR
    subgraph "Scaling Strategy"
        subgraph "Event Hubs"
            EH1[Throughput Units<br/>1-20 TU]
            EH2[Partitions<br/>2-32]
            EH3[Auto-inflate<br/>Enabled]
        end

        subgraph "Stream Analytics"
            SA1[Streaming Units<br/>1-192 SU]
            SA2[Parallel Jobs]
            SA3[Query Optimization]
        end

        subgraph "Power BI"
            PB1[Premium Capacity]
            PB2[Dedicated Pool]
            PB3[Incremental Refresh]
        end
    end

    EH1 --> SA1
    EH2 --> SA2
    SA1 --> PB1

    style EH1 fill:#ffd,stroke:#333,stroke-width:2px
    style SA1 fill:#dfd,stroke:#333,stroke-width:2px
    style PB1 fill:#dff,stroke:#333,stroke-width:2px

Auto-scaling Configuration

# Enable auto-inflate for Event Hubs
az eventhubs namespace update \
    --name $NAMESPACE \
    --resource-group $RESOURCE_GROUP \
    --enable-auto-inflate true \
    --maximum-throughput-units 10

# Configure Stream Analytics auto-scale
az stream-analytics job scale \
    --name matomo-streaming \
    --resource-group $RESOURCE_GROUP \
    --streaming-units 6

Cost Analysis

Detailed Cost Breakdown

Component Configuration Unit Price Est. Monthly Cost
Event Hubs Standard, 2 TU, 4 partitions $0.03/hour/TU $45
Stream Analytics 3 SU $0.11/hour/SU $240
Azure Functions Consumption, 2.6M executions $0.20/million $10
Data Lake Archive 100GB/month $0.02/GB $2
Cosmos DB 400 RU/s $0.008/hour/100RU $23
Power BI Premium Per User (5 users) $20/user $100
Total ~$420/month

Cost Optimization Strategies

  1. Use Event Hubs Basic tier if advanced features not needed (-50% cost)
  2. Optimize Stream Analytics queries to reduce SU requirements
  3. Implement data sampling for high-volume scenarios
  4. Use Power BI Pro + Paginated Reports instead of Premium if possible
  5. Archive old streaming data to cold storage

Limitations and Considerations

Power BI Push Dataset Limitations

  • Maximum 5 million rows per dataset
  • Maximum 1 million rows added per hour
  • Maximum 120 POST requests per minute
  • Maximum 16KB per row

Event Hubs Limitations

  • Maximum message size: 1MB
  • Maximum retention: 7 days (Standard tier)
  • Maximum throughput: 20 TU (Standard tier)

Stream Analytics Limitations

  • Maximum SU: 192 per job
  • Query complexity affects SU requirements
  • State size limitations for windowed aggregations

Disaster Recovery

Multi-region Setup

# Create secondary Event Hub in different region
az eventhubs namespace create \
    --name $NAMESPACE-dr \
    --resource-group $RESOURCE_GROUP \
    --location northeurope \
    --sku Standard

# Configure geo-disaster recovery
az eventhubs namespace authorization-rule create \
    --name GeoRecovery \
    --namespace-name $NAMESPACE \
    --resource-group $RESOURCE_GROUP \
    --rights Listen Send Manage

# Pair namespaces
az eventhubs georecovery-alias set \
    --alias matomo-stream-alias \
    --namespace-name $NAMESPACE \
    --resource-group $RESOURCE_GROUP \
    --partner-namespace /subscriptions/{sub}/resourceGroups/{rg}/providers/Microsoft.EventHub/namespaces/{namespace-dr}

Next Steps

  1. Evaluate streaming requirements and data volumes
  2. Set up Event Hubs and Stream Analytics
  3. Create Power BI streaming datasets
  4. Implement monitoring and alerting
  5. Test failover and disaster recovery procedures
  6. Optimize based on actual usage patterns