Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rework upsert flow to handle all errors from API #170

Merged
merged 3 commits into from
Jan 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion astrapy/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def _process_response(self: T) -> API_RESPONSE:
# Cast the response to the expected type.
response_body: API_RESPONSE = cast(API_RESPONSE, self.response.json())

# If the API produced an error, warn and return the API request error class
# If the API produced an error, warn and raise it as an Exception
if "errors" in response_body and not self.skip_error_check:
logger.debug(response_body["errors"])

Expand Down
71 changes: 42 additions & 29 deletions astrapy/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -887,7 +887,8 @@ def upsert(self, document: API_DOC) -> str:
"""
Emulate an upsert operation for a single document in the collection.

This method attempts to insert the document. If a document with the same _id exists, it updates the existing document.
This method attempts to insert the document.
If a document with the same _id exists, it updates the existing document.

Args:
document (dict): The document to insert or update.
Expand All @@ -898,20 +899,25 @@ def upsert(self, document: API_DOC) -> str:
# Build the payload for the insert attempt
result = self.insert_one(document, failures_allowed=True)

# If the call failed, then we replace the existing doc
if (
"errors" in result
and "errorCode" in result["errors"][0]
and result["errors"][0]["errorCode"] == "DOCUMENT_ALREADY_EXISTS"
):
# Now we attempt to update
result = self.find_one_and_replace(
replacement=document,
filter={"_id": document["_id"]},
)
upserted_id = cast(str, result["data"]["document"]["_id"])
# If the call failed because of preexisting doc, then we replace it
if "errors" in result:
if (
"errorCode" in result["errors"][0]
and result["errors"][0]["errorCode"] == "DOCUMENT_ALREADY_EXISTS"
):
# Now we attempt the update
result = self.find_one_and_replace(
replacement=document,
filter={"_id": document["_id"]},
)
upserted_id = cast(str, result["data"]["document"]["_id"])
else:
raise ValueError(result)
else:
upserted_id = cast(str, result["status"]["insertedIds"][0])
if result.get("status", {}).get("insertedIds", []):
upserted_id = cast(str, result["status"]["insertedIds"][0])
else:
raise ValueError("Unexplained empty insertedIds from API")

return upserted_id

Expand All @@ -924,7 +930,8 @@ def upsert_many(
"""
Emulate an upsert operation for multiple documents in the collection.

This method attempts to insert the documents. If a document with the same _id exists, it updates the existing document.
This method attempts to insert the documents.
If a document with the same _id exists, it updates the existing document.

Args:
documents (List[dict]): The documents to insert or update.
Expand Down Expand Up @@ -1749,7 +1756,8 @@ async def upsert(self, document: API_DOC) -> str:
"""
Emulate an upsert operation for a single document in the collection.

This method attempts to insert the document. If a document with the same _id exists, it updates the existing document.
This method attempts to insert the document.
If a document with the same _id exists, it updates the existing document.

Args:
document (dict): The document to insert or update.
Expand All @@ -1760,20 +1768,25 @@ async def upsert(self, document: API_DOC) -> str:
# Build the payload for the insert attempt
result = await self.insert_one(document, failures_allowed=True)

# If the call failed, then we replace the existing doc
if (
"errors" in result
and "errorCode" in result["errors"][0]
and result["errors"][0]["errorCode"] == "DOCUMENT_ALREADY_EXISTS"
):
# Now we attempt to update
result = await self.find_one_and_replace(
replacement=document,
filter={"_id": document["_id"]},
)
upserted_id = cast(str, result["data"]["document"]["_id"])
# If the call failed because of preexisting doc, then we replace it
if "errors" in result:
if (
"errorCode" in result["errors"][0]
and result["errors"][0]["errorCode"] == "DOCUMENT_ALREADY_EXISTS"
):
# Now we attempt the update
result = await self.find_one_and_replace(
replacement=document,
filter={"_id": document["_id"]},
)
upserted_id = cast(str, result["data"]["document"]["_id"])
else:
raise ValueError(result)
else:
upserted_id = cast(str, result["status"]["insertedIds"][0])
if result.get("status", {}).get("insertedIds", []):
upserted_id = cast(str, result["status"]["insertedIds"][0])
else:
raise ValueError("Unexplained empty insertedIds from API")

return upserted_id

Expand Down
34 changes: 34 additions & 0 deletions tests/astrapy/test_async_db_dml.py
Original file line number Diff line number Diff line change
Expand Up @@ -713,6 +713,40 @@ async def test_upsert_document(
assert response1["data"]["document"] == document1


@pytest.mark.describe("upsert should catch general errors from API")
async def test_upsert_api_errors(
async_writable_v_collection: AsyncAstraDBCollection,
) -> None:
_id0 = str(uuid.uuid4())
_id1 = str(uuid.uuid4())

document0a = {
"_id": _id0,
"nature": "good vector",
"$vector": [10, 11],
}
upsert_result0 = await async_writable_v_collection.upsert(document0a)
assert upsert_result0 == _id0

# triggering an API error for the already-exists path of the upsert
document0b = {
"_id": _id0,
"nature": "faulty vector",
"$vector": [10, 11, 999, -153],
}
with pytest.raises(ValueError):
_ = await async_writable_v_collection.upsert(document0b)

# triggering an API error for the already-exists path of the upsert
document1 = {
"_id": _id1,
"nature": "faulty vector from the start",
"$vector": [10, 11, 999, -153],
}
with pytest.raises(ValueError):
_ = await async_writable_v_collection.upsert(document1)


@pytest.mark.describe("update_one to create a subdocument, not through vector (async)")
async def test_update_one_create_subdocument_novector(
async_writable_v_collection: AsyncAstraDBCollection,
Expand Down
32 changes: 32 additions & 0 deletions tests/astrapy/test_db_dml.py
Original file line number Diff line number Diff line change
Expand Up @@ -740,6 +740,38 @@ def test_upsert_document(writable_v_collection: AstraDBCollection) -> None:
assert response1["data"]["document"] == document1


@pytest.mark.describe("upsert should catch general errors from API")
def test_upsert_api_errors(writable_v_collection: AstraDBCollection) -> None:
_id0 = str(uuid.uuid4())
_id1 = str(uuid.uuid4())

document0a = {
"_id": _id0,
"nature": "good vector",
"$vector": [10, 11],
}
upsert_result0 = writable_v_collection.upsert(document0a)
assert upsert_result0 == _id0

# triggering an API error for the already-exists path of the upsert
document0b = {
"_id": _id0,
"nature": "faulty vector",
"$vector": [10, 11, 999, -153],
}
with pytest.raises(ValueError):
_ = writable_v_collection.upsert(document0b)

# triggering an API error for the already-exists path of the upsert
document1 = {
"_id": _id1,
"nature": "faulty vector from the start",
"$vector": [10, 11, 999, -153],
}
with pytest.raises(ValueError):
_ = writable_v_collection.upsert(document1)


@pytest.mark.describe("update_one to create a subdocument, not through vector")
def test_update_one_create_subdocument_novector(
writable_v_collection: AstraDBCollection,
Expand Down
Loading