Files
captain-hook-indexer/main.py
2026-01-30 22:38:08 +01:00

78 lines
2.5 KiB
Python

import store
import vector
import time
import os
from minio.datatypes import Object
TIME_INTERVAL = int(os.getenv("INDEXER_INTERVAL"))
S3_ENDPOINT = os.getenv("S3_ENDPOINT")
S3_ACCESS_KEY = os.getenv("S3_ACCESS_KEY")
S3_SECRET_KEY = os.getenv("S3_SECRET_KEY")
S3_BUCKET = os.getenv("S3_BUCKET")
EMBEDDING_AUTHORIZATION = os.getenv("EMBEDDING_AUTHORIZATION")
EMBEDDING_URL = os.getenv("EMBEDDING_URL")
EMBEDDING_MODEL = os.getenv("EMBEDDING_MODEL")
QDRANT_HOST = os.getenv("QDRANT_HOST")
QDRANT_PORT = int(os.getenv("QDRANT_PORT"))
QDRANT_COLLECTION_NAME = os.getenv("QDRANT_COLLECTION_NAME")
def filter_not_indexed(object: Object) -> bool:
return object.object_name is not None and s3.get_tag(object.object_name).get("status") != "indexed"
s3 = store.S3Storage(
endpoint=S3_ENDPOINT,
access_key=S3_ACCESS_KEY,
secret_key=S3_SECRET_KEY,
bucket=S3_BUCKET
)
embedded_client = vector.EmbeddedClient(
authorization=EMBEDDING_AUTHORIZATION,
url=EMBEDDING_URL,
model=EMBEDDING_MODEL
)
qdrant = vector.Qdrant(
host=QDRANT_HOST,
port=QDRANT_PORT,
collection_name=QDRANT_COLLECTION_NAME
)
qdrant.create_if_not_exists_collection()
while True:
print(f"- Indexing objects...")
objects = s3.list_filtered(filter_not_indexed, recursive=True)
for object in objects:
if object.object_name is None:
continue
print(f" - Downloading object {object.object_name}...")
content = s3.get(object.object_name)
if content is None:
continue
print(f" - Chunking object {object.object_name}...")
chunks = vector.Chunks().chunk(content)
print(f" - Embedding chunks...")
count_chunks = len(chunks)
count_embedded = 0
for i, chunk in enumerate(chunks):
print(f" - Embedding chunk {i+1}/{count_chunks}...")
chunk.embed(embedded_client)
print(f" - Chunk {i+1}/{count_chunks} embedded!")
count_embedded += 1
print(f" - {count_embedded}/{count_chunks} chunks embedded!")
print(f" - Creating points...")
points = qdrant.create_points(chunks, S3_BUCKET, object.object_name)
print(f" - Upserting {len(points)} points to Qdrant...")
qdrant.upsert_points(points)
print(f" - Points upserted!")
s3.set_tag(object.object_name, {"status": "indexed"})
print(f"{len(list(objects))} objects to index! Sleeping for {TIME_INTERVAL} seconds...")
time.sleep(TIME_INTERVAL)