OAuth

When you use your identity on one site to sign in on another, you're using OAuth. This protocol has become the standard way for applications to access user data without ever seeing the user's password. Understanding it is essential for understanding single sign-on, API access control, and third-party integrations on the internet.

OAuth solves a fundamental problem: how do you give a third-party application limited access to your account without sharing your credentials? Before the protocol was developed, users had to give their username and password to every app, which created security nightmares. OAuth introduced the concept of time-limited credentials called tokens with specific scopes that can be revoked without changing your password.

The OAuth Flow

OAuth involves four parties working together. The resource owner is typically the user who owns the data being accessed. The client application wants to access that data. The authorization server authenticates the user and issues tokens. The resource server hosts the protected resources and enforces access control.

FIXME: diagram

The authorization flow works like this:

  1. When the client requests authorization, the application redirects the user to the authorization server.
  2. The user authenticates (logs in) and grants permission.
  3. The server issues a one-time authorization code and sends it to the client.
  4. The client authenticates itself and exchanges that one-time code for an access token.
  5. The client then uses the token to access protected resources, e.g., to call protected APIs.

The key feature is that the client never sees the user's password, and the token has limited scope and lifetime.

OAuth 2.0 defines several other flows for other scenarios, including Proof Key for Code Exchange (PKCE) for mobile and single-page apps that can't securely store secrets and Client Credentials Flow for machine-to-machine authentication without user involvement.

Message Types

The most common OAuth flow is the Authorization Code flow designed for server-side applications. Before looking at the servers themselves, we need to understand the data types they exchange.

The foundation is a helper function that generates random strings for codes and tokens:

def generate_token(prefix: str = "tok") -> str:
    """Generate a random token."""
    random_part = "".join(random.choices(string.ascii_letters + string.digits, k=16))
    return f"{prefix}_{random_part}"

The six message dataclasses come in three request-response pairs, one for each phase of the protocol. AuthorizationRequest carries the client's identity, the desired scope, and a state token for cross-site request forgery (CSRF) protection. TokenRequest carries the one-time code along with the client's own credentials so the authorization server can verify who it is talking to. ResourceRequest carries the access token so the resource server can check whether the caller is allowed to see the requested data:

@dataclass
class AuthorizationRequest:
    """Request for user authorization."""

    client_id: str
    redirect_uri: str
    scope: list[str]
    state: str  # CSRF protection
    response_queue: Queue

    def __str__(self):
        return f"AuthRequest(client={self.client_id}, scope={self.scope})"


@dataclass
class AuthorizationResponse:
    """Response with authorization code."""

    code: str
    state: str

    def __str__(self):
        return f"AuthResponse(code={self.code[:8]}...)"


@dataclass
class TokenRequest:
    """Request to exchange code for access token."""

    code: str
    client_id: str
    client_secret: str
    redirect_uri: str
    response_queue: Queue

    def __str__(self):
        return f"TokenRequest(client={self.client_id})"


@dataclass
class TokenResponse:
    """Response with access token."""

    access_token: str
    token_type: str = "Bearer"
    expires_in: int = 3600
    refresh_token: str | None = None
    scope: list[str] | None = None

    def __str__(self):
        return f"TokenResponse(token={self.access_token[:8]}...)"


@dataclass
class ResourceRequest:
    """Request to access protected resource."""

    access_token: str
    resource_path: str
    response_queue: Queue

    def __str__(self):
        return f"ResourceRequest(path={self.resource_path})"


@dataclass
class ResourceResponse:
    """Response from resource server."""

    success: bool
    data: Any = None
    error: str | None = None

    def __str__(self):
        if self.success:
            return "ResourceResponse(success=True)"
        return f"ResourceResponse(error={self.error})"

The servers also need two internal record-keeping types. An AuthorizationCode tracks whether a code has already been used (codes are one-time-only) and when it expires. An AccessToken records which client owns it, what scopes it covers, and when it expires. Both have an is_valid method that encapsulates this check:

@dataclass
class AuthorizationCode:
    """Authorization code with metadata."""

    code: str
    client_id: str
    redirect_uri: str
    scope: list[str]
    expires_at: float
    used: bool = False

    def is_valid(self, now: float) -> bool:
        """Check if code is still valid."""
        return not self.used and now < self.expires_at


@dataclass
class AccessToken:
    """Access token with metadata."""

    token: str
    client_id: str
    scope: list[str]
    expires_at: float

    def is_valid(self, now: float) -> bool:
        """Check if token is still valid."""
        return now < self.expires_at

The authorization code is short-lived (typically 10 minutes), while access tokens last longer (often 1 hour).

Authorization Server

The authorization server is the trust anchor of OAuth: it verifies the user's identity, obtains consent, and issues tokens. Its init method creates two incoming queues and three dictionaries for tracking registered clients, issued codes, and active tokens:

class AuthorizationServer(Process):
    """OAuth 2.0 authorization server."""

    def init(self):
        self.auth_queue = Queue(self._env)
        self.token_queue = Queue(self._env)

        # Registered clients
        self.clients: Dict[str, Dict] = {}

        # Issued authorization codes
        self.auth_codes: Dict[str, AuthorizationCode] = {}

        # Issued access tokens
        self.access_tokens: Dict[str, AccessToken] = {}

        # User credentials (simplified - real systems use secure storage)
        self.users = {
            "alice@example.com": "password123",
            "bob@example.com": "secret456",
        }

        print(f"[{self.now:.1f}] Authorization Server started")

Before any client can use the server, it must be registered with a client ID, a shared secret, and a list of allowed redirect URIs. The redirect URI check is an important security constraint: it prevents an attacker from registering a lookalike app and redirecting authorization codes to a server they control:

    def register_client(
        self, client_id: str, client_secret: str, redirect_uris: List[str]
    ):
        """Register a new OAuth client."""
        self.clients[client_id] = {
            "secret": client_secret,
            "redirect_uris": redirect_uris,
        }
        print(f"[{self.now:.1f}] Registered client: {client_id}")

The server's event loop uses asimpy's FirstOf to wait on both queues simultaneously. Whichever queue receives a message first wins; the other request is automatically cancelled:

    async def run(self):
        """Main server loop."""
        while True:
            # Handle both authorization and token requests
            name, request = await FirstOf(
                self._env, auth=self.auth_queue.get(), token=self.token_queue.get()
            )

            if name == "auth":
                await self.handle_authorization_request(request)
            elif name == "token":
                await self.handle_token_request(request)

When an authorization request arrives, handle_authorization_request delegates to two helpers and handles the user consent simulation in between:

    async def handle_authorization_request(self, request: AuthorizationRequest):
        """Handle authorization request from client."""
        print(f"[{self.now:.1f}] AuthServer: Received {request}")

        if not self._validate_auth_request(request):
            return

        # Simulate user authentication and consent
        await self.timeout(0.5)  # User login time
        print(f"[{self.now:.1f}] AuthServer: User authenticated, showing consent")

        await self.timeout(0.3)  # User consent time
        print(f"[{self.now:.1f}] AuthServer: User granted permissions: {request.scope}")

        await self._issue_authorization_code(request)

_validate_auth_request checks that the client is registered and that the redirect URI is one the client pre-registered. Checking the redirect URI here is an important security constraint: without it, an attacker could register a lookalike application and redirect authorization codes to a server they control:

    def _validate_auth_request(self, request: AuthorizationRequest) -> bool:
        """Check that the client is registered and the redirect URI is allowed."""
        if request.client_id not in self.clients:
            print(f"[{self.now:.1f}] AuthServer: Unknown client {request.client_id}")
            return False

        client = self.clients[request.client_id]
        if request.redirect_uri not in client["redirect_uris"]:
            print(f"[{self.now:.1f}] AuthServer: Invalid redirect URI")
            return False

        return True

_issue_authorization_code generates the code, records it in auth_codes with its expiry and scope, and sends it back through the response queue embedded in the request:

    async def _issue_authorization_code(self, request: AuthorizationRequest):
        """Generate, store, and return a one-time authorization code."""
        code = generate_token("code")
        auth_code = AuthorizationCode(
            code=code,
            client_id=request.client_id,
            redirect_uri=request.redirect_uri,
            scope=request.scope,
            expires_at=self.now + 600,  # 10 minute expiry
        )
        self.auth_codes[code] = auth_code

        response = AuthorizationResponse(code=code, state=request.state)
        await request.response_queue.put(response)

        print(f"[{self.now:.1f}] AuthServer: Issued authorization code")

When the client then presents that code to exchange it for a token, handle_token_request is similarly short because all the checking is delegated:

    async def handle_token_request(self, request: TokenRequest):
        """Exchange authorization code for access token."""
        print(f"[{self.now:.1f}] AuthServer: Received {request}")

        auth_code = await self._validate_token_request(request)
        if auth_code is None:
            return

        await self._issue_access_token(request, auth_code)

_validate_token_request works through a chain of checks: the client must exist, the secret must match, the code must exist and still be valid, and the client and redirect URI in the token request must exactly match those in the original authorization request. Any mismatch sends an error response and returns None so the caller can exit early. The method constructs the error response once at the top rather than repeating it at each branch:

    async def _validate_token_request(
        self, request: TokenRequest
    ) -> AuthorizationCode | None:
        """Validate client credentials and authorization code; return code or None."""
        error = TokenResponse(access_token="", token_type="error")

        if request.client_id not in self.clients:
            print(f"[{self.now:.1f}] AuthServer: Unknown client")
            await request.response_queue.put(error)
            return None

        client = self.clients[request.client_id]
        if client["secret"] != request.client_secret:
            print(f"[{self.now:.1f}] AuthServer: Invalid client secret")
            await request.response_queue.put(error)
            return None

        if request.code not in self.auth_codes:
            print(f"[{self.now:.1f}] AuthServer: Invalid authorization code")
            await request.response_queue.put(error)
            return None

        auth_code = self.auth_codes[request.code]

        if not auth_code.is_valid(self.now):
            print(f"[{self.now:.1f}] AuthServer: Authorization code expired or used")
            await request.response_queue.put(error)
            return None

        if auth_code.client_id != request.client_id:
            print(f"[{self.now:.1f}] AuthServer: Code issued to different client")
            await request.response_queue.put(error)
            return None

        if auth_code.redirect_uri != request.redirect_uri:
            print(f"[{self.now:.1f}] AuthServer: Redirect URI mismatch")
            await request.response_queue.put(error)
            return None

        return auth_code

_issue_access_token marks the code as used before doing anything else. This is the line that makes authorization codes one-time-only: re-using a code is one of the classic attack vectors against OAuth, so the flag must be set before the token is issued, not after:

    async def _issue_access_token(
        self, request: TokenRequest, auth_code: AuthorizationCode
    ):
        """Mark the code used, generate an access token, store it, and send it."""
        auth_code.used = True

        access_token = generate_token("access")
        token = AccessToken(
            token=access_token,
            client_id=request.client_id,
            scope=auth_code.scope,
            expires_at=self.now + 3600,  # 1 hour expiry
        )
        self.access_tokens[access_token] = token

        response = TokenResponse(
            access_token=access_token,
            token_type="Bearer",
            expires_in=3600,
            scope=auth_code.scope,
        )
        await request.response_queue.put(response)

        print(f"[{self.now:.1f}] AuthServer: Issued access token")

Resource Server

The resource server hosts the protected API. Its init method takes a reference to the authorization server so it can look up tokens, and it stores a dictionary of protected resources where each entry specifies the required scope and the data to return:

class ResourceServer(Process):
    """OAuth 2.0 resource server (protected API)."""

    def init(self, auth_server: AuthorizationServer):
        self.auth_server = auth_server
        self.resource_queue = Queue(self._env)

        # Protected resources
        self.resources = {
            "/api/profile": {
                "scope_required": ["profile"],
                "data": {"name": "Alice", "email": "alice@example.com"},
            },
            "/api/photos": {
                "scope_required": ["photos"],
                "data": {"photos": ["photo1.jpg", "photo2.jpg", "photo3.jpg"]},
            },
            "/api/messages": {
                "scope_required": ["messages"],
                "data": {"messages": ["Hello!", "How are you?"]},
            },
        }

        print(f"[{self.now:.1f}] Resource Server started")

Critically, the resource server doesn't need to know anything about users, only about tokens and scopes. Its event loop is simpler than the authorization server's because it has only one incoming queue:

    async def run(self):
        """Main server loop."""
        while True:
            request = await self.resource_queue.get()
            await self.handle_resource_request(request)

When a request arrives, handle_resource_request delegates to two helpers and sends the successful response only if both pass:

    async def handle_resource_request(self, request: ResourceRequest):
        """Handle API request with access token."""
        print(f"[{self.now:.1f}] ResourceServer: Received {request}")

        token = await self._validate_token(request)
        if token is None:
            return

        resource = await self._check_resource_access(request, token)
        if resource is None:
            return

        print(f"[{self.now:.1f}] ResourceServer: Returning {request.resource_path}")
        await request.response_queue.put(
            ResourceResponse(success=True, data=resource["data"])
        )

_validate_token looks the token up in the authorization server's table and checks it hasn't expired. Because the resource server shares a reference to the authorization server, it can read access_tokens directly without a network call—a simplification that real deployments replace with a token introspection endpoint or a shared cache:

    async def _validate_token(
        self, request: ResourceRequest
    ) -> AccessToken | None:
        """Check that the token exists and has not expired; send error if not."""
        if request.access_token not in self.auth_server.access_tokens:
            print(f"[{self.now:.1f}] ResourceServer: Invalid token")
            await request.response_queue.put(
                ResourceResponse(success=False, error="invalid_token")
            )
            return None

        token = self.auth_server.access_tokens[request.access_token]

        if not token.is_valid(self.now):
            print(f"[{self.now:.1f}] ResourceServer: Token expired")
            await request.response_queue.put(
                ResourceResponse(success=False, error="token_expired")
            )
            return None

        return token

_check_resource_access confirms the path exists and then checks scope using set operations. The token must cover all scopes the resource requires, but may cover more, so a token issued with ["profile", "photos"] scope passes a check for a resource that requires only ["profile"]:

    async def _check_resource_access(
        self, request: ResourceRequest, token: AccessToken
    ) -> dict | None:
        """Check that the resource exists and the token's scope covers it."""
        if request.resource_path not in self.resources:
            print(f"[{self.now:.1f}] ResourceServer: Resource not found")
            await request.response_queue.put(
                ResourceResponse(success=False, error="not_found")
            )
            return None

        resource = self.resources[request.resource_path]
        required_scopes = set(resource["scope_required"])
        token_scopes = set(token.scope)

        if not required_scopes.issubset(token_scopes):
            print(f"[{self.now:.1f}] ResourceServer: Insufficient scope")
            await request.response_queue.put(
                ResourceResponse(success=False, error="insufficient_scope")
            )
            return None

        return resource

OAuth Client

The OAuth client orchestrates the authorization flow. Its init method stores the client credentials and references to both servers, and initializes access_token to None since no token has been issued yet:

class OAuthClient(Process):
    """OAuth 2.0 client application."""

    def init(
        self,
        client_id: str,
        client_secret: str,
        redirect_uri: str,
        auth_server: AuthorizationServer,
        resource_server: ResourceServer,
    ):
        self.client_id = client_id
        self.client_secret = client_secret
        self.redirect_uri = redirect_uri
        self.auth_server = auth_server
        self.resource_server = resource_server

        self.access_token: Optional[str] = None

        print(f"[{self.now:.1f}] Client '{client_id}' started")

The client's run method calls three helper methods in sequence, aborting if any step fails. It requests authorization for the profile and photos scopes, exchanges the resulting code for an access token, then uses that token to make three API calls, including one for messages that it was not granted access to:

    async def run(self):
        """Demonstrate complete OAuth flow."""
        # Step 1: Request authorization
        scopes = ["profile", "photos"]
        code = await self.request_authorization(scopes)

        if not code:
            print(f"[{self.now:.1f}] Client: Authorization failed")
            return

        # Step 2: Exchange code for token
        token_response = await self.exchange_code_for_token(code)

        if not token_response or token_response.token_type == "error":
            print(f"[{self.now:.1f}] Client: Token exchange failed")
            return

        self.access_token = token_response.access_token
        print(f"[{self.now:.1f}] Client: Got access token!")

        # Step 3: Access protected resources
        await self.timeout(0.5)
        await self.access_resource("/api/profile")

        await self.timeout(0.5)
        await self.access_resource("/api/photos")

        # Try accessing resource without permission
        await self.timeout(0.5)
        await self.access_resource("/api/messages")

request_authorization generates a state token before sending the request. This state value is a random string that the client includes in the request and the server echoes back in the response. The client checks that the echoed state matches the one it sent, which detects CSRF attacks where an attacker tricks a user's browser into sending a forged callback:

    async def request_authorization(self, scopes: List[str]) -> Optional[str]:
        """Step 1: Request user authorization."""
        print(f"[{self.now:.1f}] Client: Requesting authorization for {scopes}")

        state = generate_token("state")  # CSRF protection
        response_queue = Queue(self._env)

        request = AuthorizationRequest(
            client_id=self.client_id,
            redirect_uri=self.redirect_uri,
            scope=scopes,
            state=state,
            response_queue=response_queue,
        )

        await self.auth_server.auth_queue.put(request)
        response = await response_queue.get()

        # Validate state to prevent CSRF
        if response.state != state:
            print(f"[{self.now:.1f}] Client: State mismatch - possible CSRF attack!")
            return None

        print(f"[{self.now:.1f}] Client: Received authorization code")
        return response.code

exchange_code_for_token packages the code with the client credentials and sends the bundle to the token endpoint. The client must authenticate itself here—not just the user—because the token endpoint needs to confirm that the party asking for the token is the same one that originally registered with the server:

    async def exchange_code_for_token(self, code: str) -> Optional[TokenResponse]:
        """Step 2: Exchange authorization code for access token."""
        print(f"[{self.now:.1f}] Client: Exchanging code for token")

        response_queue = Queue(self._env)

        request = TokenRequest(
            code=code,
            client_id=self.client_id,
            client_secret=self.client_secret,
            redirect_uri=self.redirect_uri,
            response_queue=response_queue,
        )

        await self.auth_server.token_queue.put(request)
        response = await response_queue.get()

        return response

access_resource wraps the stored token in a ResourceRequest and waits for the response. The client never stores the user's password at any point in this flow; it only ever handles codes and tokens:

    async def access_resource(self, path: str):
        """Step 3: Access protected resource with token."""
        print(f"[{self.now:.1f}] Client: Accessing {path}")

        if not self.access_token:
            print(f"[{self.now:.1f}] Client: No access token!")
            return

        response_queue = Queue(self._env)

        request = ResourceRequest(
            access_token=self.access_token,
            resource_path=path,
            response_queue=response_queue,
        )

        await self.resource_server.resource_queue.put(request)
        response = await response_queue.get()

        if response.success:
            print(f"[{self.now:.1f}] Client: Success! Data: {response.data}")
        else:
            print(f"[{self.now:.1f}] Client: Failed - {response.error}")

Basic OAuth Simulation

Putting it all together, the simulation creates an authorization server and resource server, registers a client application called photo_app, and then runs the complete three-step flow:

def main():
    """Demonstrate basic OAuth 2.0 authorization code flow."""
    env = Environment()

    # Create authorization server
    auth_server = AuthorizationServer(env)

    # Create resource server
    resource_server = ResourceServer(env, auth_server)

    # Register client application
    client_id = "photo_app"
    client_secret = "secret_xyz"
    redirect_uri = "https://photoapp.example.com/callback"

    auth_server.register_client(
        client_id=client_id, client_secret=client_secret, redirect_uris=[redirect_uri]
    )

    # Create client
    OAuthClient(
        env,
        client_id=client_id,
        client_secret=client_secret,
        redirect_uri=redirect_uri,
        auth_server=auth_server,
        resource_server=resource_server,
    )

    # Run simulation
    env.run(until=20)

The output shows the servers processing requests in sequence, the user authentication pause, the code exchange, and finally the resource access attempts—including the denied request for messages, which the client was never granted permission to read.

[0.0] Authorization Server started
[0.0] Resource Server started
[0.0] Registered client: photo_app
[0.0] Client 'photo_app' started
[0.0] Client: Requesting authorization for ['profile', 'photos']
[0.0] AuthServer: Received AuthRequest(client=photo_app, scope=['profile', 'photos'])
[0.5] AuthServer: User authenticated, showing consent
[0.8] AuthServer: User granted permissions: ['profile', 'photos']
[0.8] AuthServer: Issued authorization code
[0.8] Client: Received authorization code
[0.8] Client: Exchanging code for token
[0.8] AuthServer: Received TokenRequest(client=photo_app)
[0.8] AuthServer: Issued access token
[0.8] Client: Got access token!
[1.3] Client: Accessing /api/profile
[1.3] ResourceServer: Received ResourceRequest(path=/api/profile)
[1.3] ResourceServer: Returning /api/profile
[1.3] Client: Success! Data: {'name': 'Alice', 'email': 'alice@example.com'}
[1.8] Client: Accessing /api/photos
[1.8] ResourceServer: Received ResourceRequest(path=/api/photos)
[1.8] ResourceServer: Returning /api/photos
[1.8] Client: Success! Data: {'photos': ['photo1.jpg', 'photo2.jpg', 'photo3.jpg']}
[2.3] Client: Accessing /api/messages
[2.3] ResourceServer: Received ResourceRequest(path=/api/messages)
[2.3] ResourceServer: Insufficient scope
[2.3] Client: Failed - insufficient_scope

Refresh Tokens

Access tokens expire quickly (typically 1 hour). Refresh tokens allow clients to get new access tokens without user interaction. A RefreshToken has the same structure as an access token but is stored separately and used only to request a replacement:

@dataclass
class RefreshToken:
    """Refresh token for obtaining new access tokens."""

    token: str
    client_id: str
    scope: list[str]
    expires_at: float

    def is_valid(self, now: float) -> bool:
        return now < self.expires_at

The authorization server can issue refresh tokens alongside access tokens. Clients use refresh tokens to get new access tokens when they expire, maintaining long-lived sessions without storing passwords.

In the Real World

Real OAuth implementations must address several security concerns that our simulation ignores. For example, they must ensure that an authorization response matches the original request to prevent CSRF attacks. They also need token management, so that a resource server can check token validity with an authorization server or a user can revoke a token in order to do things like sign out of all devices.