0

I have a spark Scala job running in EMR that I am trying to improve. As of right now it runs on m5.8xlarge with no issues. I recently tried upgrading to the Graviton based EC2 instances m6g.8xlarge and while the job does succeed, I am seeing some weird issues. Some of the issues I see is tasks failing due to a timeout, stages running in a strange order, and it looks like the memory is strained. The stage that runs out of order is the one with failed tasks, stage 6 runs then fails, then stages 4 & 5 complete, and then stage 6 retry succeeds. In the m5.8xlarge run that currently is working, stages 4 & 5 get skipped. I'm not sure why this is happening since the only change I made was going from an m5 instance type to an m6g, so I wanted to see if anyone experienced something similar or has solutions. I will also post some of the errors from the failed tasks, but I think they are related to the oom.

Here is the main error I am seeing:

ERROR TransportClientFactory:261 - Exception while bootstrapping client after 60041 ms
java.lang.RuntimeException: java.util.concurrent.TimeoutException: Timeout waiting for task.
    at org.spark_project.guava.base.Throwables.propagate(Throwables.java:160)
    at org.apache.spark.network.client.TransportClient.sendRpcSync(TransportClient.java:263)
    at org.apache.spark.network.sasl.SaslClientBootstrap.doBootstrap(SaslClientBootstrap.java:70)
    at org.apache.spark.network.crypto.AuthClientBootstrap.doSaslAuth(AuthClientBootstrap.java:116)
    at org.apache.spark.network.crypto.AuthClientBootstrap.doBootstrap(AuthClientBootstrap.java:89)
    at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:257)
    at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:187)
    at org.apache.spark.network.shuffle.ExternalShuffleClient.lambda$fetchBlocks$0(ExternalShuffleClient.java:100)
    at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:141)
    at org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:121)
    at org.apache.spark.network.shuffle.ExternalShuffleClient.fetchBlocks(ExternalShuffleClient.java:109)
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:264)
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.org$apache$spark$storage$ShuffleBlockFetcherIterator$$send$1(ShuffleBlockFetcherIterator.scala:614)
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.fetchUpToMaxBytes(ShuffleBlockFetcherIterator.scala:609)
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.initialize(ShuffleBlockFetcherIterator.scala:442)
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.<init>(ShuffleBlockFetcherIterator.scala:160)
    at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:66)
    at org.apache.spark.sql.execution.ShuffledRowRDD.compute(ShuffledRowRDD.scala:173)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
    at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:105)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
Caused by: java.util.concurrent.TimeoutException: Timeout waiting for task.
    at org.spark_project.guava.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:276)
    at org.spark_project.guava.util.concurrent.AbstractFuture.get(AbstractFuture.java:96)
    at org.apache.spark.network.client.TransportClient.sendRpcSync(TransportClient.java:259)
    ... 39 more
Julie
  • 123
  • 8
sgallagher
  • 137
  • 10

1 Answers1

1

I think this is not an out of memory problem. Both m6g.8xlarge and m5.8xlarge have 120 GB of memory following their specs: https://aws.amazon.com/ec2/instance-types/m6g and https://aws.amazon.com/ec2/instance-types/m5

I see in the backtrace that the timeout is during the authentication process:

First it fails to authenticate with Spark's auth protocol in doBootstrap(AuthClientBootstrap.java:89 https://github.com/apache/spark/blob/master/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthClientBootstrap.java#L99

Bootstraps a {@link TransportClient} by performing authentication using Spark's auth protocol. This bootstrap falls back to using the SASL bootstrap if the server throws an error during authentication, and the configuration allows it. This is used for backwards compatibility with external shuffle services that do not support the new protocol.

and then it also fails to authenticate with SASL in doBootstrap(SaslClientBootstrap.java:70 https://github.com/apache/spark/blob/master/common/network-common/src/main/java/org/apache/spark/network/sasl/SaslClientBootstrap.java#L54

  • Bootstraps a {@link TransportClient} by performing SASL authentication on the connection. The server should be setup with a {@link SaslRpcHandler} with matching keys for the given appId.
  • Performs SASL authentication by sending a token, and then proceeding with the SASL challenge-response tokens until we either successfully authenticate or throw an exception due to mismatch.
sebpop
  • 31
  • 3
  • @sgallagher could you please post how to reproduce the fails you are seeing? – sebpop Mar 03 '22 at 17:13
  • I'm not sure how to reproduce the errors, I only see these when I run with the m6g AWS instance types. I'm not sure if those instance types require a different security role that might be causing the Authentication issue? – sgallagher Mar 07 '22 at 17:41