feat(auth): Add OAuth2 authentication

This commit is contained in:
Björn Benouarets
2026-01-27 16:35:46 +01:00
commit 50c85e9b7f
36 changed files with 1599 additions and 0 deletions

176
secnex/kit/auth/auth.py Normal file
View File

@@ -0,0 +1,176 @@
from rich.console import Console
import os
import json
import secrets
import webbrowser as browser
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import urlparse, parse_qs
from threading import Thread
import time
from typing import Optional
import requests
import keyring
import keyring.errors
from secnex.kit.auth.session import Session
from secnex.kit.config.config import Config
console = Console()
class RedirectHandler(BaseHTTPRequestHandler):
def __init__(self, *args, auth_instance=None, **kwargs):
self.auth_instance = auth_instance
super().__init__(*args, **kwargs)
def do_GET(self):
"""Handle the redirect from the OAuth provider"""
if self.auth_instance is None:
self.send_response(500)
self.send_header("Content-type", "text/html")
self.end_headers()
self.wfile.write(b"<html><body><h1>Error</h1><p>Auth instance not available.</p></body></html>")
return
parsed_url = urlparse(self.path)
query_params = parse_qs(parsed_url.query)
code = query_params.get("code", [None])[0]
state = query_params.get("state", [None])[0]
if code and state:
self.auth_instance.received_code = code
self.auth_instance.received_state = state
self.send_response(200)
self.send_header("Content-type", "text/html")
self.end_headers()
self.wfile.write(b"<html><body><h1>Login successful!</h1><p>You can close this window.</p></body></html>")
if self.auth_instance.server:
self.auth_instance.server.shutdown()
else:
self.send_response(400)
self.send_header("Content-type", "text/html")
self.end_headers()
self.wfile.write(b"<html><body><h1>Error</h1><p>Missing code or state parameter.</p></body></html>")
def log_message(self, format, *args):
"""Suppress default logging"""
pass
class Auth:
SERVICE_NAME = "secnex-cli"
def __init__(self, config: Config) -> None:
self.config = config
self.received_code = None
self.received_state = None
self.server = None
self.session = None
def _get_keyring_username(self) -> str:
"""Get the username for keyring storage"""
return f"{self.config.name}_{self.config.client_id}"
def save_token(self, token: str) -> None:
"""Save access token to keyring"""
try:
username = self._get_keyring_username()
keyring.set_password(self.SERVICE_NAME, username, token)
console.print(f"Access token saved securely", style="bold green")
except Exception as e:
console.print(f"Failed to save token to keyring: {e}", style="bold red")
def get_token(self) -> Optional[str]:
"""Get access token from keyring"""
try:
username = self._get_keyring_username()
token = keyring.get_password(self.SERVICE_NAME, username)
return token
except Exception as e:
console.print(f"Failed to get token from keyring: {e}", style="bold red")
return None
def delete_token(self) -> None:
"""Delete access token from keyring"""
try:
username = self._get_keyring_username()
keyring.delete_password(self.SERVICE_NAME, username)
console.print(f"Access token deleted", style="bold green")
except keyring.errors.PasswordDeleteError:
# Token doesn't exist, which is fine
pass
except Exception as e:
console.print(f"Failed to delete token from keyring: {e}", style="bold red")
def exchange_code_for_token(self, code: str, redirect_uri: str) -> Optional[Session]:
"""Exchange authorization code for access token"""
token_url = f"{self.config.ui}/token"
data = {
"grant_type": "authorization_code",
"code": code,
"redirect_uri": redirect_uri,
"client_id": self.config.client_id,
"client_secret": self.config.client_secret,
}
try:
response = requests.post(token_url, data=data)
response.raise_for_status()
token_data = response.json()
access_token = token_data.get("access_token")
if access_token:
self.session = Session(access_token)
console.print(f"Successfully obtained access token!", style="bold green")
# Save token to keyring
self.save_token(access_token)
return self.session
else:
console.print(f"Token response missing access_token", style="bold red")
return None
except requests.exceptions.RequestException as e:
console.print(f"Failed to exchange code for token! Please check the logs for more information.", style="bold red")
return None
def login(self) -> None:
"""Login to the SecNex API"""
response_type = "code"
scope = "openid profile email"
state = secrets.token_hex(10)
redirect_uri = "http://localhost:8001"
url = f"{self.config.ui}/authorize?client_id={self.config.client_id}&response_type={response_type}&scope={scope}&state={state}&redirect_uri={redirect_uri}"
# Start HTTP server on port 8001
def create_handler(*args, **kwargs):
return RedirectHandler(*args, auth_instance=self, **kwargs)
self.server = HTTPServer(("localhost", 8001), create_handler)
server_thread = Thread(target=self.server.serve_forever, daemon=True)
server_thread.start()
console.print(f"Waiting for redirect on http://localhost:8001...", style="bold yellow")
console.print(f"Opening browser to: {url}", style="bold green")
browser.open(url)
# Wait for the redirect (with timeout)
timeout = 300 # 5 minutes
start_time = time.time()
while self.received_code is None and (time.time() - start_time) < timeout:
time.sleep(0.1)
if self.received_code:
console.print(f"Received authorization code!", style="bold green")
# Exchange code for token
console.print(f"Exchanging code for token...", style="bold yellow")
session = self.exchange_code_for_token(self.received_code, redirect_uri)
if session:
console.print(f"Login successful!", style="bold green")
else:
console.print(f"Login failed: Could not obtain access token. Please try again.", style="bold red")
else:
console.print(f"Timeout waiting for authorization code. Please try again.", style="bold red")

View File

@@ -0,0 +1,12 @@
import webbrowser as browser
import secrets
import requests
from rich.console import Console
class Interactive:
def __init__(self, base_url: str = "http://localhost:3000", redirect_url: str = "http://localhost:8001") -> None:
self.base_url = base_url
self.redirect_url = redirect_url

View File

@@ -0,0 +1,11 @@
class Session:
def __init__(self, session: str) -> None:
self.session = session
def __str__(self) -> str:
return self.session
def __eq__(self, other: object) -> bool:
if not isinstance(other, Session):
return False
return self.session == other.session

View File

@@ -0,0 +1,56 @@
import requests
from typing import Optional
from datetime import datetime#
import secnex.utils as utils
from secnex.kit.config.config import Config
from secnex.kit.models.application import Application
class ApplicationClient:
def __init__(self, config: Config) -> None:
self.config = config
def create(self, name: str, tenant_id: str, secret: str) -> tuple[Optional[str], int, bool]:
"""Create a new application"""
url = f"{self.config.host}/apps"
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer 1234567890",
}
response = requests.post(url, headers=headers, json={"name": name, "tenant": tenant_id, "secret": secret})
result = response.json()
success = False
app_id = None
if "id" in result["body"]:
success = True
app_id = result["body"]["id"]
return app_id, response.status_code, success
def get_all(self) -> tuple[Optional[list[Application]], int, bool]:
"""Get all applications"""
url = f"{self.config.host}/apps"
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer 1234567890",
}
response = requests.get(url, headers=headers)
result = response.json()
success = False
applications = []
if "applications" in result["body"]:
success = True
applications = [Application(**application) for application in result["body"]["applications"]]
return applications, response.status_code, success
def remove(self, id: str) -> tuple[bool, int]:
"""Remove an application"""
url = f"{self.config.host}/apps/{id}"
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer 1234567890",
}
response = requests.delete(url, headers=headers)
success = response.status_code == 200
return success, response.status_code

View File

View File

@@ -0,0 +1,40 @@
import requests
from typing import Optional
from datetime import datetime#
from secnex.kit.config.config import Config
from secnex.kit.models.tenant import Tenant
class TenantClient:
def __init__(self, config: Config) -> None:
self.config = config
def create(self, name: str) -> tuple[Optional[str], int, bool]:
"""Create a new tenant"""
url = f"{self.config.host}/tenants"
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer 1234567890",
}
response = requests.post(url, headers=headers, json={"name": name})
result = response.json()
success = False
tenant_id = None
if "id" in result["body"]:
success = True
tenant_id = result["body"]["id"]
return tenant_id, response.status_code, success
def get_all(self) -> list[Tenant]:
"""Get all tenants"""
url = f"{self.config.host}/tenants"
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer 1234567890",
}
response = requests.get(url, headers=headers)
result = response.json()
tenants = result["body"]["tenants"]
return [Tenant(**tenant) for tenant in tenants]

View File

@@ -0,0 +1,38 @@
import requests
from typing import Optional
from datetime import datetime#
from secnex.kit.config.config import Config
from secnex.kit.models.user import User
class UserClient:
def __init__(self, config: Config) -> None:
self.config = config
def get_all(self) -> list[User]:
"""Get all users"""
url = f"{self.config.host}/users"
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer 1234567890",
}
response = requests.get(url, headers=headers)
result = response.json()
users = result["body"]["users"]
return [User(**u) for u in users]
def add_user_to_tenant(self, tenant_id: str, user_id: str) -> tuple[bool, int]:
"""Add a user to a tenant"""
url = f"{self.config.host}/users/{user_id}/tenant"
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer 1234567890",
}
data = {
"tenant": tenant_id,
}
response = requests.put(url, headers=headers, json=data)
success = response.status_code == 200
return success, response.status_code

View File

@@ -0,0 +1,73 @@
import os
import json
from typing import Optional
from rich.console import Console
from rich.prompt import Prompt
console = Console()
class Config:
def __init__(self, name: str, path: str = os.path.expanduser("~/.secnex")) -> None:
self.name = name
self.path = path
self.config = self.__load_config()
if self.config is None:
host = Prompt.ask("Enter the host of the SecNex API", default="http://localhost:3003")
ui = Prompt.ask("Enter the UI to use", default="http://localhost:3000")
client_id = Prompt.ask("Enter the ID of your client application")
client_secret = Prompt.ask("Enter the secret of your client application")
tenant_id = Prompt.ask("Enter the ID of your tenant")
self.config = {
"host": host,
"ui": ui,
"client": {
"id": client_id,
"secret": client_secret,
"tenant": tenant_id,
}
}
self.__save_config()
@property
def host(self) -> Optional[str]:
if self.config is not None:
return self.config["host"]
return None
@property
def ui(self) -> Optional[str]:
if self.config is not None:
return self.config["ui"]
return None
@property
def client_id(self) -> Optional[str]:
if self.config is not None:
return self.config["client"]["id"]
return None
@property
def client_secret(self) -> Optional[str]:
if self.config is not None:
return self.config["client"]["secret"]
return None
@property
def tenant(self) -> Optional[str]:
if self.config is not None:
return self.config["client"]["tenant"]
return None
def __load_config(self) -> dict | None:
path = os.path.join(self.path, f"{self.name}.json")
if not os.path.exists(path):
return None
with open(path, "r") as f:
return json.load(f)
def __save_config(self) -> None:
path = os.path.join(self.path, f"{self.name}.json")
with open(path, "w") as f:
json.dump(self.config, f)

View File

@@ -0,0 +1,28 @@
from __future__ import annotations
from datetime import datetime
from typing import Any, Optional
class Application:
def __init__(self, id: str, name: str, secret: str, tenant_id: str, expires_at: datetime | str | None, created_at: datetime | str, updated_at: datetime | str, deleted_at: datetime | str | None) -> None:
self.id = id
self.name = name
self.secret = secret
self.tenant_id = tenant_id
self.expires_at = expires_at
self.created_at = self._parse_datetime(created_at)
self.updated_at = self._parse_datetime(updated_at)
self.deleted_at = self._parse_datetime(deleted_at) if deleted_at is not None else None
def __str__(self) -> str:
return f"{self.name} - {self.id} - {self.secret} - {self.tenant_id} - {self.expires_at} - {self.created_at} - {self.updated_at} - {self.deleted_at}"
@staticmethod
def _parse_datetime(value: Any) -> Any:
if isinstance(value, datetime):
return value
if isinstance(value, str):
try:
return datetime.fromisoformat(value)
except ValueError:
return value

33
secnex/kit/models/key.py Normal file
View File

@@ -0,0 +1,33 @@
from __future__ import annotations
from datetime import datetime
from typing import Any, Optional
class ApiKey:
def __init__(self,
id: str,
key: str,
enabled: bool,
created_at: datetime | str | None,
updated_at: datetime | str | None,
deleted_at: datetime | str | None,
) -> None:
self.id = id
self.key = key
self.enabled = enabled
self.created_at = self._parse_datetime(created_at)
self.updated_at = self._parse_datetime(updated_at)
self.deleted_at = self._parse_datetime(deleted_at) if deleted_at is not None else None
def __str__(self) -> str:
return f"{self.id} - {self.key} - {self.enabled} - {self.created_at} - {self.updated_at} - {self.deleted_at}"
@staticmethod
def _parse_datetime(value: Any) -> Any:
if isinstance(value, datetime):
return value
if isinstance(value, str):
try:
return datetime.fromisoformat(value)
except ValueError:
return value

View File

@@ -0,0 +1,39 @@
from __future__ import annotations
from datetime import datetime
from typing import Any, Optional
class Tenant:
def __init__(
self,
id: str,
name: str,
enabled: bool,
allow_self_registration: bool,
allow_self_registration_domains: Optional[list[str]],
created_at: datetime | str,
updated_at: datetime | str,
deleted_at: datetime | str | None,
) -> None:
self.id = id
self.name = name
self.enabled = enabled
self.allow_self_registration = allow_self_registration
self.allow_self_registration_domains = allow_self_registration_domains or []
self.created_at = self._parse_datetime(created_at)
self.updated_at = self._parse_datetime(updated_at)
self.deleted_at = self._parse_datetime(deleted_at) if deleted_at is not None else None
def __str__(self) -> str:
return f"{self.name} - {self.id} - {self.enabled} - {self.allow_self_registration} - {self.allow_self_registration_domains} - {self.created_at} - {self.updated_at} - {self.deleted_at}"
@staticmethod
def _parse_datetime(value: Any) -> Any:
if isinstance(value, datetime):
return value
if isinstance(value, str):
try:
return datetime.fromisoformat(value)
except ValueError:
return value
return value

47
secnex/kit/models/user.py Normal file
View File

@@ -0,0 +1,47 @@
from __future__ import annotations
from datetime import datetime
from typing import Any, Optional
class User:
def __init__(
self,
id: str,
first_name: str,
last_name: str,
username: str,
email: str,
password: str,
verified: bool,
external_id: Optional[str],
tenant_id: Optional[str],
created_at: datetime | str,
updated_at: datetime | str,
deleted_at: datetime | str | None,
) -> None:
self.id = id
self.first_name = first_name
self.last_name = last_name
self.username = username
self.email = email
self.password = password
self.verified = verified
self.external_id = external_id
self.tenant_id = tenant_id
self.created_at = self._parse_datetime(created_at)
self.updated_at = self._parse_datetime(updated_at)
self.deleted_at = self._parse_datetime(deleted_at) if deleted_at is not None else None
def __str__(self) -> str:
return f"{self.first_name} {self.last_name} - {self.email} - {self.id} - {self.tenant_id} - {self.created_at} - {self.updated_at} - {self.deleted_at}"
@staticmethod
def _parse_datetime(value: Any) -> Any:
if isinstance(value, datetime):
return value
if isinstance(value, str):
try:
return datetime.fromisoformat(value)
except ValueError:
return value
return value