Migrating from Static JWT Secrets to JWKS in Supabase

Supabase recently announced their transition from symmetric JWT signing using static secrets to asymmetric JWT signing with JWKS (JSON Web Key Set). This change brings significant security and performance benefits, but requires updating your authentication code.

Why JWKS is Better

Security Benefits:

Performance Benefits:

The Migration Process

Before: Static Secret Validation

Here’s how most FastAPI applications were validating Supabase tokens:

# core/security.py - OLD APPROACH
from jose import jwt, JWTError
import os

SUPABASE_JWT_SECRET = os.getenv("SUPABASE_JWT_SECRET")

def validate_supabase_token(token: str):
    try:
        payload = jwt.decode(
            token,
            SUPABASE_JWT_SECRET,  # Static secret
            algorithms=["HS256"],  # Symmetric algorithm
            options={"verify_aud": False}
        )
        return payload
    except JWTError as e:
        logger.error(f"Token validation error: {str(e)}")
        return None

After: JWKS Validation

Here’s the updated implementation using JWKS:

# core/security.py - NEW APPROACH
from fastapi import Request, HTTPException
from jose import jwt, JWTError, jwk
import os
import logging
import httpx
from typing import Dict, Any, Optional

logger = logging.getLogger(__name__)

SUPABASE_JWKS_URL = os.getenv("SUPABASE_JWKS_URL", "https://your-project.supabase.co/auth/v1/.well-known/jwks.json")

# Cache for JWKS keys to avoid frequent network requests
_jwks_cache: Optional[Dict[str, Any]] = None

async def get_jwks() -> Dict[str, Any]:
    """Fetch JWKS from Supabase endpoint with caching"""
    global _jwks_cache
    
    if _jwks_cache is None:
        try:
            async with httpx.AsyncClient() as client:
                response = await client.get(SUPABASE_JWKS_URL)
                response.raise_for_status()
                _jwks_cache = response.json()
                logger.info("JWKS fetched and cached successfully")
        except Exception as e:
            logger.error(f"Failed to fetch JWKS: {str(e)}")
            raise HTTPException(status_code=500, detail="Failed to fetch authentication keys")
    
    return _jwks_cache

def get_signing_key(kid: str, jwks_data: Dict[str, Any]) -> str:
    """Extract the public key for the given key ID"""
    for key in jwks_data.get("keys", []):
        if key.get("kid") == kid:
            return jwk.construct(key).to_pem()
    raise ValueError(f"Unable to find key with kid: {kid}")

async def validate_supabase_token(token: str) -> Optional[Dict[str, Any]]:
    try:
        # Get the unverified header to extract the key ID
        unverified_header = jwt.get_unverified_header(token)
        kid = unverified_header.get("kid")
        
        if not kid:
            logger.error("Token missing key ID (kid) in header")
            return None
        
        # Fetch JWKS and get the signing key
        jwks_data = await get_jwks()
        signing_key = get_signing_key(kid, jwks_data)
        
        # Decode and verify the token with the public key
        payload = jwt.decode(
            token,
            signing_key,  # Public key from JWKS
            algorithms=["ES256"],  # Asymmetric algorithm
            options={"verify_aud": False}
        )
        return payload
    except JWTError as e:
        logger.error(f"Token validation error: {str(e)}")
        return None
    except Exception as e:
        logger.error(f"Unexpected error during token validation: {str(e)}")
        return None

async def get_current_user(request: Request):
    token = request.headers.get("Authorization")
    if not token:
        raise HTTPException(status_code=401, detail="Authorization header missing")
    
    token = token.split("Bearer ")[-1]
    user_data = await validate_supabase_token(token)
    if not user_data:
        raise HTTPException(status_code=401, detail="Invalid token")
    return user_data

Key Changes Explained

1. Environment Variable Update

# Old
SUPABASE_JWT_SECRET = os.getenv("SUPABASE_JWT_SECRET")

# New  
SUPABASE_JWKS_URL = os.getenv("SUPABASE_JWKS_URL", "https://your-project.supabase.co/auth/v1/.well-known/jwks.json")

2. Algorithm Change

# Old: Symmetric HMAC
algorithms=["HS256"]

# New: Asymmetric Elliptic Curve
algorithms=["ES256"]

3. Key Retrieval Process

The new approach:

  1. Extracts the kid (key ID) from the JWT header
  2. Fetches the JWKS from Supabase (with caching)
  3. Finds the matching public key using the kid
  4. Uses the public key to verify the token signature

4. Async Support

The validation function is now async to support HTTP requests for JWKS fetching.

Environment Configuration

Update your environment variables:

# Remove (keep for backwards compatibility during transition)
# SUPABASE_JWT_SECRET=your_old_secret

# Add
SUPABASE_JWKS_URL=https://your-project.supabase.co/auth/v1/.well-known/jwks.json

Testing the Migration

You can test both old and new tokens during the transition period. Here’s a simple test:

import asyncio
from core.security import validate_supabase_token

async def test_token_validation():
    # Test with a real JWT from your Supabase auth
    test_token = "eyJ..."  # Your JWT here
    result = await validate_supabase_token(test_token)
    print(f"Validation result: {result}")

# Run the test
asyncio.run(test_token_validation())

Performance Considerations

Caching Strategy

The implementation includes global caching for JWKS to avoid repeated network calls:

_jwks_cache: Optional[Dict[str, Any]] = None

Consider implementing more sophisticated caching with TTL if needed:

from datetime import datetime, timedelta

class JWKSCache:
    def __init__(self, ttl_seconds: int = 3600):  # 1 hour default
        self.cache: Optional[Dict[str, Any]] = None
        self.expires_at: Optional[datetime] = None
        self.ttl_seconds = ttl_seconds
    
    def is_expired(self) -> bool:
        return self.expires_at is None or datetime.now() > self.expires_at
    
    def set(self, data: Dict[str, Any]):
        self.cache = data
        self.expires_at = datetime.now() + timedelta(seconds=self.ttl_seconds)
    
    def get(self) -> Optional[Dict[str, Any]]:
        return None if self.is_expired() else self.cache

Migration Timeline

According to Supabase’s announcement:

Common Issues and Solutions

Issue: Missing kid in JWT header

if not kid:
    logger.error("Token missing key ID (kid) in header")
    return None

Solution: Ensure you’re using updated Supabase client libraries that include kid in JWT headers.

Issue: JWKS fetch failures

except Exception as e:
    logger.error(f"Failed to fetch JWKS: {str(e)}")
    raise HTTPException(status_code=500, detail="Failed to fetch authentication keys")

Solution: Implement retry logic and fallback mechanisms for production environments.

Issue: Algorithm mismatch

algorithms=["ES256"]  # Make sure this matches your Supabase setup

Solution: Check your Supabase project settings to confirm the signing algorithm.

Dependencies Required

Make sure you have the necessary packages:

pip install python-jose[cryptography] httpx

Or add to your requirements.txt:

python-jose[cryptography]
httpx

Conclusion

Migrating to JWKS provides better security, performance, and scalability for your authentication system. The transition is straightforward but requires updating your validation logic to handle asymmetric keys and JWKS fetching.

The key benefits you’ll gain:

Start the migration now to be ready for Supabase’s upcoming changes and enjoy the immediate performance benefits of local token validation.

By: Gavi Narra on: