Here’s the comprehensive solution addressing all three focus areas:
Custom Rule Expression Syntax:
Your rule has multiple issues. IoT Core Rules Engine uses a specific expression language with limitations:
- Type handling - numeric vs string:
// WRONG - fails on string values
device.telemetry.temperature > 85
// RIGHT - handle both types
(isNumber(device.telemetry.temperature) AND device.telemetry.temperature > 85) OR
(isString(device.telemetry.temperature) AND parseFloat(device.telemetry.temperature) > 85)
- Null/undefined handling:
// Add existence checks
exists(device.telemetry.temperature) AND
exists(device.telemetry.unit) AND
device.telemetry.unit == 'celsius' AND
(
(isNumber(device.telemetry.temperature) AND device.telemetry.temperature > 85) OR
(isString(device.telemetry.temperature) AND parseFloat(device.telemetry.temperature) > 85)
)
- Complete rule definition:
{
"name": "high-temperature-alert",
"condition": {
"expression": "exists(device.telemetry.temperature) AND exists(device.telemetry.unit) AND device.telemetry.unit == 'celsius' AND ((isNumber(device.telemetry.temperature) AND device.telemetry.temperature > 85) OR (isString(device.telemetry.temperature) AND parseFloat(device.telemetry.temperature) > 85))"
},
"actions": [
{
"pubsub": {
"topic": "projects/PROJECT_ID/topics/temperature-alerts"
}
}
]
}
Telemetry Payload Structure:
Standardize your payload format to ensure consistent rule evaluation:
- Define canonical schema:
{
"device_id": "string (required)",
"timestamp": "integer (unix epoch, required)",
"telemetry": {
"temperature": "number (required)",
"unit": "string (required, enum: celsius|fahrenheit|kelvin)",
"sensor_id": "string (optional)",
"location": "string (optional)"
},
"metadata": {
"firmware_version": "string",
"battery_level": "number"
}
}
- Implement payload normalization Cloud Function:
import json
from google.cloud import pubsub_v1
def normalize_telemetry(event, context):
"""Normalize telemetry payloads before rules engine evaluation."""
publisher = pubsub_v1.PublisherClient()
normalized_topic = 'projects/PROJECT_ID/topics/telemetry-normalized'
try:
# Decode Pub/Sub message
message_data = base64.b64decode(event['data']).decode('utf-8')
payload = json.loads(message_data)
# Normalize temperature to number
if 'telemetry' in payload and 'temperature' in payload['telemetry']:
temp = payload['telemetry']['temperature']
if isinstance(temp, str):
payload['telemetry']['temperature'] = float(temp)
# Ensure required fields exist
if 'timestamp' not in payload:
payload['timestamp'] = int(time.time())
# Validate unit field
if payload.get('telemetry', {}).get('unit') not in ['celsius', 'fahrenheit', 'kelvin']:
payload['telemetry']['unit'] = 'celsius' # Default
# Publish normalized payload
publisher.publish(
normalized_topic,
json.dumps(payload).encode('utf-8'),
device_id=payload.get('device_id', 'unknown')
)
except Exception as e:
print(f"Normalization error: {e}")
# Publish to dead-letter topic
publisher.publish(
'projects/PROJECT_ID/topics/telemetry-errors',
event['data'],
error=str(e)
)
- Deploy normalization function:
gcloud functions deploy normalize_telemetry \
--runtime python39 \
--trigger-topic telemetry-raw \
--entry-point normalize_telemetry \
--memory 256MB \
--timeout 60s
Rule Evaluation Logs:
Implement comprehensive logging and monitoring:
- Enable detailed rule evaluation logging:
gcloud iot registries update temp-sensors \
--region=us-central1 \
--enable-mqtt-config \
--log-level=DEBUG
- Create log-based metrics:
# Metric for rule evaluation failures
gcloud logging metrics create rule_evaluation_no_match \
--description="Count of rule evaluations with no match" \
--log-filter='resource.type="cloudiot_device"
logName="projects/PROJECT_ID/logs/cloudiot.googleapis.com%2Frules_engine"
jsonPayload.result="no_match"'
# Metric for successful rule triggers
gcloud logging metrics create rule_evaluation_match \
--description="Count of successful rule evaluations" \
--log-filter='resource.type="cloudiot_device"
logName="projects/PROJECT_ID/logs/cloudiot.googleapis.com%2Frules_engine"
jsonPayload.result="match"'
- Query evaluation logs:
# View recent rule evaluations
gcloud logging read \
'resource.type="cloudiot_device"
AND logName="projects/PROJECT_ID/logs/cloudiot.googleapis.com%2Frules_engine"
AND jsonPayload.rule_name="high-temperature-alert"' \
--limit=50 \
--format=json > rule_evaluations.json
- Analyze evaluation patterns:
import json
with open('rule_evaluations.json') as f:
logs = json.load(f)
no_match_count = 0
match_count = 0
error_count = 0
for entry in logs:
result = entry.get('jsonPayload', {}).get('result')
if result == 'no_match':
no_match_count += 1
# Log the payload that didn't match
print(f"No match: {entry.get('jsonPayload', {}).get('payload')}")
elif result == 'match':
match_count += 1
elif result == 'error':
error_count += 1
print(f"Error: {entry.get('jsonPayload', {}).get('error')}")
print(f"Match rate: {match_count / len(logs) * 100:.1f}%")
print(f"No-match rate: {no_match_count / len(logs) * 100:.1f}%")
- Set up alerting for missed alerts:
# alert-policy.yaml
displayName: "Low Rule Match Rate"
conditions:
- displayName: "Rule match rate below 90%"
conditionThreshold:
filter: |
metric.type="logging.googleapis.com/user/rule_evaluation_match"
resource.type="cloudiot_device"
comparison: COMPARISON_LT
thresholdValue: 0.9
duration: 300s
notificationChannels:
- projects/PROJECT_ID/notificationChannels/CHANNEL_ID
Testing Strategy:
Before deploying rules to production:
- Test with actual production payloads:
# Capture sample payloads
gcloud pubsub subscriptions pull telemetry-sub --limit=100 > sample_payloads.txt
# Test rule against samples
for payload in sample_payloads.txt; do
# Use Rules Engine test console or API
test_rule_evaluation(rule, payload)
done
- Implement shadow mode testing:
# Evaluate rule without triggering actions
def shadow_rule_test(payload, rule_expression):
result = evaluate_rule(rule_expression, payload)
log_metric('shadow_rule_evaluation', {
'rule': rule_expression,
'result': result,
'payload': payload
})
# Don't trigger actual alerts in shadow mode
This comprehensive approach ensures your rules handle all payload variations, provides visibility into evaluation behavior, and prevents missed critical alerts.