Skip to content

Commit

Permalink
Merge pull request #161 from poissoncorp/v5.2
Browse files Browse the repository at this point in the history
Bulk Insert
  • Loading branch information
ml054 committed Feb 17, 2023
2 parents 510a12d + ee53910 commit 8daaf25
Show file tree
Hide file tree
Showing 19 changed files with 790 additions and 57 deletions.
443 changes: 443 additions & 0 deletions ravendb/documents/bulk_insert_operation.py

Large diffs are not rendered by default.

34 changes: 34 additions & 0 deletions ravendb/documents/commands/bulkinsert.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import json
from typing import Optional

import requests

from ravendb.http.server_node import ServerNode
from ravendb.http.raven_command import RavenCommand, VoidRavenCommand


class GetNextOperationIdCommand(RavenCommand[int]):
def __init__(self):
super(GetNextOperationIdCommand, self).__init__(int)
self._node_tag = 0

def is_read_request(self) -> bool:
return False # disable caching

def create_request(self, node: ServerNode) -> requests.Request:
return requests.Request("GET", f"{node.url}/databases/{node.database}/operations/next-operation-id")

def set_response(self, response: Optional[str], from_cache: bool) -> None:
json_node = json.loads(response)
self.result = json_node.get("Id", None)
self._node_tag = json_node.get("NodeTag", None)


class KillOperationCommand(VoidRavenCommand):
def __init__(self, operation_id: int, node_tag: Optional[str] = None):
super(KillOperationCommand, self).__init__()
self._id = operation_id
self._selected_node_tag = node_tag

def create_request(self, node: ServerNode) -> requests.Request:
return requests.Request("POST", f"{node.url}/databases/{node.database}/operations/kill?id={self._id}")
5 changes: 4 additions & 1 deletion ravendb/documents/conventions.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,10 @@ def json_default(o):
elif isinstance(o, (int, float)):
return str(o)
else:
raise TypeError(repr(o) + " is not JSON serializable (Try add a json default method to convention)")
raise TypeError(
repr(o) + " is not JSON serializable (Try add a json default method to convention"
" or try to add methods - to_json & classmethod from_json - to object class)"
)

@staticmethod
def default_transform_plural(name):
Expand Down
4 changes: 4 additions & 0 deletions ravendb/documents/queries/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ def __init__(
self.project_into: Union[None, bool] = None
self.projection_behavior: Union[None, ProjectionBehavior] = None

@classmethod
def custom_function(cls, alias: str, func: str) -> QueryData:
return cls([func], [], alias, None, None, True)


class QueryResultBase(Generic[_TResult, _TIncludes]):
@abstractmethod
Expand Down
14 changes: 11 additions & 3 deletions ravendb/documents/session/entity_to_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def convert_entity_to_json_internal_static(
json_node = Utils.entity_to_dict(entity, conventions.json_default_method)
EntityToJson.write_metadata(json_node, document_info)
if remove_identity_property:
EntityToJson.try_remove_identity_property(entity)
EntityToJson.try_remove_identity_property_json(json_node)
return json_node

@staticmethod
Expand All @@ -69,7 +69,7 @@ def _convert_entity_to_json_internal(
json_node = Utils.entity_to_dict(entity, self._session.conventions.json_default_method)
self.write_metadata(json_node, document_info)
if remove_identity_property:
self.try_remove_identity_property(json_node)
self.try_remove_identity_property_json(json_node)
return json_node

# todo: refactor this method, make it more useful/simple and less ugly (like this return...[0])
Expand Down Expand Up @@ -100,6 +100,14 @@ def try_remove_identity_property(document):
except AttributeError:
return False

@staticmethod
def try_remove_identity_property_json(document: Dict) -> bool:
try:
del document["Id"]
return True
except KeyError:
return False

@staticmethod
def write_metadata(json_node: dict, document_info: DocumentInfo):
if document_info is None:
Expand Down Expand Up @@ -169,7 +177,7 @@ def convert_to_entity_static(
# todo: Separate it into two different functions and isolate the return statements from the first part

# I. Extract the object type
metadata = document.pop("@metadata")
metadata = document.get("@metadata")
document_deepcopy = deepcopy(document)

# 1. Get type from metadata
Expand Down
8 changes: 4 additions & 4 deletions ravendb/documents/session/operations/lazy.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from ravendb.tools.utils import Utils, CaseInsensitiveDict
from ravendb.documents.queries.query import QueryResult
from ravendb.extensions.json_extensions import JsonExtensions
from ravendb.documents.commands.crud import GetDocumentsResult, ConditionalGetDocumentsCommand
from ravendb.documents.commands.crud import GetDocumentsResult, ConditionalGetDocumentsCommand, ConditionalGetResult
from ravendb.documents.session.operations.load_operation import LoadOperation
from ravendb.documents.conventions import DocumentConventions
from ravendb.documents.operations.lazy.lazy_operation import LazyOperation
Expand Down Expand Up @@ -166,7 +166,7 @@ def handle_response(self, response: "GetResponse") -> None:
if response.result is not None:
etag = response.headers.get(constants.Headers.ETAG)

res = ConditionalGetDocumentsCommand.ConditionalGetResult.from_json(json.loads(response.result))
res = ConditionalGetResult.from_json(json.loads(response.result))
document_info = DocumentInfo.get_new_document_info(res.results[0])
r = self.__session.track_entity_document_info(self.__object_type, document_info)

Expand Down Expand Up @@ -227,7 +227,7 @@ def load_starting_with(
def conditional_load(
self, key: str, change_vector: str, object_type: Type[_T] = None
) -> Lazy[ConditionalLoadResult[_T]]:
if not key.isspace():
if not key or key.isspace():
raise ValueError("key cannot be None or whitespace")

if self._delegate.is_loaded(key):
Expand All @@ -236,7 +236,7 @@ def __lazy_factory():
entity = self._delegate.load(key, object_type)
if entity is None:
return ConditionalLoadResult.create(None, None)
cv = self._delegate.get_change_vector_for(entity)
cv = self._delegate.advanced.get_change_vector_for(entity)
return ConditionalLoadResult.create(entity, cv)

return Lazy(__lazy_factory)
Expand Down
4 changes: 2 additions & 2 deletions ravendb/documents/session/operations/load_operation.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ def with_time_series(self, time_series: List[TimeSeriesRange]):
self._time_series_to_include = time_series
return self

def by_keys(self, keys: List[str]):
distinct = CaseInsensitiveSet(filter(lambda key: key and key.strip(), keys))
def by_keys(self, keys: List[Optional[str]]):
distinct = CaseInsensitiveSet(filter(lambda x: x and not x.isspace(), keys))
self._keys = list(distinct)
return self

Expand Down
16 changes: 8 additions & 8 deletions ravendb/documents/store/definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from ravendb import constants, exceptions
from ravendb.changes.database_changes import DatabaseChanges
from ravendb.documents.bulk_insert_operation import BulkInsertOperation
from ravendb.documents.operations.executor import MaintenanceOperationExecutor, OperationExecutor
from ravendb.documents.operations.indexes import PutIndexesOperation
from ravendb.documents.session.event_args import (
Expand Down Expand Up @@ -163,14 +164,10 @@ def maintenance(self) -> MaintenanceOperationExecutor:
def operations(self) -> OperationExecutor:
pass

# todo: changes

# todo: aggressive_caching

# todo: time_series

# todo: bulk_insert

@abstractmethod
def open_session(self, database: Optional[str] = None, session_options: Optional = None):
pass
Expand Down Expand Up @@ -294,26 +291,25 @@ def get_effective_database_static(store: DocumentStoreBase, database: str) -> st
if database is None:
database = store.database

if not database.isspace():
if database and not database.isspace():
return database

raise ValueError(
"Cannot determine database to operate on. "
"Please either specify 'database' directly as an action parameter "
"or set the default database to operate on using 'DocumentStore.setDatabase' method. "
"or set the default database to operate on using 'DocumentStore.database'. "
"Did you forget to pass 'database' parameter?"
)


class DocumentStore(DocumentStoreBase):
def __init__(self, urls: Optional[Union[str, List[str]]] = None, database: Optional[str] = None):
def __init__(self, urls: Union[str, List[str]] = None, database: Optional[str] = None):
super(DocumentStore, self).__init__()
self.__subscriptions = DocumentSubscriptions(self)
self.__thread_pool_executor = ThreadPoolExecutor()
self.urls = [urls] if isinstance(urls, str) else urls
self.database = database
self.__request_executors: Dict[str, Lazy[RequestExecutor]] = CaseInsensitiveDict()
# todo: database changes
# todo: aggressive cache
self.__maintenance_operation_executor: Union[None, MaintenanceOperationExecutor] = None
self.__operation_executor: Union[None, OperationExecutor] = None
Expand Down Expand Up @@ -519,6 +515,10 @@ def initialize(self) -> DocumentStore:

# todo: aggressively cache

def bulk_insert(self, database_name: Optional[str] = None) -> BulkInsertOperation:
self.assert_initialized()
return BulkInsertOperation(self.get_effective_database(database_name), self)

def _assert_valid_configuration(self) -> None:
if not self.urls:
raise ValueError("Document URLs cannot be empty.")
Expand Down
13 changes: 13 additions & 0 deletions ravendb/exceptions/documents/bulkinsert.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from typing import Optional

from ravendb.exceptions.raven_exceptions import RavenException


class BulkInsertAbortedException(RavenException):
def __init__(self, message: str, cause: Optional[Exception] = None):
super(BulkInsertAbortedException, self).__init__(message, cause)


class BulkInsertProtocolViolationException(RavenException):
def __init__(self, message: str, cause: Optional[Exception] = None):
super(BulkInsertProtocolViolationException, self).__init__(message, cause)
40 changes: 13 additions & 27 deletions ravendb/http/request_executor.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import datetime
import inspect
import json
import logging
import os
Expand Down Expand Up @@ -579,33 +580,21 @@ def __send_request_to_server(
session_info: SessionInfo,
request: requests.Request,
url: str,
) -> requests.Response:
) -> Optional[requests.Response]:
try:
self.number_of_server_requests += 1
timeout = command.timeout if command.timeout else self.__default_timeout
if timeout:

if not timeout:
return self.__send(chosen_node, command, session_info, request)

else:
try:
# todo: create Task from lines below and call it
# AggressiveCacheOptions callingTheadAggressiveCaching = aggressiveCaching.get();
# CompletableFuture<CloseableHttpResponse> sendTask = CompletableFuture.supplyAsync(() ->
# AggressiveCacheOptions aggressiveCacheOptionsToRestore = aggressiveCaching.get();
try:
return self.__send(chosen_node, command, session_info, request)
except IOError:
# throw ExceptionsUtils.unwrapException(e);
raise
# finally aggressiveCaching.set(aggressiveCacheOptionsToRestore);
return self.__send(chosen_node, command, session_info, request)
except requests.Timeout as t:
# request.abort()
# net.ravendb.client.exceptions.TimeoutException timeoutException =
# new net.ravendb.client.exceptions.TimeoutException(
# "The request for " + request.getURI() + " failed with timeout after " +
# TimeUtils.durationToTimeSpan(timeout), e);

if not should_retry:
if command.failed_nodes is None:
command.failed_nodes = {}

command.failed_nodes[chosen_node] = t
raise t

Expand All @@ -615,10 +604,6 @@ def __send_request_to_server(
self.__throw_failed_to_contact_all_nodes(command, request)

return None
except IOError as e:
raise e
else:
return self.__send(chosen_node, command, session_info, request)
except IOError as e:
if not should_retry:
raise
Expand All @@ -633,7 +618,7 @@ def __send_request_to_server(
def __send(
self, chosen_node: ServerNode, command: RavenCommand, session_info: SessionInfo, request: requests.Request
) -> requests.Response:
response: requests.Response = None
response: Optional[requests.Response] = None

if self.should_execute_on_all(chosen_node, command):
response = self.__execute_on_all_to_figure_out_the_fastest(chosen_node, command)
Expand Down Expand Up @@ -891,7 +876,8 @@ def __supply_async(

def __create_request(self, node: ServerNode, command: RavenCommand) -> requests.Request:
request = command.create_request(node)
if request.data and not isinstance(request.data, str):
# todo: optimize that if - look for the way to make less ifs each time
if request.data and not isinstance(request.data, str) and not inspect.isgenerator(request.data):
request.data = json.dumps(request.data, default=self.conventions.json_default_method)

# todo: 1117 - 1133
Expand Down Expand Up @@ -1125,10 +1111,10 @@ def __handle_server_down(
self,
url: str,
chosen_node: ServerNode,
node_index: int,
node_index: Optional[int],
command: RavenCommand,
request: requests.Request,
response: requests.Response,
response: Optional[requests.Response],
e: Exception,
session_info: SessionInfo,
should_retry: bool,
Expand Down
9 changes: 7 additions & 2 deletions ravendb/json/json_operation.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,14 @@ def compare_json(
old_json_props = set(original_json.keys())
new_json_props = set(new_json.keys())

# todo: Discuss about that condition - add/del Id causes changes
if "Id" in old_json_props:
old_json_props.remove("Id")
if "Id" in new_json_props:
new_json_props.remove("Id")

new_fields = new_json_props - old_json_props
removed_fields = old_json_props - new_json_props
removed_fields.discard("Id") # todo: Discuss about that condition - add/del Id causes changes

for field in removed_fields:
if changes is None:
Expand All @@ -64,7 +69,7 @@ def compare_json(
prop == constants.Documents.Metadata.LAST_MODIFIED
or prop == constants.Documents.Metadata.COLLECTION
or prop == constants.Documents.Metadata.CHANGE_VECTOR
or prop == constants.Documents.Metadata.KEY
or prop == constants.Documents.Metadata.ID
):
continue

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from ravendb.tests.test_base import TestBase


class TestBulkInsertAttachments(TestBase):
def setUp(self):
super(TestBulkInsertAttachments, self).setUp()

def test_store_async_null_id(self):
def callback():
with self.store.bulk_insert() as bulk_insert:
bulk_insert.attachments_for(None)

self.assertRaisesWithMessage(callback, ValueError, "Document id cannot be None or empty.")
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from datetime import datetime
from datetime import datetime, timedelta
from dataclasses import dataclass

from typing import List
Expand All @@ -24,21 +24,19 @@ def test_can_add_or_patch(self):
key = "users/1"

with self.store.open_session() as session:
new_user = User(first_name="Hibernating", last_name="Rhinos", login_count=1)
new_user = User(first_name="Hibernating", last_name="Rhinos", last_login=datetime.utcnow())
session.store(new_user, key)
session.save_changes()
self.assertEqual(1, session.advanced.number_of_requests)

with self.store.open_session() as session:
new_user = User(first_name="Hibernating", last_name="Rhinos")
session.advanced.add_or_increment(key, new_user, "login_count", 3)

new_user = User(first_name="Hibernating", last_name="Rhinos", last_login=datetime.utcnow())
new_date = datetime.utcnow() + timedelta(days=365)
session.advanced.add_or_patch(key, new_user, "last_login", new_date)
session.save_changes()

self.assertEqual(1, session.advanced.number_of_requests)

user = session.load(key, User)
self.assertEqual(4, user.login_count)

session.delete(key)
session.save_changes()

Expand Down Expand Up @@ -71,7 +69,7 @@ def test_can_add_or_patch_add_item_to_an_existing_array(self):

with self.store.open_session() as session:
user = User(first_name="Hibernating", last_name="Rhinos")
datetime_now = datetime.now()
datetime_now = datetime.utcnow()
d2000 = datetime(
2000,
datetime_now.month,
Expand Down
Loading

0 comments on commit 8daaf25

Please sign in to comment.