Wednesday, May 28, 2014

Spark should be better than MapReduce (if only it worked)

Spark is the user-friendly face of Big Data: a distributing programming framework which lets you write collection oriented algorithms in Scala that (theoretically) execute seamlessly across many machines. Spark has an elegant API (parallel collections with methods like map/reduce/groupByKey) that feels like programming locally. Unlike MapReduce, Spark can cache partial results across the memory of its distributed workers, allowing for significantly faster/lower-latency computations. If Spark worked as promised it would be a huge productivity boost over writing MapReduce pipelines

Unfortunately, as I've learned over the past month, Spark will happily generate programs that mysteriously grind to a halt-- and tracking down the source of these problems can be a numbingly opaque process. There are at least two distinct problems: Spark's lazy evaluation makes it hard to know which parts of your program are the bottleneck and, even if you can identify a particularly slow expression, it's not always obvious why it's slow or how to make it faster.

Lazy Evaluation

Spark's API lets you to express your intentions clearly via many fine-grained calls to transformations such as map, filter, flatMap, distinct, &c. If you ran a long chain of transformations one at a time, you'd incur a large communication overhead and clog each worker's local cache with useless partial results. Spark reconciles its functional style with performance via delayed execution: transformations get bundled together and only run on demand. The unruly down-side to Spark's execution model is that big swaths of your program will run as a monolithic glob of computation. And if that computation runs slowly...well, good luck figuring out which of its constituent parts is the culprit. Spark could ameliorate some of this confusion with a non-lazy debug mode or some built-in tooling assistance (i.e. a distributed profiler). For now, however, you're stuck (1) trying to reason your way through the lazy dependency graph ("Oh! This one is a shuffle dependency!") (2) forcing computations and checking how long they take. 

Too Many Parameters, Too Many Ways to Fail

Though Spark looks like you're programming on your laptop, it has many performance gotchas you must guard vigilantly against. Make sure your objects aren't using Java serialization (Kryo is faster), pull that object out of the closure, broadcast this array to keep it from being copied repeatedly. Though annoying for beginners, these rules of thumb at least have some consistency which can be learned. 

More frustrating, however, is the inevitability with which Spark's many (initially invisible) default parameters will be wrong for your application. Buffers and heap sizes will turn out to be too small. Your reductions will use too few (or too many) workers. Too much memory gets used for data caching, or maybe it's too little. There's no rigorous or systematic way to set these parameters: you wait until things fail and supplicate at the feet of Spark's heuristic knob pantheon. 

My Experience Thus Far

Nothing I do can save my cluster from spending hours thrashing its way through a modest input size (before dying under a flood of obscure exceptions). I have recently tried all of the following (and more) to keep Spark from its stubborn predilection toward dying:
After a month of "stab in the dark" debugging, I've learned a lot about the internals of Spark but still don't have a working application. In some weird twist of Stockholm syndrome, I've come to like expressing my algorithms in Spark's abstractions: if only they would actually run! 

Has anyone else had a similar experience with Spark? Alternatively, has anyone had a positive experience with a non-trivial Spark codebase (bigger than tutorial wordcount examples). If so, what were your tricks to avoid the death-by-a-thousand-shuffles I've been experiencing? How about Spark's close cousins: Scalding, Scoobi, and Scrunch, are they substantially better (or worse)?



10 comments:

  1. I'm sorry to hear about your troubles, as I'm a fan of Spark. However, I don't have enough experience with it to know how to solve your issues, etc. I've used Scalding a lot, which sits on Cascading, which (currently) sits on MapReduce. The API is similar to Spark's, so you might try porting a "troubled" part of your app to Scalding to see if it us noticeably better. It will have MR's slow aspects, but maybe it won't get bogged down.

    ReplyDelete
  2. Hey Alex, it's too bad to hear that you had a bad experience, but definitely useful feedback. For tuning applications, did you take a look at the performance talk at Spark Summit 2013 (https://spark-summit.org/talk/wendell-understanding-the-performance-of-spark-applications/) and the app UI at localhost:4040? They offer a good place to start. The other thing I'd recommend is posting on the user mailing lists (http://spark.apache.org/community.html). There may be other people there who can help with specific issues.

    More generally, the community has worked quite actively over the past few releases to reduce configuration needed, and is continuing to do so. In the past two releases we've removed many of the cases that required sending data through Akka (so there's no frame size limit), added external spilling to most operations, and added quite a few more monitoring features. More generally, efforts like Spark SQL (https://databricks.com/blog/2014/03/26/spark-sql-manipulating-structured-data-using-spark-2.html) will let Spark automatically use an efficient storage format data where you describe the schema, eliminating the need to fine-tune Java object storage. Unfortunately there are just some known challenges in programming high-memory applications in Java, but the language features and API leave room for significant automatic management.

    ReplyDelete
  3. Hey Alex,

    I’m a committer of Stratosphere (http://stratosphere.eu), where we are developing a system that has similar APIs to Spark, but with a different runtime. The runtime is storing objects always in a serialized form in byte arrays. The operators of the system operate on the serialized data, rather than on the Java objects. This leads to very efficient memory usage. Additionally, all operators (except the in-memory index for the iterations) support graceful degradation to disk (spilling). I would love to see whether Stratosphere solves some of your issues, especially the ones pertaining to memory configuration, and data serialization.

    Robert

    ReplyDelete
  4. This comment has been removed by a blog administrator.

    ReplyDelete
  5. The performance issues I can sympathize with; as you say, there's a bunch of random heuristics that you need to learn.

    I haven't hand the random failures you've experienced though. My spark jobs sometimes take much longer than they should, but they never just fail.

    ReplyDelete
  6. Stupid comment box erased my comment after authentication.
    Here's the short version: follow the advice on https://my.vertica.com/docs/4.1/HTML/Master/12962.htm - I had to set vm.max_map_count to 12800000 and them everything started working for me.
    (I've had a very similar experience, "Spark is awesome, if only it worked!". Now it seems to have started working :) ).

    ReplyDelete
  7. This made me laugh, it's quite true, Spark's API and Theory is amazing and should make Big Data as easy as writing regular Scala code. I've been building full sized complicated production quality applications in Spark and found 90% of debugging time is spent on the simplest 10% of Sparks functionality. Once the basics and heuristics are eventually solved, all the complicated stuff is easy. For example controlling the number of partitions isn't easy (they provide `coalesce` for decreasing, but there is no shuffle-free way for increasing).

    I've also pretty much done everything on your list, and will add to your list that writting an sbt build file to work with Intellij and java -cp is a nightmare.

    Nevertheless I continue to use and love Spark. Before Spark I used Scalding for about a year, and although pretty cool, I'd say Spark is actually much easier to use, and orders faster.

    For Spark to solve it's problems I feel it needs to focus it's efforts better as follows:

    MORE:
    - Documentation on building and running like any other application (i.e. with java -cp NOT some silly script), I'm fed up of the 10 billion reasons why building can go wrong, they should provide template SBT files for every release that actually work.
    - File input and output formats
    - a `repartitionWithoutShuffle` function
    - default auto-magic optimised serialisation
    - Cache, shuffle, and heap memory fraction usage monitoring

    LESS:
    - wasting effort on the Python API, it's time these dynamically typed hacky scripters get their act together and learn safe production worthy languages.

    ReplyDelete
  8. Thanks for your comment Fredri,

    There are some sensible developments in the 1.2 branch of Spark. Folks in my lab are still having a miserable time getting it running at scale but it seems like there's at least an increased emphasis on monitoring, debuggability, and memory/spilling. Maybe in a year or two Spark's scalability will finally match the elegance of its API?

    ReplyDelete
  9. Hi Alex, thanks for the article - I found your views on lazy evaluation and debugging quite interesting. However I thought that with Spark, the web interface on port 4040 shows a breakdown of the 'stages' of an application. So then if you are trying to find where your Spark program is slow, you can just look at how long it's taking at each stage in the web UI. Does that not solve the problem? Or is the concern that it's difficult to know which transformation is in each stage?

    ReplyDelete
  10. Some great helpful insights ....thx.

    ReplyDelete