from rich.console import Console import os import json import secrets import webbrowser as browser from http.server import HTTPServer, BaseHTTPRequestHandler from urllib.parse import urlparse, parse_qs from threading import Thread import time from typing import Optional import requests import keyring import keyring.errors from secnex.kit.auth.session import Session from secnex.kit.config.config import Config console = Console() class RedirectHandler(BaseHTTPRequestHandler): def __init__(self, *args, auth_instance=None, **kwargs): self.auth_instance = auth_instance super().__init__(*args, **kwargs) def do_GET(self): """Handle the redirect from the OAuth provider""" if self.auth_instance is None: self.send_response(500) self.send_header("Content-type", "text/html") self.end_headers() self.wfile.write(b"

Error

Auth instance not available.

") return parsed_url = urlparse(self.path) query_params = parse_qs(parsed_url.query) code = query_params.get("code", [None])[0] state = query_params.get("state", [None])[0] if code and state: self.auth_instance.received_code = code self.auth_instance.received_state = state self.send_response(200) self.send_header("Content-type", "text/html") self.end_headers() self.wfile.write(b"

Login successful!

You can close this window.

") if self.auth_instance.server: self.auth_instance.server.shutdown() else: self.send_response(400) self.send_header("Content-type", "text/html") self.end_headers() self.wfile.write(b"

Error

Missing code or state parameter.

") def log_message(self, format, *args): """Suppress default logging""" pass class Auth: SERVICE_NAME = "secnex-cli" def __init__(self, config: Config) -> None: self.config = config self.received_code = None self.received_state = None self.server = None self.session = None def _get_keyring_username(self) -> str: """Get the username for keyring storage""" return f"{self.config.name}_{self.config.client_id}" def save_token(self, token: str) -> None: """Save access token to keyring""" try: username = self._get_keyring_username() keyring.set_password(self.SERVICE_NAME, username, token) console.print(f"Access token saved securely", style="bold green") except Exception as e: console.print(f"Failed to save token to keyring: {e}", style="bold red") def get_token(self) -> Optional[str]: """Get access token from keyring""" try: username = self._get_keyring_username() token = keyring.get_password(self.SERVICE_NAME, username) return token except Exception as e: console.print(f"Failed to get token from keyring: {e}", style="bold red") return None def delete_token(self) -> None: """Delete access token from keyring""" try: username = self._get_keyring_username() keyring.delete_password(self.SERVICE_NAME, username) console.print(f"Access token deleted", style="bold green") except keyring.errors.PasswordDeleteError: # Token doesn't exist, which is fine pass except Exception as e: console.print(f"Failed to delete token from keyring: {e}", style="bold red") def exchange_code_for_token(self, code: str, redirect_uri: str) -> Optional[Session]: """Exchange authorization code for access token""" token_url = f"{self.config.ui}/token" data = { "grant_type": "authorization_code", "code": code, "redirect_uri": redirect_uri, "client_id": self.config.client_id, "client_secret": self.config.client_secret, } try: response = requests.post(token_url, data=data) response.raise_for_status() token_data = response.json() access_token = token_data.get("access_token") if access_token: self.session = Session(access_token) console.print(f"Successfully obtained access token!", style="bold green") # Save token to keyring self.save_token(access_token) return self.session else: console.print(f"Token response missing access_token", style="bold red") return None except requests.exceptions.RequestException as e: console.print(f"Failed to exchange code for token! Please check the logs for more information.", style="bold red") return None def login(self) -> None: """Login to the SecNex API""" response_type = "code" scope = "openid profile email" state = secrets.token_hex(10) redirect_uri = "http://localhost:8001" url = f"{self.config.ui}/authorize?client_id={self.config.client_id}&response_type={response_type}&scope={scope}&state={state}&redirect_uri={redirect_uri}" # Start HTTP server on port 8001 def create_handler(*args, **kwargs): return RedirectHandler(*args, auth_instance=self, **kwargs) self.server = HTTPServer(("localhost", 8001), create_handler) server_thread = Thread(target=self.server.serve_forever, daemon=True) server_thread.start() console.print(f"Waiting for redirect on http://localhost:8001...", style="bold yellow") console.print(f"Opening browser to: {url}", style="bold green") browser.open(url) # Wait for the redirect (with timeout) timeout = 300 # 5 minutes start_time = time.time() while self.received_code is None and (time.time() - start_time) < timeout: time.sleep(0.1) if self.received_code: console.print(f"Received authorization code!", style="bold green") # Exchange code for token console.print(f"Exchanging code for token...", style="bold yellow") session = self.exchange_code_for_token(self.received_code, redirect_uri) if session: console.print(f"Login successful!", style="bold green") else: console.print(f"Login failed: Could not obtain access token. Please try again.", style="bold red") else: console.print(f"Timeout waiting for authorization code. Please try again.", style="bold red")