2

Reading from Elasticsearch v6.2 into spark using the prescribed spark connector org.elasticsearch:elasticsearch-spark-20_2.11:6.3.2 is horrendously slow. This is from a 3 node ES cluster with index:

curl https://server/_cat/indices?v
green  open   db MmVwAwYfTz4eE_L-tncbwQ   5   1  199983131      9974871    105.1gb         51.8gb

Reading on a (10 node, 1tb mem, >50 VCPUs) spark cluster:

val query = """{
  "query": {
    "match_all": {}
  }
}"""

val df = spark.read
  .format("org.elasticsearch.spark.sql")
  .option("es.nodes","server")
  .option("es.port", "443")
  .option("es.net.ssl","true")
  .option("es.nodes.wan.only","true")
  .option("es.input.use.sliced.partitions", "false")
  .option("es.scroll.size", "1000")
  .option("es.read.field.include", "f1,f2,f3")
  .option("es.query",query)
  .load("db")

df.take(1)

That took 10 minutes to execute. enter image description here

Is this how (slowly) it's supposed to work, or am I doing something wrong?

ixaxaar
  • 6,411
  • 3
  • 24
  • 33
  • Hi @ixaxaar, did you find the reason or any solution? – Rami Jun 28 '19 at 12:58
  • 1
    Nope, could not really figure it out. Tried various settings and configs including `es.input.use.sliced.partition`, finally went ahead with dumping the whole index into s3 (https://github.com/taskrabbit/elasticsearch-dump) and then loading in spark. – ixaxaar Jul 01 '19 at 05:51
  • Try to check CPU utilization with `https://server/_cat/nodes?v&h=id,cpu&s=id` – StanislavKo Dec 11 '19 at 09:07

1 Answers1

1

This is not how slow it is supposed to be, and the answer could be found in the screenshot you shared:

The column Stages: Succeeded/Total in Spark UI shows only one task that runs the read operation, I don't think that this is what you would expect, otherwise, what's the point of having a whole cluster.

I have faced the same problem and it took me a while to figure out that Spark associates a task (partition) to each shard in the Elasticsearch index,

There we have our answer, to go faster we should parallelise the process, how to do so ? well by distributing our source index into multiple shards.

By default, Elasticsearch creates an Index with one shard, though, it is possible to personalised it as below:

PUT /index-name
{
     "settings": {
     "index": {
     "number_of_shards": x,  
     "number_of_replicas": xx 
    }
  }
}

The number of shards could be higher than the number of Elastic nodes, this is all transparent to Spark. If the index already exists, try creating a new inex and then use the Elasticsearch Reindex API

I hope this solved your problem.

  • Well, in the above issue I was trying to fetch one document. `df.take(1)`. Anyway, after lots of hiccups faced by the team, we ended up phasing out ES in favor of S3. Life was pretty good after that. – ixaxaar May 16 '21 at 11:45
  • 1
    This is what I wanted to highlight https://i.imgur.com/WrGkPoz.png there is one task that is run on one partition `df. take(1)` won't stop spark from reading all the data and then filtering, on the other hand, if we increase the number of shards, Spark would associate to each one a partition and so a task which would then run in parallel. – bechir nahali May 16 '21 at 19:08
  • @ixaxaar I do share your opinion with S3, you have for sure saved yourselves from a lot of unnecessary trouble. – bechir nahali May 16 '21 at 19:15