-
Notifications
You must be signed in to change notification settings - Fork 20
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
First pass at batched versions of insert / upsert #134
Conversation
Grab latest master fixes
executor.submit(insert_batch, batch, i) | ||
for i, batch in enumerate(batched_list) | ||
] | ||
response_list = [future.result() for future in futures] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If some futures error, it would be nice to return the result of successful futures and the exceptions of failed ones.
Especially if the flag partial_failures_allowed
is true.
The returned value could be List[Union[API_RESPONSE, Exception]] and we know that an Exception corresponds to a batch size.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Cassandra Python driver enriches that with a boolean to easily tell them apart, so the return type is List[Tuple[bool, Union[API_RESPONSE,Exception]]]
(see here). I wonder if that would be a good model to adopt here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not. I find that it's not too hard to test if the result is an instance of Exception
. But I don't have a strong opinion on this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here - it can be left as simple as possible, no boolean is all good for me.
astrapy/db.py
Outdated
# Perform the bulk upsert with concurrency | ||
with ThreadPoolExecutor(max_workers=concurrency) as executor: | ||
futures = [executor.submit(self.upsert, document) for document in documents] | ||
response_list = [future.result() for future in futures] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here, the returned value could be List[Union[str, Exception]]
with an optional partial_failures_allowed
flag
tests/astrapy/test_db_dml.py
Outdated
) -> None: | ||
_id0 = str(uuid.uuid4()) | ||
_id2 = str(uuid.uuid4()) | ||
documents: List[API_DOC] = [ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This long list could be replaced by a repeated element.
Something like
[{"_id": str(uuid.uuid4()), "name": "Abba"} for _ in range(30)]
or even
[{"name": "Abba"}] * 30
as we can let cassandra set the id.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have rewritten the whole test code.
tests/astrapy/test_db_dml.py
Outdated
}, | ||
] | ||
|
||
response = writable_vector_collection.batched_concurrent_insert_many(documents) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Set concurrency=2 ?
astrapy/db.py
Outdated
|
||
return self.insert_many( | ||
documents=batch, | ||
options=options, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If concurrency > 1, we could maybe set options["ordered"] = False
as the insertions will be unordered anyway.
I'd like to raise a point which goes back to the suggestion to collate the response from the various API calls. I am not sure what the best design would be.
To elaborate more on the last point: the caller sends 45 documents and the second API call raises an exception. What is more useful to the caller:
Probably any choice other than keeping the response/exception separate by chunk is sacrificing some info, and I would avoid that - at the cost of exposing the "surprising" chunked structure in the response item. |
…ystematic test coverage for the chunked insert and upsert_many
No description provided.