Duplicate Pub/Sub messages from billing engine causing unexpected cost spikes

Our billing engine is processing duplicate messages from Pub/Sub, resulting in customers being charged multiple times for the same IoT device usage. We’re seeing approximately 3-8% duplicate rate during high-traffic periods.

The flow is: IoT devices → IoT Core → Pub/Sub → Billing Engine → Cloud SQL. Our billing subscriber uses this acknowledgment pattern:

subscriber.subscribe(subscription_path, callback=process_billing_event)
# In callback:
message.ack()  # Called after DB insert

We’ve confirmed duplicates by checking message_id values - the same message_id appears multiple times in our billing database. This suggests Pub/Sub is redelivering messages we’ve already acknowledged. Our ack deadline is 60 seconds and processing typically takes 5-10 seconds.

Is there a way to enforce exactly-once delivery, or do we need to implement application-level deduplication? The financial impact is significant.

Here’s a comprehensive solution addressing all three focus areas:

Pub/Sub Message Deduplication: Implement database-level deduplication with unique constraints:

CREATE TABLE billing_events (
  message_id VARCHAR(255) PRIMARY KEY,
  device_id VARCHAR(100) NOT NULL,
  usage_amount DECIMAL(10,2),
  processed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

In your application:

try:
    cursor.execute(
        "INSERT INTO billing_events (message_id, device_id, usage_amount) VALUES (%s, %s, %s)",
        (message.message_id, device_id, amount)
    )
    connection.commit()
    message.ack()
except IntegrityError:  # Duplicate message_id
    message.ack()  # Already processed, safe to ack

Ack/Nack Handling: Your current code acks after DB insert, which is correct, but you need proper error handling:

def process_billing_event(message):
    try:
        # Parse and validate
        billing_data = parse_message(message.data)
        # Insert with deduplication
        insert_billing_record(message.message_id, billing_data)
        message.ack()
    except ValidationError as e:
        # Invalid data - don't retry
        log_error(f"Invalid message: {e}")
        message.ack()  # Ack to prevent infinite retries
    except DatabaseError as e:
        # Transient DB error - retry
        log_error(f"DB error: {e}")
        message.nack()  # Explicit nack for retry

Exactly-Once Delivery: Pub/Sub doesn’t guarantee exactly-once, but you can achieve it at the application level:

  1. Use transactions for atomic deduplication:
with connection.begin():
    # Check + Insert in single transaction
    result = cursor.execute(
        "INSERT INTO billing_events ... ON CONFLICT (message_id) DO NOTHING RETURNING id"
    )
    if result.rowcount > 0:
        # First time processing this message
        cursor.execute("INSERT INTO customer_charges ...")
  1. Implement distributed locking for critical sections using Cloud Memorystore (Redis):
import redis
lock = redis_client.lock(f"billing_lock:{message.message_id}", timeout=30)
if lock.acquire(blocking=False):
    try:
        process_billing(message)
    finally:
        lock.release()
  1. Monitor duplicate rates:
total_messages = Counter('billing_messages_total')
duplicate_messages = Counter('billing_duplicates_total')

if is_duplicate:
    duplicate_messages.inc()
total_messages.inc()

Set up alerts when duplicate rate exceeds 1%. Your 3-8% rate suggests ack handling issues or worker instability. Check Cloud Monitoring for:

  • Subscription/unacked_messages_count
  • Subscription/oldest_unacked_message_age
  • High worker restart rates

Finally, increase your ack deadline to 120 seconds to provide more buffer for processing variations. This reduces redelivery due to timeout.

We tried adding a message_id check, but we’re hitting race conditions where two concurrent workers process the same duplicate message before either completes the DB insert. We’re using Cloud SQL with default isolation levels. Should we be using database-level locking or is there a better pattern?

I’d add that you should also investigate why you’re getting 3-8% duplicates - that seems high. Are you properly handling ack/nack? If your processing fails partway through, you should nack the message, not let it timeout and get redelivered. Also check if your workers are being terminated mid-processing, which would cause redelivery of unacked messages.

Enable Pub/Sub message ordering if you need guaranteed ordering within a key (like device_id). This can reduce duplicate processing in some scenarios. Also monitor your ack latency - if you’re close to the 60-second deadline, network hiccups could cause late acks that lead to redelivery.

Pub/Sub provides at-least-once delivery by design, not exactly-once. You must implement idempotency in your application. Check if the message_id already exists in your database before processing. This is standard practice for financial systems.