Automated real-time sensor data pipeline from IoT devices to dashboards

We successfully implemented an end-to-end real-time sensor data pipeline that processes IoT telemetry and displays it in operational dashboards with sub-minute latency. Sharing our architecture and lessons learned.

Use Case: 300+ environmental sensors (temperature, humidity, air quality) across 15 facilities need real-time monitoring with anomaly detection and alerting.

Architecture: Sensors → Cloud IoT Core → Pub/Sub → Cloud Functions → BigQuery → Data Studio

Key requirements: message ordering for accurate time-series analysis, streaming inserts to BigQuery, automated anomaly detection, and real-time dashboard updates. The pipeline processes 50K+ messages/hour with end-to-end latency under 45 seconds.

Happy to share implementation details and code patterns.

Here’s our complete implementation with detailed architecture and code patterns:

Architecture Overview:


Sensors (300+)
  → MQTT to Cloud IoT Core
  → Pub/Sub (with ordering keys)
  → Cloud Functions (processing & batching)
  → BigQuery (streaming inserts)
  → Data Studio (real-time dashboards)

Pub/Sub Message Ordering: Configured ordering at publish time from IoT devices:

from google.cloud import iot_v1
import json

def publish_sensor_reading(device_id, sensor_data):
    client = iot_v1.DeviceManagerClient()
    device_path = client.device_path(project, location, registry, device_id)

    # Prepare message with ordering key
    message = {
        'sensor_id': sensor_data['sensor_id'],
        'timestamp': sensor_data['timestamp'],
        'temperature': sensor_data['temperature'],
        'humidity': sensor_data['humidity'],
        'air_quality_index': sensor_data['aqi']
    }

    # Publish with ordering key = sensor_id
    publisher = pubsub_v1.PublisherClient()
    topic_path = publisher.topic_path(project, 'sensor-telemetry')
    future = publisher.publish(
        topic_path,
        data=json.dumps(message).encode('utf-8'),
        ordering_key=sensor_data['sensor_id']  # Critical for time-series
    )

Cloud Functions Processing: Implemented batching and anomaly detection:

import base64
import json
from google.cloud import bigquery
from datetime import datetime, timedelta
import numpy as np

# Global variables for batching
batch_buffer = []
LAST_FLUSH = datetime.now()
BATCH_SIZE = 100
BATCH_TIMEOUT = 10  # seconds

def process_sensor_message(event, context):
    global batch_buffer, LAST_FLUSH

    # Decode Pub/Sub message
    pubsub_message = base64.b64decode(event['data']).decode('utf-8')
    sensor_data = json.loads(pubsub_message)

    # Timestamp validation
    message_time = datetime.fromisoformat(sensor_data['timestamp'])
    current_time = datetime.now()
    if (current_time - message_time) > timedelta(minutes=5):
        print(f"Discarding stale message: {sensor_data['sensor_id']}")
        return

    # Anomaly detection (Z-score)
    anomaly_detected = detect_anomaly(sensor_data)
    sensor_data['is_anomaly'] = anomaly_detected

    if anomaly_detected:
        trigger_alert(sensor_data)

    # Add to batch buffer
    batch_buffer.append(sensor_data)

    # Flush conditions: size or timeout
    if len(batch_buffer) >= BATCH_SIZE or \
       (datetime.now() - LAST_FLUSH).seconds >= BATCH_TIMEOUT:
        flush_to_bigquery(batch_buffer)
        batch_buffer = []
        LAST_FLUSH = datetime.now()

def detect_anomaly(sensor_data):
    # Fetch recent readings for this sensor
    recent_readings = get_recent_readings(sensor_data['sensor_id'])

    if len(recent_readings) < 10:
        return False  # Not enough data

    # Calculate Z-score
    values = [r['temperature'] for r in recent_readings]
    mean = np.mean(values)
    std = np.std(values)

    if std == 0:
        return False

    z_score = abs((sensor_data['temperature'] - mean) / std)

    # Threshold: 3 standard deviations
    return z_score > 3.0

BigQuery Streaming: Batch insert implementation:

def flush_to_bigquery(batch):
    client = bigquery.Client()
    table_id = f"{project}.sensor_data.telemetry"

    # Prepare rows
    rows_to_insert = []
    for record in batch:
        rows_to_insert.append({
            'sensor_id': record['sensor_id'],
            'timestamp': record['timestamp'],
            'temperature': record['temperature'],
            'humidity': record['humidity'],
            'air_quality_index': record['air_quality_index'],
            'is_anomaly': record['is_anomaly'],
            'facility_id': record.get('facility_id', 'unknown'),
            'ingestion_time': datetime.now().isoformat()
        })

    # Streaming insert with error handling
    errors = client.insert_rows_json(table_id, rows_to_insert)

    if errors:
        print(f"BigQuery insert errors: {errors}")
        # Log to Cloud Logging for alerting
    else:
        print(f"Inserted {len(rows_to_insert)} rows to BigQuery")

Data Studio Visualization: Optimized dashboard configuration:

  • Refresh interval: 1 minute (balance between real-time and query costs)
  • Used materialized view for aggregated metrics:
CREATE MATERIALIZED VIEW sensor_data.realtime_summary
AS
SELECT
  facility_id,
  sensor_id,
  MAX(timestamp) as last_reading_time,
  AVG(temperature) as avg_temperature,
  AVG(humidity) as avg_humidity,
  AVG(air_quality_index) as avg_aqi,
  SUM(CASE WHEN is_anomaly THEN 1 ELSE 0 END) as anomaly_count
FROM sensor_data.telemetry
WHERE timestamp >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 HOUR)
GROUP BY facility_id, sensor_id
  • Data Studio connects to materialized view instead of raw table
  • Queries run in <2 seconds even with 15 facilities and 300 sensors

Anomaly Detection Logic: Implemented statistical and rule-based detection:

# Pseudocode - Complete anomaly detection:
1. Fetch last 50 readings for sensor (from cache or BigQuery)
2. Calculate rolling statistics (mean, std dev)
3. Check Z-score threshold (>3.0 = anomaly)
4. Apply domain-specific rules:
   - Temperature: flag if >30°C or <10°C (absolute thresholds)
   - Humidity: flag if >80% or <20%
   - AQI: flag if >150 (unhealthy levels)
5. Rate of change detection: flag if change >5°C in 5 minutes
6. If anomaly detected, publish to alert topic

Monitoring & Alerting: Comprehensive observability:

  • Cloud Monitoring alerts on:
    • Cloud Functions error rate >1%
    • BigQuery streaming insert failures
    • Pub/Sub unacked message age >5 minutes
    • Dashboard query latency >5 seconds
  • Custom metrics: anomaly detection rate per facility
  • Cloud Logging exports to BigQuery for analysis

Cost Optimization: For 50K messages/hour (1.2M/day):

  • Cloud Functions: $45/month (batching reduced from $220/month)
  • BigQuery streaming: $50/month (batching reduced from $250/month)
  • Pub/Sub: $15/month
  • Data Studio: Free
  • Total: ~$110/month

Performance Metrics:

  • End-to-end latency: 35-45 seconds (p95)
  • Anomaly detection latency: <500ms
  • Dashboard refresh: 1-minute intervals
  • BigQuery insert success rate: 99.97%
  • Message ordering accuracy: 100%

Lessons Learned:

  1. Batching is critical for cost optimization at scale
  2. Pub/Sub ordering keys essential for time-series accuracy
  3. Materialized views dramatically improve dashboard performance
  4. Simple statistical anomaly detection outperformed ML for real-time use
  5. Timestamp validation prevents stale data corruption
  6. Monitor unacked message age to catch processing bottlenecks early

Future Enhancements:

  • Implement predictive anomaly detection using BigQuery ML
  • Add data quality checks (missing readings, sensor failures)
  • Expand to multi-region deployment for disaster recovery
  • Implement automated sensor calibration based on historical patterns

This architecture has been running in production for 6 months with 99.9% uptime and successfully detected 247 genuine anomalies that required intervention.

We used Pub/Sub ordering keys set to sensor_id. Each sensor’s messages arrive in order, which is critical for time-series analysis. In Cloud Functions, we also implemented timestamp-based validation to discard any messages that arrive more than 5 minutes late (rare but happens during network issues). This prevents stale data from corrupting real-time calculations.

We batch inserts in Cloud Functions - collect up to 100 messages or wait 10 seconds, whichever comes first. This reduces BigQuery streaming insert costs by 80% while keeping latency acceptable. Anomaly detection runs in Cloud Functions using simple statistical methods (Z-score for outliers). We tried BigQuery ML but the prediction latency was too high for real-time alerts. Cloud Functions gives us sub-second anomaly detection.

Data Studio performance with real-time data can be tricky. How often does your dashboard refresh? Are you using Data Studio’s native BigQuery connection or a cached data source? We found that direct BigQuery queries on large tables caused dashboard timeouts.

How are you handling BigQuery streaming inserts at that volume? 50K messages/hour is manageable, but are you batching or doing individual inserts? Also, what’s your approach to anomaly detection - running in Cloud Functions or using BigQuery ML?

This is great! How did you handle message ordering in Pub/Sub? We’re struggling with out-of-order sensor readings causing incorrect time-series calculations. Did you use ordering keys or handle it in Cloud Functions?

What’s your monitoring and alerting strategy? When Cloud Functions fail or BigQuery inserts fail, how quickly do you detect and recover? Also curious about your cost optimization - 50K messages/hour can add up in Cloud Functions execution costs.