Unfortunately, as I've learned over the past month, Spark will happily generate programs that mysteriously grind to a halt-- and tracking down the source of these problems can be a numbingly opaque process. There are at least two distinct problems: Spark's lazy evaluation makes it hard to know which parts of your program are the bottleneck and, even if you can identify a particularly slow expression, it's not always obvious why it's slow or how to make it faster.
Lazy EvaluationSpark's API lets you to express your intentions clearly via many fine-grained calls to transformations such as map, filter, flatMap, distinct, &c. If you ran a long chain of transformations one at a time, you'd incur a large communication overhead and clog each worker's local cache with useless partial results. Spark reconciles its functional style with performance via delayed execution: transformations get bundled together and only run on demand. The unruly down-side to Spark's execution model is that big swaths of your program will run as a monolithic glob of computation. And if that computation runs slowly...well, good luck figuring out which of its constituent parts is the culprit. Spark could ameliorate some of this confusion with a non-lazy debug mode or some built-in tooling assistance (i.e. a distributed profiler). For now, however, you're stuck (1) trying to reason your way through the lazy dependency graph ("Oh! This one is a shuffle dependency!") (2) forcing computations and checking how long they take.
Too Many Parameters, Too Many Ways to FailThough Spark looks like you're programming on your laptop, it has many performance gotchas you must guard vigilantly against. Make sure your objects aren't using Java serialization (Kryo is faster), pull that object out of the closure, broadcast this array to keep it from being copied repeatedly. Though annoying for beginners, these rules of thumb at least have some consistency which can be learned.
More frustrating, however, is the inevitability with which Spark's many (initially invisible) default parameters will be wrong for your application. Buffers and heap sizes will turn out to be too small. Your reductions will use too few (or too many) workers. Too much memory gets used for data caching, or maybe it's too little. There's no rigorous or systematic way to set these parameters: you wait until things fail and supplicate at the feet of Spark's heuristic knob pantheon.
My Experience Thus FarNothing I do can save my cluster from spending hours thrashing its way through a modest input size (before dying under a flood of obscure exceptions). I have recently tried all of the following (and more) to keep Spark from its stubborn predilection toward dying:
- increasing JVM heap size of Spark's executors
- tweaking the "memory fraction" used for caching partial results
- writing custom Kryo serializers for objects stored in parallel collections
- co-partitioning collections before joining them
- changing the Akka frame size (receive buffer of the underlying actor library)
- turning speculative execution on/off
- turning shuffle file consolidation on/off
- changing defaultParallelism (a confusingly named parameter which sometimes affects the number of workers assigned to a parallel reduction)
- using stripped down structures (with fewer fields) to represent collection elements
- manually interning strings (by transforming them into integers) in hopes of speeding up groupByKey
- broadcasting shared objects (in case serialized closures were getting too heavy)
- changing the HDFS block size of the input data (to increase initial number of data partitions)
- writing a custom partitioner to try uniform binning of keys (in case RangePartitioner was slow)
- writing a custom RDD (parallel collection) for 3-way join
- ...&c &c.