import store import vector import time import os from minio.datatypes import Object TIME_INTERVAL = int(os.getenv("INDEXER_INTERVAL")) S3_ENDPOINT = os.getenv("S3_ENDPOINT") S3_ACCESS_KEY = os.getenv("S3_ACCESS_KEY") S3_SECRET_KEY = os.getenv("S3_SECRET_KEY") S3_BUCKET = os.getenv("S3_BUCKET") EMBEDDING_AUTHORIZATION = os.getenv("EMBEDDING_AUTHORIZATION") EMBEDDING_URL = os.getenv("EMBEDDING_URL") EMBEDDING_MODEL = os.getenv("EMBEDDING_MODEL") QDRANT_HOST = os.getenv("QDRANT_HOST") QDRANT_PORT = int(os.getenv("QDRANT_PORT")) QDRANT_COLLECTION_NAME = os.getenv("QDRANT_COLLECTION_NAME") def filter_not_indexed(object: Object) -> bool: return object.object_name is not None and s3.get_tag(object.object_name).get("status") != "indexed" s3 = store.S3Storage( endpoint=S3_ENDPOINT, access_key=S3_ACCESS_KEY, secret_key=S3_SECRET_KEY, bucket=S3_BUCKET ) embedded_client = vector.EmbeddedClient( authorization=EMBEDDING_AUTHORIZATION, url=EMBEDDING_URL, model=EMBEDDING_MODEL ) qdrant = vector.Qdrant( host=QDRANT_HOST, port=QDRANT_PORT, collection_name=QDRANT_COLLECTION_NAME ) qdrant.create_if_not_exists_collection() while True: print(f"- Indexing objects...") objects = s3.list_filtered(filter_not_indexed, recursive=True) for object in objects: if object.object_name is None: continue print(f" - Downloading object {object.object_name}...") content = s3.get(object.object_name) if content is None: continue print(f" - Chunking object {object.object_name}...") chunks = vector.Chunks().chunk(content) print(f" - Embedding chunks...") count_chunks = len(chunks) count_embedded = 0 for i, chunk in enumerate(chunks): print(f" - Embedding chunk {i+1}/{count_chunks}...") chunk.embed(embedded_client) print(f" - Chunk {i+1}/{count_chunks} embedded!") count_embedded += 1 print(f" - {count_embedded}/{count_chunks} chunks embedded!") print(f" - Creating points...") points = qdrant.create_points(chunks, S3_BUCKET, object.object_name) print(f" - Upserting {len(points)} points to Qdrant...") qdrant.upsert_points(points) print(f" - Points upserted!") s3.set_tag(object.object_name, {"status": "indexed"}) print(f"✓ {len(list(objects))} objects to index! Sleeping for {TIME_INTERVAL} seconds...") time.sleep(TIME_INTERVAL)