Device alerts not triggering in rules engine for custom conditions on telemetry data

Our custom alert rules in IoT Core Rules Engine aren’t firing for specific telemetry conditions. We have temperature sensors that should trigger alerts when readings exceed 85°C, but alerts are inconsistent - sometimes firing, sometimes not, even when we can see the threshold-exceeding values in our telemetry logs.

Rule expression:


device.telemetry.temperature > 85 AND device.telemetry.unit == 'celsius'

Sample telemetry payload:

{
  "device_id": "temp-sensor-042",
  "telemetry": {
    "temperature": 87.3,
    "unit": "celsius",
    "timestamp": 1723456789
  }
}

I’ve checked the rule evaluation logs and they show “no_match” even for payloads that clearly exceed 85°C. The rule works perfectly in the Rules Engine test console, but fails in production. We’re missing critical temperature alerts that could prevent equipment damage.

Check your telemetry payload structure carefully. IoT Core Rules Engine is very strict about JSON path matching. If your actual production payloads have different nesting or field names than your test data, rules won’t match. Enable detailed logging and compare the actual payloads hitting the rules engine versus your test data.

Here’s the comprehensive solution addressing all three focus areas:

Custom Rule Expression Syntax: Your rule has multiple issues. IoT Core Rules Engine uses a specific expression language with limitations:

  1. Type handling - numeric vs string:
// WRONG - fails on string values
device.telemetry.temperature > 85

// RIGHT - handle both types
(isNumber(device.telemetry.temperature) AND device.telemetry.temperature > 85) OR
(isString(device.telemetry.temperature) AND parseFloat(device.telemetry.temperature) > 85)
  1. Null/undefined handling:
// Add existence checks
exists(device.telemetry.temperature) AND
exists(device.telemetry.unit) AND
device.telemetry.unit == 'celsius' AND
(
  (isNumber(device.telemetry.temperature) AND device.telemetry.temperature > 85) OR
  (isString(device.telemetry.temperature) AND parseFloat(device.telemetry.temperature) > 85)
)
  1. Complete rule definition:
{
  "name": "high-temperature-alert",
  "condition": {
    "expression": "exists(device.telemetry.temperature) AND exists(device.telemetry.unit) AND device.telemetry.unit == 'celsius' AND ((isNumber(device.telemetry.temperature) AND device.telemetry.temperature > 85) OR (isString(device.telemetry.temperature) AND parseFloat(device.telemetry.temperature) > 85))"
  },
  "actions": [
    {
      "pubsub": {
        "topic": "projects/PROJECT_ID/topics/temperature-alerts"
      }
    }
  ]
}

Telemetry Payload Structure: Standardize your payload format to ensure consistent rule evaluation:

  1. Define canonical schema:
{
  "device_id": "string (required)",
  "timestamp": "integer (unix epoch, required)",
  "telemetry": {
    "temperature": "number (required)",
    "unit": "string (required, enum: celsius|fahrenheit|kelvin)",
    "sensor_id": "string (optional)",
    "location": "string (optional)"
  },
  "metadata": {
    "firmware_version": "string",
    "battery_level": "number"
  }
}
  1. Implement payload normalization Cloud Function:
import json
from google.cloud import pubsub_v1

def normalize_telemetry(event, context):
    """Normalize telemetry payloads before rules engine evaluation."""
    publisher = pubsub_v1.PublisherClient()
    normalized_topic = 'projects/PROJECT_ID/topics/telemetry-normalized'

    try:
        # Decode Pub/Sub message
        message_data = base64.b64decode(event['data']).decode('utf-8')
        payload = json.loads(message_data)

        # Normalize temperature to number
        if 'telemetry' in payload and 'temperature' in payload['telemetry']:
            temp = payload['telemetry']['temperature']
            if isinstance(temp, str):
                payload['telemetry']['temperature'] = float(temp)

        # Ensure required fields exist
        if 'timestamp' not in payload:
            payload['timestamp'] = int(time.time())

        # Validate unit field
        if payload.get('telemetry', {}).get('unit') not in ['celsius', 'fahrenheit', 'kelvin']:
            payload['telemetry']['unit'] = 'celsius'  # Default

        # Publish normalized payload
        publisher.publish(
            normalized_topic,
            json.dumps(payload).encode('utf-8'),
            device_id=payload.get('device_id', 'unknown')
        )

    except Exception as e:
        print(f"Normalization error: {e}")
        # Publish to dead-letter topic
        publisher.publish(
            'projects/PROJECT_ID/topics/telemetry-errors',
            event['data'],
            error=str(e)
        )
  1. Deploy normalization function:
gcloud functions deploy normalize_telemetry \
  --runtime python39 \
  --trigger-topic telemetry-raw \
  --entry-point normalize_telemetry \
  --memory 256MB \
  --timeout 60s

Rule Evaluation Logs: Implement comprehensive logging and monitoring:

  1. Enable detailed rule evaluation logging:
gcloud iot registries update temp-sensors \
  --region=us-central1 \
  --enable-mqtt-config \
  --log-level=DEBUG
  1. Create log-based metrics:
# Metric for rule evaluation failures
gcloud logging metrics create rule_evaluation_no_match \
  --description="Count of rule evaluations with no match" \
  --log-filter='resource.type="cloudiot_device"
logName="projects/PROJECT_ID/logs/cloudiot.googleapis.com%2Frules_engine"
jsonPayload.result="no_match"'

# Metric for successful rule triggers
gcloud logging metrics create rule_evaluation_match \
  --description="Count of successful rule evaluations" \
  --log-filter='resource.type="cloudiot_device"
logName="projects/PROJECT_ID/logs/cloudiot.googleapis.com%2Frules_engine"
jsonPayload.result="match"'
  1. Query evaluation logs:
# View recent rule evaluations
gcloud logging read \
  'resource.type="cloudiot_device"
   AND logName="projects/PROJECT_ID/logs/cloudiot.googleapis.com%2Frules_engine"
   AND jsonPayload.rule_name="high-temperature-alert"' \
  --limit=50 \
  --format=json > rule_evaluations.json
  1. Analyze evaluation patterns:
import json

with open('rule_evaluations.json') as f:
    logs = json.load(f)

no_match_count = 0
match_count = 0
error_count = 0

for entry in logs:
    result = entry.get('jsonPayload', {}).get('result')
    if result == 'no_match':
        no_match_count += 1
        # Log the payload that didn't match
        print(f"No match: {entry.get('jsonPayload', {}).get('payload')}")
    elif result == 'match':
        match_count += 1
    elif result == 'error':
        error_count += 1
        print(f"Error: {entry.get('jsonPayload', {}).get('error')}")

print(f"Match rate: {match_count / len(logs) * 100:.1f}%")
print(f"No-match rate: {no_match_count / len(logs) * 100:.1f}%")
  1. Set up alerting for missed alerts:
# alert-policy.yaml
displayName: "Low Rule Match Rate"
conditions:
  - displayName: "Rule match rate below 90%"
    conditionThreshold:
      filter: |
        metric.type="logging.googleapis.com/user/rule_evaluation_match"
        resource.type="cloudiot_device"
      comparison: COMPARISON_LT
      thresholdValue: 0.9
      duration: 300s
notificationChannels:
  - projects/PROJECT_ID/notificationChannels/CHANNEL_ID

Testing Strategy: Before deploying rules to production:

  1. Test with actual production payloads:
# Capture sample payloads
gcloud pubsub subscriptions pull telemetry-sub --limit=100 > sample_payloads.txt

# Test rule against samples
for payload in sample_payloads.txt; do
  # Use Rules Engine test console or API
  test_rule_evaluation(rule, payload)
done
  1. Implement shadow mode testing:
# Evaluate rule without triggering actions
def shadow_rule_test(payload, rule_expression):
    result = evaluate_rule(rule_expression, payload)
    log_metric('shadow_rule_evaluation', {
        'rule': rule_expression,
        'result': result,
        'payload': payload
    })
    # Don't trigger actual alerts in shadow mode

This comprehensive approach ensures your rules handle all payload variations, provides visibility into evaluation behavior, and prevents missed critical alerts.

Also verify that your rule is actually receiving all telemetry messages. If devices are publishing faster than the rules engine can process, some messages might be dropped or delayed. Check the rule execution metrics to see if processing rate matches your expected message rate. Add a counter metric to track rule evaluations vs actual device publishes.