Out-of-order events in data stream causing inaccurate analytics results

Our IoT data stream from 1,500 environmental sensors flows through Dataflow to BigQuery for analytics. We’re seeing events arrive out of order, causing incorrect time-series analysis. For example, a temperature reading timestamped 10:15:00 arrives after readings from 10:16:00 and 10:17:00.

Dataflow pipeline:

PCollection<SensorEvent> events = pipeline
  .apply(PubsubIO.readStrings().fromTopic(topic))
  .apply(ParDo.of(new ParseEventFn()))
  .apply(Window.into(FixedWindows.of(Duration.standardMinutes(5))));

BigQuery queries show gaps and anomalies in time-series data. We need proper event timestamp handling, appropriate Dataflow windowing for out-of-order events, and BigQuery time-series analytics that account for late arrivals. What’s the best approach to handle event-time vs processing-time correctly?

Out-of-order events are inherent in distributed IoT systems due to network latency, device buffering, and asynchronous processing. Let me provide a comprehensive solution addressing all three aspects.

Event Timestamp Handling: The fundamental issue is distinguishing between event time (when the sensor reading occurred) and processing time (when Dataflow receives the event). Dataflow defaults to processing time, which causes the problems you’re experiencing.

Modify your pipeline to extract and use event timestamps:

PCollection<SensorEvent> events = pipeline
  .apply(PubsubIO.readStrings().fromTopic(topic))
  .apply(ParDo.of(new ParseEventFn()))
  .apply(WithTimestamps.of((SensorEvent e) ->
    new Instant(e.getTimestampMillis())))
  .setCoder(SerializableCoder.of(SensorEvent.class));

In your ParseEventFn, extract the timestamp from the JSON payload:

public class ParseEventFn extends DoFn<String, SensorEvent> {
  @ProcessElement
  public void processElement(ProcessContext c) {
    JsonObject json = JsonParser.parseString(c.element()).getAsJsonObject();
    long timestampMillis = json.get("timestamp").getAsLong();
    SensorEvent event = new SensorEvent(
      json.get("sensorId").getAsString(),
      json.get("temperature").getAsDouble(),
      timestampMillis
    );
    c.output(event);
  }
}

Critically, validate timestamps to handle clock skew or misconfigured sensors. Reject events with timestamps more than 1 hour in the future or more than 24 hours in the past:

long now = Instant.now().getMillis();
if (timestampMillis > now + 3600000 || timestampMillis < now - 86400000) {
  LOG.warn("Invalid timestamp for sensor {}: {}", sensorId, timestampMillis);
  // Output to dead-letter queue
  c.output(deadLetterTag, element);
  return;
}

Dataflow Windowing Configuration: Implement event-time windowing with allowed lateness to handle out-of-order events:

PCollection<SensorEvent> windowedEvents = events
  .apply(Window.<SensorEvent>into(FixedWindows.of(Duration.standardMinutes(5)))
    .triggering(
      AfterWatermark.pastEndOfWindow()
        .withEarlyFirings(AfterProcessingTime
          .pastFirstElementInPane()
          .plusDelayOf(Duration.standardMinutes(1)))
        .withLateFirings(AfterProcessingTime
          .pastFirstElementInPane()
          .plusDelayOf(Duration.standardMinutes(2))))
    .withAllowedLateness(Duration.standardMinutes(15))
    .accumulatingFiredPanes());

This configuration:

  • Uses 5-minute fixed windows based on event time
  • Fires early results every 1 minute for real-time visibility
  • Fires late updates every 2 minutes when late data arrives
  • Allows events up to 15 minutes late
  • Accumulates late data into existing panes rather than discarding

The watermark (Dataflow’s estimate of event time progress) advances based on observed timestamps. Events arriving before the watermark passes the window end time are considered on-time. Events arriving after are considered late.

For highly variable latency, consider using session windows instead of fixed windows:

Window.into(Sessions.withGapDuration(Duration.standardMinutes(10)))

This groups events from the same sensor into sessions based on activity gaps, which can be more robust to irregular event arrival patterns.

BigQuery Time-Series Analytics: Structure your BigQuery integration to handle late-arriving data efficiently. Use the Storage Write API with exactly-once semantics:

.apply(BigQueryIO.<SensorEvent>write()
  .to(tableSpec)
  .withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API)
  .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
  .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER));

Partition your BigQuery table by event timestamp (not ingestion time) for efficient querying:

CREATE TABLE sensor_readings (
  sensor_id STRING,
  temperature FLOAT64,
  event_timestamp TIMESTAMP,
  processing_timestamp TIMESTAMP,
  window_start TIMESTAMP,
  window_end TIMESTAMP
)
PARTITION BY DATE(event_timestamp)
CLUSTER BY sensor_id;

Store both event_timestamp and processing_timestamp to enable analysis of latency patterns. Include window_start and window_end to identify which Dataflow window produced each record.

For time-series analytics that handle late arrivals, use window functions with ORDER BY event_timestamp:

SELECT
  sensor_id,
  event_timestamp,
  temperature,
  AVG(temperature) OVER (
    PARTITION BY sensor_id
    ORDER BY event_timestamp
    ROWS BETWEEN 5 PRECEDING AND CURRENT ROW
  ) as moving_avg
FROM sensor_readings
WHERE DATE(event_timestamp) = CURRENT_DATE()
ORDER BY sensor_id, event_timestamp;

Implement a materialized view pattern for frequently queried aggregations:

CREATE MATERIALIZED VIEW sensor_hourly_stats AS
SELECT
  sensor_id,
  TIMESTAMP_TRUNC(event_timestamp, HOUR) as hour,
  AVG(temperature) as avg_temp,
  MIN(temperature) as min_temp,
  MAX(temperature) as max_temp,
  COUNT(*) as reading_count
FROM sensor_readings
GROUP BY sensor_id, hour;

BigQuery automatically refreshes materialized views, incorporating late-arriving data. For critical analytics, implement a scheduled query that runs hourly to recompute recent windows (last 24 hours) incorporating all late arrivals, then update a curated analytics table.

Monitor data quality by tracking latency metrics. Create a Dataflow counter that measures the difference between event time and processing time, then export to Cloud Monitoring for alerting on excessive latency.

Yes, extract and apply the sensor timestamp. For late data, configure allowed lateness on your windows using .withAllowedLateness(). Set it to something reasonable like 15 minutes. Events arriving within the allowed lateness window will still be processed and trigger pane updates. Events beyond that are dropped by default, but you can capture them in a side output for separate handling.

Consider implementing a two-tier approach: real-time stream for recent data with eventual consistency, and a batch reconciliation job that runs hourly to correct historical data using the complete event set. This balances real-time needs with accuracy requirements for time-series analytics.