From ef378b868362fbac1fb5fa6371255119d502a602 Mon Sep 17 00:00:00 2001 From: Julian Simioni Date: Thu, 6 Sep 2018 12:21:03 -0400 Subject: [PATCH] fix(lookupStream): properly use lookupStream end event The pip resolver was being shut down on the Node.js stream 'finish' event, rather than the 'end' event. The finish event is called after all data is _read_ into the stream, but there may be some processing left to do. The end result was that some records could never be processed and code using this stream would hang. --- src/lookupStream.js | 2 +- test/lookupStreamTest.js | 19 +++++++++++++++++-- 2 files changed, 18 insertions(+), 3 deletions(-) diff --git a/src/lookupStream.js b/src/lookupStream.js index fbf73fd..245bba1 100644 --- a/src/lookupStream.js +++ b/src/lookupStream.js @@ -104,7 +104,7 @@ module.exports = function(pipResolver, maxConcurrentReqs) { const end = createPipResolverEnd(pipResolver); const stream = parallelTransform(maxConcurrentReqs || 1, pipResolverStream); - stream.on('finish', end); + stream.on('end', end); return stream; }; diff --git a/test/lookupStreamTest.js b/test/lookupStreamTest.js index 2393c3d..679aaf9 100644 --- a/test/lookupStreamTest.js +++ b/test/lookupStreamTest.js @@ -255,16 +255,31 @@ tape('tests', (test) => { }); test.test('call end to stop child processes', (t) => { + // create a stubbed resolver that implements all required methods const resolver = { + lookup: (centroid, search_layers, callback) => { + setTimeout(callback, 0, null, []); + }, end: function () { t.assert(true, 'called end function'); t.equals(resolver, this, 'this is set to the correct object'); t.end(); } }; + // create one document to pass through the stream + const inputDoc = new Document( 'whosonfirst', 'placetype', '1') + .setCentroid({ lat: 12.121212, lon: 21.212121 }); - stream(resolver).end(); + // create the stream to test + const tested_stream = stream(resolver); - }); + // consume the stream so that all data is processed + tested_stream.on('data', function() {}); + + // write document to stream + tested_stream.write(inputDoc, null, function() {}); + // call end to trigger cleanup + tested_stream.end(); + }); });