Initial project structure with CLI foundation

- Set up main CLI entry point with argparse architecture
- Implement complete tenant management system (CRUD operations)
- Add PostgreSQL database connection layer with configuration
- Create user management interface foundation
- Implement rich terminal UI with formatted tables
- Add interactive prompts with questionary library
- Include comprehensive project documentation
- Set up modular command structure with Parser/Command pattern
This commit is contained in:
Björn Benouarets
2025-12-04 05:44:03 +01:00
commit 86c7872991
10 changed files with 649 additions and 0 deletions

24
.gitignore vendored Normal file
View File

@@ -0,0 +1,24 @@
__pycache__/
*.pyc
*.pyo
*.pyd
*.pyw
*.pyz
*.pywz
*.pyzw
*.pyzwz
*.pyzwzw
*.pyzwzwz
*.pyzwzwzw
*.pyzwzwzwz
.env
.env.local
.env.development.local
.env.test.local
.env.production.local
.env.development
.env.test
.env.production
.venv/

137
CLAUDE.md Normal file
View File

@@ -0,0 +1,137 @@
# CLAUDE.md
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
## Project Overview
This is the SecNex Automation CLI, a command-line interface for automating interactions with the SecNex cloud environment. The CLI provides tenant and user management capabilities with a PostgreSQL backend and interactive terminal interface.
## Development Policy
### AI-Assisted Development
- **No AI Attribution**: Do not add "Authored-By", "Co-Authored-By", "Generated with Claude Code", or similar AI attribution in commits, code comments, or documentation
- **Clean Code History**: Maintain a clean git history without AI-generated commit messages or attributions
- **Professional Standards**: All code should be written to professional standards without explicit references to AI assistance
## Development Commands
### Running the CLI
```bash
# Run from project root
python src/secnex-cli/index.py
# Tenant management examples
python src/secnex-cli/index.py tenant create
python src/secnex-cli/index.py tenant list
python src/secnex-cli/index.py tenant delete --name "tenant-name"
# User management examples
python src/secnex-cli/index.py user create
```
### Dependencies
The project uses external dependencies but lacks formal dependency management. Key dependencies:
- `psycopg2` - PostgreSQL database adapter
- `rich` - Terminal formatting and tables
- `questionary` - Interactive command-line prompts
- `argon2-cffi` - Password hashing with Argon2 algorithm
Install dependencies manually:
```bash
pip install psycopg2-binary rich questionary argon2-cffi
```
## Project Architecture
### Entry Point (`src/secnex-cli/index.py`)
- **Main Class**: `SecNexCLI` - Initializes database connection and sets up command parsing
- **Database**: Hardcoded PostgreSQL connection (localhost:5432, database="api", user="postgres", password="postgres")
- **Command Pattern**: Uses argparse subparsers with separate Parser and Command classes
- **Match/Case**: Python 3.10+ match-case syntax for command dispatch
### Command Structure
The CLI follows a Parser/Command pattern:
- **Parser Classes** (`cmd/tenant.py`, `cmd/user.py`): Define argparse structure and command-line arguments
- **Command Classes**: Implement business logic and database operations
- **Database Layer** (`database/`): Centralized connection management and configuration
### Database Schema
- **Primary Table**: `core.tenants` with fields: `id`, `name`, `enabled`, `created_at`, `deleted_at`
- **Soft Deletion**: Uses `deleted_at` timestamp for soft deletes
- **Connection Management**: Direct psycopg2 connections with manual cursor management
### Tenant Commands (Fully Implemented)
- `tenant create` - Interactive tenant creation with rich table output
- `tenant list` - Lists active tenants in formatted table
- `tenant get --name <tenant>` - Retrieves specific tenant details
- `tenant delete --name <tenant> [--force]` - Soft delete (default) or hard delete with --force
- `tenant enable/disable --name <tenant>` - Tenant state management
- `tenant restore --name <tenant>` - Restore soft-deleted tenants
### User Commands (Fully Implemented)
- `user create` - Interactive user creation with password hashing, tenant selection, and rich table output
## Key Implementation Details
### UI/UX Features
- **Rich Tables**: Uses `rich.Table` for formatted terminal output with emojis (✅/❌ for enabled/disabled)
- **Interactive Prompts**: Uses `questionary` for text input, password fields, and selection dropdowns
- **Cancellation Handling**: Proper handling when users cancel prompts (Ctrl+C or ESC)
### Database Patterns
- **Parameterized Queries**: All database operations use parameterized queries to prevent SQL injection
- **Manual Connection Management**: Explicit cursor creation, commit, and close operations
- **Error Handling**: Basic database connection error handling with graceful exits
### Security Features
- **Argon2 Password Hashing**: Uses Argon2 algorithm for secure password storage with configurable parameters
- **Secure Hash Configuration**: Time cost 3, memory cost 64MB, parallelism 4, 32-byte hash, 16-byte salt
- **Password Verification**: Includes verification and rehash checking capabilities in `utils/hash.py`
- **Transaction Management**: Proper rollback on database errors to maintain data integrity
### Code Organization
- **Type Hints**: Consistent use of Python type hints throughout codebase
- **Separation of Concerns**: Clear separation between parsing, command execution, and database operations
- **Interactive vs Non-interactive**: Commands support both argument-based and interactive input
## Critical Missing Components
### Dependency Management
- No `requirements.txt` or `pyproject.toml` exists
- Dependencies must be installed manually
- No version pinning or dependency resolution
### Configuration Management
- Database credentials are hardcoded in `index.py:9-15`
- No environment variable support or config file loading
- No development/production configuration separation
### Incomplete Features
- No error handling for database constraint violations (duplicate tenants, etc.)
- No logging or audit trail functionality
- No user authentication/login functionality (only user creation)
### Development Infrastructure
- No test framework or test files
- No linting configuration (flake8, black, etc.)
- No CI/CD pipeline configuration
- No shell completion support
## Database Connection Requirements
The CLI requires a running PostgreSQL instance with:
- Database name: `api`
- Tables:
- `core.tenants` with schema: `id`, `name`, `enabled`, `created_at`, `deleted_at`
- `auth.users` with schema: `id`, `tenant_id`, `name`, `email`, `password`, `created_at`, `enabled`
- User: `postgres` with password `postgres` on localhost:5432 (hardcoded)
## Future Development Priorities
1. **Add formal dependency management** (requirements.txt or pyproject.toml)
2. **Implement configuration system** for database connections and CLI settings
3. **Complete user management** functionality
4. **Add comprehensive error handling** and validation
5. **Implement testing framework** with unit and integration tests
6. **Add environment variable support** for configuration

19
README.md Normal file
View File

@@ -0,0 +1,19 @@
# SecNex Automation CLI
This is a CLI to automate for SecNex cloud environment.
## Commands
### `tenant`
#### `tenant create` / `tenant new`
Create a new tenant.
#### `tenant list` / `tenant ls`
List all tenants.
#### `tenant delete`
Delete a tenant.

View File

@@ -0,0 +1,2 @@
from .tenant import TenantParser, TenantCommand
from .user import UserParser, UserCommand

View File

@@ -0,0 +1,231 @@
import argparse
from rich.console import Console
from rich.table import Table
import questionary
from database.conn import DatabaseConnection
class TenantParser:
def __init__(self, parser: argparse.ArgumentParser) -> None:
self.parser = parser
def add_parser(self) -> None:
tenant_subparsers = self.parser.add_subparsers(dest="action", help="tenant actions")
tenant_subparsers.add_parser("list", help="list all tenants")
get_parser = tenant_subparsers.add_parser("get", help="get a tenant")
get_parser.add_argument("--name", type=str, required=True, help="name of the tenant")
create_parser = tenant_subparsers.add_parser("create", help="create a new tenant")
create_parser.add_argument("--name", type=str, required=False, default=None, help="name of the tenant")
delete_parser = tenant_subparsers.add_parser("delete", help="delete a tenant")
delete_parser.add_argument("--name", type=str, required=False, default=None, help="name of the tenant")
delete_parser.add_argument("--force", action="store_true", required=False, default=None, help="force delete a tenant")
enable_parser = tenant_subparsers.add_parser("enable", help="enable a tenant")
enable_parser.add_argument("--name", type=str, required=False, default=None, help="name of the tenant")
disable_parser = tenant_subparsers.add_parser("disable", help="disable a tenant")
disable_parser.add_argument("--name", type=str, required=False, default=None, help="name of the tenant")
class TenantCommand:
def __init__(self, action: str, database_connection: DatabaseConnection) -> None:
self.action = action
self.__database_connection = database_connection
def execute(self, args: argparse.Namespace) -> None:
match self.action:
case "list":
self.tenant_list()
case "get":
self.tenant_get(args.name)
case "create":
self.tenant_create(args.name)
case "delete":
self.tenant_delete(args.name, args.force)
case "restore":
self.tenant_restore(args.name)
case "enable":
self.tenant_enable(args.name)
case "disable":
self.tenant_disable(args.name)
case _:
print(f"Unknown action: {self.action}")
def tenant_create(self, name: str | None = None) -> None:
if name is None:
name = questionary.text("Enter the name of the tenant").ask()
if name is None:
print("Tenant creation cancelled")
return
if self.__database_connection.connection is None:
print("Database connection is not available")
return
cursor = self.__database_connection.connection.cursor()
cursor.execute("INSERT INTO core.tenants (name, created_at, enabled) VALUES (%s, NOW(), TRUE) RETURNING *", (name,))
tenant = cursor.fetchone()
if tenant is None:
print("Failed to create tenant")
return
self.__database_connection.connection.commit()
cursor.close()
print()
table = Table()
table.add_column("ID", justify="left")
table.add_column("Name", justify="left")
table.add_column("Enabled", justify="center")
table.add_column("Created At", justify="left")
table.add_row(str(tenant[0]), tenant[1], "" if tenant[2] else "", tenant[3].strftime("%Y-%m-%d %H:%M:%S"))
Console().print(table)
def tenant_delete(self, name: str | None = None, force: bool | None = None) -> None:
if name is None:
name = questionary.text("Enter the name of the tenant").ask()
if name is None:
print("Tenant deletion cancelled")
return
if force is None:
force = questionary.confirm("Are you sure you want to delete this tenant?").ask()
if force is None:
print("Tenant deletion cancelled")
return
if self.__database_connection.connection is None:
print("Database connection is not available")
return
cursor = self.__database_connection.connection.cursor()
if force:
cursor.execute("DELETE FROM core.tenants WHERE name = %s", (name,))
else:
cursor.execute("UPDATE core.tenants SET deleted_at = NOW() WHERE name = %s AND deleted_at IS NULL", (name,))
cursor.execute("UPDATE core.tenants SET enabled = FALSE WHERE name = %s AND deleted_at IS NOT NULL", (name,))
self.__database_connection.connection.commit()
cursor.close()
print(f"Tenant {name} deleted successfully")
def tenant_restore(self, name: str) -> None:
print(f"Restoring tenant {name}...")
if self.__database_connection.connection is None:
print("Database connection is not available")
return
cursor = self.__database_connection.connection.cursor()
cursor.execute("UPDATE core.tenants SET deleted_at = NULL WHERE name = %s", (name,))
cursor.execute("UPDATE core.tenants SET enabled = TRUE WHERE name = %s AND deleted_at IS NOT NULL", (name,))
self.__database_connection.connection.commit()
cursor.close()
print(f"Tenant {name} restored successfully")
def tenant_enable(self, name: str | None = None) -> None:
if name is None:
name = questionary.text("Enter the name of the tenant").ask()
if name is None:
print("Tenant enable cancelled")
return
print(f"Enabling tenant {name}...")
if self.__database_connection.connection is None:
print("Database connection is not available")
return
cursor = self.__database_connection.connection.cursor()
cursor.execute("SELECT id, name, enabled, created_at FROM core.tenants WHERE name = %s", (name,))
tenant = cursor.fetchone()
if tenant is None:
print("Tenant not found")
return
cursor.execute("UPDATE core.tenants SET enabled = TRUE WHERE name = %s", (name,))
self.__database_connection.connection.commit()
cursor.close()
table = Table()
table.add_column("ID", justify="left")
table.add_column("Name", justify="left")
table.add_column("Enabled", justify="center")
table.add_column("Created At", justify="left")
table.add_row(str(tenant[0]), tenant[1], "" if not tenant[2] else "", tenant[3].strftime("%Y-%m-%d %H:%M:%S"))
Console().print(table)
def tenant_disable(self, name: str | None = None) -> None:
if name is None:
name = questionary.text("Enter the name of the tenant").ask()
if name is None:
print("Tenant disable cancelled")
return
print(f"Disabling tenant {name}...")
if self.__database_connection.connection is None:
print("Database connection is not available")
return
cursor = self.__database_connection.connection.cursor()
cursor.execute("SELECT id, name, enabled, created_at FROM core.tenants WHERE name = %s", (name,))
tenant = cursor.fetchone()
if tenant is None:
print("Tenant not found")
return
cursor.execute("UPDATE core.tenants SET enabled = FALSE WHERE name = %s", (name,))
self.__database_connection.connection.commit()
cursor.close()
table = Table()
table.add_column("ID", justify="left")
table.add_column("Name", justify="left")
table.add_column("Enabled", justify="center")
table.add_column("Created At", justify="left")
table.add_row(str(tenant[0]), tenant[1], "" if not tenant[2] else "", tenant[3].strftime("%Y-%m-%d %H:%M:%S"))
Console().print(table)
def tenant_list(self) -> None:
print("Listing tenants...")
if self.__database_connection.connection is None:
print("Database connection is not available")
return
cursor = self.__database_connection.connection.cursor()
cursor.execute("SELECT id, name, enabled, created_at FROM core.tenants WHERE deleted_at IS NULL")
tenants = cursor.fetchall()
table = Table()
table.add_column("ID", justify="left")
table.add_column("Name", justify="left")
table.add_column("Enabled", justify="center")
table.add_column("Created At", justify="left")
for tenant in tenants:
table.add_row(str(tenant[0]), tenant[1], "" if tenant[2] else "", tenant[3].strftime("%Y-%m-%d %H:%M:%S"))
Console().print(table)
cursor.close()
def tenant_get(self, name: str) -> None:
print(f"Getting tenant {name}...")
if self.__database_connection.connection is None:
print("Database connection is not available")
return
cursor = self.__database_connection.connection.cursor()
cursor.execute("SELECT * FROM core.tenants WHERE name = %s", (name,))
tenant = cursor.fetchone()
table = Table()
table.add_column("ID", justify="left")
table.add_column("Name", justify="left")
table.add_column("Enabled", justify="center")
table.add_column("Created At", justify="left")
table.add_row(str(tenant[0]), tenant[1], "" if tenant[2] else "", tenant[3].strftime("%Y-%m-%d %H:%M:%S"))
Console().print(table)
cursor.close()

111
src/secnex-cli/cmd/user.py Normal file
View File

@@ -0,0 +1,111 @@
import argparse
from rich.console import Console
from rich.table import Table
import questionary
from database.conn import DatabaseConnection
from utils.hash import Argon2Hasher
class UserParser:
def __init__(self, parser: argparse.ArgumentParser) -> None:
self.parser = parser
def add_parser(self) -> None:
user_subparsers = self.parser.add_subparsers(dest="action", help="user actions")
user_subparsers.add_parser("create", help="create a new user")
class UserCommand:
def __init__(self, action: str, database_connection: DatabaseConnection) -> None:
self.action = action
self.__database_connection = database_connection
self.__hasher = Argon2Hasher()
def execute(self, args: argparse.Namespace) -> None:
match self.action:
case "create":
self.user_create()
case _:
print(f"Unknown action: {self.action}")
def user_create(self) -> None:
if self.__database_connection.connection is None:
print("Database connection is not available")
return
cursor = self.__database_connection.connection.cursor()
cursor.execute("SELECT id, name FROM core.tenants WHERE enabled = TRUE AND deleted_at IS NULL")
tenants = cursor.fetchall()
if len(tenants) == 0:
print("No tenants found")
return
tenant_name = questionary.select("Select a tenant", choices=[tenant[1] for tenant in tenants]).ask()
if tenant_name is None:
print("User creation cancelled")
return
tenant_id = None
for tenant in tenants:
if tenant[1] == tenant_name:
tenant_id = tenant[0]
break
if tenant_id is None:
print("Tenant not found")
return
name = questionary.text("Enter the name of the user").ask()
if name is None:
print("User creation cancelled")
return
email = questionary.text("Enter the email of the user").ask()
if email is None:
print("User creation cancelled")
return
password = questionary.password("Enter the password of the user").ask()
if password is None:
print("User creation cancelled")
return
# Hash the password using Argon2
hashed_password = self.__hasher.hash_password(password)
# Insert the user into the database
try:
cursor.execute(
"INSERT INTO auth.users (tenant_id, name, email, password, created_at, enabled) VALUES (%s, %s, %s, %s, NOW(), TRUE) RETURNING id, name, email, created_at",
(tenant_id, name, email, hashed_password)
)
user = cursor.fetchone()
self.__database_connection.connection.commit()
if user is None:
print("Failed to create user")
return
# Display success message with user details
print()
table = Table()
table.add_column("ID", justify="left")
table.add_column("Name", justify="left")
table.add_column("Email", justify="left")
table.add_column("Tenant", justify="left")
table.add_column("Created At", justify="left")
table.add_row(
str(user[0]),
user[1],
user[2],
tenant_name, # This contains the tenant name for display
user[3].strftime("%Y-%m-%d %H:%M:%S")
)
Console().print(table)
print("✅ User created successfully!")
except Exception as e:
print(f"Error creating user: {e}")
self.__database_connection.connection.rollback()
finally:
cursor.close()

View File

@@ -0,0 +1,16 @@
class DatabaseConfig:
def __init__(self, host: str, port: int, database: str, user: str, password: str) -> None:
self.host = host
self.port = port
self.database = database
self.user = user
self.password = password
def to_dict(self) -> dict:
return {
"host": self.host,
"port": self.port,
"database": self.database,
"user": self.user,
"password": self.password,
}

View File

@@ -0,0 +1,16 @@
import psycopg2
from .config import DatabaseConfig
class DatabaseConnection:
def __init__(self, host: str, port: int, database: str, user: str, password: str) -> None:
self.config = DatabaseConfig(host, port, database, user, password)
self.connection = None
def connect(self) -> psycopg2.extensions.connection | None:
try:
self.connection = psycopg2.connect(**self.config.to_dict())
return self.connection
except psycopg2.Error as e:
print(f"Error connecting to database: {e}")
return None

39
src/secnex-cli/index.py Normal file
View File

@@ -0,0 +1,39 @@
import sys
import argparse
from database.conn import DatabaseConnection
from cmd import TenantParser, TenantCommand, UserParser, UserCommand
class SecNexCLI:
def __init__(self) -> None:
self.__database_connection = DatabaseConnection(
host="localhost",
port=5432,
database="api",
user="postgres",
password="postgres"
)
if self.__database_connection.connect() is None:
print("Failed to connect to database")
sys.exit(1)
self.parser = argparse.ArgumentParser(description="SecNex CLI")
self.parser.add_argument("--version", action="version", version="SecNex CLI 1.0.0", help="show version of cli and exit")
self.subparsers = self.parser.add_subparsers(dest="command", help="available commands")
TenantParser(self.subparsers.add_parser("tenant", help="manage tenants")).add_parser()
UserParser(self.subparsers.add_parser("user", help="manage users")).add_parser()
def run(self) -> None:
args = self.parser.parse_args()
match args.command:
case "tenant":
TenantCommand(args.action, self.__database_connection).execute(args)
case "user":
UserCommand(args.action, self.__database_connection).execute(args)
case _:
print(f"Unknown command: {args.command}")
if __name__ == "__main__":
cli = SecNexCLI()
cli.run()

View File

@@ -0,0 +1,54 @@
import argon2
from argon2 import PasswordHasher
from argon2.exceptions import VerifyMismatchError
class Argon2Hasher:
def __init__(self) -> None:
self.ph = PasswordHasher(
time_cost=3, # Number of iterations
memory_cost=65536, # Memory usage in KiB (64MB)
parallelism=4, # Number of parallel threads
hash_len=32, # Hash length in bytes
salt_len=16 # Salt length in bytes
)
def hash_password(self, password: str) -> str:
"""
Hash a password using Argon2.
Args:
password: The plain text password to hash
Returns:
The hashed password as a string
"""
return self.ph.hash(password)
def verify_password(self, hashed_password: str, plain_password: str) -> bool:
"""
Verify a plain password against a hashed password.
Args:
hashed_password: The hashed password from the database
plain_password: The plain text password to verify
Returns:
True if the password is correct, False otherwise
"""
try:
self.ph.verify(hashed_password, plain_password)
return True
except VerifyMismatchError:
return False
def needs_rehash(self, hashed_password: str) -> bool:
"""
Check if a password needs to be rehashed with new parameters.
Args:
hashed_password: The hashed password to check
Returns:
True if the password needs rehashing, False otherwise
"""
return self.ph.check_needs_rehash(hashed_password)