Real-time Streaming Integration¶
Overview¶
The real-time streaming integration provides near-instantaneous data flow from Matomo to Power BI using Azure's streaming infrastructure. This approach is ideal for live dashboards, real-time monitoring, and immediate alerting on key metrics.
Architecture¶
graph TB
subgraph "Matomo Cloud"
MA[Matomo Analytics]
MAPI[Reporting API]
MW[Webhooks<br/>If Available]
end
subgraph "Ingestion Layer"
AF[Azure Function<br/>Event Triggered]
AKV[Azure Key Vault]
APIM[API Management<br/>Optional]
end
subgraph "Streaming Layer"
EH[Event Hubs<br/>Partitioned]
ASA[Stream Analytics<br/>Jobs]
ASAQ[Stream Analytics<br/>Queries]
end
subgraph "Storage Layer"
ADLS[Data Lake<br/>Archive]
COS[Cosmos DB<br/>Hot Path]
end
subgraph "Power BI"
PDS[Push Dataset<br/>5M rows]
SDS[Streaming Dataset]
DASH[Live Dashboard]
ALERT[Alerts]
end
MA --> MAPI
MAPI -->|Poll/Push| AF
MW -.->|Webhook| AF
AKV -->|Secrets| AF
APIM --> AF
AF -->|Events| EH
EH --> ASA
ASA --> ASAQ
ASAQ -->|Archive| ADLS
ASAQ -->|Hot Data| COS
ASAQ -->|Stream| PDS
ASAQ -->|Stream| SDS
PDS --> DASH
SDS --> DASH
DASH --> ALERT
style MA fill:#f9f,stroke:#333,stroke-width:2px
style DASH fill:#9f9,stroke:#333,stroke-width:2px
style EH fill:#ffd,stroke:#333,stroke-width:2px
style ASA fill:#dfd,stroke:#333,stroke-width:2px
Data Flow Patterns¶
sequenceDiagram
participant M as Matomo
participant AF as Azure Function
participant EH as Event Hub
participant ASA as Stream Analytics
participant PBI as Power BI
participant User as Dashboard User
loop Every 60 seconds
AF->>M: Fetch latest metrics
M-->>AF: Return JSON data
AF->>AF: Transform to events
AF->>EH: Send events
end
EH->>ASA: Stream events
ASA->>ASA: Window aggregation
ASA->>PBI: Push to dataset
PBI->>PBI: Update visuals
User->>PBI: View dashboard
PBI-->>User: Real-time data
Implementation Steps¶
Step 1: Create Event Hub Namespace¶
# Variables
RESOURCE_GROUP="rg-matomo-streaming"
LOCATION="westeurope"
NAMESPACE="eh-matomo-stream"
EVENT_HUB="matomo-events"
# Create Event Hub Namespace
az eventhubs namespace create \
--name $NAMESPACE \
--resource-group $RESOURCE_GROUP \
--location $LOCATION \
--sku Standard \
--capacity 1
# Create Event Hub with partitions
az eventhubs eventhub create \
--name $EVENT_HUB \
--namespace-name $NAMESPACE \
--resource-group $RESOURCE_GROUP \
--partition-count 4 \
--message-retention 1
# Create consumer group for Stream Analytics
az eventhubs eventhub consumer-group create \
--name streamanalytics \
--eventhub-name $EVENT_HUB \
--namespace-name $NAMESPACE \
--resource-group $RESOURCE_GROUP
# Get connection string
az eventhubs namespace authorization-rule keys list \
--name RootManageSharedAccessKey \
--namespace-name $NAMESPACE \
--resource-group $RESOURCE_GROUP \
--query primaryConnectionString -o tsv
Step 2: Create Azure Function for Real-time Data Ingestion¶
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Azure.EventHubs;
using Microsoft.Azure.WebJobs;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using System.Net.Http;
using Azure.Security.KeyVault.Secrets;
using Azure.Identity;
public static class MatomoStreamingIngestion
{
private static readonly HttpClient httpClient = new HttpClient();
private static EventHubClient eventHubClient;
[FunctionName("StreamMatomoData")]
public static async Task Run(
[TimerTrigger("*/60 * * * * *")] TimerInfo myTimer,
ILogger log)
{
log.LogInformation($"Streaming Matomo data at: {DateTime.Now}");
try
{
// Initialize Event Hub client if needed
if (eventHubClient == null)
{
var connectionString = Environment.GetEnvironmentVariable("EventHubConnectionString");
eventHubClient = EventHubClient.CreateFromConnectionString(connectionString);
}
// Get Matomo configuration
var matomoUrl = Environment.GetEnvironmentVariable("MatomoUrl");
var siteId = Environment.GetEnvironmentVariable("MatomoSiteId");
var apiToken = await GetApiToken();
// Fetch real-time data
var realtimeData = await FetchRealtimeMetrics(matomoUrl, siteId, apiToken);
// Transform to events
var events = TransformToEvents(realtimeData);
// Send to Event Hub
var batch = eventHubClient.CreateBatch();
foreach (var evt in events)
{
var eventData = new EventData(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(evt)));
eventData.Properties["EventType"] = evt.EventType;
eventData.Properties["Timestamp"] = evt.Timestamp.ToString("o");
if (!batch.TryAdd(eventData))
{
// Send current batch and start new one
await eventHubClient.SendAsync(batch);
batch = eventHubClient.CreateBatch();
batch.TryAdd(eventData);
}
}
// Send remaining events
if (batch.Count > 0)
{
await eventHubClient.SendAsync(batch);
}
log.LogInformation($"Sent {events.Count} events to Event Hub");
}
catch (Exception ex)
{
log.LogError($"Error streaming data: {ex.Message}");
throw;
}
}
private static async Task<string> GetApiToken()
{
var keyVaultUrl = Environment.GetEnvironmentVariable("KeyVaultUrl");
var client = new SecretClient(new Uri(keyVaultUrl), new DefaultAzureCredential());
var secret = await client.GetSecretAsync("MatomoApiToken");
return secret.Value.Value;
}
private static async Task<MatomoRealtimeData> FetchRealtimeMetrics(
string baseUrl, string siteId, string apiToken)
{
var apiUrl = $"{baseUrl}/index.php" +
$"?module=API" +
$"&method=Live.getLastVisitsDetails" +
$"&idSite={siteId}" +
$"&period=day" +
$"&date=today" +
$"&format=JSON" +
$"&token_auth={apiToken}" +
$"&filter_limit=100";
var response = await httpClient.GetStringAsync(apiUrl);
return JsonConvert.DeserializeObject<MatomoRealtimeData>(response);
}
private static List<StreamingEvent> TransformToEvents(MatomoRealtimeData data)
{
var events = new List<StreamingEvent>();
// Transform visits to events
foreach (var visit in data.Visits)
{
events.Add(new StreamingEvent
{
EventType = "PageView",
Timestamp = DateTime.UtcNow,
VisitorId = visit.VisitorId,
SessionId = visit.IdVisit,
PageUrl = visit.LastActionUrl,
PageTitle = visit.LastActionTitle,
TimeOnPage = visit.TimeOnPage,
Country = visit.Country,
Device = visit.DeviceType,
Browser = visit.Browser,
Properties = new Dictionary<string, object>
{
["referrer"] = visit.Referrer,
["campaign"] = visit.Campaign,
["isNewVisitor"] = visit.IsNewVisitor
}
});
}
return events;
}
}
public class StreamingEvent
{
public string EventType { get; set; }
public DateTime Timestamp { get; set; }
public string VisitorId { get; set; }
public string SessionId { get; set; }
public string PageUrl { get; set; }
public string PageTitle { get; set; }
public int TimeOnPage { get; set; }
public string Country { get; set; }
public string Device { get; set; }
public string Browser { get; set; }
public Dictionary<string, object> Properties { get; set; }
}
Step 3: Configure Stream Analytics Job¶
-- Input configuration
CREATE INPUT EventHubInput
FROM EventHub
WITH (
EventHubName = 'matomo-events',
ConsumerGroupName = 'streamanalytics'
);
-- Output to Power BI
CREATE OUTPUT PowerBIOutput
TO PowerBI
WITH (
Dataset = 'MatomoRealtime',
Table = 'Metrics',
AuthenticationMode = 'UserToken'
);
-- Output to Data Lake for archival
CREATE OUTPUT DataLakeOutput
TO DataLake
WITH (
AccountName = 'stmatomoanalytics',
Container = 'streaming-archive',
PathPattern = '{date}/{time}',
DateFormat = 'yyyy/MM/dd',
TimeFormat = 'HH'
);
-- Real-time aggregation query
WITH RealtimeMetrics AS (
SELECT
System.Timestamp() as WindowEnd,
COUNT(*) as PageViews,
COUNT(DISTINCT VisitorId) as UniqueVisitors,
COUNT(DISTINCT SessionId) as Sessions,
AVG(TimeOnPage) as AvgTimeOnPage,
PageUrl,
Country,
Device
FROM EventHubInput TIMESTAMP BY Timestamp
GROUP BY
PageUrl,
Country,
Device,
TumblingWindow(minute, 1)
),
SessionMetrics AS (
SELECT
System.Timestamp() as WindowEnd,
SessionId,
VisitorId,
COUNT(*) as PageViewsPerSession,
SUM(TimeOnPage) as SessionDuration,
COLLECT() as PagesVisited
FROM EventHubInput TIMESTAMP BY Timestamp
GROUP BY
SessionId,
VisitorId,
SessionWindow(minute, 30, 5)
)
-- Output to Power BI
SELECT
WindowEnd,
PageViews,
UniqueVisitors,
Sessions,
AvgTimeOnPage,
PageUrl,
Country,
Device
INTO PowerBIOutput
FROM RealtimeMetrics;
-- Archive raw events
SELECT
*
INTO DataLakeOutput
FROM EventHubInput TIMESTAMP BY Timestamp;
Step 4: Create Power BI Streaming Dataset¶
{
"name": "MatomoRealtime",
"defaultMode": "pushStreaming",
"tables": [
{
"name": "Metrics",
"columns": [
{
"name": "WindowEnd",
"dataType": "DateTime"
},
{
"name": "PageViews",
"dataType": "Int64"
},
{
"name": "UniqueVisitors",
"dataType": "Int64"
},
{
"name": "Sessions",
"dataType": "Int64"
},
{
"name": "AvgTimeOnPage",
"dataType": "Double"
},
{
"name": "PageUrl",
"dataType": "String"
},
{
"name": "Country",
"dataType": "String"
},
{
"name": "Device",
"dataType": "String"
}
]
}
]
}
PowerShell script to create dataset:
# Install Power BI PowerShell module
Install-Module -Name MicrosoftPowerBIMgmt -Force
# Connect to Power BI
Connect-PowerBIServiceAccount
# Create streaming dataset
$datasetSchema = @{
name = "MatomoRealtime"
defaultMode = "pushStreaming"
tables = @(
@{
name = "Metrics"
columns = @(
@{name = "WindowEnd"; dataType = "DateTime"},
@{name = "PageViews"; dataType = "Int64"},
@{name = "UniqueVisitors"; dataType = "Int64"},
@{name = "Sessions"; dataType = "Int64"},
@{name = "AvgTimeOnPage"; dataType = "Double"},
@{name = "PageUrl"; dataType = "String"},
@{name = "Country"; dataType = "String"},
@{name = "Device"; dataType = "String"}
)
}
)
}
$workspace = Get-PowerBIWorkspace -Name "Matomo Analytics"
$dataset = New-PowerBIDataset -WorkspaceId $workspace.Id -Dataset $datasetSchema
Step 5: Build Real-time Dashboard¶
graph TB
subgraph "Dashboard Components"
subgraph "KPI Tiles"
KPI1[Current Visitors<br/>Card Visual]
KPI2[Page Views/Min<br/>Card Visual]
KPI3[Avg Session Time<br/>Gauge Visual]
KPI4[Bounce Rate<br/>KPI Visual]
end
subgraph "Time Series"
TS1[Visitors Over Time<br/>Line Chart]
TS2[Page Views Trend<br/>Area Chart]
end
subgraph "Breakdowns"
BD1[Top Pages<br/>Bar Chart]
BD2[Device Distribution<br/>Pie Chart]
BD3[Geographic Map<br/>Map Visual]
end
subgraph "Detailed Views"
DV1[Session Details<br/>Table]
DV2[Real-time Feed<br/>Stream Visual]
end
end
style KPI1 fill:#ffd,stroke:#333,stroke-width:2px
style TS1 fill:#dfd,stroke:#333,stroke-width:2px
style BD3 fill:#dff,stroke:#333,stroke-width:2px
Advanced Streaming Patterns¶
Pattern 1: Anomaly Detection¶
-- Stream Analytics anomaly detection
WITH AnomalyDetection AS (
SELECT
System.Timestamp() as Time,
PageViews,
ANOMALYDETECTION(PageViews) OVER (
LIMIT DURATION(hour, 2)
) as AnomalyScore
FROM EventHubInput TIMESTAMP BY Timestamp
GROUP BY TumblingWindow(minute, 5)
)
SELECT
Time,
PageViews,
AnomalyScore,
CASE
WHEN AnomalyScore > 0.95 THEN 'High'
WHEN AnomalyScore > 0.75 THEN 'Medium'
ELSE 'Low'
END as AnomalyLevel
INTO AnomalyOutput
FROM AnomalyDetection
WHERE AnomalyScore > 0.75;
Pattern 2: Complex Event Processing¶
[FunctionName("ProcessComplexEvents")]
public static async Task ProcessUserJourney(
[EventHubTrigger("matomo-events", Connection = "EventHubConnectionString")]
EventData[] events,
ILogger log)
{
var userJourneys = new Dictionary<string, UserJourney>();
foreach (var eventData in events)
{
var evt = JsonConvert.DeserializeObject<StreamingEvent>(
Encoding.UTF8.GetString(eventData.Body)
);
if (!userJourneys.ContainsKey(evt.VisitorId))
{
userJourneys[evt.VisitorId] = new UserJourney
{
VisitorId = evt.VisitorId,
StartTime = evt.Timestamp,
Pages = new List<PageVisit>()
};
}
var journey = userJourneys[evt.VisitorId];
journey.Pages.Add(new PageVisit
{
PageUrl = evt.PageUrl,
Timestamp = evt.Timestamp,
TimeOnPage = evt.TimeOnPage
});
// Detect conversion patterns
if (IsConversionPath(journey))
{
await SendConversionAlert(journey);
}
// Detect abandonment patterns
if (IsAbandonmentPattern(journey))
{
await SendAbandonmentAlert(journey);
}
}
}
Pattern 3: Real-time Personalization¶
sequenceDiagram
participant User as Website User
participant Web as Website
participant Stream as Stream Processing
participant ML as ML Model
participant PBI as Power BI
User->>Web: Visit page
Web->>Stream: Send event
Stream->>ML: Get predictions
ML-->>Stream: Personalization data
Stream->>PBI: Update dashboard
Stream->>Web: Send recommendations
Web->>User: Show personalized content
Performance Optimization¶
Event Hub Partitioning¶
// Partition by visitor ID for better distribution
public static string GetPartitionKey(StreamingEvent evt)
{
// Use consistent hashing for even distribution
var hash = evt.VisitorId.GetHashCode();
var partitionCount = 4; // Match Event Hub partition count
var partition = Math.Abs(hash) % partitionCount;
return partition.ToString();
}
// Send with partition key
var eventData = new EventData(Encoding.UTF8.GetBytes(json));
await eventHubClient.SendAsync(eventData, GetPartitionKey(evt));
Stream Analytics Optimization¶
-- Use partitioning for parallel processing
WITH PartitionedInput AS (
SELECT
*,
PartitionId
FROM EventHubInput
PARTITION BY PartitionId
)
SELECT
System.Timestamp() as Time,
PartitionId,
COUNT(*) as EventCount
INTO Output
FROM PartitionedInput TIMESTAMP BY Timestamp
GROUP BY
PartitionId,
TumblingWindow(minute, 1);
Monitoring and Alerting¶
Azure Monitor Alerts¶
{
"type": "Microsoft.Insights/metricAlerts",
"name": "HighEventLatency",
"properties": {
"severity": 2,
"enabled": true,
"scopes": [
"/subscriptions/{subscription}/resourceGroups/{rg}/providers/Microsoft.EventHub/namespaces/{namespace}"
],
"evaluationFrequency": "PT5M",
"windowSize": "PT15M",
"criteria": {
"odata.type": "Microsoft.Azure.Monitor.SingleResourceMultipleMetricCriteria",
"allOf": [
{
"name": "EventProcessingLatency",
"metricName": "IncomingMessages",
"operator": "LessThan",
"threshold": 100,
"timeAggregation": "Average"
}
]
},
"actions": [
{
"actionGroupId": "/subscriptions/{subscription}/resourceGroups/{rg}/providers/Microsoft.Insights/actionGroups/{actionGroup}"
}
]
}
}
Power BI Alerts¶
// Configure data-driven alerts in Power BI
public class PowerBIAlertConfiguration
{
public string AlertName { get; set; } = "High Traffic Alert";
public string Condition { get; set; } = "PageViews > 1000";
public string Frequency { get; set; } = "Hourly";
public List<string> Recipients { get; set; } = new List<string>
{
"analytics-team@company.com"
};
}
Scaling Considerations¶
graph LR
subgraph "Scaling Strategy"
subgraph "Event Hubs"
EH1[Throughput Units<br/>1-20 TU]
EH2[Partitions<br/>2-32]
EH3[Auto-inflate<br/>Enabled]
end
subgraph "Stream Analytics"
SA1[Streaming Units<br/>1-192 SU]
SA2[Parallel Jobs]
SA3[Query Optimization]
end
subgraph "Power BI"
PB1[Premium Capacity]
PB2[Dedicated Pool]
PB3[Incremental Refresh]
end
end
EH1 --> SA1
EH2 --> SA2
SA1 --> PB1
style EH1 fill:#ffd,stroke:#333,stroke-width:2px
style SA1 fill:#dfd,stroke:#333,stroke-width:2px
style PB1 fill:#dff,stroke:#333,stroke-width:2px
Auto-scaling Configuration¶
# Enable auto-inflate for Event Hubs
az eventhubs namespace update \
--name $NAMESPACE \
--resource-group $RESOURCE_GROUP \
--enable-auto-inflate true \
--maximum-throughput-units 10
# Configure Stream Analytics auto-scale
az stream-analytics job scale \
--name matomo-streaming \
--resource-group $RESOURCE_GROUP \
--streaming-units 6
Cost Analysis¶
Detailed Cost Breakdown¶
| Component | Configuration | Unit Price | Est. Monthly Cost |
|---|---|---|---|
| Event Hubs | Standard, 2 TU, 4 partitions | $0.03/hour/TU | $45 |
| Stream Analytics | 3 SU | $0.11/hour/SU | $240 |
| Azure Functions | Consumption, 2.6M executions | $0.20/million | $10 |
| Data Lake Archive | 100GB/month | $0.02/GB | $2 |
| Cosmos DB | 400 RU/s | $0.008/hour/100RU | $23 |
| Power BI Premium | Per User (5 users) | $20/user | $100 |
| Total | ~$420/month |
Cost Optimization Strategies¶
- Use Event Hubs Basic tier if advanced features not needed (-50% cost)
- Optimize Stream Analytics queries to reduce SU requirements
- Implement data sampling for high-volume scenarios
- Use Power BI Pro + Paginated Reports instead of Premium if possible
- Archive old streaming data to cold storage
Limitations and Considerations¶
Power BI Push Dataset Limitations¶
- Maximum 5 million rows per dataset
- Maximum 1 million rows added per hour
- Maximum 120 POST requests per minute
- Maximum 16KB per row
Event Hubs Limitations¶
- Maximum message size: 1MB
- Maximum retention: 7 days (Standard tier)
- Maximum throughput: 20 TU (Standard tier)
Stream Analytics Limitations¶
- Maximum SU: 192 per job
- Query complexity affects SU requirements
- State size limitations for windowed aggregations
Disaster Recovery¶
Multi-region Setup¶
# Create secondary Event Hub in different region
az eventhubs namespace create \
--name $NAMESPACE-dr \
--resource-group $RESOURCE_GROUP \
--location northeurope \
--sku Standard
# Configure geo-disaster recovery
az eventhubs namespace authorization-rule create \
--name GeoRecovery \
--namespace-name $NAMESPACE \
--resource-group $RESOURCE_GROUP \
--rights Listen Send Manage
# Pair namespaces
az eventhubs georecovery-alias set \
--alias matomo-stream-alias \
--namespace-name $NAMESPACE \
--resource-group $RESOURCE_GROUP \
--partner-namespace /subscriptions/{sub}/resourceGroups/{rg}/providers/Microsoft.EventHub/namespaces/{namespace-dr}
Next Steps¶
- Evaluate streaming requirements and data volumes
- Set up Event Hubs and Stream Analytics
- Create Power BI streaming datasets
- Implement monitoring and alerting
- Test failover and disaster recovery procedures
- Optimize based on actual usage patterns