From b8501ff2618a15f27c8ae880f33f52441af52ab6 Mon Sep 17 00:00:00 2001 From: emrgnt-cmplxty <68796651+emrgnt-cmplxty@users.noreply.github.com> Date: Thu, 22 Aug 2024 11:20:28 -0700 Subject: [PATCH] Feature/remove version from ingestion end pt (#936) * sync model * remove ability to set version * tweak versions impl * fix version bug --- py/cli/commands/ingestion.py | 10 ++----- py/core/main/api/routes/ingestion/base.py | 6 +---- py/core/main/services/ingestion_service.py | 16 ++++++----- py/core/pipes/ingestion/parsing_pipe.py | 6 ++--- py/sdk/ingestion.py | 7 ----- py/sdk/models.py | 31 ++++++++++++++++++++++ 6 files changed, 45 insertions(+), 31 deletions(-) diff --git a/py/cli/commands/ingestion.py b/py/cli/commands/ingestion.py index 403aaa632..77a98946f 100644 --- a/py/cli/commands/ingestion.py +++ b/py/cli/commands/ingestion.py @@ -21,21 +21,15 @@ @click.option( "--metadatas", type=JSON, help="Metadatas for ingestion as a JSON string" ) -@click.option( - "--versions", - multiple=True, - help="Starting version for ingested files (e.g. `v1`)", -) @click.pass_obj -def ingest_files(client, file_paths, document_ids, metadatas, versions): +def ingest_files(client, file_paths, document_ids, metadatas): """Ingest files into R2R.""" with timer(): file_paths = list(file_paths) document_ids = list(document_ids) if document_ids else None - versions = list(versions) if versions else None response = client.ingest_files( - file_paths, metadatas, document_ids, versions + file_paths, metadatas, document_ids ) click.echo(json.dumps(response, indent=2)) diff --git a/py/core/main/api/routes/ingestion/base.py b/py/core/main/api/routes/ingestion/base.py index f0510e944..d9d429049 100644 --- a/py/core/main/api/routes/ingestion/base.py +++ b/py/core/main/api/routes/ingestion/base.py @@ -34,7 +34,7 @@ def load_openapi_extras(self): def setup_routes(self): # Note, we use the following verbose input parameters because FastAPI struggles to handle `File` input and `Body` inputs - # at the same time. Therefore, we must ues `Form` inputs for the metadata, document_ids, and versions inputs. + # at the same time. Therefore, we must ues `Form` inputs for the metadata, document_ids ingest_files_extras = self.openapi_extras.get("ingest_files", {}) ingest_files_descriptions = ingest_files_extras.get( "input_descriptions", {} @@ -53,9 +53,6 @@ async def ingest_files_app( None, description=ingest_files_descriptions.get("document_ids"), ), - versions: Optional[Json[list[str]]] = Form( - None, description=ingest_files_descriptions.get("versions") - ), metadatas: Optional[Json[list[dict]]] = Form( None, description=ingest_files_descriptions.get("metadatas") ), @@ -107,7 +104,6 @@ async def ingest_files_app( files=files, metadatas=metadatas, document_ids=document_ids, - versions=versions, user=auth_user, chunking_provider=chunking_provider, ) diff --git a/py/core/main/services/ingestion_service.py b/py/core/main/services/ingestion_service.py index c035e2933..abc4da6c1 100644 --- a/py/core/main/services/ingestion_service.py +++ b/py/core/main/services/ingestion_service.py @@ -28,7 +28,7 @@ logger = logging.getLogger(__name__) MB_CONVERSION_FACTOR = 1024 * 1024 - +STARTING_VERSION = "v0" class IngestionService(Service): def __init__( @@ -56,7 +56,6 @@ async def ingest_files( user: UserResponse, metadatas: Optional[list[dict]] = None, document_ids: Optional[list[UUID]] = None, - versions: Optional[list[str]] = None, chunking_provider: Optional[ChunkingProvider] = None, *args: Any, **kwargs: Any, @@ -89,7 +88,6 @@ async def ingest_files( # ingests all documents in parallel return await self.ingest_documents( documents, - versions, chunking_provider=chunking_provider, *args, **kwargs, @@ -178,10 +176,11 @@ async def update_files( ) documents.append(document) + ingestion_results = await self.ingest_documents( documents, - versions=new_versions, chunking_provider=chunking_provider, + versions=new_versions, *args, **kwargs, ) @@ -252,7 +251,7 @@ async def ingest_documents( } for iteration, document in enumerate(documents): - version = versions[iteration] if versions else "v0" + version = versions[iteration] if versions else STARTING_VERSION # Check for duplicates within the current batch if document.id in processed_documents: @@ -263,7 +262,8 @@ async def ingest_documents( if ( document.id in existing_document_info - and existing_document_info[document.id].version == version + # apply `geq` check to prevent re-ingestion of updated documents + and (existing_document_info[document.id].version >= version) and existing_document_info[document.id].status == "success" ): logger.error( @@ -305,6 +305,9 @@ async def ingest_documents( processed_documents[document.id] = document.metadata.get( "title", str(document.id) ) + # Add version to metadata to propagate through pipeline + document.metadata["version"] = version + if duplicate_documents: duplicate_details = [ @@ -336,7 +339,6 @@ async def ingest_documents( not in [skipped["id"] for skipped in skipped_documents] ], ), - versions=[info.version for info in document_infos], run_manager=self.run_manager, *args, **kwargs, diff --git a/py/core/pipes/ingestion/parsing_pipe.py b/py/core/pipes/ingestion/parsing_pipe.py index 9b4bdebff..b1b4c8373 100644 --- a/py/core/pipes/ingestion/parsing_pipe.py +++ b/py/core/pipes/ingestion/parsing_pipe.py @@ -72,11 +72,9 @@ async def _run_logic( input: Input, state: AsyncState, run_id: UUID, - versions: Optional[list[str]] = None, *args, **kwargs, ) -> AsyncGenerator[DocumentExtraction, None]: async for document in input.message: - version = versions[0] if versions else "v0" - async for result in self._parse(document, run_id, version): - yield result + async for result in self._parse(document, run_id, document.metadata.get("version", "1.0")): + yield result \ No newline at end of file diff --git a/py/sdk/ingestion.py b/py/sdk/ingestion.py index 48037973e..0f58eb5df 100644 --- a/py/sdk/ingestion.py +++ b/py/sdk/ingestion.py @@ -15,7 +15,6 @@ async def ingest_files( file_paths: list[str], document_ids: Optional[list[Union[str, UUID]]] = None, metadatas: Optional[list[dict]] = None, - versions: Optional[list[str]] = None, chunking_settings: Optional[Union[dict, ChunkingConfig]] = None, ) -> dict: """ @@ -25,7 +24,6 @@ async def ingest_files( file_paths (List[str]): List of file paths to ingest. document_ids (Optional[List[str]]): List of document IDs. metadatas (Optional[List[dict]]): List of metadata dictionaries for each file. - versions (Optional[List[str]]): List of version strings for each file. chunking_settings (Optional[Union[dict, ChunkingConfig]]): Custom chunking configuration. Returns: @@ -39,10 +37,6 @@ async def ingest_files( raise ValueError( "Number of metadatas must match number of document IDs." ) - if versions is not None and len(file_paths) != len(versions): - raise ValueError( - "Number of versions must match number of document IDs." - ) if ( chunking_settings is not None and chunking_settings is not ChunkingConfig @@ -80,7 +74,6 @@ async def ingest_files( if document_ids else None ), - "versions": json.dumps(versions) if versions else None, "chunking_settings": ( json.dumps( chunking_settings.model_dump() diff --git a/py/sdk/models.py b/py/sdk/models.py index e0355fc7e..03b690610 100644 --- a/py/sdk/models.py +++ b/py/sdk/models.py @@ -205,6 +205,37 @@ class Token(BaseModel): token_type: str +class IndexMeasure(str, Enum): + """ + An enum representing the types of distance measures available for indexing. + + Attributes: + cosine_distance (str): The cosine distance measure for indexing. + l2_distance (str): The Euclidean (L2) distance measure for indexing. + max_inner_product (str): The maximum inner product measure for indexing. + """ + + cosine_distance = "cosine_distance" + l2_distance = "l2_distance" + max_inner_product = "max_inner_product" + + + +class HybridSearchSettings(BaseModel): + full_text_weight: float = Field( + default=1.0, description="Weight to apply to full text search" + ) + semantic_weight: float = Field( + default=5.0, description="Weight to apply to semantic search" + ) + full_text_limit: int = Field( + default=200, + description="Maximum number of results to return from full text search", + ) + rrf_k: int = Field( + default=50, description="K-value for RRF (Rank Reciprocal Fusion)" + ) + class VectorSearchSettings(BaseModel): use_vector_search: bool = Field( default=True, description="Whether to use vector search"