Skip to content

Commit

Permalink
Feature/remove version from ingestion end pt (#936)
Browse files Browse the repository at this point in the history
* sync model

* remove ability to set version

* tweak versions impl

* fix version bug
  • Loading branch information
emrgnt-cmplxty authored Aug 22, 2024
1 parent cb3ba3c commit b8501ff
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 31 deletions.
10 changes: 2 additions & 8 deletions py/cli/commands/ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,15 @@
@click.option(
"--metadatas", type=JSON, help="Metadatas for ingestion as a JSON string"
)
@click.option(
"--versions",
multiple=True,
help="Starting version for ingested files (e.g. `v1`)",
)
@click.pass_obj
def ingest_files(client, file_paths, document_ids, metadatas, versions):
def ingest_files(client, file_paths, document_ids, metadatas):
"""Ingest files into R2R."""
with timer():
file_paths = list(file_paths)
document_ids = list(document_ids) if document_ids else None
versions = list(versions) if versions else None

response = client.ingest_files(
file_paths, metadatas, document_ids, versions
file_paths, metadatas, document_ids
)
click.echo(json.dumps(response, indent=2))

Expand Down
6 changes: 1 addition & 5 deletions py/core/main/api/routes/ingestion/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def load_openapi_extras(self):

def setup_routes(self):
# Note, we use the following verbose input parameters because FastAPI struggles to handle `File` input and `Body` inputs
# at the same time. Therefore, we must ues `Form` inputs for the metadata, document_ids, and versions inputs.
# at the same time. Therefore, we must ues `Form` inputs for the metadata, document_ids
ingest_files_extras = self.openapi_extras.get("ingest_files", {})
ingest_files_descriptions = ingest_files_extras.get(
"input_descriptions", {}
Expand All @@ -53,9 +53,6 @@ async def ingest_files_app(
None,
description=ingest_files_descriptions.get("document_ids"),
),
versions: Optional[Json[list[str]]] = Form(
None, description=ingest_files_descriptions.get("versions")
),
metadatas: Optional[Json[list[dict]]] = Form(
None, description=ingest_files_descriptions.get("metadatas")
),
Expand Down Expand Up @@ -107,7 +104,6 @@ async def ingest_files_app(
files=files,
metadatas=metadatas,
document_ids=document_ids,
versions=versions,
user=auth_user,
chunking_provider=chunking_provider,
)
Expand Down
16 changes: 9 additions & 7 deletions py/core/main/services/ingestion_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

logger = logging.getLogger(__name__)
MB_CONVERSION_FACTOR = 1024 * 1024

STARTING_VERSION = "v0"

class IngestionService(Service):
def __init__(
Expand Down Expand Up @@ -56,7 +56,6 @@ async def ingest_files(
user: UserResponse,
metadatas: Optional[list[dict]] = None,
document_ids: Optional[list[UUID]] = None,
versions: Optional[list[str]] = None,
chunking_provider: Optional[ChunkingProvider] = None,
*args: Any,
**kwargs: Any,
Expand Down Expand Up @@ -89,7 +88,6 @@ async def ingest_files(
# ingests all documents in parallel
return await self.ingest_documents(
documents,
versions,
chunking_provider=chunking_provider,
*args,
**kwargs,
Expand Down Expand Up @@ -178,10 +176,11 @@ async def update_files(
)
documents.append(document)


ingestion_results = await self.ingest_documents(
documents,
versions=new_versions,
chunking_provider=chunking_provider,
versions=new_versions,
*args,
**kwargs,
)
Expand Down Expand Up @@ -252,7 +251,7 @@ async def ingest_documents(
}

for iteration, document in enumerate(documents):
version = versions[iteration] if versions else "v0"
version = versions[iteration] if versions else STARTING_VERSION

# Check for duplicates within the current batch
if document.id in processed_documents:
Expand All @@ -263,7 +262,8 @@ async def ingest_documents(

if (
document.id in existing_document_info
and existing_document_info[document.id].version == version
# apply `geq` check to prevent re-ingestion of updated documents
and (existing_document_info[document.id].version >= version)
and existing_document_info[document.id].status == "success"
):
logger.error(
Expand Down Expand Up @@ -305,6 +305,9 @@ async def ingest_documents(
processed_documents[document.id] = document.metadata.get(
"title", str(document.id)
)
# Add version to metadata to propagate through pipeline
document.metadata["version"] = version


if duplicate_documents:
duplicate_details = [
Expand Down Expand Up @@ -336,7 +339,6 @@ async def ingest_documents(
not in [skipped["id"] for skipped in skipped_documents]
],
),
versions=[info.version for info in document_infos],
run_manager=self.run_manager,
*args,
**kwargs,
Expand Down
6 changes: 2 additions & 4 deletions py/core/pipes/ingestion/parsing_pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,9 @@ async def _run_logic(
input: Input,
state: AsyncState,
run_id: UUID,
versions: Optional[list[str]] = None,
*args,
**kwargs,
) -> AsyncGenerator[DocumentExtraction, None]:
async for document in input.message:
version = versions[0] if versions else "v0"
async for result in self._parse(document, run_id, version):
yield result
async for result in self._parse(document, run_id, document.metadata.get("version", "1.0")):
yield result
7 changes: 0 additions & 7 deletions py/sdk/ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ async def ingest_files(
file_paths: list[str],
document_ids: Optional[list[Union[str, UUID]]] = None,
metadatas: Optional[list[dict]] = None,
versions: Optional[list[str]] = None,
chunking_settings: Optional[Union[dict, ChunkingConfig]] = None,
) -> dict:
"""
Expand All @@ -25,7 +24,6 @@ async def ingest_files(
file_paths (List[str]): List of file paths to ingest.
document_ids (Optional[List[str]]): List of document IDs.
metadatas (Optional[List[dict]]): List of metadata dictionaries for each file.
versions (Optional[List[str]]): List of version strings for each file.
chunking_settings (Optional[Union[dict, ChunkingConfig]]): Custom chunking configuration.
Returns:
Expand All @@ -39,10 +37,6 @@ async def ingest_files(
raise ValueError(
"Number of metadatas must match number of document IDs."
)
if versions is not None and len(file_paths) != len(versions):
raise ValueError(
"Number of versions must match number of document IDs."
)
if (
chunking_settings is not None
and chunking_settings is not ChunkingConfig
Expand Down Expand Up @@ -80,7 +74,6 @@ async def ingest_files(
if document_ids
else None
),
"versions": json.dumps(versions) if versions else None,
"chunking_settings": (
json.dumps(
chunking_settings.model_dump()
Expand Down
31 changes: 31 additions & 0 deletions py/sdk/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,37 @@ class Token(BaseModel):
token_type: str


class IndexMeasure(str, Enum):
"""
An enum representing the types of distance measures available for indexing.
Attributes:
cosine_distance (str): The cosine distance measure for indexing.
l2_distance (str): The Euclidean (L2) distance measure for indexing.
max_inner_product (str): The maximum inner product measure for indexing.
"""

cosine_distance = "cosine_distance"
l2_distance = "l2_distance"
max_inner_product = "max_inner_product"



class HybridSearchSettings(BaseModel):
full_text_weight: float = Field(
default=1.0, description="Weight to apply to full text search"
)
semantic_weight: float = Field(
default=5.0, description="Weight to apply to semantic search"
)
full_text_limit: int = Field(
default=200,
description="Maximum number of results to return from full text search",
)
rrf_k: int = Field(
default=50, description="K-value for RRF (Rank Reciprocal Fusion)"
)

class VectorSearchSettings(BaseModel):
use_vector_search: bool = Field(
default=True, description="Whether to use vector search"
Expand Down

0 comments on commit b8501ff

Please sign in to comment.