OAuth
- Trace the authorization code flow through all four parties (resource owner, client, authorization server, resource server) and explain what each step accomplishes.
- Explain what PKCE adds to the authorization code flow and why it is necessary for public clients such as mobile apps.
- Describe the client credentials flow and explain when to use it instead of the authorization code flow.
- Identify the risks of storing access tokens insecurely and describe what an attacker can do with a stolen token.
When you use your identity on one site to sign in on another, you're using OAuth. This protocol has become the standard way for applications to access user data without ever seeing the user's password. Understanding it is essential for understanding single sign-on, API access control, and third-party integrations on the internet.
OAuth solves a fundamental problem: how do you give a third-party application limited access to your account without sharing your credentials? Before the protocol was developed, users had to give their username and password to every app, which created security nightmares. OAuth introduced the concept of time-limited credentials called tokens with specific scopes that can be revoked without changing your password.
The OAuth Flow
OAuth involves four parties working together. The resource owner is typically the user who owns the data being accessed. The client application wants to access that data. The authorization server authenticates the user and issues tokens. The resource server hosts the protected resources and enforces access control.
FIXME: diagram
The authorization flow works like this:
- When the client requests authorization, the application redirects the user to the authorization server.
- The user authenticates (logs in) and grants permission.
- The server issues a one-time authorization code and sends it to the client.
- The client authenticates itself and exchanges that one-time code for an access token.
- The client then uses the token to access protected resources, e.g., to call protected APIs.
The key feature is that the client never sees the user's password, and the token has limited scope and lifetime.
Message Types
The most common OAuth flow is the Authorization Code flow designed for server-side applications. Before looking at the servers themselves, we need to understand the data types they exchange.
The foundation is a helper function that generates random strings for codes and tokens:
def generate_token(prefix: str = "tok") -> str:
"""Generate a random token."""
random_part = "".join(random.choices(string.ascii_letters + string.digits, k=16))
return f"{prefix}_{random_part}"
The six message dataclasses come in three request-response pairs,
one for each phase of the protocol.
AuthorizationRequest carries the client's identity, the desired scope,
and a state token for cross-site request forgery (CSRF) protection.
TokenRequest carries the one-time code along with the client's own credentials
so the authorization server can verify who it is talking to.
ResourceRequest carries the access token so the resource server can check
whether the caller is allowed to see the requested data:
@dataclass
class AuthorizationRequest:
"""Request for user authorization."""
client_id: str
redirect_uri: str
scope: list[str]
state: str # CSRF protection
response_queue: Queue
def __str__(self):
return f"AuthRequest(client={self.client_id}, scope={self.scope})"
@dataclass
class AuthorizationResponse:
"""Response with authorization code."""
code: str
state: str
def __str__(self):
return f"AuthResponse(code={self.code[:8]}...)"
@dataclass
class TokenRequest:
"""Request to exchange code for access token."""
code: str
client_id: str
client_secret: str
redirect_uri: str
response_queue: Queue
def __str__(self):
return f"TokenRequest(client={self.client_id})"
@dataclass
class TokenResponse:
"""Response with access token."""
access_token: str
token_type: str = "Bearer"
expires_in: int = 3600
refresh_token: str | None = None
scope: list[str] | None = None
def __str__(self):
return f"TokenResponse(token={self.access_token[:8]}...)"
@dataclass
class ResourceRequest:
"""Request to access protected resource."""
access_token: str
resource_path: str
response_queue: Queue
def __str__(self):
return f"ResourceRequest(path={self.resource_path})"
@dataclass
class ResourceResponse:
"""Response from resource server."""
success: bool
data: Any = None
error: str | None = None
def __str__(self):
if self.success:
return "ResourceResponse(success=True)"
return f"ResourceResponse(error={self.error})"
The servers also need two internal record-keeping types.
An AuthorizationCode tracks whether a code has already been used
(codes are one-time-only) and when it expires.
An AccessToken records which client owns it, what scopes it covers,
and when it expires.
Both have an is_valid method that encapsulates this check:
@dataclass
class AuthorizationCode:
"""Authorization code with metadata."""
code: str
client_id: str
redirect_uri: str
scope: list[str]
expires_at: float
used: bool = False
def is_valid(self, now: float) -> bool:
"""Check if code is still valid."""
return not self.used and now < self.expires_at
@dataclass
class AccessToken:
"""Access token with metadata."""
token: str
client_id: str
scope: list[str]
expires_at: float
def is_valid(self, now: float) -> bool:
"""Check if token is still valid."""
return now < self.expires_at
The authorization code is short-lived (typically 10 minutes), while access tokens last longer (often 1 hour).
Authorization Server
The authorization server is the trust anchor of OAuth:
it verifies the user's identity, obtains consent, and issues tokens.
Its init method creates two incoming queues and three dictionaries
for tracking registered clients, issued codes, and active tokens:
class AuthorizationServer(Process):
"""OAuth 2.0 authorization server."""
def init(self):
self.auth_queue = Queue(self._env)
self.token_queue = Queue(self._env)
# Registered clients
self.clients: Dict[str, Dict] = {}
# Issued authorization codes
self.auth_codes: Dict[str, AuthorizationCode] = {}
# Issued access tokens
self.access_tokens: Dict[str, AccessToken] = {}
# User credentials (simplified - real systems use secure storage)
self.users = {
"alice@example.com": "password123",
"bob@example.com": "secret456",
}
print(f"[{self.now:.1f}] Authorization Server started")
Before any client can use the server, it must be registered with a client ID, a shared secret, and a list of allowed redirect URIs. The redirect URI check is an important security constraint: it prevents an attacker from registering a lookalike app and redirecting authorization codes to a server they control:
def register_client(
self, client_id: str, client_secret: str, redirect_uris: List[str]
):
"""Register a new OAuth client."""
self.clients[client_id] = {
"secret": client_secret,
"redirect_uris": redirect_uris,
}
print(f"[{self.now:.1f}] Registered client: {client_id}")
The server's event loop uses asimpy's FirstOf to wait on both queues simultaneously.
Whichever queue receives a message first wins;
the other request is automatically cancelled:
async def run(self):
"""Main server loop."""
while True:
# Handle both authorization and token requests
name, request = await FirstOf(
self._env, auth=self.auth_queue.get(), token=self.token_queue.get()
)
if name == "auth":
await self.handle_authorization_request(request)
elif name == "token":
await self.handle_token_request(request)
When an authorization request arrives,
handle_authorization_request delegates to two helpers and handles the user consent simulation in between:
async def handle_authorization_request(self, request: AuthorizationRequest):
"""Handle authorization request from client."""
print(f"[{self.now:.1f}] AuthServer: Received {request}")
if not self._validate_auth_request(request):
return
# Simulate user authentication and consent
await self.timeout(0.5) # User login time
print(f"[{self.now:.1f}] AuthServer: User authenticated, showing consent")
await self.timeout(0.3) # User consent time
print(f"[{self.now:.1f}] AuthServer: User granted permissions: {request.scope}")
await self._issue_authorization_code(request)
_validate_auth_request checks that the client is registered and that the redirect URI is one the client pre-registered.
Checking the redirect URI here is an important security constraint:
without it, an attacker could register a lookalike application and redirect authorization codes to a server they control:
def _validate_auth_request(self, request: AuthorizationRequest) -> bool:
"""Check that the client is registered and the redirect URI is allowed."""
if request.client_id not in self.clients:
print(f"[{self.now:.1f}] AuthServer: Unknown client {request.client_id}")
return False
client = self.clients[request.client_id]
if request.redirect_uri not in client["redirect_uris"]:
print(f"[{self.now:.1f}] AuthServer: Invalid redirect URI")
return False
return True
_issue_authorization_code generates the code, records it in auth_codes with its expiry and scope,
and sends it back through the response queue embedded in the request:
async def _issue_authorization_code(self, request: AuthorizationRequest):
"""Generate, store, and return a one-time authorization code."""
code = generate_token("code")
auth_code = AuthorizationCode(
code=code,
client_id=request.client_id,
redirect_uri=request.redirect_uri,
scope=request.scope,
expires_at=self.now + 600, # 10 minute expiry
)
self.auth_codes[code] = auth_code
response = AuthorizationResponse(code=code, state=request.state)
await request.response_queue.put(response)
print(f"[{self.now:.1f}] AuthServer: Issued authorization code")
When the client then presents that code to exchange it for a token,
handle_token_request is similarly short because all the checking is delegated:
async def handle_token_request(self, request: TokenRequest):
"""Exchange authorization code for access token."""
print(f"[{self.now:.1f}] AuthServer: Received {request}")
auth_code = await self._validate_token_request(request)
if auth_code is None:
return
await self._issue_access_token(request, auth_code)
_validate_token_request works through a chain of checks:
the client must exist, the secret must match, the code must exist and still be valid,
and the client and redirect URI in the token request must exactly match those in the original authorization request.
Any mismatch sends an error response and returns None so the caller can exit early.
The method constructs the error response once at the top rather than repeating it at each branch:
async def _validate_token_request(
self, request: TokenRequest
) -> AuthorizationCode | None:
"""Validate client credentials and authorization code; return code or None."""
error = TokenResponse(access_token="", token_type="error")
if request.client_id not in self.clients:
print(f"[{self.now:.1f}] AuthServer: Unknown client")
await request.response_queue.put(error)
return None
client = self.clients[request.client_id]
if client["secret"] != request.client_secret:
print(f"[{self.now:.1f}] AuthServer: Invalid client secret")
await request.response_queue.put(error)
return None
if request.code not in self.auth_codes:
print(f"[{self.now:.1f}] AuthServer: Invalid authorization code")
await request.response_queue.put(error)
return None
auth_code = self.auth_codes[request.code]
if not auth_code.is_valid(self.now):
print(f"[{self.now:.1f}] AuthServer: Authorization code expired or used")
await request.response_queue.put(error)
return None
if auth_code.client_id != request.client_id:
print(f"[{self.now:.1f}] AuthServer: Code issued to different client")
await request.response_queue.put(error)
return None
if auth_code.redirect_uri != request.redirect_uri:
print(f"[{self.now:.1f}] AuthServer: Redirect URI mismatch")
await request.response_queue.put(error)
return None
return auth_code
_issue_access_token marks the code as used before doing anything else.
This is the line that makes authorization codes one-time-only:
re-using a code is one of the classic attack vectors against OAuth,
so the flag must be set before the token is issued, not after:
async def _issue_access_token(
self, request: TokenRequest, auth_code: AuthorizationCode
):
"""Mark the code used, generate an access token, store it, and send it."""
auth_code.used = True
access_token = generate_token("access")
token = AccessToken(
token=access_token,
client_id=request.client_id,
scope=auth_code.scope,
expires_at=self.now + 3600, # 1 hour expiry
)
self.access_tokens[access_token] = token
response = TokenResponse(
access_token=access_token,
token_type="Bearer",
expires_in=3600,
scope=auth_code.scope,
)
await request.response_queue.put(response)
print(f"[{self.now:.1f}] AuthServer: Issued access token")
Resource Server
The resource server hosts the protected API.
Its init method takes a reference to the authorization server
so it can look up tokens, and it stores a dictionary of protected resources
where each entry specifies the required scope and the data to return:
class ResourceServer(Process):
"""OAuth 2.0 resource server (protected API)."""
def init(self, auth_server: AuthorizationServer):
self.auth_server = auth_server
self.resource_queue = Queue(self._env)
# Protected resources
self.resources = {
"/api/profile": {
"scope_required": ["profile"],
"data": {"name": "Alice", "email": "alice@example.com"},
},
"/api/photos": {
"scope_required": ["photos"],
"data": {"photos": ["photo1.jpg", "photo2.jpg", "photo3.jpg"]},
},
"/api/messages": {
"scope_required": ["messages"],
"data": {"messages": ["Hello!", "How are you?"]},
},
}
print(f"[{self.now:.1f}] Resource Server started")
Critically, the resource server doesn't need to know anything about users, only about tokens and scopes. Its event loop is simpler than the authorization server's because it has only one incoming queue:
async def run(self):
"""Main server loop."""
while True:
request = await self.resource_queue.get()
await self.handle_resource_request(request)
When a request arrives, handle_resource_request delegates to two helpers
and sends the successful response only if both pass:
async def handle_resource_request(self, request: ResourceRequest):
"""Handle API request with access token."""
print(f"[{self.now:.1f}] ResourceServer: Received {request}")
token = await self._validate_token(request)
if token is None:
return
resource = await self._check_resource_access(request, token)
if resource is None:
return
print(f"[{self.now:.1f}] ResourceServer: Returning {request.resource_path}")
await request.response_queue.put(
ResourceResponse(success=True, data=resource["data"])
)
_validate_token looks the token up in the authorization server's table and checks it hasn't expired.
Because the resource server shares a reference to the authorization server,
it can read access_tokens directly without a network call—a simplification that real deployments
replace with a token introspection endpoint or a shared cache:
async def _validate_token(
self, request: ResourceRequest
) -> AccessToken | None:
"""Check that the token exists and has not expired; send error if not."""
if request.access_token not in self.auth_server.access_tokens:
print(f"[{self.now:.1f}] ResourceServer: Invalid token")
await request.response_queue.put(
ResourceResponse(success=False, error="invalid_token")
)
return None
token = self.auth_server.access_tokens[request.access_token]
if not token.is_valid(self.now):
print(f"[{self.now:.1f}] ResourceServer: Token expired")
await request.response_queue.put(
ResourceResponse(success=False, error="token_expired")
)
return None
return token
_check_resource_access confirms the path exists and then checks scope using set operations.
The token must cover all scopes the resource requires, but may cover more,
so a token issued with ["profile", "photos"] scope passes a check for a resource
that requires only ["profile"]:
async def _check_resource_access(
self, request: ResourceRequest, token: AccessToken
) -> dict | None:
"""Check that the resource exists and the token's scope covers it."""
if request.resource_path not in self.resources:
print(f"[{self.now:.1f}] ResourceServer: Resource not found")
await request.response_queue.put(
ResourceResponse(success=False, error="not_found")
)
return None
resource = self.resources[request.resource_path]
required_scopes = set(resource["scope_required"])
token_scopes = set(token.scope)
if not required_scopes.issubset(token_scopes):
print(f"[{self.now:.1f}] ResourceServer: Insufficient scope")
await request.response_queue.put(
ResourceResponse(success=False, error="insufficient_scope")
)
return None
return resource
OAuth Client
The OAuth client orchestrates the authorization flow.
Its init method stores the client credentials and references to both servers,
and initializes access_token to None since no token has been issued yet:
class OAuthClient(Process):
"""OAuth 2.0 client application."""
def init(
self,
client_id: str,
client_secret: str,
redirect_uri: str,
auth_server: AuthorizationServer,
resource_server: ResourceServer,
):
self.client_id = client_id
self.client_secret = client_secret
self.redirect_uri = redirect_uri
self.auth_server = auth_server
self.resource_server = resource_server
self.access_token: Optional[str] = None
print(f"[{self.now:.1f}] Client '{client_id}' started")
The client's run method calls three helper methods in sequence,
aborting if any step fails.
It requests authorization for the profile and photos scopes,
exchanges the resulting code for an access token,
then uses that token to make three API calls,
including one for messages that it was not granted access to:
async def run(self):
"""Demonstrate complete OAuth flow."""
# Step 1: Request authorization
scopes = ["profile", "photos"]
code = await self.request_authorization(scopes)
if not code:
print(f"[{self.now:.1f}] Client: Authorization failed")
return
# Step 2: Exchange code for token
token_response = await self.exchange_code_for_token(code)
if not token_response or token_response.token_type == "error":
print(f"[{self.now:.1f}] Client: Token exchange failed")
return
self.access_token = token_response.access_token
print(f"[{self.now:.1f}] Client: Got access token!")
# Step 3: Access protected resources
await self.timeout(0.5)
await self.access_resource("/api/profile")
await self.timeout(0.5)
await self.access_resource("/api/photos")
# Try accessing resource without permission
await self.timeout(0.5)
await self.access_resource("/api/messages")
request_authorization generates a state token before sending the request.
This state value is a random string that the client includes in the request
and the server echoes back in the response.
The client checks that the echoed state matches the one it sent,
which detects CSRF attacks where an attacker
tricks a user's browser into sending a forged callback:
async def request_authorization(self, scopes: List[str]) -> Optional[str]:
"""Step 1: Request user authorization."""
print(f"[{self.now:.1f}] Client: Requesting authorization for {scopes}")
state = generate_token("state") # CSRF protection
response_queue = Queue(self._env)
request = AuthorizationRequest(
client_id=self.client_id,
redirect_uri=self.redirect_uri,
scope=scopes,
state=state,
response_queue=response_queue,
)
await self.auth_server.auth_queue.put(request)
response = await response_queue.get()
# Validate state to prevent CSRF
if response.state != state:
print(f"[{self.now:.1f}] Client: State mismatch - possible CSRF attack!")
return None
print(f"[{self.now:.1f}] Client: Received authorization code")
return response.code
exchange_code_for_token packages the code with the client credentials
and sends the bundle to the token endpoint.
The client must authenticate itself here—not just the user—because the token endpoint
needs to confirm that the party asking for the token is the same one
that originally registered with the server:
async def exchange_code_for_token(self, code: str) -> Optional[TokenResponse]:
"""Step 2: Exchange authorization code for access token."""
print(f"[{self.now:.1f}] Client: Exchanging code for token")
response_queue = Queue(self._env)
request = TokenRequest(
code=code,
client_id=self.client_id,
client_secret=self.client_secret,
redirect_uri=self.redirect_uri,
response_queue=response_queue,
)
await self.auth_server.token_queue.put(request)
response = await response_queue.get()
return response
access_resource wraps the stored token in a ResourceRequest and
waits for the response.
The client never stores the user's password at any point in this flow;
it only ever handles codes and tokens:
async def access_resource(self, path: str):
"""Step 3: Access protected resource with token."""
print(f"[{self.now:.1f}] Client: Accessing {path}")
if not self.access_token:
print(f"[{self.now:.1f}] Client: No access token!")
return
response_queue = Queue(self._env)
request = ResourceRequest(
access_token=self.access_token,
resource_path=path,
response_queue=response_queue,
)
await self.resource_server.resource_queue.put(request)
response = await response_queue.get()
if response.success:
print(f"[{self.now:.1f}] Client: Success! Data: {response.data}")
else:
print(f"[{self.now:.1f}] Client: Failed - {response.error}")
Basic OAuth Simulation
Putting it all together, the simulation creates an authorization server and resource server,
registers a client application called photo_app,
and then runs the complete three-step flow:
def main():
"""Demonstrate basic OAuth 2.0 authorization code flow."""
env = Environment()
# Create authorization server
auth_server = AuthorizationServer(env)
# Create resource server
resource_server = ResourceServer(env, auth_server)
# Register client application
client_id = "photo_app"
client_secret = "secret_xyz"
redirect_uri = "https://photoapp.example.com/callback"
auth_server.register_client(
client_id=client_id, client_secret=client_secret, redirect_uris=[redirect_uri]
)
# Create client
OAuthClient(
env,
client_id=client_id,
client_secret=client_secret,
redirect_uri=redirect_uri,
auth_server=auth_server,
resource_server=resource_server,
)
# Run simulation
env.run(until=20)
The output shows the servers processing requests in sequence,
the user authentication pause,
the code exchange,
and finally the resource access attempts—including the denied request for messages,
which the client was never granted permission to read.
[0.0] Authorization Server started
[0.0] Resource Server started
[0.0] Registered client: photo_app
[0.0] Client 'photo_app' started
[0.0] Client: Requesting authorization for ['profile', 'photos']
[0.0] AuthServer: Received AuthRequest(client=photo_app, scope=['profile', 'photos'])
[0.5] AuthServer: User authenticated, showing consent
[0.8] AuthServer: User granted permissions: ['profile', 'photos']
[0.8] AuthServer: Issued authorization code
[0.8] Client: Received authorization code
[0.8] Client: Exchanging code for token
[0.8] AuthServer: Received TokenRequest(client=photo_app)
[0.8] AuthServer: Issued access token
[0.8] Client: Got access token!
[1.3] Client: Accessing /api/profile
[1.3] ResourceServer: Received ResourceRequest(path=/api/profile)
[1.3] ResourceServer: Returning /api/profile
[1.3] Client: Success! Data: {'name': 'Alice', 'email': 'alice@example.com'}
[1.8] Client: Accessing /api/photos
[1.8] ResourceServer: Received ResourceRequest(path=/api/photos)
[1.8] ResourceServer: Returning /api/photos
[1.8] Client: Success! Data: {'photos': ['photo1.jpg', 'photo2.jpg', 'photo3.jpg']}
[2.3] Client: Accessing /api/messages
[2.3] ResourceServer: Received ResourceRequest(path=/api/messages)
[2.3] ResourceServer: Insufficient scope
[2.3] Client: Failed - insufficient_scope
PKCE for Public Clients
The authorization code flow above assumes the client has a client_secret—
a shared secret between the client and the authorization server.
Mobile apps and single-page apps cannot keep a secret:
anyone who installs the app or reads the JavaScript source code can extract it.
PKCE (Proof Key for Code Exchange, RFC 7636) solves this.
Before making the authorization request, the client generates a random code verifier and computes a code challenge by hashing it with SHA-256:
def generate_code_verifier(length: int = 64) -> str:
"""Generate a cryptographically random code verifier.
The verifier must be 43–128 characters of URL-safe random data.
In production, use os.urandom and encode as base64url.
We use a simpler approach here since we are not cryptographically
concerned in a simulation.
"""
raw = os.urandom(length)
return base64.urlsafe_b64encode(raw).rstrip(b"=").decode("ascii")[:length]
def compute_code_challenge(verifier: str) -> str:
"""Compute SHA-256 hash of the verifier, base64url-encoded (no padding).
This is the S256 method from RFC 7636. The 'plain' method (sending the
verifier as the challenge) is allowed by the spec but provides no
additional security and should not be used.
"""
digest = hashlib.sha256(verifier.encode("ascii")).digest()
return base64.urlsafe_b64encode(digest).rstrip(b"=").decode("ascii")
def verify_challenge(verifier: str, stored_challenge: str) -> bool:
"""Verify that the verifier matches the stored challenge."""
return compute_code_challenge(verifier) == stored_challenge
The client sends the challenge (not the verifier) in the authorization request. When exchanging the code for a token, it sends the original verifier. The server hashes it and checks that it matches the stored challenge. An attacker who intercepts the code cannot use it without knowing the verifier, which was never sent over the network.
class PKCEClient(Process):
"""OAuth client that uses PKCE for the authorization code flow.
This client generates a code verifier before requesting authorization,
sends the code challenge with the authorization request,
and proves knowledge of the verifier when exchanging the code for a token.
"""
def init(
self,
client_id: str,
redirect_uri: str,
auth_server_auth_queue: Queue,
auth_server_token_queue: Queue,
) -> None:
self.client_id = client_id
self.redirect_uri = redirect_uri
self.auth_server_auth_queue = auth_server_auth_queue
self.auth_server_token_queue = auth_server_token_queue
# PKCE clients have no client_secret (they are public clients).
self.access_token: Optional[str] = None
self._code_verifier: Optional[str] = None
print(f"[{self.now:.1f}] PKCEClient '{client_id}' started (no client secret)")
async def run(self) -> None:
"""Demonstrate the PKCE flow."""
scopes = ["profile", "photos"]
# Step 1: Generate verifier and challenge BEFORE the authorization request.
self._code_verifier = generate_code_verifier()
challenge = compute_code_challenge(self._code_verifier)
print(
f"[{self.now:.1f}] PKCEClient: Generated code verifier "
f"(challenge={challenge[:12]}...)"
)
# Step 2: Send authorization request with the challenge.
code = await self._request_authorization(scopes, challenge)
if code is None:
print(f"[{self.now:.1f}] PKCEClient: Authorization failed")
return
# Step 3: Exchange code for token using the verifier.
token = await self._exchange_code(code)
if token is None:
print(f"[{self.now:.1f}] PKCEClient: Token exchange failed")
return
self.access_token = token
print(f"[{self.now:.1f}] PKCEClient: Token acquired successfully via PKCE")
async def _request_authorization(
self, scopes: list[str], challenge: str
) -> Optional[str]:
"""Send authorization request including the code challenge."""
from oauth_types import AuthorizationRequest
state = generate_token("state")
response_queue: Queue = Queue(self._env)
# In a real request this would be an HTTP redirect to the authorization
# server with code_challenge and code_challenge_method=S256 as query params.
request = AuthorizationRequest(
client_id=self.client_id,
redirect_uri=self.redirect_uri,
scope=scopes,
state=state,
response_queue=response_queue,
)
# Attach PKCE fields (would be query parameters in real HTTP).
request.code_challenge = challenge # type: ignore[attr-defined]
request.code_challenge_method = "S256" # type: ignore[attr-defined]
await self.auth_server_auth_queue.put(request)
response = await response_queue.get()
if response.state != state:
print(f"[{self.now:.1f}] PKCEClient: State mismatch — CSRF?")
return None
return response.code
async def _exchange_code(self, code: str) -> Optional[str]:
"""Exchange the authorization code for a token, proving the verifier."""
from oauth_types import TokenRequest
response_queue: Queue = Queue(self._env)
request = TokenRequest(
code=code,
client_id=self.client_id,
client_secret="", # Public clients have no secret.
redirect_uri=self.redirect_uri,
response_queue=response_queue,
)
# Attach the code verifier — the server will hash it and compare.
request.code_verifier = self._code_verifier # type: ignore[attr-defined]
await self.auth_server_token_queue.put(request)
response: TokenResponse = await response_queue.get()
if response.token_type == "error":
print(f"[{self.now:.1f}] PKCEClient: Token exchange error")
return None
return response.access_token
Token storage: Once a client has an access token, it must store it securely.
On mobile apps, use the platform's secure storage (iOS Keychain, Android Keystore).
In web apps, prefer sessionStorage over localStorage—
tokens in localStorage are accessible to any JavaScript on the page,
including third-party analytics scripts.
Never store tokens in non-HttpOnly cookies,
which are readable by JavaScript and vulnerable to XSS.
Client Credentials Flow
The authorization code flow authenticates users. Server-to-server communication—batch jobs, microservices, scheduled tasks—often has no user. The Client Credentials flow handles this case: the application authenticates itself directly and receives a token representing the application, not any particular user.
from dataclasses import dataclass
@dataclass
class ClientCredentialsRequest:
"""Token request for the client credentials flow.
Unlike an authorization code request, there is no code to exchange.
The client authenticates directly with its credentials.
"""
client_id: str
client_secret: str
scope: list[str]
response_queue: Queue
class ClientCredentialsClient(Process):
"""OAuth client using the client credentials flow.
Suitable for server-to-server communication where there is no user.
The client authenticates itself and receives a token that represents
the application's own identity rather than any particular user.
"""
def init(
self,
client_id: str,
client_secret: str,
auth_server_token_queue: Queue,
scopes: Optional[list[str]] = None,
) -> None:
self.client_id = client_id
self.client_secret = client_secret
self.auth_server_token_queue = auth_server_token_queue
self.scopes = scopes or ["read"]
self.access_token: Optional[str] = None
self.token_expiry: float = 0.0
print(
f"[{self.now:.1f}] M2M client '{client_id}' started "
f"(scopes: {self.scopes})"
)
async def run(self) -> None:
"""Acquire a token and use it; refresh when it expires."""
await self._acquire_token()
if self.access_token:
print(
f"[{self.now:.1f}] M2M client: Token acquired, "
f"making API calls..."
)
# Simulate periodic API calls.
for _ in range(3):
await self.timeout(1.0)
await self._make_api_call()
else:
print(f"[{self.now:.1f}] M2M client: Failed to acquire token")
async def _acquire_token(self) -> None:
"""Request a token directly from the token endpoint."""
print(f"[{self.now:.1f}] M2M client: Requesting token with client credentials")
response_queue: Queue = Queue(self._env)
request = ClientCredentialsRequest(
client_id=self.client_id,
client_secret=self.client_secret,
scope=self.scopes,
response_queue=response_queue,
)
await self.auth_server_token_queue.put(request)
response = await response_queue.get()
if hasattr(response, "access_token") and response.access_token:
self.access_token = response.access_token
self.token_expiry = self.now + 60.0 # tokens typically last 1 hour
print(
f"[{self.now:.1f}] M2M client: Token acquired "
f"(expires at {self.token_expiry:.0f})"
)
else:
print(f"[{self.now:.1f}] M2M client: Token request failed")
async def _make_api_call(self) -> None:
"""Make an API call, refreshing the token if it has expired."""
if self.now >= self.token_expiry:
print(f"[{self.now:.1f}] M2M client: Token expired, refreshing...")
await self._acquire_token()
if self.access_token:
print(
f"[{self.now:.1f}] M2M client: API call with token "
f"{self.access_token[:8]}..."
)
else:
print(f"[{self.now:.1f}] M2M client: Cannot call API, no token")
The flow is simpler than authorization code:
there is no redirect, no authorization code, and no user consent screen.
The client sends its client_id and client_secret directly to the token endpoint.
This means the client_secret is the application's password and must be protected accordingly.
Refresh Tokens
Access tokens expire quickly (typically 1 hour).
Refresh tokens allow clients to get new access tokens without user interaction.
A RefreshToken has the same structure as an access token
but is stored separately and used only to request a replacement:
@dataclass
class RefreshToken:
"""Refresh token for obtaining new access tokens."""
token: str
client_id: str
scope: list[str]
expires_at: float
def is_valid(self, now: float) -> bool:
return now < self.expires_at
The authorization server can issue refresh tokens alongside access tokens. Clients use refresh tokens to get new access tokens when they expire, maintaining long-lived sessions without storing passwords.
In the Real World
Real OAuth implementations must address several security concerns that our simulation ignores. For example, they must ensure that an authorization response matches the original request to prevent CSRF attacks. They also need token management, so that a resource server can check token validity with an authorization server or a user can revoke a token in order to do things like sign out of all devices.
Exercises
-
In the basic OAuth simulation, the authorization server accepts any redirect URI that was pre-registered. Modify the example to register two redirect URIs for
photo_app:"https://photo.example.com/callback"and"https://photo.example.com/mobile". Verify that a request with a URI that is not in the registered list is rejected. Why is this check important? -
The authorization code is single-use. Find the line in
authorization_server.pythat marks a code as used. What would happen if this line were removed? Write a test that reuses the same code twice and verify that the second use fails. -
Trace through the PKCE flow for this interception scenario: An attacker has intercepted the authorization code but does not know the code verifier. What happens when the attacker sends the code to the token endpoint without a verifier? What happens if the attacker sends the code with a wrong verifier? Verify by modifying
PKCEClient._exchange_codeto send a tampered verifier. -
The client credentials flow is used for machine-to-machine authentication. Modify
ClientCredentialsClientto automatically retry token acquisition if the first attempt fails, with exponential backoff. What should the retry limit be, and why? -
Access tokens expire (after 60 seconds in the simulation). What happens if a client tries to use an expired token? Trace through
resource_server.pyto find the expiry check. Now modifyOAuthClientto detect a token-expired error in the resource response and automatically request a new token before retrying. (This is the "token refresh" pattern; you can use a refresh token or re-run the full flow.)