From ec6196d27ac49769d1e430c0533b8e07bc835ed9 Mon Sep 17 00:00:00 2001 From: Stefano Lottini Date: Mon, 15 Jan 2024 12:19:27 +0100 Subject: [PATCH 1/2] rework upsert flow to handle all errors from API --- astrapy/api.py | 2 +- astrapy/db.py | 67 ++++++++++++++++++------------ tests/astrapy/test_async_db_dml.py | 34 +++++++++++++++ tests/astrapy/test_db_dml.py | 32 ++++++++++++++ 4 files changed, 107 insertions(+), 28 deletions(-) diff --git a/astrapy/api.py b/astrapy/api.py index c49c4c1b..12bb38f1 100644 --- a/astrapy/api.py +++ b/astrapy/api.py @@ -73,7 +73,7 @@ def _process_response(self: T) -> API_RESPONSE: # Cast the response to the expected type. response_body: API_RESPONSE = cast(API_RESPONSE, self.response.json()) - # If the API produced an error, warn and return the API request error class + # If the API produced an error, warn and raise it as an Exception if "errors" in response_body and not self.skip_error_check: logger.debug(response_body["errors"]) diff --git a/astrapy/db.py b/astrapy/db.py index 001e7050..b5422b31 100644 --- a/astrapy/db.py +++ b/astrapy/db.py @@ -874,7 +874,8 @@ def upsert(self, document: API_DOC) -> str: """ Emulate an upsert operation for a single document in the collection. - This method attempts to insert the document. If a document with the same _id exists, it updates the existing document. + This method attempts to insert the document. + If a document with the same _id exists, it updates the existing document. Args: document (dict): The document to insert or update. @@ -887,19 +888,24 @@ def upsert(self, document: API_DOC) -> str: result = self.insert_one(document, failures_allowed=True) # If the call failed, then we replace the existing doc - if ( - "errors" in result - and "errorCode" in result["errors"][0] - and result["errors"][0]["errorCode"] == "DOCUMENT_ALREADY_EXISTS" - ): - # Now we attempt to update - result = self.find_one_and_replace( - replacement=document, - filter={"_id": document["_id"]}, - ) - upserted_id = cast(str, result["data"]["document"]["_id"]) + if "errors" in result: + if ( + "errorCode" in result["errors"][0] + and result["errors"][0]["errorCode"] == "DOCUMENT_ALREADY_EXISTS" + ): + # Now we attempt the update + result = self.find_one_and_replace( + replacement=document, + filter={"_id": document["_id"]}, + ) + upserted_id = cast(str, result["data"]["document"]["_id"]) + else: + raise ValueError(result) else: - upserted_id = cast(str, result["status"]["insertedIds"][0]) + if result.get("status", {}).get("insertedIds", []): + upserted_id = cast(str, result["status"]["insertedIds"][0]) + else: + raise ValueError("Unexplained empty insertedIds from API") return upserted_id @@ -912,7 +918,8 @@ def upsert_many( """ Emulate an upsert operation for multiple documents in the collection. - This method attempts to insert the documents. If a document with the same _id exists, it updates the existing document. + This method attempts to insert the documents. + If a document with the same _id exists, it updates the existing document. Args: documents (List[dict]): The documents to insert or update. @@ -1716,7 +1723,8 @@ async def upsert(self, document: API_DOC) -> str: """ Emulate an upsert operation for a single document in the collection. - This method attempts to insert the document. If a document with the same _id exists, it updates the existing document. + This method attempts to insert the document. + If a document with the same _id exists, it updates the existing document. Args: document (dict): The document to insert or update. @@ -1728,19 +1736,24 @@ async def upsert(self, document: API_DOC) -> str: result = await self.insert_one(document, failures_allowed=True) # If the call failed, then we replace the existing doc - if ( - "errors" in result - and "errorCode" in result["errors"][0] - and result["errors"][0]["errorCode"] == "DOCUMENT_ALREADY_EXISTS" - ): - # Now we attempt to update - result = await self.find_one_and_replace( - replacement=document, - filter={"_id": document["_id"]}, - ) - upserted_id = cast(str, result["data"]["document"]["_id"]) + if "errors" in result: + if ( + "errorCode" in result["errors"][0] + and result["errors"][0]["errorCode"] == "DOCUMENT_ALREADY_EXISTS" + ): + # Now we attempt the update + result = await self.find_one_and_replace( + replacement=document, + filter={"_id": document["_id"]}, + ) + upserted_id = cast(str, result["data"]["document"]["_id"]) + else: + raise ValueError(result) else: - upserted_id = cast(str, result["status"]["insertedIds"][0]) + if result.get("status", {}).get("insertedIds", []): + upserted_id = cast(str, result["status"]["insertedIds"][0]) + else: + raise ValueError("Unexplained empty insertedIds from API") return upserted_id diff --git a/tests/astrapy/test_async_db_dml.py b/tests/astrapy/test_async_db_dml.py index 4d336081..46141614 100644 --- a/tests/astrapy/test_async_db_dml.py +++ b/tests/astrapy/test_async_db_dml.py @@ -650,6 +650,40 @@ async def test_upsert_document( assert response1["data"]["document"] == document1 +@pytest.mark.describe("upsert should catch general errors from API") +async def test_upsert_api_errors( + async_writable_vector_collection: AsyncAstraDBCollection, +) -> None: + _id0 = str(uuid.uuid4()) + _id1 = str(uuid.uuid4()) + + document0a = { + "_id": _id0, + "nature": "good vector", + "$vector": [10, 11], + } + upsert_result0 = await async_writable_vector_collection.upsert(document0a) + assert upsert_result0 == _id0 + + # triggering an API error for the already-exists path of the upsert + document0b = { + "_id": _id0, + "nature": "faulty vector", + "$vector": [10, 11, 999, -153], + } + with pytest.raises(ValueError): + _ = await async_writable_vector_collection.upsert(document0b) + + # triggering an API error for the already-exists path of the upsert + document1 = { + "_id": _id1, + "nature": "faulty vector from the start", + "$vector": [10, 11, 999, -153], + } + with pytest.raises(ValueError): + _ = await async_writable_vector_collection.upsert(document1) + + @pytest.mark.describe("update_one to create a subdocument, not through vector") async def test_update_one_create_subdocument_novector( async_disposable_vector_collection: AsyncAstraDBCollection, diff --git a/tests/astrapy/test_db_dml.py b/tests/astrapy/test_db_dml.py index abaed7a8..92287b8b 100644 --- a/tests/astrapy/test_db_dml.py +++ b/tests/astrapy/test_db_dml.py @@ -698,6 +698,38 @@ def test_upsert_document(writable_vector_collection: AstraDBCollection) -> None: assert response1["data"]["document"] == document1 +@pytest.mark.describe("upsert should catch general errors from API") +def test_upsert_api_errors(writable_vector_collection: AstraDBCollection) -> None: + _id0 = str(uuid.uuid4()) + _id1 = str(uuid.uuid4()) + + document0a = { + "_id": _id0, + "nature": "good vector", + "$vector": [10, 11], + } + upsert_result0 = writable_vector_collection.upsert(document0a) + assert upsert_result0 == _id0 + + # triggering an API error for the already-exists path of the upsert + document0b = { + "_id": _id0, + "nature": "faulty vector", + "$vector": [10, 11, 999, -153], + } + with pytest.raises(ValueError): + _ = writable_vector_collection.upsert(document0b) + + # triggering an API error for the already-exists path of the upsert + document1 = { + "_id": _id1, + "nature": "faulty vector from the start", + "$vector": [10, 11, 999, -153], + } + with pytest.raises(ValueError): + _ = writable_vector_collection.upsert(document1) + + @pytest.mark.describe("update_one to create a subdocument, not through vector") def test_update_one_create_subdocument_novector( disposable_vector_collection: AstraDBCollection, From b49380ad2aa7119e2848dc0c5576fd9bb63d06e6 Mon Sep 17 00:00:00 2001 From: Stefano Lottini Date: Mon, 22 Jan 2024 23:54:13 +0100 Subject: [PATCH 2/2] clarified comment in upsert flow --- astrapy/db.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/astrapy/db.py b/astrapy/db.py index 7d2a34b8..eb003069 100644 --- a/astrapy/db.py +++ b/astrapy/db.py @@ -899,7 +899,7 @@ def upsert(self, document: API_DOC) -> str: # Build the payload for the insert attempt result = self.insert_one(document, failures_allowed=True) - # If the call failed, then we replace the existing doc + # If the call failed because of preexisting doc, then we replace it if "errors" in result: if ( "errorCode" in result["errors"][0] @@ -1768,7 +1768,7 @@ async def upsert(self, document: API_DOC) -> str: # Build the payload for the insert attempt result = await self.insert_one(document, failures_allowed=True) - # If the call failed, then we replace the existing doc + # If the call failed because of preexisting doc, then we replace it if "errors" in result: if ( "errorCode" in result["errors"][0]