Here’s a comprehensive solution addressing all three focus areas:
Pub/Sub Message Deduplication:
Implement database-level deduplication with unique constraints:
CREATE TABLE billing_events (
message_id VARCHAR(255) PRIMARY KEY,
device_id VARCHAR(100) NOT NULL,
usage_amount DECIMAL(10,2),
processed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
In your application:
try:
cursor.execute(
"INSERT INTO billing_events (message_id, device_id, usage_amount) VALUES (%s, %s, %s)",
(message.message_id, device_id, amount)
)
connection.commit()
message.ack()
except IntegrityError: # Duplicate message_id
message.ack() # Already processed, safe to ack
Ack/Nack Handling:
Your current code acks after DB insert, which is correct, but you need proper error handling:
def process_billing_event(message):
try:
# Parse and validate
billing_data = parse_message(message.data)
# Insert with deduplication
insert_billing_record(message.message_id, billing_data)
message.ack()
except ValidationError as e:
# Invalid data - don't retry
log_error(f"Invalid message: {e}")
message.ack() # Ack to prevent infinite retries
except DatabaseError as e:
# Transient DB error - retry
log_error(f"DB error: {e}")
message.nack() # Explicit nack for retry
Exactly-Once Delivery:
Pub/Sub doesn’t guarantee exactly-once, but you can achieve it at the application level:
- Use transactions for atomic deduplication:
with connection.begin():
# Check + Insert in single transaction
result = cursor.execute(
"INSERT INTO billing_events ... ON CONFLICT (message_id) DO NOTHING RETURNING id"
)
if result.rowcount > 0:
# First time processing this message
cursor.execute("INSERT INTO customer_charges ...")
- Implement distributed locking for critical sections using Cloud Memorystore (Redis):
import redis
lock = redis_client.lock(f"billing_lock:{message.message_id}", timeout=30)
if lock.acquire(blocking=False):
try:
process_billing(message)
finally:
lock.release()
- Monitor duplicate rates:
total_messages = Counter('billing_messages_total')
duplicate_messages = Counter('billing_duplicates_total')
if is_duplicate:
duplicate_messages.inc()
total_messages.inc()
Set up alerts when duplicate rate exceeds 1%. Your 3-8% rate suggests ack handling issues or worker instability. Check Cloud Monitoring for:
- Subscription/unacked_messages_count
- Subscription/oldest_unacked_message_age
- High worker restart rates
Finally, increase your ack deadline to 120 seconds to provide more buffer for processing variations. This reduces redelivery due to timeout.