You need a multi-layered approach to handle network outages properly:
Buffering Configuration: First, right-size your buffer based on actual message volume. With 200 msg/s at 2KB each:
{
"properties": {
"desired": {
"bufferSizeInMB": 250,
"messageTTL": 7200,
"maxRetryAttempts": 5,
"retryIntervalSeconds": 10,
"persistentStoragePath": "/var/lib/iotedge/storage/datastream"
}
}
}
This gives you ~10 minutes of buffering capacity (250MB / 0.4MB per second). The extended TTL ensures messages don’t expire during longer outages.
Persistent Storage for Messages: Ensure the data stream module uses disk-backed storage, not just memory:
{
"HostConfig": {
"Binds": [
"/var/lib/iotedge/storage/datastream:/app/storage"
]
},
"Env": [
"STORAGE_MODE=persistent",
"STORAGE_PATH=/app/storage"
]
}
Verify storage is working:
ls -lah /var/lib/iotedge/storage/datastream/
# Should show .db files during buffering
Network Outage Handling: Integrate with edgeHub’s store-and-forward for better resilience. Route messages through edgeHub instead of direct IoT Hub connections:
{
"routes": {
"dataStreamToHub": "FROM /messages/modules/datastream/* INTO $upstream"
}
}
Configure edgeHub’s store-and-forward:
{
"properties.desired": {
"storeAndForwardConfiguration": {
"timeToLiveSecs": 7200
},
"schemaVersion": "1.2",
"routes": {
"dataStreamToHub": "FROM /messages/modules/datastream/* INTO $upstream"
}
}
}
Modify your data stream module to send messages to edgeHub:
from azure.iot.device import IoTHubModuleClient
client = IoTHubModuleClient.create_from_edge_environment()
# Send to edgeHub instead of direct to IoT Hub
await client.send_message_to_output(
message,
output_name="telemetryOutput"
)
This leverages edgeHub’s built-in buffering and retry logic, which is more robust than custom module buffering.
Additional optimizations:
- Message Compression: Reduce storage requirements:
import gzip
import json
data = json.dumps(telemetry_data)
compressed = gzip.compress(data.encode())
message = Message(compressed)
message.custom_properties["compressed"] = "true"
- Priority Queuing: Implement message priorities:
if message.priority == "high":
buffer.high_priority_queue.append(message)
else:
buffer.normal_queue.append(message)
- Storage Monitoring: Add alerts for buffer usage:
buffer_usage = get_buffer_size() / max_buffer_size
if buffer_usage > 0.8:
send_alert("Buffer 80% full - network issues?")
- Graceful Degradation: During extended outages, implement sampling:
if buffer_usage > 0.9:
# Sample every 10th message instead of all
if message_count % 10 == 0:
buffer.append(message)
Monitor the solution:
# Watch buffer usage
watch -n 5 'du -sh /var/lib/iotedge/storage/datastream/'
# Monitor edgeHub queue
iotedge logs edgeHub --tail 50 | grep -i queue
The root cause is insufficient buffer capacity combined with direct IoT Hub connections that bypass edgeHub’s store-and-forward mechanism. By routing through edgeHub and properly configuring both module and edgeHub buffering, you’ll achieve reliable message delivery during network interruptions.