feat: implement core HTTP client library

- Add HTTPClient class with GET/POST support
- Implement Response class with JSON parsing
- Add Basic and Bearer authentication classes
- Implement OAuth2 interactive provider
- Add Microsoft Entra ID integration
- Support URL parameters and custom headers
- Include browser integration for OAuth flows

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Björn Benouarets
2025-12-01 14:07:36 +01:00
parent f1be0cbd6d
commit 09f1d02e38
9 changed files with 387 additions and 0 deletions

3
src/zerohttp/__init__.py Normal file
View File

@@ -0,0 +1,3 @@
from .auth import *
from .http import *
from .providers import *

3
src/zerohttp/__main__.py Normal file
View File

@@ -0,0 +1,3 @@
class ZeroHTTP:
APPLICATION = "ZeroHTTP"
VERSION = "0.0.1"

View File

@@ -0,0 +1,3 @@
from .basic import BasicAuthentication
from .bearer import BearerAuthentication
from .interactive import InteractiveProvider

View File

@@ -0,0 +1,34 @@
import base64
class BasicAuthentication:
def __init__(self, username: str, password: str):
self.username = username
self.password = password
self.hash = self.encode()
def encode(self) -> str:
"""
Encode the username and password as a base64 string
"""
# Create a string in the format "username:password"
credentials = f"{self.username}:{self.password}"
# Encode the string as base64
encoded_credentials = base64.b64encode(credentials.encode("utf-8"))
# Convert the base64 bytes to a string
return encoded_credentials.decode("utf-8")
def decode(self, encoded_credentials: str) -> tuple:
"""
Decode the base64 string into a username and password
"""
# Decode the base64 string into bytes
decoded_credentials = base64.b64decode(encoded_credentials)
# Convert the bytes to a string
credentials = decoded_credentials.decode("utf-8")
# Split the string into a username and password
username, password = credentials.split(":")
# Return the username and password
return username, password
def __str__(self) -> str:
return "Basic " + self.hash

View File

@@ -0,0 +1,21 @@
import base64
class BearerAuthentication:
def __init__(self, token: str):
self.token = token
self.type = "Bearer"
# Convert to base64
def encode(self) -> str:
"""
Encode the token as a base64 string
"""
# Encode the string as base64
encoded_token = base64.b64encode(self.token.encode("utf-8"))
# Convert the base64 bytes to a string
self.hash = encoded_token.decode("utf-8")
# Return the hash
return self.hash
def __str__(self) -> str:
return "Bearer " + self.token

View File

@@ -0,0 +1,123 @@
from ..http import HTTPClient
from ..auth import BearerAuthentication
import base64
import string
import random
import threading
import socketserver
import urllib.parse as parse
import http.server
from typing import Any
def handler_factory(state, provider):
def create_handler(*args, **kwargs):
return InteractiveCallbackHandler(*args, state=state, provider=provider, **kwargs)
return create_handler
class InteractiveCallbackHandler(http.server.BaseHTTPRequestHandler):
def __init__(self, request, client_address, server, state: str, provider: Any):
self.state = state
self.provider = provider
super().__init__(request, client_address, server)
def do_GET(self):
if self.path == '/favicon.ico':
self.send_response(404)
self.end_headers()
return
query_params = parse.parse_qs(parse.urlparse(self.path).query)
self.provider.authorization_code = query_params["code"][0]
self.send_response(200)
self.send_header("Content-type", "text/html")
self.end_headers()
self.wfile.write(b"<html><head><title>Authentication Successful</title></head>")
self.wfile.write(b"<body><p>You have successfully authenticated with the provider.</p>")
self.wfile.write(b"</body></html>")
self.provider.authorization_received.set()
class InteractiveProvider:
def __init__(self, name: str, client_id: str, client_secret: str, token_url: str, authorization_url: str, scope: str, open: bool = True):
self.name = name
self.client_id = client_id
self.client_secret = client_secret
self.redirect_uri = "http://localhost:8000/redirect"
self.token_url = token_url
self.authorization_url = authorization_url
self.scope = scope
self.authorization_code = None
self.access_token = None
self.server = None
self.open = open
self.state = self.__generate_state()
self.authorization_received = threading.Event()
def start_server(self):
handler = handler_factory(self.state, self)
self.server = socketserver.TCPServer(("", 8000), handler)
server_thread = threading.Thread(target=self.server.serve_forever)
server_thread.start()
def stop_server(self):
if self.server:
def shutdown_server():
if self.server:
self.server.shutdown()
self.server.server_close()
self.server = None
shutdown_thread = threading.Thread(target=shutdown_server)
shutdown_thread.start()
def __generate_state(self) -> str:
# Generate a random string
state = ''.join(random.choices(string.ascii_uppercase + string.digits, k=16))
# Encode the string as base64
encoded_state = base64.b64encode(state.encode("utf-8"))
# Convert the base64 bytes to a string
return encoded_state.decode("utf-8")
def __get_authorization_code(self) -> str:
server_thread = threading.Thread(target=self.start_server)
server_thread.start()
login_client = HTTPClient(base_url=self.authorization_url)
if self.open:
login_client.webbrowser(params=[("client_id", self.client_id), ("response_type", "code"), ("redirect_uri", self.redirect_uri), ("response_mode", "query"), ("scope", self.scope), ("state", self.__generate_state())])
else:
login_client.webbrowser(params=[("client_id", self.client_id), ("response_type", "code"), ("redirect_uri", self.redirect_uri), ("response_mode", "query"), ("scope", self.scope), ("state", self.__generate_state())])
self.authorization_received.wait() # Wait for the event here
if self.authorization_code is None:
raise RuntimeError("Authorization code was not received")
return self.authorization_code if self.authorization_code else ""
def authenticate(self) -> str:
# Get the authorization code
authorization_code = self.__get_authorization_code()
# Create a client object
client = HTTPClient(base_url="")
client.set_header("Content-Type", "application/x-www-form-urlencoded")
# client_id=11111111-1111-1111-1111-111111111111
# &scope=user.read%20mail.read
# &code=OAAABAAAAiL9Kn2Z27UubvWFPbm0gLWQJVzCTE9UkP3pSx1aXxUjq3n8b2JRLk4OxVXr...
# &redirect_uri=http%3A%2F%2Flocalhost%2Fmyapp%2F
# &grant_type=authorization_code
# &client_secret=HF8Q~Krjqh4r...
# Send the request
data = {
"client_id": self.client_id,
"scope": self.scope,
"code": authorization_code,
"redirect_uri": self.redirect_uri,
"grant_type": "authorization_code",
"client_secret": self.client_secret
}
res = client.post(url=self.token_url, data=data)
# Get the response
res_json = res.json()
self.access_token = res_json["access_token"]
self.stop_server()
return self.access_token

120
src/zerohttp/http.py Normal file
View File

@@ -0,0 +1,120 @@
import urllib.parse as parse
import urllib.request as request
import urllib.error as error
import json as js
import webbrowser as browser
import http.client
from .__main__ import ZeroHTTP
from typing import Any
class Response:
def __init__(self, response: http.client.HTTPResponse):
if response:
self.text = response.read().decode("utf-8")
else:
self.text = ""
def json(self) -> dict:
return js.loads(self.text)
class Param:
def __init__(self, key: str, value: str):
self.key = key
self.value = value
def __str__(self) -> str:
return f"{self.key}={self.value}"
class HTTPClient:
def __init__(self, auth: Any = None, proxy: str = "", base_url: str = ""):
self.auth = auth
self.proxy = proxy
self.base_url = base_url if base_url else ""
self.headers = {
"Content-Type": "application/json",
"Accept": "application/json",
"User-Agent": f"{ZeroHTTP.APPLICATION}/{ZeroHTTP.VERSION}"
}
def set_header(self, key: str, value: str):
self.headers[key] = value
def set_headers(self, headers: dict):
self.headers = headers
def get_header(self, key: str) -> str:
return self.headers[key]
def get_headers(self) -> dict:
return self.headers
def __send(self, method: str, url: str, data: dict = {}) -> Response:
# Create a request object
req = request.Request(url, method=method, headers=self.headers)
# Add the authentication
if self.auth:
req.add_header("Authorization", f"{self.auth.type} {self.auth.token}")
encoded_data: bytes | None = None
if data:
match self.headers["Content-Type"]:
case "application/json":
encoded_data = js.dumps(data).encode("utf-8")
case "application/x-www-form-urlencoded":
encoded_data = parse.urlencode(data).encode("utf-8")
case _:
encoded_data = js.dumps(data).encode("utf-8")
try:
response = request.urlopen(req, data=encoded_data)
except error.HTTPError as e:
print(e.read().decode())
raise
# Return the response
return Response(response)
def get(self, url: str = "", params: list = []) -> Response:
parameter = ""
if params:
parameter = "?"
parameter_length = len(params)
for param in params:
p = Param(param[0], param[1])
parameter += str(p)
if params.index(param) != parameter_length - 1:
parameter += "&"
url = self.base_url + url + parameter
return self.__send("GET", url)
def post(self, url: str = "", data: dict = {}, params: list = []) -> Response:
parameter = ""
if params:
parameter = "?"
parameter_length = len(params)
for param in params:
p = Param(param[0], param[1])
parameter += str(p)
if params.index(param) != parameter_length - 1:
parameter += "&"
url = self.base_url + url + parameter
return self.__send("POST", url, data=data)
def link(self, url: str = "", params: list = []) -> str:
parameter = "?"
parameter_length = len(params)
for param in params:
p = Param(param[0], param[1])
parameter += str(p)
if params.index(param) != parameter_length - 1:
parameter += "&"
return self.base_url + url + parameter
def webbrowser(self, url: str = "", params: list = []):
parameter = "?"
parameter_length = len(params)
for param in params:
p = Param(param[0], param[1])
parameter += str(p)
if params.index(param) != parameter_length - 1:
parameter += "&"
url = self.base_url + url + parameter
browser.open(url)

View File

@@ -0,0 +1 @@
from .entra import *

View File

@@ -0,0 +1,79 @@
from ..http import HTTPClient
from ..auth import InteractiveProvider, BearerAuthentication
import base64
import string
import random
class Entra:
def __init__(self, tenant: str, client: tuple, scope: str, token_url: str = "https://login.microsoftonline.com/{tenant}/oauth2/v2.0/token", authorization_url: str = "https://login.microsoftonline.com/{tenant}/oauth2/v2.0/authorize", open: bool = True):
self.tenant = tenant
self.client_id = client[0]
self.client_secret = client[1]
self.scope = scope
self.token_url = token_url.format(tenant=self.tenant)
self.authorization_url = authorization_url.format(tenant=self.tenant)
self.__check_urls()
self.provider = None
self.client = None
self.open = open
def __check_urls(self):
if not self.token_url:
self.token_url = f"https://login.microsoftonline.com/{self.tenant}/oauth2/v2.0/token"
if not self.authorization_url:
self.authorization_url = f"https://login.microsoftonline.com/{self.tenant}/oauth2/v2.0/authorize"
def __create_provider(self):
self.provider = InteractiveProvider(
name="Microsoft",
client_id=self.client_id,
client_secret=self.client_secret,
token_url=self.token_url,
authorization_url=self.authorization_url,
scope=self.scope,
open=self.open
)
def authenticate(self) -> BearerAuthentication:
if not self.provider:
self.__create_provider()
if not self.provider:
raise RuntimeError("Provider not created")
token = self.provider.authenticate()
if not token:
raise RuntimeError("Authentication failed")
return BearerAuthentication(token)
class EntraApp:
def __init__(self, tenant: str, client: tuple, scope: str = "https://graph.microsoft.com/.default", token_url: str = ""):
self.tenant = tenant
self.client_id = client[0]
self.client_secret = client[1]
self.scope = scope
self.token_url = token_url if token_url else f"https://login.microsoftonline.com/{self.tenant}/oauth2/v2.0/token"
def __check_urls(self):
if not self.token_url:
self.token_url = f"https://login.microsoftonline.com/{self.tenant}/oauth2/v2.0/token"
def __generate_state(self) -> str:
# Generate a random string
state = ''.join(random.choices(string.ascii_uppercase + string.digits, k=16))
# Encode the string as base64
encoded_state = base64.b64encode(state.encode("utf-8"))
# Convert the base64 bytes to a string
return encoded_state.decode("utf-8")
def authenticate(self) -> BearerAuthentication:
client = HTTPClient(base_url=self.token_url)
client.set_header("Content-Type", "application/x-www-form-urlencoded")
data = {
"client_id": self.client_id,
"scope": self.scope,
"client_secret": self.client_secret,
"grant_type": "client_credentials"
}
response = client.post(data=data)
token = response.json()["access_token"]
return BearerAuthentication(token)