Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mongo connector is lag behind mongodb #446

Closed
hungvotrung opened this issue May 12, 2016 · 42 comments
Closed

Mongo connector is lag behind mongodb #446

hungvotrung opened this issue May 12, 2016 · 42 comments

Comments

@hungvotrung
Copy link

hungvotrung commented May 12, 2016

Hi guys,
We are using mongo-connector to feed data from mongodb to elasticsearch. Everything went well until today we have a big insert / update to mongodb and mongo-connector started falling behind and taking hours to catch-up.
Here is the mongo-connector config:

"mainAddress": "10.a.b.c:27017",
    "oplogFile": "/var/log/mongo-connector/oplog.timestamp",
    "noDump": false,
    "batchSize": -1,
    "verbosity": 2,
    "continueOnError": true,

 "docManagers": [
        {
            "docManager": "elastic2_doc_manager",
            "targetURL": "10.x.y.z:9200",
            "bulkSize": 1000,
            "__uniqueKey": "_id",
            "args": {
                "clientOptions": {"timeout": 60}
                 }
        }

Elasticsearch is setup as cluster of 3 servers.
I'm looking for anyway to make mongo-connector tailing and update data faster.

Thanks everyone.

@juggernauts
Copy link

Same issue here, any updates ?

@jmmk
Copy link

jmmk commented May 27, 2016

The default elastic2_doc_manager indexes documents one at a time, which is inefficient and pretty slow if you have a large volume of operations in the oplog. I was able to increase the speed quite a bit by modifying the doc manager to batch the indexing actions into a bulk request.

@luisobo
Copy link

luisobo commented Jun 10, 2016

Same issue here. @jmmk any chance you can share your patch? Thanks in advance :)

@behackett
Copy link
Contributor

@jmmk, pull requests against elastic2-doc-manager are warmly welcomed. :-)

@luisobo
Copy link

luisobo commented Jun 13, 2016

I was looking into this and seems that mongo-connector itself is the one driving whether a bulk_upsert or a plain upsert is being performed by the doc manager. Shouldn't this update be made to mongo_connector?

Regardless, I think it'd be great to have either mongo_connector or the doc managers buffer updates for a given period of time and then bulk upserting.

@jmmk
Copy link

jmmk commented Jun 17, 2016

@luisobo @behackett I have been out of town for a few weeks, but I will try and see if I tidy up my solution and make an upstream PR.

@luisobo it's not just the upserts that can/should be batched, it's every operation. ElasticSearch allows a "bulk indexing operation" with any number of inserts/updates/deletes that will be run in the order specified (see: https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html).

Mongo Connector is just passing each oplog operation to the doc manager to handle it however it sees fit. In order to properly replicate the database state, the operations must be run "one at a time" in the exact order they happened. So if Mongo Connector tried to do any batching logic, it may not be correct for how the downstream data store needs to handle the operations. But since we know we can batch them in a bulk indexing request and that they will be run in order, we can do that for ES to prevent unnecessary network calls.

@sliwinski-milosz
Copy link

Same issue here. I can't wait till solution will come :)

@jmmk
Copy link

jmmk commented Jun 24, 2016

Made a quick gist to demonstrate the improvements to elastic2_doc_manager:
https://gist.github.com/jmmk/b3342508b6a805f51101e53fb9d9df86

You can see a diff of the changes I've made here: https://www.diffchecker.com/qybzkblm

Note: I have not tested this code, but it should be a good starting point.

@hungvotrung
Copy link
Author

Thanks@jmmk. I'm gonna test this out.

@sliwinski-milosz
Copy link

sliwinski-milosz commented Jun 27, 2016

What I've noticed is that mongo-connector, on operation == "u" calls docman.update . And elastic2_doc_manager update function starts with self.commit() which means that it clears buffer even before it has been filled.
Btw.. the problem with calling self.commit() inside elastic2_doc_manager has been mentioned here:
yougov/elastic2-doc-manager#4

I am also not sure if it handles case when the buffer is not full yet, but actually mongo-connector went through whole oplog -> I mean, we should make a bulk and empty buffer once mongo-connector finish to go through oplog.

EDIT:

So after I:

  1. commented out self.commit() inside update function in elastic2_doc_manager
  2. added handling for commit after oplog loop is done

It works very well! 10000 lines from oplog it is able to sync in ~15sec.

I added checking as well to call commit only if self.action_buffer is not empty (it is related to point 2)

@jmmk
Copy link

jmmk commented Jun 27, 2016

@sliwinski-milosz that call to self.commit() looks wrong, I'll change it in the gist. If you want to periodically commit, you should configure the auto_commit_interval. This will solve the case where the buffer isn't completely full and it doesn't commit.

Also noticed the run_auto_commit() wasn't doing the commit, so I updated that as well.

You can see the two changes here: https://gist.github.com/jmmk/b3342508b6a805f51101e53fb9d9df86/revisions

@sliwinski-milosz
Copy link

Superb, thank you very much @jmmk

Regarding auto commit: I don't want to call my commits periodically. Rather would like to call it only, when some lines from oplog have been processed. That is why I mentioned about flush inside mongo-connector after it goes out from oplog loop.

@jmmk
Copy link

jmmk commented Jun 27, 2016

I think auto commit is still your best bet, but you could just add an additional check inside commit or inside run_auto_commit:

def commit(self):
    if len(actions_buffer):
        # do stuff

# OR
def run_auto_commit(self):
    if len(actions_buffer):
        self.commit()
    if self.auto_commit_interval not in [None, 0]:
        Timer(self.auto_commit_interval, self.run_auto_commit).start()

@sliwinski-milosz
Copy link

sliwinski-milosz commented Jun 27, 2016

@jmmk you are right. Auto commit is my best bet and I started to use that.

There is only one issue in provided code. self.action_buffer = [] needs to be moved before first call of run_auto_commit as it is used by self.commit()

Thank you very much again, now it works much better!

@hungvotrung
Copy link
Author

hungvotrung commented Jun 28, 2016

I faced an issue: if do an insert then has several update right after
2016-06-28 11:39:46,462 [DEBUG] urllib3.util.retry:164 - Converted retries value: False -> Retry(total=False, connect=None, read=None, redirect=0) 2016-06-28 11:39:46,463 [DEBUG] urllib3.connectionpool:395 - "GET /perftestnet/Packages/5771f1809e9cc54d88dce2c4 HTTP/1.1" 404 90 2016-06-28 11:39:46,463 [WARNING] elasticsearch:82 - GET /perftestnet/Packages/5771f1809e9cc54d88dce2c4 [status:404 request:0.001s] 2016-06-28 11:39:46,463 [DEBUG] elasticsearch:90 - > None 2016-06-28 11:39:46,463 [DEBUG] elasticsearch:93 - < {"_index":"perftestnet","_type":"Packages","_id":"5771f1809e9cc54d88dce2c4","found":false} mongo_connector.oplog_manager:282 - Unable to process oplog document {content_removed} Traceback (most recent call last): File "/usr/lib/python2.7/site-packages/mongo_connector/oplog_manager.py", line 268, in run ns, timestamp) File "/usr/lib/python2.7/site-packages/mongo_connector/util.py", line 38, in wrapped reraise(new_type, exc_value, exc_tb) File "/usr/lib/python2.7/site-packages/mongo_connector/util.py", line 32, in wrapped return f(*args, **kwargs) File "/usr/lib/python2.7/site-packages/mongo_connector/doc_managers/elastic2_bulk_doc_manager.py", line 137, in update id=u(document_id)) File "/usr/lib/python2.7/site-packages/elasticsearch/client/utils.py", line 69, in _wrapped return func(*args, params=params, **kwargs) File "/usr/lib/python2.7/site-packages/elasticsearch/client/__init__.py", line 346, in get doc_type, id), params=params) File "/usr/lib/python2.7/site-packages/elasticsearch/transport.py", line 329, in perform_request status, headers, data = connection.perform_request(method, url, params, body, ignore=ignore, timeout=timeout) File "/usr/lib/python2.7/site-packages/elasticsearch/connection/http_urllib3.py", line 109, in perform_request self._raise_error(response.status, raw_data) File "/usr/lib/python2.7/site-packages/elasticsearch/connection/base.py", line 108, in _raise_error raise HTTP_EXCEPTIONS.get(status_code, TransportError)(status_code, error_message, additional_info) OperationFailed: TransportError(404, u'{"_index":"perftestnet","_type":"Packages","_id":"5771f1809e9cc54d88dce2c4","found":false}')

EDIT:
Update need to get doc from elasticsearch server but it's not committed yet. I can manage to add the commit for every insert by adding docman.commit() in oplog_manager.py to get it continue. Any suggestion for better solution?

@sliwinski-milosz
Copy link

sliwinski-milosz commented Jun 28, 2016

Some workaround before we will have better solution would be to set a flag on insert -> insert_in_queue and once your oplog-manager will want to do "update" then if flag is true -> do commit before update and set flag back to false.

Thanks to that if you will have 7000 insert operations and then one update operation, it will still be able to use bulk.

P.S. Now we know why there was commit inside doc manager update function.

@jmmk
Copy link

jmmk commented Jun 28, 2016

@hungvotrung @sliwinski-milosz I forgot about that - all the inserts must be committed because the update pulls the document from Elasticsearch. It could be slower if you simply add the commit or try the solution provided by @sliwinski-milosz, but I would test these first to see if it's fast enough for your needs.

Alternatively, you can make some more modifications. Right now when an update operation comes in, it immediately attempts to fetch the document from ES. You can make this faster by batching updates, but it will be more complex.

  • when several updates come in a row, put them in a queue
  • make a request to ES for all the document ids in the queue
  • run apply_update on each document with the oplog messages in the queue (it's possible that multiple oplog messages will apply to the same document so you need to process them in order)
  • insert the updated documents

@hungvotrung
Copy link
Author

image

Summary from my test-lab (mixing 1000 (i) and 13000 (u), the first @jmmk commit already did the magic but the change later even better. Normal took ~ 40 mins to finish but new changeonly need 5 mins (actual time is shorter due to mongo-connector have to wait for application process the data).
Big thanks to @jmmk and @sliwinski-milosz.

@sliwinski-milosz
Copy link

Hey Guys,

I prepared solution based on hints provided by @jmmk . With my solution elastic2_doc_manager is able to do bulk insert and update operations. It also does multi get request to ES to get sources for queued operations.

I have tested it for a while and seems that works pretty good.

So now I need your help in testing. @hungvotrung could you please test my solution and provide that nice summary. This time there should be a lot less Elasticsearch calls.

You can find code here:
https://github.com/sliwinski-milosz/elastic2-doc-manager/blob/master/mongo_connector/doc_managers/elastic2_doc_manager.py

And here is a diff in comparison to default elastic2_doc_manager:
https://www.diffchecker.com/i3eefnah

@llvtt
Copy link

llvtt commented Jul 5, 2016

@sliwinski-milosz, does the code you've written pass the existing unit tests in mongo-connector and elastic2-doc-manager? Those would be good starting points for testing. You might also consider writing some tests around BulkBuffer. When you're ready, you should turn your code into a pull request against elastic2-doc-manager.

@jmmk @sliwinski-milosz @hungvotrung Thank you all for your hard work on this.

@sliwinski-milosz
Copy link

sliwinski-milosz commented Jul 6, 2016

I found one case which is not handled by my solution.
Steps:

  1. Insert documents to collection which is provided in "namespaces" configuration inside mongo-connector configuration file. New documents should not contain any fields specified in"fields" configuration. (Mongo-connector will skip these documents and will not replicate them to elasticsearch.)
  2. Try to update these documents by adding fields which are specified inside mongo-connector configuration file. (It will try to update these documents, and as it is "update" operation -> it will try to get sources from elasticsearch -> but as documents have been skipped, sources are not there.)

I wonder if above case is even handled in default elastic2_doc_manager.

@hungvotrung
Copy link
Author

hungvotrung commented Jul 7, 2016

@sliwinski-milosz : your new code is lighting fast
image

Edit:
Not sure why there was to refresh for each _bulk call
`2016-07-07 10:45:23,177 [INFO] elasticsearch:63 - GET http://10.10.y.z:9200/_mget [status:200 request:0.003s]

2016-07-07 10:45:23,582 [INFO] elasticsearch:63 - POST http://10.10.y.z:9200/_bulk [status:200 request:0.365s]
2016-07-07 10:45:23,623 [INFO] elasticsearch:63 - POST http://10.10.y.z:9200/_refresh [status:200 request:0.037s]
2016-07-07 10:45:23,725 [INFO] elasticsearch:63 - POST http://10.10.y.z:9200/_refresh [status:200 request:0.005s]
2016-07-07 10:45:23,731 [INFO] elasticsearch:63 - GET http://10.10.y.z:9200/_mget [status:200 request:0.004s]
2016-07-07 10:45:24,131 [INFO] elasticsearch:63 - POST http://10.10.y.z:9200/_bulk [status:200 request:0.338s]
2016-07-07 10:45:24,213 [INFO] elasticsearch:63 - POST http://10.10.y.z:9200/_refresh [status:200 request:0.070s]
2016-07-07 10:45:24,887 [INFO] elasticsearch:63 - POST http://10.10.y.z:9200/_refresh [status:200 request:0.002s]`

@sliwinski-milosz
Copy link

sliwinski-milosz commented Jul 7, 2016

@hungvotrung Thank you very much for testing!

There are two threads: mongo-connector and timer (for auto_commit purpose).

It refreshes Elasticsearch in two cases:

  1. Just before mget request
  2. After bulk write
    That is why you can see two calls for refresh in logs. So one of these two is called before mget.

Because of these two threads it has to call refresh before mget as I put Ad2 refresh outside lock, just to not block mongo-connector.
It is possible that timer will call refresh and before it is done mongo-connector will call mget. In that case if elasticsearch will not be refreshed, it might not find a data (or get stale data). That is why I put Ad1 refresh in code.

@hungvotrung
Copy link
Author

Bug: that still related to the scenario that when application insert new document then follow with severals update on that newly doc. This case mongo-connector fail to send new doc to elastic-search since mget cannot find the doc on elastic search & does't through any exception. To fix (and avoid losing data) I still have to make the commit immediately after have any insert in mongodb. Other than that bulk insert seem work well.

@sliwinski-milosz
Copy link

sliwinski-milosz commented Jul 15, 2016

@hungvotrung it is interesting.

You can print missing documents here:

           else:
                # Document not found in elasticsearch,
                # Seems like something went wrong during replication
                print each_doc
                print self.doc_to_get[index]

I am using that logic for 9 days without any issues (no missing documents).

  1. Are you sure that your case is not related to the case described by me in previous comment? If you will try to insert documents with fields, which are not mapped in mongo-connector configuration -> they will not be replicated to elasticsearch so you will not be able to make updates on them. (https://github.com/mongodb-labs/mongo-connector/wiki/Configuration-Options#fields)
  2. Do you use default oplog_manager.py?

Logic to handle your case is there - lets investigate the issue.

@hungvotrung
Copy link
Author

  1. Yes, I use the field options in the config file. But I don't think it relate to the case u described before as the object that we insert to mongodb have the fields in the config.
  2. Yes, just normal oplog_manager

So I turned on the debug log to got more detail (1 insert follows by several update):
2016-07-18 13:53:04,436 [DEBUG] mongo_connector.oplog_manager:229 - OplogThread: Iterating through cursor, document number in this cursor is 0 2016-07-18 13:53:04,436 [DEBUG] mongo_connector.oplog_manager:274 - OplogThread: Operation for this entry is i 2016-07-18 13:53:04,437 [DEBUG] mongo_connector.oplog_manager:329 - OplogThread: Doc is processed. 2016-07-18 13:53:04,470 [DEBUG] mongo_connector.oplog_manager:229 - OplogThread: Iterating through cursor, document number in this cursor is 1 2016-07-18 13:53:04,470 [DEBUG] mongo_connector.oplog_manager:274 - OplogThread: Operation for this entry is u 2016-07-18 13:53:04,476 [DEBUG] mongo_connector.oplog_manager:329 - OplogThread: Doc is processed. 2016-07-18 13:53:04,476 [DEBUG] mongo_connector.oplog_manager:745 - OplogThread: oplog checkpoint updated to Timestamp(1468821184, 4) 2016-07-18 13:53:04,478 [DEBUG] mongo_connector.oplog_manager:341 - OplogThread: updating checkpoint afterprocessing new oplog entries 2016-07-18 13:53:04,479 [DEBUG] mongo_connector.oplog_manager:745 - OplogThread: oplog checkpoint updated to Timestamp(1468821184, 4) 2016-07-18 13:53:04,479 [DEBUG] mongo_connector.oplog_manager:223 - OplogThread: Cursor is still alive and thread is still running. 2016-07-18 13:53:04,488 [DEBUG] mongo_connector.oplog_manager:229 - OplogThread: Iterating through cursor, document number in this cursor is 0 2016-07-18 13:53:04,489 [DEBUG] mongo_connector.oplog_manager:274 - OplogThread: Operation for this entry is u

mget

2016-07-18 13:53:25,831 [DEBUG] urllib3.connectionpool:395 - "GET /_mget HTTP/1.1" 200 920 2016-07-18 13:53:25,831 [INFO] elasticsearch:63 - GET http://10.10.y.z:9200/_mget [status:200 request:0.061s] 2016-07-18 13:53:25,832 [DEBUG] elasticsearch:65 - > {"docs": [{"_type": "Collection", "_id": "578c6ec19e9cc51e44b2b461", "_index": "tobeindexdb"}, {"_type": "Collection", "_id": "578c6ec19e9cc51e44b2b461", "_index": "tobeindexdb"}, {"_type": "Collection", "_id": "578c6ec19e9cc51e44b2b461", "_index": "tobeindexdb"}, {"_type": "Collection", "_id": "578c6ec19e9cc51e44b2b461", "_index": "tobeindexdb"}, {"_type": "Collection", "_id": "578c6ec19e9cc51e44b2b461", "_index": "tobeindexdb"}, {"_type": "Collection", "_id": "578c6ec19e9cc51e44b2b461", "_index": "tobeindexdb"}, {"_type": "Collection", "_id": "578c6ec19e9cc51e44b2b461", "_index": "tobeindexdb"}, {"_type": "Collection", "_id": "578c6ec19e9cc51e44b2b461", "_index": "tobeindexdb"}, {"_type": "Collection", "_id": "578c6ec19e9cc51e44b2b461", "_index": "tobeindexdb"}, {"_type": "Collection", "_id": "578c6ec19e9cc51e44b2b461", "_index": "tobeindexdb"}]} 2016-07-18 13:53:25,832 [DEBUG] elasticsearch:66 - < {"docs":[{"_index":"tobeindexdb","_type":"Collection","_id":"578c6ec19e9cc51e44b2b461","found":false},{"_index":"tobeindexdb","_type":"Collection","_id":"578c6ec19e9cc51e44b2b461","found":false},{"_index":"tobeindexdb","_type":"Collection","_id":"578c6ec19e9cc51e44b2b461","found":false},{"_index":"tobeindexdb","_type":"Collection","_id":"578c6ec19e9cc51e44b2b461","found":false},{"_index":"tobeindexdb","_type":"Collection","_id":"578c6ec19e9cc51e44b2b461","found":false},{"_index":"tobeindexdb","_type":"Collection","_id":"578c6ec19e9cc51e44b2b461","found":false},{"_index":"tobeindexdb","_type":"Collection","_id":"578c6ec19e9cc51e44b2b461","found":false},{"_index":"tobeindexdb","_type":"Collection","_id":"578c6ec19e9cc51e44b2b461","found":false},{"_index":"tobeindexdb","_type":"Collection","_id":"578c6ec19e9cc51e44b2b461","found":false},{"_index":"tobeindexdb","_type":"Collection","_id":"578c6ec19e9cc51e44b2b461","found":false}]}

you can see mget cannot get the the newly doc hence can not apply the update later and when the bulk happen new doc will be overwrite by later update --> got an empty doc in elasticsearch. So for new insert with update right after it will fail hence it's important to do the commit for each insert otherwise we need to look for another way to make sure data is replicated correctly :)
Detail log is here: http://pastebin.com/HaRJhi7K

@sliwinski-milosz
Copy link

sliwinski-milosz commented Jul 19, 2016

Hey @hungvotrung

I found the issue. It was about type of "_id" variable. Oplog (by update() function) provides "_id" as integer number but inside my class (locally) _source is stored under unicode "_id". That is why it was not able to find _source locally and it tried to get it from elasticsearch.

You can find diff here:
sliwinski-milosz/elastic2-doc-manager@3cc49c5

Could you please test it again? I made some smoke tests and seems to work fine.

@hungvotrung
Copy link
Author

hungvotrung commented Jul 20, 2016

Got this exception:

ongo_connector.util:92 - Fatal Exception
Traceback (most recent call last):
File "/usr/lib/python2.7/site-packages/mongo_connector/util.py", line 90, in wrapped
func(_args, *_kwargs)
File "/usr/lib/python2.7/site-packages/mongo_connector/oplog_manager.py", line 303, in run
ns, timestamp)
File "/usr/lib/python2.7/site-packages/mongo_connector/util.py", line 32, in wrapped
return f(_args, *_kwargs)
File "/usr/lib/python2.7/site-packages/mongo_connector/doc_managers/elastic2_bulk_doc_manager.py", line 145, in update
self.upsert(updated, namespace, timestamp)
File "/usr/lib/python2.7/site-packages/mongo_connector/util.py", line 32, in wrapped
return f(_args, *_kwargs)
File "/usr/lib/python2.7/site-packages/mongo_connector/doc_managers/elastic2_bulk_doc_manager.py", line 181, in upsert
self.index(action,meta_action)
File "/usr/lib/python2.7/site-packages/mongo_connector/doc_managers/elastic2_bulk_doc_manager.py", line 324, in index
self.commit()
File "/usr/lib/python2.7/site-packages/mongo_connector/doc_managers/elastic2_bulk_doc_manager.py", line 332, in commit
successes, errors = bulk(self.elastic, action_buffer)
File "/usr/lib/python2.7/site-packages/elasticsearch/helpers/init.py", line 188, in bulk
for ok, item in streaming_bulk(client, actions, **kwargs):
File "/usr/lib/python2.7/site-packages/elasticsearch/helpers/init.py", line 159, in streaming_bulk
for bulk_actions in _chunk_actions(actions, chunk_size, max_chunk_bytes, client.transport.serializer):
File "/usr/lib/python2.7/site-packages/elasticsearch/helpers/init.py", line 58, in _chunk_actions
data = serializer.dumps(data)
File "/usr/lib/python2.7/site-packages/elasticsearch/serializer.py", line 50, in dumps
raise SerializationError(data, e)
TypeError("Unable to serialize ObjectId('578f2eca9e9cc51314efea67') (type: <class 'bson.objectid.ObjectId'>)",))

@sliwinski-milosz
Copy link

sliwinski-milosz commented Jul 20, 2016

@hungvotrung many thanks for testing!!

I found the root cause: it was related to storing _source locally. For one case I stored it after Formatter.format_document, for another one before doing that. Now for both cases I store not formatted source.

If you still want to try ;-):
sliwinski-milosz/elastic2-doc-manager@817d5c9

This time I tested it more deeply:

  1. I made a dump of entire db.
  2. Made some for loop inserts/updates/deletes.

@hungvotrung
Copy link
Author

@sliwinski-milosz sweet, just done several run, seem works fine now

@sliwinski-milosz
Copy link

Good to hear! :) Thank you for your patience. Now it is time to prepare some tests and make a pull request. I will do that soon.

@julio-vaz
Copy link

Thanks for everyone's time!
Any update on this matter?

@sliwinski-milosz
Copy link

Hi, no updates yet at least from my side. I am still working on tests... maybe this weekend I will be able to finally make a pull request.

@sliwinski-milosz
Copy link

sliwinski-milosz commented Jan 19, 2017

@hungvotrung could you please, for the very last time, prepare yours nice summary with the latest version of mongo-connector and elastic2_doc_manager (0.2.0 vs 0.3.0) and then I think we can close this issue.

Please just remember to set autoCommitInterval.

@sorhent
Copy link

sorhent commented Aug 8, 2017

@sliwinski-milosz
Hello, I was using your solution, because my ES had a big lag behind mongodb but I was facing 2 problems:

  1. When performing bulk index/delete operations, if during index/delete operation there is a problem(maybe document is absent when update or delete), the method ~elasticsearch.Elasticsearch.bulk called from elastic2_doc_manager.py will raise an exception BulkIndexError and all the documents to be treated after the document that caused the exception will not be treated.
    In order to avoid this I made something like this:
    kw['raise_on_error'] = False
    kw['raise_on_exception'] = False
    successes, errors = bulk(self.elastic, action_buffer, **kw)
    'raise_on_error' and 'raise_on_exception' parameters are set to True by default.
    The 'chunk_size' parameter of the same ~elasticsearch.Elasticsearch.bulk method could be modified as well (by default 500).
    You want me to create a PR for solving this issue?

  2. The second issue was that in your code the parent/child and routing notions are not treated. For now my code is a little bit dirty but I'll try to clean it and to create a PR.

@sliwinski-milosz
Copy link

sliwinski-milosz commented Aug 8, 2017

Hi @sorhent :)

Ad. 1. I think that you should rather find a reason why any of those operation fails - if you will simply just ignore exceptions you will end-up with mongodb<->elasticsearch not fully synchronised.

Ad. 2. It seems like parent-child relationship is not ready yet: yougov/elastic2-doc-manager#25 - maybe you can help them to finish implementation :)

You can check this issue as well: #678 . Maybe that is the reason of your exceptions.

@sorhent
Copy link

sorhent commented Aug 9, 2017

Hi again @sliwinski-milosz,

  1. Actually that was my problem (mongodb<->elasticsearch not fully synchronised) before modifying the code as I indicated in the previous comment. And that was due to following scenario:
  • I start mongo-connector for a full transfer of a mongo collection that changes very often, to ES
  • before transfering the document (let's say) 'd', there is an external delete action of this document so this document will not be transfered to ES because it doesn't exist anymore
  • once, mongo-connector riches the end of the collection, it will start to replay the oplog operations that executed meanwhile
  • when it will have to play the delete opperation for document 'd' there will be a failure because this document was not transfered to ES
  • as the two parameters 'raise_on_error' and 'raise_on_exception' are set by default to True and the bulk method is implemented on ES library, the execution of the current 'bulk' method will stop at this moment
  • so any operation in the current bulk, after a failed operation, will not be executed, creating inconsistency between mongo and ES
    Anyways, all the failed operations are logged as errors.
  1. Thanks for info, I'll try to see with them

@sliwinski-milosz
Copy link

That is actually quite interesting case :)

So steps to reproduce:

  1. Start full-dump of mongo-db.
  2. While dump is ongoing - delete some documents from MongoDb.
  3. As mongo-connector hasn't yet dumped mentioned document and document has been deleted in the meantime, it will not be added to ES.
  4. After dump - mongo-connector goes through oplog and it is trying to delete mentioned document again even that it has not been added to ES.

@ShaneHarvey do you know if above case is handled by mongo-connector?

@sorhent
Copy link

sorhent commented Aug 9, 2017

@sliwinski-milosz , taking into account that the issue is on https://github.com/mongodb-labs/elastic2-doc-manager repository I created an issue related to bulk operation(yougov/elastic2-doc-manager#52) , on that repository

@Ricky-Hao
Copy link

Ricky-Hao commented Nov 19, 2018

@sliwinski-milosz I found a http auth problem in your doc_manager. If the sniff_on_start set to True, the main host of ElasticSearch would be set with http_auth, but the other node would connect without http_auth.
So, add client_options['http_auth'] = tuple(kwargs['http_auth'].split(':')) to doc_manager before self.elastic = Elasticsearch(hosts=url, **client_options) could solve this problem.
Remember add http_auth field in configure file docManagers.args. Just like "http_auth" : "user:pass".

@sliwinski-milosz
Copy link

@Ricky-Hao issue that you have mentioned is not related to lag between ElasticSearch and MongoDb.

Could you please open new issue regarding http auth problem? As it is related to elastic2-doc-manager I think you should open it in https://github.com/yougov/elastic2-doc-manager/issues
Even better if you would also open pull request with the fix ;-)

I also think that this issue can be closed as lagging has been fixed.

@jaraco
Copy link
Collaborator

jaraco commented Nov 19, 2018

Thanks for the update @sliwinski-milosz .

@jaraco jaraco closed this as completed Nov 19, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests