78 lines
2.5 KiB
Python
78 lines
2.5 KiB
Python
import store
|
|
import vector
|
|
|
|
import time
|
|
import os
|
|
from minio.datatypes import Object
|
|
|
|
|
|
TIME_INTERVAL = int(os.getenv("INDEXER_INTERVAL"))
|
|
S3_ENDPOINT = os.getenv("S3_ENDPOINT")
|
|
S3_ACCESS_KEY = os.getenv("S3_ACCESS_KEY")
|
|
S3_SECRET_KEY = os.getenv("S3_SECRET_KEY")
|
|
S3_BUCKET = os.getenv("S3_BUCKET")
|
|
|
|
EMBEDDING_AUTHORIZATION = os.getenv("EMBEDDING_AUTHORIZATION")
|
|
EMBEDDING_URL = os.getenv("EMBEDDING_URL")
|
|
EMBEDDING_MODEL = os.getenv("EMBEDDING_MODEL")
|
|
|
|
QDRANT_HOST = os.getenv("QDRANT_HOST")
|
|
QDRANT_PORT = int(os.getenv("QDRANT_PORT"))
|
|
QDRANT_COLLECTION_NAME = os.getenv("QDRANT_COLLECTION_NAME")
|
|
|
|
|
|
def filter_not_indexed(object: Object) -> bool:
|
|
return object.object_name is not None and s3.get_tag(object.object_name).get("status") != "indexed"
|
|
|
|
s3 = store.S3Storage(
|
|
endpoint=S3_ENDPOINT,
|
|
access_key=S3_ACCESS_KEY,
|
|
secret_key=S3_SECRET_KEY,
|
|
bucket=S3_BUCKET
|
|
)
|
|
|
|
embedded_client = vector.EmbeddedClient(
|
|
authorization=EMBEDDING_AUTHORIZATION,
|
|
url=EMBEDDING_URL,
|
|
model=EMBEDDING_MODEL
|
|
)
|
|
|
|
qdrant = vector.Qdrant(
|
|
host=QDRANT_HOST,
|
|
port=QDRANT_PORT,
|
|
collection_name=QDRANT_COLLECTION_NAME
|
|
)
|
|
qdrant.create_if_not_exists_collection()
|
|
|
|
while True:
|
|
print(f"- Indexing objects...")
|
|
objects = s3.list_filtered(filter_not_indexed, recursive=True)
|
|
for object in objects:
|
|
if object.object_name is None:
|
|
continue
|
|
print(f" - Downloading object {object.object_name}...")
|
|
content = s3.get(object.object_name)
|
|
if content is None:
|
|
continue
|
|
print(f" - Chunking object {object.object_name}...")
|
|
chunks = vector.Chunks().chunk(content)
|
|
print(f" - Embedding chunks...")
|
|
count_chunks = len(chunks)
|
|
count_embedded = 0
|
|
for i, chunk in enumerate(chunks):
|
|
print(f" - Embedding chunk {i+1}/{count_chunks}...")
|
|
chunk.embed(embedded_client)
|
|
print(f" - Chunk {i+1}/{count_chunks} embedded!")
|
|
count_embedded += 1
|
|
print(f" - {count_embedded}/{count_chunks} chunks embedded!")
|
|
|
|
print(f" - Creating points...")
|
|
points = qdrant.create_points(chunks, S3_BUCKET, object.object_name)
|
|
print(f" - Upserting {len(points)} points to Qdrant...")
|
|
qdrant.upsert_points(points)
|
|
print(f" - Points upserted!")
|
|
|
|
s3.set_tag(object.object_name, {"status": "indexed"})
|
|
print(f"✓ {len(list(objects))} objects to index! Sleeping for {TIME_INTERVAL} seconds...")
|
|
time.sleep(TIME_INTERVAL)
|