refactor: Rename Chunk_v2/Summary_v2 collections to Chunk/Summary
- Add migrate_rename_collections.py script for data migration - Update flask_app.py to use new collection names - Update weaviate_ingest.py to use new collection names - Update schema.py documentation - Update README.md and ANALYSE_MCP_TOOLS.md Migration completed: 5372 chunks + 114 summaries preserved with vectors. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
337
generations/library_rag/migrate_rename_collections.py
Normal file
337
generations/library_rag/migrate_rename_collections.py
Normal file
@@ -0,0 +1,337 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Rename collections: Chunk_v2 -> Chunk, Summary_v2 -> Summary
|
||||
|
||||
Weaviate doesn't support renaming collections directly, so this script:
|
||||
1. Creates new collections (Chunk, Summary) with identical schema
|
||||
2. Copies all objects with their vectors (batch insert)
|
||||
3. Validates the migration (count check)
|
||||
4. Optionally deletes old collections (--cleanup flag)
|
||||
|
||||
Usage:
|
||||
python migrate_rename_collections.py --dry-run # Preview without changes
|
||||
python migrate_rename_collections.py # Execute migration
|
||||
python migrate_rename_collections.py --cleanup # Delete old collections after validation
|
||||
"""
|
||||
|
||||
import weaviate
|
||||
import weaviate.classes as wvc
|
||||
from weaviate.classes.config import Configure, Property, DataType, VectorDistances
|
||||
from weaviate.classes.query import Filter
|
||||
import sys
|
||||
import argparse
|
||||
from typing import Any
|
||||
import time
|
||||
|
||||
MIGRATIONS = [
|
||||
("Chunk_v2", "Chunk"),
|
||||
("Summary_v2", "Summary"),
|
||||
]
|
||||
|
||||
BATCH_SIZE = 100
|
||||
|
||||
|
||||
def get_collection_count(client: weaviate.WeaviateClient, name: str) -> int:
|
||||
"""Get the number of objects in a collection."""
|
||||
try:
|
||||
coll = client.collections.get(name)
|
||||
return coll.aggregate.over_all().total_count
|
||||
except Exception:
|
||||
return 0
|
||||
|
||||
|
||||
def collection_exists(client: weaviate.WeaviateClient, name: str) -> bool:
|
||||
"""Check if a collection exists."""
|
||||
try:
|
||||
client.collections.get(name)
|
||||
return True
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
def create_chunk_collection(client: weaviate.WeaviateClient) -> None:
|
||||
"""Create the new Chunk collection with schema from Chunk_v2."""
|
||||
print(" Creating Chunk collection...")
|
||||
|
||||
client.collections.create(
|
||||
name="Chunk",
|
||||
description="Document chunks with manual GPU vectorization (BAAI/bge-m3, 1024-dim)",
|
||||
vectorizer_config=Configure.Vectorizer.none(),
|
||||
vector_index_config=Configure.VectorIndex.hnsw(
|
||||
distance_metric=VectorDistances.COSINE,
|
||||
ef_construction=128,
|
||||
max_connections=32,
|
||||
quantizer=Configure.VectorIndex.Quantizer.rq(),
|
||||
),
|
||||
properties=[
|
||||
Property(name="text", data_type=DataType.TEXT, description="Chunk text content"),
|
||||
Property(name="workTitle", data_type=DataType.TEXT, description="Work title"),
|
||||
Property(name="workAuthor", data_type=DataType.TEXT, description="Work author"),
|
||||
Property(name="sectionPath", data_type=DataType.TEXT, description="Section path"),
|
||||
Property(name="sectionLevel", data_type=DataType.INT, description="Section level"),
|
||||
Property(name="chapterTitle", data_type=DataType.TEXT, description="Chapter title"),
|
||||
Property(name="canonicalReference", data_type=DataType.TEXT, description="Canonical reference"),
|
||||
Property(name="unitType", data_type=DataType.TEXT, description="Unit type"),
|
||||
Property(name="keywords", data_type=DataType.TEXT_ARRAY, description="Keywords"),
|
||||
Property(name="language", data_type=DataType.TEXT, description="Language code"),
|
||||
Property(name="year", data_type=DataType.INT, description="Publication year"),
|
||||
Property(name="orderIndex", data_type=DataType.INT, description="Order index"),
|
||||
Property(name="summary", data_type=DataType.TEXT, description="Chunk summary"),
|
||||
Property(name="document", data_type=DataType.TEXT, description="Document reference"),
|
||||
],
|
||||
)
|
||||
print(" [OK] Chunk collection created")
|
||||
|
||||
|
||||
def create_summary_collection(client: weaviate.WeaviateClient) -> None:
|
||||
"""Create the new Summary collection with schema from Summary_v2."""
|
||||
print(" Creating Summary collection...")
|
||||
|
||||
client.collections.create(
|
||||
name="Summary",
|
||||
description="Section summaries (v2 - sans Document)",
|
||||
vectorizer_config=Configure.Vectorizer.none(),
|
||||
vector_index_config=Configure.VectorIndex.hnsw(
|
||||
distance_metric=VectorDistances.COSINE,
|
||||
ef_construction=128,
|
||||
max_connections=32,
|
||||
quantizer=Configure.VectorIndex.Quantizer.rq(),
|
||||
),
|
||||
properties=[
|
||||
Property(name="text", data_type=DataType.TEXT, description="Summary text (vectorized)"),
|
||||
Property(name="concepts", data_type=DataType.TEXT_ARRAY, description="Key concepts"),
|
||||
Property(name="workTitle", data_type=DataType.TEXT, description="Work title"),
|
||||
Property(name="sectionPath", data_type=DataType.TEXT, description="Section path"),
|
||||
Property(name="title", data_type=DataType.TEXT, description="Section title"),
|
||||
Property(name="level", data_type=DataType.INT, description="Hierarchy level"),
|
||||
Property(name="chunksCount", data_type=DataType.INT, description="Chunks count"),
|
||||
Property(name="language", data_type=DataType.TEXT, description="Language code"),
|
||||
Property(name="workAuthor", data_type=DataType.TEXT, description="Work author"),
|
||||
Property(name="year", data_type=DataType.NUMBER, description="Publication year"),
|
||||
],
|
||||
)
|
||||
print(" [OK] Summary collection created")
|
||||
|
||||
|
||||
def clean_properties(props: dict[str, Any], collection_name: str) -> dict[str, Any]:
|
||||
"""Clean properties to ensure correct types."""
|
||||
cleaned = dict(props)
|
||||
|
||||
# Integer fields that may have been stored as float
|
||||
int_fields = ["sectionLevel", "year", "orderIndex", "level", "chunksCount"]
|
||||
|
||||
for field in int_fields:
|
||||
if field in cleaned and cleaned[field] is not None:
|
||||
try:
|
||||
cleaned[field] = int(cleaned[field])
|
||||
except (ValueError, TypeError):
|
||||
pass
|
||||
|
||||
return cleaned
|
||||
|
||||
|
||||
def migrate_objects(
|
||||
client: weaviate.WeaviateClient,
|
||||
source_name: str,
|
||||
target_name: str,
|
||||
dry_run: bool = False
|
||||
) -> int:
|
||||
"""Copy all objects from source to target collection with vectors."""
|
||||
source = client.collections.get(source_name)
|
||||
target = client.collections.get(target_name)
|
||||
|
||||
total = source.aggregate.over_all().total_count
|
||||
print(f" Migrating {total} objects from {source_name} -> {target_name}")
|
||||
|
||||
if dry_run:
|
||||
print(f" [DRY-RUN] Would migrate {total} objects")
|
||||
return total
|
||||
|
||||
migrated = 0
|
||||
errors = 0
|
||||
batch_objects: list[dict[str, Any]] = []
|
||||
|
||||
for obj in source.iterator(include_vector=True):
|
||||
# Get vector (handle both dict and direct vector)
|
||||
vector = obj.vector
|
||||
if isinstance(vector, dict):
|
||||
vector = vector.get("default", list(vector.values())[0] if vector else None)
|
||||
|
||||
# Clean properties to ensure correct types
|
||||
cleaned_props = clean_properties(obj.properties, target_name)
|
||||
|
||||
batch_objects.append({
|
||||
"uuid": obj.uuid,
|
||||
"properties": cleaned_props,
|
||||
"vector": vector,
|
||||
})
|
||||
|
||||
if len(batch_objects) >= BATCH_SIZE:
|
||||
# Insert batch
|
||||
with target.batch.dynamic() as batch:
|
||||
for item in batch_objects:
|
||||
batch.add_object(
|
||||
uuid=item["uuid"],
|
||||
properties=item["properties"],
|
||||
vector=item["vector"],
|
||||
)
|
||||
migrated += len(batch_objects)
|
||||
print(f" Progress: {migrated}/{total} ({100*migrated//total}%)", end='\r')
|
||||
batch_objects = []
|
||||
|
||||
# Insert remaining objects
|
||||
if batch_objects:
|
||||
with target.batch.dynamic() as batch:
|
||||
for item in batch_objects:
|
||||
batch.add_object(
|
||||
uuid=item["uuid"],
|
||||
properties=item["properties"],
|
||||
vector=item["vector"],
|
||||
)
|
||||
migrated += len(batch_objects)
|
||||
|
||||
print(f" Progress: {migrated}/{total} (100%) ")
|
||||
print(f" [OK] Migrated {migrated} objects")
|
||||
|
||||
return migrated
|
||||
|
||||
|
||||
def validate_migration(
|
||||
client: weaviate.WeaviateClient,
|
||||
source_name: str,
|
||||
target_name: str
|
||||
) -> bool:
|
||||
"""Validate that source and target have same object count."""
|
||||
source_count = get_collection_count(client, source_name)
|
||||
target_count = get_collection_count(client, target_name)
|
||||
|
||||
if source_count == target_count:
|
||||
print(f" [OK] Validation passed: {source_name}={source_count}, {target_name}={target_count}")
|
||||
return True
|
||||
else:
|
||||
print(f" [ERROR] Validation FAILED: {source_name}={source_count}, {target_name}={target_count}")
|
||||
return False
|
||||
|
||||
|
||||
def cleanup_old_collections(client: weaviate.WeaviateClient, dry_run: bool = False) -> None:
|
||||
"""Delete old collections after successful migration."""
|
||||
print("\n" + "="*70)
|
||||
print("CLEANUP: Deleting old collections")
|
||||
print("="*70)
|
||||
|
||||
for source_name, target_name in MIGRATIONS:
|
||||
if not collection_exists(client, source_name):
|
||||
print(f" {source_name}: Already deleted")
|
||||
continue
|
||||
|
||||
if not collection_exists(client, target_name):
|
||||
print(f" [ERROR] Cannot delete {source_name}: {target_name} doesn't exist!")
|
||||
continue
|
||||
|
||||
# Validate before deleting
|
||||
if not validate_migration(client, source_name, target_name):
|
||||
print(f" [ERROR] Skipping {source_name} deletion: validation failed")
|
||||
continue
|
||||
|
||||
if dry_run:
|
||||
print(f" [DRY-RUN] Would delete {source_name}")
|
||||
else:
|
||||
client.collections.delete(source_name)
|
||||
print(f" [OK] Deleted {source_name}")
|
||||
|
||||
|
||||
def main() -> None:
|
||||
parser = argparse.ArgumentParser(description="Rename Weaviate collections")
|
||||
parser.add_argument("--dry-run", action="store_true", help="Preview without making changes")
|
||||
parser.add_argument("--cleanup", action="store_true", help="Delete old collections after validation")
|
||||
args = parser.parse_args()
|
||||
|
||||
print("="*70)
|
||||
print("WEAVIATE COLLECTION RENAME: Chunk_v2 -> Chunk, Summary_v2 -> Summary")
|
||||
print("="*70)
|
||||
print(f"Mode: {'DRY-RUN' if args.dry_run else 'LIVE'}")
|
||||
print(f"Cleanup: {'YES' if args.cleanup else 'NO'}")
|
||||
print()
|
||||
|
||||
client = weaviate.connect_to_local()
|
||||
|
||||
try:
|
||||
# Show current state
|
||||
print("Current collections:")
|
||||
for source_name, target_name in MIGRATIONS:
|
||||
source_count = get_collection_count(client, source_name)
|
||||
target_exists = collection_exists(client, target_name)
|
||||
target_count = get_collection_count(client, target_name) if target_exists else 0
|
||||
print(f" {source_name}: {source_count} objects")
|
||||
print(f" {target_name}: {'exists (' + str(target_count) + ' objects)' if target_exists else 'does not exist'}")
|
||||
print()
|
||||
|
||||
if args.cleanup:
|
||||
cleanup_old_collections(client, dry_run=args.dry_run)
|
||||
return
|
||||
|
||||
# Migration
|
||||
for source_name, target_name in MIGRATIONS:
|
||||
print("="*70)
|
||||
print(f"MIGRATING: {source_name} -> {target_name}")
|
||||
print("="*70)
|
||||
|
||||
# Check source exists
|
||||
if not collection_exists(client, source_name):
|
||||
print(f" [ERROR] Source collection {source_name} does not exist!")
|
||||
continue
|
||||
|
||||
# Check if target already exists
|
||||
if collection_exists(client, target_name):
|
||||
target_count = get_collection_count(client, target_name)
|
||||
if target_count > 0:
|
||||
print(f" Target {target_name} already exists with {target_count} objects")
|
||||
print(f" Skipping (already migrated)")
|
||||
continue
|
||||
else:
|
||||
print(f" Target {target_name} exists but empty, will populate")
|
||||
else:
|
||||
# Create target collection
|
||||
if not args.dry_run:
|
||||
if target_name == "Chunk":
|
||||
create_chunk_collection(client)
|
||||
elif target_name == "Summary":
|
||||
create_summary_collection(client)
|
||||
else:
|
||||
print(f" [DRY-RUN] Would create {target_name} collection")
|
||||
|
||||
# Migrate objects
|
||||
if not args.dry_run:
|
||||
migrate_objects(client, source_name, target_name, dry_run=False)
|
||||
else:
|
||||
migrate_objects(client, source_name, target_name, dry_run=True)
|
||||
|
||||
# Validate
|
||||
if not args.dry_run:
|
||||
validate_migration(client, source_name, target_name)
|
||||
|
||||
print()
|
||||
|
||||
# Final status
|
||||
print("="*70)
|
||||
print("MIGRATION COMPLETE")
|
||||
print("="*70)
|
||||
print("\nFinal state:")
|
||||
for source_name, target_name in MIGRATIONS:
|
||||
source_count = get_collection_count(client, source_name)
|
||||
target_count = get_collection_count(client, target_name)
|
||||
print(f" {source_name}: {source_count} objects")
|
||||
print(f" {target_name}: {target_count} objects")
|
||||
|
||||
if not args.dry_run:
|
||||
print("\nNext steps:")
|
||||
print(" 1. Update code: replace 'Chunk_v2' -> 'Chunk', 'Summary_v2' -> 'Summary'")
|
||||
print(" 2. Test the application")
|
||||
print(" 3. Run: python migrate_rename_collections.py --cleanup")
|
||||
|
||||
finally:
|
||||
client.close()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user