Replace Document collection with Work in retrieval_tools.py

Document collection was merged into Work during the "SANS_DOCUMENT"
migration. Updated all handlers to use Work instead:

- get_document_handler: queries Work by sourceId
- list_documents_handler: queries Work directly
- filter_by_author_handler: simplified to use Work only
- delete_document_handler: deletes from Work

Work now contains: title, author, year, language, genre, sourceId,
pages, edition (all formerly in Document)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-01-31 21:53:21 +01:00
parent c633ab5aff
commit c529ab617a

View File

@@ -773,12 +773,13 @@ async def get_document_handler(
with log_tool_invocation("get_document", tool_inputs) as invocation: with log_tool_invocation("get_document", tool_inputs) as invocation:
try: try:
with get_weaviate_client() as client: with get_weaviate_client() as client:
documents = client.collections.get("Document") # Use Work collection (Document was merged into Work)
works = client.collections.get("Work")
# Query Document by sourceId # Query Work by sourceId
query_start = time.perf_counter() query_start = time.perf_counter()
doc_filter = Filter.by_property("sourceId").equal(input_data.source_id) doc_filter = Filter.by_property("sourceId").equal(input_data.source_id)
result = documents.query.fetch_objects( result = works.query.fetch_objects(
filters=doc_filter, filters=doc_filter,
limit=1, limit=1,
) )
@@ -786,7 +787,7 @@ async def get_document_handler(
log_weaviate_query( log_weaviate_query(
operation="fetch_objects", operation="fetch_objects",
collection="Document", collection="Work",
filters={"sourceId": input_data.source_id}, filters={"sourceId": input_data.source_id},
result_count=len(result.objects), result_count=len(result.objects),
duration_ms=query_duration_ms, duration_ms=query_duration_ms,
@@ -794,7 +795,7 @@ async def get_document_handler(
if not result.objects: if not result.objects:
logger.warning( logger.warning(
"Document not found", "Work not found",
extra={"source_id": input_data.source_id}, extra={"source_id": input_data.source_id},
) )
output = GetDocumentOutput( output = GetDocumentOutput(
@@ -802,24 +803,23 @@ async def get_document_handler(
chunks=[], chunks=[],
chunks_total=0, chunks_total=0,
found=False, found=False,
error=f"Document not found: {input_data.source_id}", error=f"Work not found: {input_data.source_id}",
) )
invocation.set_result(output.model_dump()) invocation.set_result(output.model_dump())
return output return output
# Extract document properties # Extract Work properties (Document was merged into Work)
doc_obj = result.objects[0] doc_obj = result.objects[0]
props = doc_obj.properties props = doc_obj.properties
work_data = get_nested_dict(props, "work")
# Parse TOC and hierarchy (stored as JSON strings) # Parse TOC and hierarchy (stored as JSON strings) - may not exist in Work
toc_data = safe_json_parse(props.get("toc")) toc_data = safe_json_parse(props.get("toc"))
hierarchy_data = safe_json_parse(props.get("hierarchy")) hierarchy_data = safe_json_parse(props.get("hierarchy"))
document_info = DocumentInfo( document_info = DocumentInfo(
source_id=safe_str(props.get("sourceId"), input_data.source_id), source_id=safe_str(props.get("sourceId"), input_data.source_id),
work_title=safe_str(work_data.get("title"), "Unknown"), work_title=safe_str(props.get("title"), "Unknown"),
work_author=safe_str(work_data.get("author"), "Unknown"), work_author=safe_str(props.get("author"), "Unknown"),
edition=safe_str(props.get("edition")) or None, edition=safe_str(props.get("edition")) or None,
pages=safe_int(props.get("pages"), 0), pages=safe_int(props.get("pages"), 0),
language=safe_str(props.get("language"), "unknown"), language=safe_str(props.get("language"), "unknown"),
@@ -956,24 +956,17 @@ async def list_documents_handler(
with log_tool_invocation("list_documents", tool_inputs) as invocation: with log_tool_invocation("list_documents", tool_inputs) as invocation:
try: try:
with get_weaviate_client() as client: with get_weaviate_client() as client:
documents_collection = client.collections.get("Document") # Use Work collection (Document was merged into Work)
works_collection = client.collections.get("Work")
# Build filters for nested object properties # Build filters (Work has author/title directly, not nested)
filters: Any = None filters: Any = None
if input_data.author_filter: if input_data.author_filter:
filters = ( filters = Filter.by_property("author").equal(input_data.author_filter)
Filter.by_property("work")
.by_property("author") # type: ignore[attr-defined]
.equal(input_data.author_filter)
)
if input_data.work_filter: if input_data.work_filter:
work_f = ( work_f = Filter.by_property("title").equal(input_data.work_filter)
Filter.by_property("work")
.by_property("title") # type: ignore[attr-defined]
.equal(input_data.work_filter)
)
filters = (filters & work_f) if filters else work_f filters = (filters & work_f) if filters else work_f
if input_data.language_filter: if input_data.language_filter:
@@ -985,7 +978,7 @@ async def list_documents_handler(
# First, get total count (requires fetching all matching objects) # First, get total count (requires fetching all matching objects)
# Weaviate v4 doesn't have a direct count API, so we fetch with high limit # Weaviate v4 doesn't have a direct count API, so we fetch with high limit
query_start = time.perf_counter() query_start = time.perf_counter()
count_result = documents_collection.query.fetch_objects( count_result = works_collection.query.fetch_objects(
filters=filters, filters=filters,
limit=10000, # High limit to get all for counting limit=10000, # High limit to get all for counting
) )
@@ -995,7 +988,7 @@ async def list_documents_handler(
# Weaviate v4 fetch_objects doesn't support offset directly, # Weaviate v4 fetch_objects doesn't support offset directly,
# so we fetch limit + offset and slice # so we fetch limit + offset and slice
fetch_limit = input_data.limit + input_data.offset fetch_limit = input_data.limit + input_data.offset
result = documents_collection.query.fetch_objects( result = works_collection.query.fetch_objects(
filters=filters, filters=filters,
limit=fetch_limit, limit=fetch_limit,
) )
@@ -1003,7 +996,7 @@ async def list_documents_handler(
log_weaviate_query( log_weaviate_query(
operation="fetch_objects", operation="fetch_objects",
collection="Document", collection="Work",
filters={ filters={
"author": input_data.author_filter, "author": input_data.author_filter,
"work": input_data.work_filter, "work": input_data.work_filter,
@@ -1016,16 +1009,15 @@ async def list_documents_handler(
# Apply offset by slicing # Apply offset by slicing
paginated_objects = result.objects[input_data.offset:] paginated_objects = result.objects[input_data.offset:]
# Convert results to output schema # Convert results to output schema (Work has properties directly)
document_summaries: List[DocumentSummary] = [] document_summaries: List[DocumentSummary] = []
for obj in paginated_objects[:input_data.limit]: for obj in paginated_objects[:input_data.limit]:
props = obj.properties props = obj.properties
work_data = get_nested_dict(props, "work")
doc_summary = DocumentSummary( doc_summary = DocumentSummary(
source_id=safe_str(props.get("sourceId"), "unknown"), source_id=safe_str(props.get("sourceId"), "unknown"),
work_title=safe_str(work_data.get("title"), "Unknown"), work_title=safe_str(props.get("title"), "Unknown"),
work_author=safe_str(work_data.get("author"), "Unknown"), work_author=safe_str(props.get("author"), "Unknown"),
pages=safe_int(props.get("pages"), 0), pages=safe_int(props.get("pages"), 0),
chunks_count=safe_int(props.get("chunksCount"), 0), chunks_count=safe_int(props.get("chunksCount"), 0),
language=safe_str(props.get("language"), "unknown"), language=safe_str(props.get("language"), "unknown"),
@@ -1258,8 +1250,8 @@ async def filter_by_author_handler(
with log_tool_invocation("filter_by_author", tool_inputs) as invocation: with log_tool_invocation("filter_by_author", tool_inputs) as invocation:
try: try:
with get_weaviate_client() as client: with get_weaviate_client() as client:
# Use Work collection (Document was merged into Work)
works_collection = client.collections.get("Work") works_collection = client.collections.get("Work")
documents_collection = client.collections.get("Document")
chunks_collection = client.collections.get("Chunk") chunks_collection = client.collections.get("Chunk")
# Query Work collection by author # Query Work collection by author
@@ -1297,44 +1289,22 @@ async def filter_by_author_handler(
genre=safe_str(work_props.get("genre")) or None, genre=safe_str(work_props.get("genre")) or None,
) )
# Query Documents for this work # Work now contains Document data (sourceId, pages, etc.)
doc_filter = ( # Each Work IS a document since they were merged
Filter.by_property("work") chunks_count = safe_int(work_props.get("chunksCount"), 0)
.by_property("title") # type: ignore[attr-defined]
.equal(work_title) doc_summary = DocumentSummary(
) source_id=safe_str(work_props.get("sourceId"), "unknown"),
doc_filter = ( work_title=work_title,
doc_filter work_author=safe_str(work_props.get("author"), input_data.author),
& Filter.by_property("work") pages=safe_int(work_props.get("pages"), 0),
.by_property("author") # type: ignore[attr-defined] chunks_count=chunks_count,
.equal(input_data.author) language=safe_str(work_props.get("language"), "unknown"),
) )
docs_result = documents_collection.query.fetch_objects( # Build document summaries (one document per work now)
filters=doc_filter, work_documents: List[DocumentSummary] = [doc_summary]
limit=100, # Reasonable limit per work work_chunks_total = chunks_count
)
# Build document summaries
work_documents: List[DocumentSummary] = []
work_chunks_total = 0
for doc_obj in docs_result.objects:
doc_props = doc_obj.properties
doc_work_data = get_nested_dict(doc_props, "work")
chunks_count = safe_int(doc_props.get("chunksCount"), 0)
doc_summary = DocumentSummary(
source_id=safe_str(doc_props.get("sourceId"), "unknown"),
work_title=safe_str(doc_work_data.get("title"), work_title),
work_author=safe_str(
doc_work_data.get("author"), input_data.author
),
pages=safe_int(doc_props.get("pages"), 0),
chunks_count=chunks_count,
language=safe_str(doc_props.get("language"), "unknown"),
)
work_documents.append(doc_summary)
work_chunks_total += chunks_count work_chunks_total += chunks_count
# If include_chunk_counts is False and we don't have chunksCount, # If include_chunk_counts is False and we don't have chunksCount,
@@ -1527,21 +1497,23 @@ async def delete_document_handler(
) )
# Delete the document itself # Delete the document itself
# Delete from Work collection (Document was merged into Work)
work_deleted = False
try: try:
doc_collection = client.collections.get("Document") work_collection = client.collections.get("Work")
doc_filter = Filter.by_property("sourceId").equal( work_filter = Filter.by_property("sourceId").equal(
input_data.source_id input_data.source_id
) )
doc_result = doc_collection.data.delete_many(where=doc_filter) work_result = work_collection.data.delete_many(where=work_filter)
document_deleted = doc_result.successful > 0 work_deleted = work_result.successful > 0
if document_deleted: if work_deleted:
logger.info( logger.info(
f"Deleted document {input_data.source_id}", f"Deleted work {input_data.source_id}",
extra={"source_id": input_data.source_id}, extra={"source_id": input_data.source_id},
) )
except Exception as e: except Exception as e:
logger.warning( logger.warning(
f"Error deleting document: {e}", f"Error deleting work: {e}",
extra={ extra={
"source_id": input_data.source_id, "source_id": input_data.source_id,
"error": str(e), "error": str(e),
@@ -1552,7 +1524,7 @@ async def delete_document_handler(
log_weaviate_query( log_weaviate_query(
operation="delete_many", operation="delete_many",
collection="Chunk,Summary,Document", collection="Chunk,Summary,Work",
filters={"sourceId": input_data.source_id}, filters={"sourceId": input_data.source_id},
result_count=chunks_deleted + summaries_deleted, result_count=chunks_deleted + summaries_deleted,
duration_ms=query_duration_ms, duration_ms=query_duration_ms,