diff --git a/generations/library_rag/mcp_tools/retrieval_tools.py b/generations/library_rag/mcp_tools/retrieval_tools.py index f7bd618..1a70f5c 100644 --- a/generations/library_rag/mcp_tools/retrieval_tools.py +++ b/generations/library_rag/mcp_tools/retrieval_tools.py @@ -773,12 +773,13 @@ async def get_document_handler( with log_tool_invocation("get_document", tool_inputs) as invocation: try: with get_weaviate_client() as client: - documents = client.collections.get("Document") + # Use Work collection (Document was merged into Work) + works = client.collections.get("Work") - # Query Document by sourceId + # Query Work by sourceId query_start = time.perf_counter() doc_filter = Filter.by_property("sourceId").equal(input_data.source_id) - result = documents.query.fetch_objects( + result = works.query.fetch_objects( filters=doc_filter, limit=1, ) @@ -786,7 +787,7 @@ async def get_document_handler( log_weaviate_query( operation="fetch_objects", - collection="Document", + collection="Work", filters={"sourceId": input_data.source_id}, result_count=len(result.objects), duration_ms=query_duration_ms, @@ -794,7 +795,7 @@ async def get_document_handler( if not result.objects: logger.warning( - "Document not found", + "Work not found", extra={"source_id": input_data.source_id}, ) output = GetDocumentOutput( @@ -802,24 +803,23 @@ async def get_document_handler( chunks=[], chunks_total=0, found=False, - error=f"Document not found: {input_data.source_id}", + error=f"Work not found: {input_data.source_id}", ) invocation.set_result(output.model_dump()) return output - # Extract document properties + # Extract Work properties (Document was merged into Work) doc_obj = result.objects[0] props = doc_obj.properties - work_data = get_nested_dict(props, "work") - # Parse TOC and hierarchy (stored as JSON strings) + # Parse TOC and hierarchy (stored as JSON strings) - may not exist in Work toc_data = safe_json_parse(props.get("toc")) hierarchy_data = safe_json_parse(props.get("hierarchy")) document_info = DocumentInfo( source_id=safe_str(props.get("sourceId"), input_data.source_id), - work_title=safe_str(work_data.get("title"), "Unknown"), - work_author=safe_str(work_data.get("author"), "Unknown"), + work_title=safe_str(props.get("title"), "Unknown"), + work_author=safe_str(props.get("author"), "Unknown"), edition=safe_str(props.get("edition")) or None, pages=safe_int(props.get("pages"), 0), language=safe_str(props.get("language"), "unknown"), @@ -956,24 +956,17 @@ async def list_documents_handler( with log_tool_invocation("list_documents", tool_inputs) as invocation: try: with get_weaviate_client() as client: - documents_collection = client.collections.get("Document") + # Use Work collection (Document was merged into Work) + works_collection = client.collections.get("Work") - # Build filters for nested object properties + # Build filters (Work has author/title directly, not nested) filters: Any = None if input_data.author_filter: - filters = ( - Filter.by_property("work") - .by_property("author") # type: ignore[attr-defined] - .equal(input_data.author_filter) - ) + filters = Filter.by_property("author").equal(input_data.author_filter) if input_data.work_filter: - work_f = ( - Filter.by_property("work") - .by_property("title") # type: ignore[attr-defined] - .equal(input_data.work_filter) - ) + work_f = Filter.by_property("title").equal(input_data.work_filter) filters = (filters & work_f) if filters else work_f if input_data.language_filter: @@ -985,7 +978,7 @@ async def list_documents_handler( # First, get total count (requires fetching all matching objects) # Weaviate v4 doesn't have a direct count API, so we fetch with high limit query_start = time.perf_counter() - count_result = documents_collection.query.fetch_objects( + count_result = works_collection.query.fetch_objects( filters=filters, limit=10000, # High limit to get all for counting ) @@ -995,7 +988,7 @@ async def list_documents_handler( # Weaviate v4 fetch_objects doesn't support offset directly, # so we fetch limit + offset and slice fetch_limit = input_data.limit + input_data.offset - result = documents_collection.query.fetch_objects( + result = works_collection.query.fetch_objects( filters=filters, limit=fetch_limit, ) @@ -1003,7 +996,7 @@ async def list_documents_handler( log_weaviate_query( operation="fetch_objects", - collection="Document", + collection="Work", filters={ "author": input_data.author_filter, "work": input_data.work_filter, @@ -1016,16 +1009,15 @@ async def list_documents_handler( # Apply offset by slicing paginated_objects = result.objects[input_data.offset:] - # Convert results to output schema + # Convert results to output schema (Work has properties directly) document_summaries: List[DocumentSummary] = [] for obj in paginated_objects[:input_data.limit]: props = obj.properties - work_data = get_nested_dict(props, "work") doc_summary = DocumentSummary( source_id=safe_str(props.get("sourceId"), "unknown"), - work_title=safe_str(work_data.get("title"), "Unknown"), - work_author=safe_str(work_data.get("author"), "Unknown"), + work_title=safe_str(props.get("title"), "Unknown"), + work_author=safe_str(props.get("author"), "Unknown"), pages=safe_int(props.get("pages"), 0), chunks_count=safe_int(props.get("chunksCount"), 0), language=safe_str(props.get("language"), "unknown"), @@ -1258,8 +1250,8 @@ async def filter_by_author_handler( with log_tool_invocation("filter_by_author", tool_inputs) as invocation: try: with get_weaviate_client() as client: + # Use Work collection (Document was merged into Work) works_collection = client.collections.get("Work") - documents_collection = client.collections.get("Document") chunks_collection = client.collections.get("Chunk") # Query Work collection by author @@ -1297,44 +1289,22 @@ async def filter_by_author_handler( genre=safe_str(work_props.get("genre")) or None, ) - # Query Documents for this work - doc_filter = ( - Filter.by_property("work") - .by_property("title") # type: ignore[attr-defined] - .equal(work_title) - ) - doc_filter = ( - doc_filter - & Filter.by_property("work") - .by_property("author") # type: ignore[attr-defined] - .equal(input_data.author) + # Work now contains Document data (sourceId, pages, etc.) + # Each Work IS a document since they were merged + chunks_count = safe_int(work_props.get("chunksCount"), 0) + + doc_summary = DocumentSummary( + source_id=safe_str(work_props.get("sourceId"), "unknown"), + work_title=work_title, + work_author=safe_str(work_props.get("author"), input_data.author), + pages=safe_int(work_props.get("pages"), 0), + chunks_count=chunks_count, + language=safe_str(work_props.get("language"), "unknown"), ) - docs_result = documents_collection.query.fetch_objects( - filters=doc_filter, - limit=100, # Reasonable limit per work - ) - - # Build document summaries - work_documents: List[DocumentSummary] = [] - work_chunks_total = 0 - - for doc_obj in docs_result.objects: - doc_props = doc_obj.properties - doc_work_data = get_nested_dict(doc_props, "work") - chunks_count = safe_int(doc_props.get("chunksCount"), 0) - - doc_summary = DocumentSummary( - source_id=safe_str(doc_props.get("sourceId"), "unknown"), - work_title=safe_str(doc_work_data.get("title"), work_title), - work_author=safe_str( - doc_work_data.get("author"), input_data.author - ), - pages=safe_int(doc_props.get("pages"), 0), - chunks_count=chunks_count, - language=safe_str(doc_props.get("language"), "unknown"), - ) - work_documents.append(doc_summary) + # Build document summaries (one document per work now) + work_documents: List[DocumentSummary] = [doc_summary] + work_chunks_total = chunks_count work_chunks_total += chunks_count # If include_chunk_counts is False and we don't have chunksCount, @@ -1527,21 +1497,23 @@ async def delete_document_handler( ) # Delete the document itself + # Delete from Work collection (Document was merged into Work) + work_deleted = False try: - doc_collection = client.collections.get("Document") - doc_filter = Filter.by_property("sourceId").equal( + work_collection = client.collections.get("Work") + work_filter = Filter.by_property("sourceId").equal( input_data.source_id ) - doc_result = doc_collection.data.delete_many(where=doc_filter) - document_deleted = doc_result.successful > 0 - if document_deleted: + work_result = work_collection.data.delete_many(where=work_filter) + work_deleted = work_result.successful > 0 + if work_deleted: logger.info( - f"Deleted document {input_data.source_id}", + f"Deleted work {input_data.source_id}", extra={"source_id": input_data.source_id}, ) except Exception as e: logger.warning( - f"Error deleting document: {e}", + f"Error deleting work: {e}", extra={ "source_id": input_data.source_id, "error": str(e), @@ -1552,7 +1524,7 @@ async def delete_document_handler( log_weaviate_query( operation="delete_many", - collection="Chunk,Summary,Document", + collection="Chunk,Summary,Work", filters={"sourceId": input_data.source_id}, result_count=chunks_deleted + summaries_deleted, duration_ms=query_duration_ms,