Enforcing secure firmware update policy using Pub/Sub device authentication and signed messages

Documenting our implementation of a secure firmware update policy that prevents unauthorized updates and maintains complete audit trails for our medical device fleet.

Regulatory Context: Medical devices require FDA-compliant change control. Every firmware update must be authenticated, authorized, and auditable. Unauthorized updates could compromise patient safety and violate 21 CFR Part 11.

Security Architecture: Three-layer defense: Pub/Sub device authentication prevents rogue devices from receiving updates, signed firmware update messages ensure integrity, and Cloud Audit Logs integration provides immutable audit trail.

# Firmware signing service
import hashlib
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import padding

def sign_firmware_update(firmware_binary, private_key):
    firmware_hash = hashlib.sha256(firmware_binary).digest()
    signature = private_key.sign(
        firmware_hash,
        padding.PSS(mgf=padding.MGF1(hashes.SHA256()), salt_length=padding.PSS.MAX_LENGTH),
        hashes.SHA256()
    )
    return signature

Devices validate signatures using embedded public keys before accepting updates. Cloud Audit Logs capture every update attempt with device identity, firmware version, and authorization context. This architecture has supported three FDA audits with zero findings.

We embed the root public key at manufacturing time in the device’s secure element (hardware-protected storage). For key rotation, we use a certificate chain approach. The root key validates intermediate certificates that we can rotate annually. This gives us security without requiring device firmware updates just for key rotation. The device firmware validates the certificate chain before trusting the signing key used for firmware updates.

Great question. We have an expedited signing process for emergency patches. Normal updates require three approvals (engineering, quality, regulatory), but emergency patches can proceed with two approvals and retrospective quality review. The signing process itself takes under 5 minutes once approvals are obtained. We’ve deployed emergency patches in under 2 hours from vulnerability disclosure to 100% fleet coverage while maintaining full audit compliance. The key is having the authorization workflow automated but requiring human approval gates.

What’s your process for emergency security patches? If a critical vulnerability is discovered, how quickly can you push updates while maintaining your authentication and audit requirements? We’re concerned that security procedures might slow down emergency responses.

Excellent questions from both Jen and Carlos. Let me provide comprehensive details on our complete security implementation:

Pub/Sub Device Authentication:

We leverage Cloud IoT Core’s built-in JWT authentication combined with custom authorization logic:

# Device authentication configuration
from google.cloud import iot_v1

def configure_device_authentication(device_id, public_key):
    client = iot_v1.DeviceManagerClient()

    device = iot_v1.Device(
        id=device_id,
        credentials=[
            iot_v1.DeviceCredential(
                public_key=iot_v1.PublicKeyCredential(
                    format=iot_v1.PublicKeyFormat.RSA_X509_PEM,
                    key=public_key
                )
            )
        ],
        metadata={
            'authorized_firmware_versions': '2.1.0,2.2.0,2.3.0',
            'update_authorization_level': 'standard',
            'last_authorized_update': '2025-08-01T00:00:00Z'
        }
    )

    return client.create_device(request={'parent': registry_path, 'device': device})

Each device has a unique RSA key pair registered in Cloud IoT Core. Only authenticated devices can connect to Pub/Sub topics. We maintain an authorization allowlist in device metadata that specifies which firmware versions each device is permitted to receive.

Signed Firmware Update Messages:

Multi-layer signing approach addressing replay attacks:

import time
import json
from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.hazmat.primitives import hashes, serialization

class FirmwareUpdateSigner:
    def __init__(self, private_key_path):
        with open(private_key_path, 'rb') as f:
            self.private_key = serialization.load_pem_private_key(
                f.read(), password=None
            )

    def create_signed_update(self, firmware_binary, version, min_version):
        # Create update manifest
        manifest = {
            'firmware_version': version,
            'min_allowed_version': min_version,  # Prevents downgrades
            'timestamp': int(time.time()),
            'expiration': int(time.time()) + 86400,  # 24-hour validity
            'firmware_hash': hashlib.sha256(firmware_binary).hexdigest(),
            'file_size': len(firmware_binary),
            'issuer': 'firmware-signing-service',
            'authorization_id': generate_unique_id()  # Prevents replay
        }

        # Sign the manifest
        manifest_bytes = json.dumps(manifest, sort_keys=True).encode()
        signature = self.private_key.sign(
            manifest_bytes,
            padding.PSS(
                mgf=padding.MGF1(hashes.SHA256()),
                salt_length=padding.PSS.MAX_LENGTH
            ),
            hashes.SHA256()
        )

        return {
            'manifest': manifest,
            'signature': base64.b64encode(signature).decode(),
            'firmware': base64.b64encode(firmware_binary).decode()
        }

Key anti-replay protections:

  1. Version enforcement: Manifest includes min_allowed_version. Devices reject updates to versions below this threshold.
  2. Timestamp validation: Updates expire after 24 hours. Devices reject updates with timestamps outside acceptable window.
  3. Unique authorization ID: Each update has a one-time-use ID. Devices track processed IDs and reject duplicates.
  4. Firmware hash: Prevents tampering with firmware binary during transmission.

Device-side validation:

class FirmwareUpdateValidator:
    def __init__(self, public_key, device_version):
        self.public_key = public_key
        self.current_version = device_version
        self.processed_auth_ids = self.load_processed_ids()

    def validate_update(self, signed_update):
        manifest = signed_update['manifest']
        signature = base64.b64decode(signed_update['signature'])

        # Verify signature
        manifest_bytes = json.dumps(manifest, sort_keys=True).encode()
        try:
            self.public_key.verify(
                signature,
                manifest_bytes,
                padding.PSS(
                    mgf=padding.MGF1(hashes.SHA256()),
                    salt_length=padding.PSS.MAX_LENGTH
                ),
                hashes.SHA256()
            )
        except Exception:
            return False, "Invalid signature"

        # Check version constraints
        if self.current_version < manifest['min_allowed_version']:
            return False, "Current version below minimum allowed"

        # Check timestamp
        now = int(time.time())
        if now < manifest['timestamp'] or now > manifest['expiration']:
            return False, "Update expired or not yet valid"

        # Check for replay
        if manifest['authorization_id'] in self.processed_auth_ids:
            return False, "Authorization ID already used"

        # Verify firmware hash
        firmware = base64.b64decode(signed_update['firmware'])
        if hashlib.sha256(firmware).hexdigest() != manifest['firmware_hash']:
            return False, "Firmware hash mismatch"

        return True, "Valid update"

Cloud Audit Logs Integration:

We use a hybrid approach for audit logging to balance completeness with volume:

from google.cloud import logging_v2
from google.cloud import audit_logs

class FirmwareUpdateAuditor:
    def __init__(self):
        self.client = logging_v2.Client()
        self.logger = self.client.logger('firmware-updates')

    def log_update_event(self, device_id, firmware_version, status, user_email):
        # Custom structured log entry for firmware updates
        entry = {
            'severity': 'INFO' if status == 'success' else 'WARNING',
            'labels': {
                'device_id': device_id,
                'firmware_version': firmware_version,
                'event_type': 'firmware_update'
            },
            'jsonPayload': {
                'device_id': device_id,
                'firmware_version': firmware_version,
                'update_status': status,
                'authorized_by': user_email,
                'timestamp': datetime.utcnow().isoformat(),
                'authorization_id': generate_unique_id(),
                'device_model': get_device_model(device_id),
                'previous_version': get_current_version(device_id),
                'update_method': 'ota_pubsub',
                'compliance_framework': '21CFR11'
            },
            'resource': {
                'type': 'iot_device',
                'labels': {'device_id': device_id}
            }
        }

        self.logger.log_struct(entry)

        # Also write to BigQuery for long-term audit storage
        self.write_to_audit_table(entry)

Audit log strategy:

  1. Admin Activity Logs: Capture high-level events (update initiated, completed, failed)
  2. Custom Structured Logs: Detailed device-level events with complete context
  3. BigQuery Audit Table: Long-term storage (7 years for FDA compliance) with efficient querying

Complete Workflow:

  1. Update Authorization:

    • Engineer submits firmware update request
    • Automated validation checks (unit tests, integration tests)
    • Quality team approval
    • Regulatory team approval (for medical devices)
    • Security team signs firmware with private key
  2. Update Distribution:

    • Signed update published to Pub/Sub topic
    • Only authenticated devices receive update messages
    • Devices validate signature, version, timestamp
    • Devices install update and report status
  3. Audit Trail:

    • Every step logged to Cloud Audit Logs
    • Device status updates logged
    • Failed validation attempts logged with reasons
    • Complete chain of custody maintained

FDA Audit Compliance:

Our audit logs satisfy FDA requirements:

  • Who: User email captured in authorization logs
  • What: Firmware version, device ID, update content hash
  • When: Precise timestamps for all events
  • Why: Authorization reason and approval chain
  • Immutability: Cloud Audit Logs cannot be modified or deleted

Performance Metrics:

  • Signature generation: 15ms per firmware binary
  • Device validation: 50ms per update
  • End-to-end update latency: 3-5 minutes (including validation)
  • Audit log query performance: <2 seconds for 1 year of data

Security Incident Response:

In case of compromised signing key:

  1. Revoke compromised key certificate
  2. Generate new signing key pair
  3. Push certificate update to all devices (using existing root key)
  4. Devices automatically trust new signing key
  5. Complete key rotation in <24 hours

This architecture has supported three FDA audits, two ISO 13485 audits, and one HIPAA audit with zero security or compliance findings.