OAuth
When you use your identity on one site to sign in on another, you're using OAuth. This protocol has become the standard way for applications to access user data without ever seeing the user's password. Understanding it is essential for understanding single sign-on, API access control, and third-party integrations on the internet.
OAuth solves a fundamental problem: how do you give a third-party application limited access to your account without sharing your credentials? Before the protocol was developed, users had to give their username and password to every app, which created security nightmares. OAuth introduced the concept of time-limited credentials called tokens with specific scopes that can be revoked without changing your password.
The OAuth Flow
OAuth involves four parties working together. The resource owner is typically the user who owns the data being accessed. The client application wants to access that data. The authorization server authenticates the user and issues tokens. The resource server hosts the protected resources and enforces access control.
FIXME: diagram
The authorization flow works like this:
- When the client requests authorization, the application redirects the user to the authorization server.
- The user authenticates (logs in) and grants permission.
- The server issues a one-time authorization code and sends it to the client.
- The client authenticates itself and exchanges that one-time code for an access token.
- The client then uses the token to access protected resources, e.g., to call protected APIs.
The key feature is that the client never sees the user's password, and the token has limited scope and lifetime.
Message Types
The most common OAuth flow is the Authorization Code flow designed for server-side applications. Before looking at the servers themselves, we need to understand the data types they exchange.
The foundation is a helper function that generates random strings for codes and tokens:
def generate_token(prefix: str = "tok") -> str:
"""Generate a random token."""
random_part = "".join(random.choices(string.ascii_letters + string.digits, k=16))
return f"{prefix}_{random_part}"
The six message dataclasses come in three request-response pairs,
one for each phase of the protocol.
AuthorizationRequest carries the client's identity, the desired scope,
and a state token for cross-site request forgery (CSRF) protection.
TokenRequest carries the one-time code along with the client's own credentials
so the authorization server can verify who it is talking to.
ResourceRequest carries the access token so the resource server can check
whether the caller is allowed to see the requested data:
@dataclass
class AuthorizationRequest:
"""Request for user authorization."""
client_id: str
redirect_uri: str
scope: list[str]
state: str # CSRF protection
response_queue: Queue
def __str__(self):
return f"AuthRequest(client={self.client_id}, scope={self.scope})"
@dataclass
class AuthorizationResponse:
"""Response with authorization code."""
code: str
state: str
def __str__(self):
return f"AuthResponse(code={self.code[:8]}...)"
@dataclass
class TokenRequest:
"""Request to exchange code for access token."""
code: str
client_id: str
client_secret: str
redirect_uri: str
response_queue: Queue
def __str__(self):
return f"TokenRequest(client={self.client_id})"
@dataclass
class TokenResponse:
"""Response with access token."""
access_token: str
token_type: str = "Bearer"
expires_in: int = 3600
refresh_token: str | None = None
scope: list[str] | None = None
def __str__(self):
return f"TokenResponse(token={self.access_token[:8]}...)"
@dataclass
class ResourceRequest:
"""Request to access protected resource."""
access_token: str
resource_path: str
response_queue: Queue
def __str__(self):
return f"ResourceRequest(path={self.resource_path})"
@dataclass
class ResourceResponse:
"""Response from resource server."""
success: bool
data: Any = None
error: str | None = None
def __str__(self):
if self.success:
return "ResourceResponse(success=True)"
return f"ResourceResponse(error={self.error})"
The servers also need two internal record-keeping types.
An AuthorizationCode tracks whether a code has already been used
(codes are one-time-only) and when it expires.
An AccessToken records which client owns it, what scopes it covers,
and when it expires.
Both have an is_valid method that encapsulates this check:
@dataclass
class AuthorizationCode:
"""Authorization code with metadata."""
code: str
client_id: str
redirect_uri: str
scope: list[str]
expires_at: float
used: bool = False
def is_valid(self, now: float) -> bool:
"""Check if code is still valid."""
return not self.used and now < self.expires_at
@dataclass
class AccessToken:
"""Access token with metadata."""
token: str
client_id: str
scope: list[str]
expires_at: float
def is_valid(self, now: float) -> bool:
"""Check if token is still valid."""
return now < self.expires_at
The authorization code is short-lived (typically 10 minutes), while access tokens last longer (often 1 hour).
Authorization Server
The authorization server is the trust anchor of OAuth:
it verifies the user's identity, obtains consent, and issues tokens.
Its init method creates two incoming queues and three dictionaries
for tracking registered clients, issued codes, and active tokens:
class AuthorizationServer(Process):
"""OAuth 2.0 authorization server."""
def init(self):
self.auth_queue = Queue(self._env)
self.token_queue = Queue(self._env)
# Registered clients
self.clients: Dict[str, Dict] = {}
# Issued authorization codes
self.auth_codes: Dict[str, AuthorizationCode] = {}
# Issued access tokens
self.access_tokens: Dict[str, AccessToken] = {}
# User credentials (simplified - real systems use secure storage)
self.users = {
"alice@example.com": "password123",
"bob@example.com": "secret456",
}
print(f"[{self.now:.1f}] Authorization Server started")
Before any client can use the server, it must be registered with a client ID, a shared secret, and a list of allowed redirect URIs. The redirect URI check is an important security constraint: it prevents an attacker from registering a lookalike app and redirecting authorization codes to a server they control:
def register_client(
self, client_id: str, client_secret: str, redirect_uris: List[str]
):
"""Register a new OAuth client."""
self.clients[client_id] = {
"secret": client_secret,
"redirect_uris": redirect_uris,
}
print(f"[{self.now:.1f}] Registered client: {client_id}")
The server's event loop uses asimpy's FirstOf to wait on both queues simultaneously.
Whichever queue receives a message first wins;
the other request is automatically cancelled:
async def run(self):
"""Main server loop."""
while True:
# Handle both authorization and token requests
name, request = await FirstOf(
self._env, auth=self.auth_queue.get(), token=self.token_queue.get()
)
if name == "auth":
await self.handle_authorization_request(request)
elif name == "token":
await self.handle_token_request(request)
When an authorization request arrives,
handle_authorization_request delegates to two helpers and handles the user consent simulation in between:
async def handle_authorization_request(self, request: AuthorizationRequest):
"""Handle authorization request from client."""
print(f"[{self.now:.1f}] AuthServer: Received {request}")
if not self._validate_auth_request(request):
return
# Simulate user authentication and consent
await self.timeout(0.5) # User login time
print(f"[{self.now:.1f}] AuthServer: User authenticated, showing consent")
await self.timeout(0.3) # User consent time
print(f"[{self.now:.1f}] AuthServer: User granted permissions: {request.scope}")
await self._issue_authorization_code(request)
_validate_auth_request checks that the client is registered and that the redirect URI is one the client pre-registered.
Checking the redirect URI here is an important security constraint:
without it, an attacker could register a lookalike application and redirect authorization codes to a server they control:
def _validate_auth_request(self, request: AuthorizationRequest) -> bool:
"""Check that the client is registered and the redirect URI is allowed."""
if request.client_id not in self.clients:
print(f"[{self.now:.1f}] AuthServer: Unknown client {request.client_id}")
return False
client = self.clients[request.client_id]
if request.redirect_uri not in client["redirect_uris"]:
print(f"[{self.now:.1f}] AuthServer: Invalid redirect URI")
return False
return True
_issue_authorization_code generates the code, records it in auth_codes with its expiry and scope,
and sends it back through the response queue embedded in the request:
async def _issue_authorization_code(self, request: AuthorizationRequest):
"""Generate, store, and return a one-time authorization code."""
code = generate_token("code")
auth_code = AuthorizationCode(
code=code,
client_id=request.client_id,
redirect_uri=request.redirect_uri,
scope=request.scope,
expires_at=self.now + 600, # 10 minute expiry
)
self.auth_codes[code] = auth_code
response = AuthorizationResponse(code=code, state=request.state)
await request.response_queue.put(response)
print(f"[{self.now:.1f}] AuthServer: Issued authorization code")
When the client then presents that code to exchange it for a token,
handle_token_request is similarly short because all the checking is delegated:
async def handle_token_request(self, request: TokenRequest):
"""Exchange authorization code for access token."""
print(f"[{self.now:.1f}] AuthServer: Received {request}")
auth_code = await self._validate_token_request(request)
if auth_code is None:
return
await self._issue_access_token(request, auth_code)
_validate_token_request works through a chain of checks:
the client must exist, the secret must match, the code must exist and still be valid,
and the client and redirect URI in the token request must exactly match those in the original authorization request.
Any mismatch sends an error response and returns None so the caller can exit early.
The method constructs the error response once at the top rather than repeating it at each branch:
async def _validate_token_request(
self, request: TokenRequest
) -> AuthorizationCode | None:
"""Validate client credentials and authorization code; return code or None."""
error = TokenResponse(access_token="", token_type="error")
if request.client_id not in self.clients:
print(f"[{self.now:.1f}] AuthServer: Unknown client")
await request.response_queue.put(error)
return None
client = self.clients[request.client_id]
if client["secret"] != request.client_secret:
print(f"[{self.now:.1f}] AuthServer: Invalid client secret")
await request.response_queue.put(error)
return None
if request.code not in self.auth_codes:
print(f"[{self.now:.1f}] AuthServer: Invalid authorization code")
await request.response_queue.put(error)
return None
auth_code = self.auth_codes[request.code]
if not auth_code.is_valid(self.now):
print(f"[{self.now:.1f}] AuthServer: Authorization code expired or used")
await request.response_queue.put(error)
return None
if auth_code.client_id != request.client_id:
print(f"[{self.now:.1f}] AuthServer: Code issued to different client")
await request.response_queue.put(error)
return None
if auth_code.redirect_uri != request.redirect_uri:
print(f"[{self.now:.1f}] AuthServer: Redirect URI mismatch")
await request.response_queue.put(error)
return None
return auth_code
_issue_access_token marks the code as used before doing anything else.
This is the line that makes authorization codes one-time-only:
re-using a code is one of the classic attack vectors against OAuth,
so the flag must be set before the token is issued, not after:
async def _issue_access_token(
self, request: TokenRequest, auth_code: AuthorizationCode
):
"""Mark the code used, generate an access token, store it, and send it."""
auth_code.used = True
access_token = generate_token("access")
token = AccessToken(
token=access_token,
client_id=request.client_id,
scope=auth_code.scope,
expires_at=self.now + 3600, # 1 hour expiry
)
self.access_tokens[access_token] = token
response = TokenResponse(
access_token=access_token,
token_type="Bearer",
expires_in=3600,
scope=auth_code.scope,
)
await request.response_queue.put(response)
print(f"[{self.now:.1f}] AuthServer: Issued access token")
Resource Server
The resource server hosts the protected API.
Its init method takes a reference to the authorization server
so it can look up tokens, and it stores a dictionary of protected resources
where each entry specifies the required scope and the data to return:
class ResourceServer(Process):
"""OAuth 2.0 resource server (protected API)."""
def init(self, auth_server: AuthorizationServer):
self.auth_server = auth_server
self.resource_queue = Queue(self._env)
# Protected resources
self.resources = {
"/api/profile": {
"scope_required": ["profile"],
"data": {"name": "Alice", "email": "alice@example.com"},
},
"/api/photos": {
"scope_required": ["photos"],
"data": {"photos": ["photo1.jpg", "photo2.jpg", "photo3.jpg"]},
},
"/api/messages": {
"scope_required": ["messages"],
"data": {"messages": ["Hello!", "How are you?"]},
},
}
print(f"[{self.now:.1f}] Resource Server started")
Critically, the resource server doesn't need to know anything about users, only about tokens and scopes. Its event loop is simpler than the authorization server's because it has only one incoming queue:
async def run(self):
"""Main server loop."""
while True:
request = await self.resource_queue.get()
await self.handle_resource_request(request)
When a request arrives, handle_resource_request delegates to two helpers
and sends the successful response only if both pass:
async def handle_resource_request(self, request: ResourceRequest):
"""Handle API request with access token."""
print(f"[{self.now:.1f}] ResourceServer: Received {request}")
token = await self._validate_token(request)
if token is None:
return
resource = await self._check_resource_access(request, token)
if resource is None:
return
print(f"[{self.now:.1f}] ResourceServer: Returning {request.resource_path}")
await request.response_queue.put(
ResourceResponse(success=True, data=resource["data"])
)
_validate_token looks the token up in the authorization server's table and checks it hasn't expired.
Because the resource server shares a reference to the authorization server,
it can read access_tokens directly without a network call—a simplification that real deployments
replace with a token introspection endpoint or a shared cache:
async def _validate_token(
self, request: ResourceRequest
) -> AccessToken | None:
"""Check that the token exists and has not expired; send error if not."""
if request.access_token not in self.auth_server.access_tokens:
print(f"[{self.now:.1f}] ResourceServer: Invalid token")
await request.response_queue.put(
ResourceResponse(success=False, error="invalid_token")
)
return None
token = self.auth_server.access_tokens[request.access_token]
if not token.is_valid(self.now):
print(f"[{self.now:.1f}] ResourceServer: Token expired")
await request.response_queue.put(
ResourceResponse(success=False, error="token_expired")
)
return None
return token
_check_resource_access confirms the path exists and then checks scope using set operations.
The token must cover all scopes the resource requires, but may cover more,
so a token issued with ["profile", "photos"] scope passes a check for a resource
that requires only ["profile"]:
async def _check_resource_access(
self, request: ResourceRequest, token: AccessToken
) -> dict | None:
"""Check that the resource exists and the token's scope covers it."""
if request.resource_path not in self.resources:
print(f"[{self.now:.1f}] ResourceServer: Resource not found")
await request.response_queue.put(
ResourceResponse(success=False, error="not_found")
)
return None
resource = self.resources[request.resource_path]
required_scopes = set(resource["scope_required"])
token_scopes = set(token.scope)
if not required_scopes.issubset(token_scopes):
print(f"[{self.now:.1f}] ResourceServer: Insufficient scope")
await request.response_queue.put(
ResourceResponse(success=False, error="insufficient_scope")
)
return None
return resource
OAuth Client
The OAuth client orchestrates the authorization flow.
Its init method stores the client credentials and references to both servers,
and initializes access_token to None since no token has been issued yet:
class OAuthClient(Process):
"""OAuth 2.0 client application."""
def init(
self,
client_id: str,
client_secret: str,
redirect_uri: str,
auth_server: AuthorizationServer,
resource_server: ResourceServer,
):
self.client_id = client_id
self.client_secret = client_secret
self.redirect_uri = redirect_uri
self.auth_server = auth_server
self.resource_server = resource_server
self.access_token: Optional[str] = None
print(f"[{self.now:.1f}] Client '{client_id}' started")
The client's run method calls three helper methods in sequence,
aborting if any step fails.
It requests authorization for the profile and photos scopes,
exchanges the resulting code for an access token,
then uses that token to make three API calls,
including one for messages that it was not granted access to:
async def run(self):
"""Demonstrate complete OAuth flow."""
# Step 1: Request authorization
scopes = ["profile", "photos"]
code = await self.request_authorization(scopes)
if not code:
print(f"[{self.now:.1f}] Client: Authorization failed")
return
# Step 2: Exchange code for token
token_response = await self.exchange_code_for_token(code)
if not token_response or token_response.token_type == "error":
print(f"[{self.now:.1f}] Client: Token exchange failed")
return
self.access_token = token_response.access_token
print(f"[{self.now:.1f}] Client: Got access token!")
# Step 3: Access protected resources
await self.timeout(0.5)
await self.access_resource("/api/profile")
await self.timeout(0.5)
await self.access_resource("/api/photos")
# Try accessing resource without permission
await self.timeout(0.5)
await self.access_resource("/api/messages")
request_authorization generates a state token before sending the request.
This state value is a random string that the client includes in the request
and the server echoes back in the response.
The client checks that the echoed state matches the one it sent,
which detects CSRF attacks where an attacker
tricks a user's browser into sending a forged callback:
async def request_authorization(self, scopes: List[str]) -> Optional[str]:
"""Step 1: Request user authorization."""
print(f"[{self.now:.1f}] Client: Requesting authorization for {scopes}")
state = generate_token("state") # CSRF protection
response_queue = Queue(self._env)
request = AuthorizationRequest(
client_id=self.client_id,
redirect_uri=self.redirect_uri,
scope=scopes,
state=state,
response_queue=response_queue,
)
await self.auth_server.auth_queue.put(request)
response = await response_queue.get()
# Validate state to prevent CSRF
if response.state != state:
print(f"[{self.now:.1f}] Client: State mismatch - possible CSRF attack!")
return None
print(f"[{self.now:.1f}] Client: Received authorization code")
return response.code
exchange_code_for_token packages the code with the client credentials
and sends the bundle to the token endpoint.
The client must authenticate itself here—not just the user—because the token endpoint
needs to confirm that the party asking for the token is the same one
that originally registered with the server:
async def exchange_code_for_token(self, code: str) -> Optional[TokenResponse]:
"""Step 2: Exchange authorization code for access token."""
print(f"[{self.now:.1f}] Client: Exchanging code for token")
response_queue = Queue(self._env)
request = TokenRequest(
code=code,
client_id=self.client_id,
client_secret=self.client_secret,
redirect_uri=self.redirect_uri,
response_queue=response_queue,
)
await self.auth_server.token_queue.put(request)
response = await response_queue.get()
return response
access_resource wraps the stored token in a ResourceRequest and
waits for the response.
The client never stores the user's password at any point in this flow;
it only ever handles codes and tokens:
async def access_resource(self, path: str):
"""Step 3: Access protected resource with token."""
print(f"[{self.now:.1f}] Client: Accessing {path}")
if not self.access_token:
print(f"[{self.now:.1f}] Client: No access token!")
return
response_queue = Queue(self._env)
request = ResourceRequest(
access_token=self.access_token,
resource_path=path,
response_queue=response_queue,
)
await self.resource_server.resource_queue.put(request)
response = await response_queue.get()
if response.success:
print(f"[{self.now:.1f}] Client: Success! Data: {response.data}")
else:
print(f"[{self.now:.1f}] Client: Failed - {response.error}")
Basic OAuth Simulation
Putting it all together, the simulation creates an authorization server and resource server,
registers a client application called photo_app,
and then runs the complete three-step flow:
def main():
"""Demonstrate basic OAuth 2.0 authorization code flow."""
env = Environment()
# Create authorization server
auth_server = AuthorizationServer(env)
# Create resource server
resource_server = ResourceServer(env, auth_server)
# Register client application
client_id = "photo_app"
client_secret = "secret_xyz"
redirect_uri = "https://photoapp.example.com/callback"
auth_server.register_client(
client_id=client_id, client_secret=client_secret, redirect_uris=[redirect_uri]
)
# Create client
OAuthClient(
env,
client_id=client_id,
client_secret=client_secret,
redirect_uri=redirect_uri,
auth_server=auth_server,
resource_server=resource_server,
)
# Run simulation
env.run(until=20)
The output shows the servers processing requests in sequence,
the user authentication pause,
the code exchange,
and finally the resource access attempts—including the denied request for messages,
which the client was never granted permission to read.
[0.0] Authorization Server started
[0.0] Resource Server started
[0.0] Registered client: photo_app
[0.0] Client 'photo_app' started
[0.0] Client: Requesting authorization for ['profile', 'photos']
[0.0] AuthServer: Received AuthRequest(client=photo_app, scope=['profile', 'photos'])
[0.5] AuthServer: User authenticated, showing consent
[0.8] AuthServer: User granted permissions: ['profile', 'photos']
[0.8] AuthServer: Issued authorization code
[0.8] Client: Received authorization code
[0.8] Client: Exchanging code for token
[0.8] AuthServer: Received TokenRequest(client=photo_app)
[0.8] AuthServer: Issued access token
[0.8] Client: Got access token!
[1.3] Client: Accessing /api/profile
[1.3] ResourceServer: Received ResourceRequest(path=/api/profile)
[1.3] ResourceServer: Returning /api/profile
[1.3] Client: Success! Data: {'name': 'Alice', 'email': 'alice@example.com'}
[1.8] Client: Accessing /api/photos
[1.8] ResourceServer: Received ResourceRequest(path=/api/photos)
[1.8] ResourceServer: Returning /api/photos
[1.8] Client: Success! Data: {'photos': ['photo1.jpg', 'photo2.jpg', 'photo3.jpg']}
[2.3] Client: Accessing /api/messages
[2.3] ResourceServer: Received ResourceRequest(path=/api/messages)
[2.3] ResourceServer: Insufficient scope
[2.3] Client: Failed - insufficient_scope
Refresh Tokens
Access tokens expire quickly (typically 1 hour).
Refresh tokens allow clients to get new access tokens without user interaction.
A RefreshToken has the same structure as an access token
but is stored separately and used only to request a replacement:
@dataclass
class RefreshToken:
"""Refresh token for obtaining new access tokens."""
token: str
client_id: str
scope: list[str]
expires_at: float
def is_valid(self, now: float) -> bool:
return now < self.expires_at
The authorization server can issue refresh tokens alongside access tokens. Clients use refresh tokens to get new access tokens when they expire, maintaining long-lived sessions without storing passwords.
In the Real World
Real OAuth implementations must address several security concerns that our simulation ignores. For example, they must ensure that an authorization response matches the original request to prevent CSRF attacks. They also need token management, so that a resource server can check token validity with an authorization server or a user can revoke a token in order to do things like sign out of all devices.