Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add async APIs #28

Merged
merged 5 commits into from
Jan 16, 2016
Merged

Add async APIs #28

merged 5 commits into from
Jan 16, 2016

Conversation

henridf
Copy link
Owner

@henridf henridf commented Jan 16, 2016

#26

This is a breaking change that transforms all methods that "run computation" from synchronous to asynchronous (node-style callback) operation. The existing synchronous methods are renamed with a "Sync" suffix.

The methods converted are:

DataFrame.collect(..)
DataFrame.columns(..)
DataFrame.count(..)
DataFrame.head(..)

DataFrameReader.json(..)
DataFrameReader.text(..)

DataFrameWriter.json(..)
DataFrameWriter.text(..)
DataFrameWriter.saveAsTable(..)
DataFrameWriter.insertInto(..)

@henridf
Copy link
Owner Author

henridf commented Jan 16, 2016

@tobilg after I merge this, you can use the async apis to run concurrent jobs. no need for explicit java threads! If you can try this out I'd love to hear how it worked for you.

The naming scheme is unorthodox (sync versions have bare names, async
versions are suffixed "Async") to avoid having to touch tons of
lines. This should be fixed, but isn't urgent as none of the raw jvm
objects are exposed anywhere.
This change turns the "action" methods (those that run a computation) into
async methods taking node-style callbacks. For each of the asyncified
methods, an additional synchronous version is added, which has the same
name with a 'Sync' suffix.

The methods converted here are:
collect(..)
columns(..)
count(..)
head(..)
This change turns the "action" methods (those that run a computation) into
async methods taking node-style callbacks. For each of the asyncified
methods, an additional synchronous version is added, which has the same
name with a 'Sync' suffix.

The methods converted here are:
json(..)
text(..)
load(..) // undocumented
This change turns the "action" methods (those that run a computation) into
async methods taking node-style callbacks. For each of the asyncified
methods, an additional synchronous version is added, which has the same
name with a 'Sync' suffix.

The methods converted here are:
json(..)
text(..)
saveAsTable
insertInto
save(..) // undocumented
henridf added a commit that referenced this pull request Jan 16, 2016
@henridf henridf merged commit eebfa06 into master Jan 16, 2016
@henridf henridf deleted the async-apis branch January 16, 2016 18:14
@tobilg
Copy link
Contributor

tobilg commented Jan 16, 2016

Thanks! Have you successfully tested that actions get executed in parallel? From my understanding this wasn't possible because of the way Spark works...

Or do the async calls just wait until the predecessor is finished?

@henridf
Copy link
Owner Author

henridf commented Jan 16, 2016

I did some basic testing with load() by instrumenting the spark sources to
confirm that multiple async loads were indeed concurrent.

Here's some pointers to why this works: when node-java wraps jvm methods,
the asynchronous ones end up in
https://github.com/joeferner/node-java/blob/master/src/methodCallBaton.cpp#L64
where they run on a different worker thread.
This post provides some good context on node worker threads:
https://www.future-processing.pl/blog/on-problems-with-threads-in-node-js/

As to the statement in the Spark docs that "multiple parallel jobs can run
simultaneously if they were submitted from separate threads", my
understanding is that they mention threads because it's a way to initiate
the underlying calls concurrently in Scala/Java (since in the basic spark API, the calls
are synchronous, like they were in these javascript wrappers before this
PR). In our case, we get concurrency via the libuv thread pool.

On 16 January 2016 at 10:41, Tobi [email protected] wrote:

Thanks! Have you successfully tested that actions get executed in
parallel? From my understanding this wasn't possible because of the way
Spark works...

Or do the async calls just wait until the predecessor is finished?


Reply to this email directly or view it on GitHub
#28 (comment)
.

This was referenced Jan 16, 2016
@tobilg
Copy link
Contributor

tobilg commented Jan 16, 2016

That sounds great! I'll try to test this on monday when I'm back in the office.

One idea: To get rid of the callbacks, what do you think about using ES7 async/await and create promise wrappers for the async callback action methods? I use this in my project via ad-hoc babel.js transpilation...

@tobilg
Copy link
Contributor

tobilg commented Jan 16, 2016

One question: Did you use the same context in your tests?

@henridf
Copy link
Owner Author

henridf commented Jan 16, 2016

Yes, I used the same context in my tests. Is that what you were planning to do?

@henridf
Copy link
Owner Author

henridf commented Jan 16, 2016

Re callbacks, I just did it that way as a simple starting point, from which it's easy for users to get promisified versions.

Definitely not a personal preference for callbacks here :)

I also hesitated to add promisified versions as part of this PR... that's easy to do but I'm just not yet sure if it's best to build those in. It sounds like you think they should be?

@tobilg
Copy link
Contributor

tobilg commented Jan 16, 2016

Great, and yes, that's what I'd want to do. I'll test this on monday and try to incorporate this in my project.

@tobilg
Copy link
Contributor

tobilg commented Jan 16, 2016

Sorry, last comment was regarding the context. Concerning the promisified functions, I think they would be a nice addon. And it would make it really easy to use await-style quasi-synchronous code style.

@henridf
Copy link
Owner Author

henridf commented Jan 16, 2016

Filed #29 for promisified functions

On 16 January 2016 at 11:31, Tobi [email protected] wrote:

Sorry, last comment was regarding the context. Concerning the promisified
functions, I think they would be a nice addon. And it would make it really
easy to use await-style quasi-synchronous code style.


Reply to this email directly or view it on GitHub
#28 (comment)
.

@tobilg
Copy link
Contributor

tobilg commented Jan 17, 2016

Could you mayble share your test code? I was trying to promisify collect() and call the execution of two different operations (within the same context) with async.parallel(). No real luck so far unfortunately...

@henridf
Copy link
Owner Author

henridf commented Jan 17, 2016

When I originally checked (for DataFrameReader.load() I added "start" and "stop" printlns to the beginning and end of load() in DataFrameWriter.scala, started two loads at the same time, and observed that the sequence of logs was "start, start, stop, stop" (as opposed to start, stop, start, stop in the case of sequential execution).

Now I just validated that collect() is concurrent. This time i didn't touch the spark sources. Instead I did this:

  • create two json files, one big (1m rows) and one small (1 row)
  • load both as dataframes (using the same sqlContext)
  • collect both in succession, starting with the "big" one, and observe that the small one finishes first.
spark-node> var big= sqlContext.read().jsonSync("./data/big.json")
spark-node> var small = sqlContext.read().jsonSync("./data/small.json")
spark-node> big.collect((err, res) => console.log("big has " + res.length + " rows")); small.collect((err, res) => console.log("small has " + res.length + " rows")); 

The output was:

spark-node> small has 1 rows
big has 1000000 rows

Showing that the collect that started second finished first.

@tobilg
Copy link
Contributor

tobilg commented Jan 18, 2016

Thanks! Don't you also need to enable the fair scheduler for this,e.g. "spark.scheduler.mode": "FAIR" as outlined at http://spark.apache.org/docs/latest/job-scheduling.html#scheduling-within-an-application or does it work out of the box?

@tobilg
Copy link
Contributor

tobilg commented Jan 18, 2016

Update: I got it to work with my project as well. The problems were that I was using some event listeners which got overwritten when issuing parallel request... D'oh!

@henridf
Copy link
Owner Author

henridf commented Jan 18, 2016

Excellent!

On Monday, January 18, 2016, Tobi [email protected] wrote:

Update: I got it to work with my project as well. The problems were that I
was using some event listeners which got overwritten when issuing parallel
request... D'oh!


Reply to this email directly or view it on GitHub
#28 (comment)
.

@tobilg
Copy link
Contributor

tobilg commented Jan 18, 2016

Thanks! Regarding the fair scheduler, have you used this as well? With my Spark 1.6.0 the standard method is FIFO which I guess is not recommended:

By default, Spark’s scheduler runs jobs in FIFO fashion. Each job is divided into “stages” (e.g. map and reduce phases), and the first job gets priority on all available resources while its stages have tasks to launch, then the second job gets priority, etc. If the jobs at the head of the queue don’t need to use the whole cluster, later jobs can start to run right away, but if the jobs at the head of the queue are large, then later jobs may be delayed significantly.

Starting in Spark 0.8, it is also possible to configure fair sharing between jobs. Under fair sharing, Spark assigns tasks between jobs in a “round robin” fashion, so that all jobs get a roughly equal share of cluster resources. This means that short jobs submitted while a long job is running can start receiving resources right away and still get good response times, without waiting for the long job to finish. This mode is best for multi-user settings.

@henridf
Copy link
Owner Author

henridf commented Jan 18, 2016

I haven't touched the scheduler settings in any way so far. As to FIFO vs
fair, presumably fair is better but it all depends on the needs of your
application and users...

On 18 January 2016 at 08:20, Tobi [email protected] wrote:

Thanks! Regarding the fair scheduler, have you used this as well? With my
Spark 1.6.0 the standard method is FIFO which I guess is not recommended
http://spark.apache.org/docs/latest/job-scheduling.html#scheduling-within-an-application
:

By default, Spark’s scheduler runs jobs in FIFO fashion. Each job is
divided into “stages” (e.g. map and reduce phases), and the first job gets
priority on all available resources while its stages have tasks to launch,
then the second job gets priority, etc. If the jobs at the head of the
queue don’t need to use the whole cluster, later jobs can start to run
right away, but if the jobs at the head of the queue are large, then later
jobs may be delayed significantly.

Starting in Spark 0.8, it is also possible to configure fair sharing
between jobs. Under fair sharing, Spark assigns tasks between jobs in a
“round robin” fashion, so that all jobs get a roughly equal share of
cluster resources. This means that short jobs submitted while a long job is
running can start receiving resources right away and still get good
response times, without waiting for the long job to finish. This mode is
best for multi-user settings.


Reply to this email directly or view it on GitHub
#28 (comment)
.

tobilg added a commit to tobilg/apache-spark-node that referenced this pull request Jan 20, 2016
@tobilg tobilg mentioned this pull request Jan 20, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants