Our Dataflow pipeline is inserting duplicate telemetry records into BigQuery when streaming IoT data from Cloud IoT Core through Pub/Sub. We’re seeing 15-20% duplicate rate during pipeline restarts or autoscaling events.
Current setup processes messages with:
pipeline.apply(PubsubIO.readStrings())
.apply(ParDo.of(new ParseTelemetry()))
.apply(BigQueryIO.writeTableRows())
The duplicates appear to be related to Dataflow pipeline restarts - when workers restart, some messages get reprocessed. We’re not setting any insertId for BigQuery streaming inserts, which I suspect is part of the problem. Our analytics team is complaining about inflated metrics and inaccurate aggregations.
How do you handle deduplication logic when streaming IoT data through Dataflow to BigQuery?
Extract it from Pub/Sub message attributes if possible - that’s more reliable than generating it. Use the Pub/Sub messageId combined with deviceId to create a deterministic key. Also, make sure you’re using STREAMING inserts mode in BigQueryIO, not FILE_LOADS, otherwise insertId won’t help. What’s your pipeline’s windowing strategy? Fixed windows can help reduce duplicate processing windows.
Have you considered using Dataflow’s state API for deduplication before writing to BigQuery? You can maintain a stateful ParDo that tracks seen messageIds within a time window (say 1 hour) and filters out duplicates. This reduces load on BigQuery and gives you more control over deduplication logic, especially if you need to handle late-arriving data.
Beyond insertId, you should also implement idempotent transforms in your pipeline. If ParseTelemetry does any enrichment or calculations, make sure they produce the same output for the same input. We use SHA-256 hash of (deviceId + timestamp + payload) as insertId. This survives pipeline restarts and worker failures. Check your Dataflow job’s exactly-once processing guarantee is enabled.
Makes sense. Should the insertId be generated in the ParseTelemetry transform, or should I extract it from the Pub/Sub message metadata? Our messages already have a unique messageId attribute from IoT Core.