3

The original question was trying to deploy spark 1.4 on Google Cloud. After downloaded and set

SPARK_HADOOP2_TARBALL_URI='gs://my_bucket/my-images/spark-1.4.1-bin-hadoop2.6.tgz'

deployment with bdutil was fine; however, when trying to call SqlContext.parquetFile("gs://my_bucket/some_data.parquet"), it runs into following exception:

 java.lang.ClassCastException: com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem cannot be cast to org.apache.hadoop.fs.FileSystem
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2595)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2612)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:169)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:354)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
at org.apache.hadoop.hive.metastore.Warehouse.getFs(Warehouse.java:112)
at org.apache.hadoop.hive.metastore.Warehouse.getDnsPath(Warehouse.java:144)
at org.apache.hadoop.hive.metastore.Warehouse.getWhRoot(Warehouse.java:159)

And what confused me is that GoogleHadoopFileSystem should be a subclass of org.apache.hadoop.fs.FileSystem, and I even verified in the same spark-shell instance:

scala> var gfs = new com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem()
gfs: com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem = com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem@46f105c

scala> gfs.isInstanceOf[org.apache.hadoop.fs.FileSystem]
res3: Boolean = true

scala> gfs.asInstanceOf[org.apache.hadoop.fs.FileSystem]
res4: org.apache.hadoop.fs.FileSystem = com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem@46f105c

Did I miss anything, any workaround? Thanks in advance!

UPDATE: this is my bdutil (version 1.3.1) setting for deployment:

import_env hadoop2_env.sh
import_env extensions/spark/spark_env.sh
CONFIGBUCKET="my_conf_bucket"
PROJECT="my_proj"
GCE_IMAGE='debian-7-backports'
GCE_MACHINE_TYPE='n1-highmem-4'
GCE_ZONE='us-central1-f'
GCE_NETWORK='my-network'
GCE_MASTER_MACHINE_TYPE='n1-standard-2'
PREEMPTIBLE_FRACTION=1.0
PREFIX='my-hadoop'
NUM_WORKERS=8
USE_ATTACHED_PDS=true
WORKER_ATTACHED_PDS_SIZE_GB=200
MASTER_ATTACHED_PD_SIZE_GB=200
HADOOP_TARBALL_URI="gs://hadoop-dist/hadoop-2.6.0.tar.gz"
SPARK_MODE="yarn-client"
SPARK_HADOOP2_TARBALL_URI='gs://my_conf_bucket/my-images/spark-1.4.1-bin-hadoop2.6.tgz'
Community
  • 1
  • 1
Haiying Wang
  • 652
  • 7
  • 10
  • In my tests it seems I can at least load a Parquet file just fine, and even call "take(10)" on the resulting dataframe. Are you really seeing the exception immediately when calling sqlContext.parquetFile("...")? Or is it during some particular transform operation on the dataframe? – Dennis Huo Jul 17 '15 at 17:18
  • Thanks for the reply, Dennis. Only thing I did was sqlContext.parquetFile("...") and it failed with the error; I also redeployed still the same error. Any suggestion how to investigate what could cause this? I have added my detail setting for bdutil. Thanks again! – Haiying Wang Jul 17 '15 at 20:03
  • Which bdutil version are you using? – Dennis Huo Jul 17 '15 at 20:19
  • bdutil version is 1.3.1 – Haiying Wang Jul 17 '15 at 21:28
  • I tried with a few different bdutil versions and different Spark versions and so far I've always been able to load my parquet files. Any chance you have some reproducible sample of both the scala commands being run, invocation flags, etc., you could share? You could also share details with gcp-hadoop-contact@google.com to discuss with Google engineers more directly if it's easier – Dennis Huo Jul 17 '15 at 23:01
  • I have also run into this issue. I am sure it has something to do with org.apache.spark.sql.hive.client.IsolatedClientLoader . Is there any discussion happening on some mailinglist? – Prikso NAI Jul 20 '15 at 15:14
  • Dennis, I just sent email with details to gcp-hadoop-contact@google.com. – Haiying Wang Jul 20 '15 at 15:33
  • Prikso, thanks for the info. Not sure gcp email list could be accessible. Hopefully new update will be posted here when available. – Haiying Wang Jul 20 '15 at 15:37
  • I should also mention that (at least in my case) this happens only when creating a Hive SQL context. With spark's SQL context this error is not reproduced. Maybe this information helps with reproducing. – Prikso NAI Jul 20 '15 at 15:37
  • Thanks, both of you! We received the email to gcp-hadoop-contact, and Prisko's additional info about it being only the Hive SQL context indeed helps; I had been habitually creating a new Spark SQL context each time when testing. Hopefully we'll have an update shortly. – Dennis Huo Jul 20 '15 at 20:05
  • Sorry for the delay, took awhile to build a few different flavors of Spark from source for testing, in any case, hopefully the fix will make it into the next Spark version and my answer lists a few short-term workaround you can use. – Dennis Huo Jul 21 '15 at 00:17
  • Thanks for the quick response, Dennis! – Haiying Wang Jul 21 '15 at 04:29

1 Answers1

2

Short Answer

Indeed it was related to IsolatedClientLoader, and we've tracked down the root cause and verified a fix. I filed https://issues.apache.org/jira/browse/SPARK-9206 to track this issue, and successfully built a clean Spark tarball from my fork with a simple fix: https://github.com/apache/spark/pull/7549

There are a few short-term options:

  1. Use Spark 1.3.1 for now.
  2. In your bdutil deployment, use HDFS as the default filesystem (--default_fs=hdfs); you'll still be able to directly specify gs:// paths in your jobs, just that HDFS will be used for intermediate data and staging files. There are some minor incompatibilities using raw Hive in this mode, though.
  3. Use the raw val sqlContext = new org.apache.spark.sql.SQLContext(sc) instead of a HiveContext if you don't need HiveContext features.
  4. git clone https://github.com/dennishuo/spark and run ./make-distribution.sh --name my-custom-spark --tgz --skip-java-test -Pyarn -Phadoop-2.6 -Dhadoop.version=2.6.0 -Phive -Phive-thriftserver to get a fresh tarball you can specify in your bdutil's spark_env.sh.

Long Answer

We've verified that it only manifests when fs.default.name and fs.defaultFS are set to a gs:// path regardless of whether trying to load a path from parquetFile("gs://...") or parquetFile("hdfs://..."), and when fs.default.name and fs.defaultFS are set to an HDFS path, loading data from both HDFS and from GCS works fine. This is also specific to Spark 1.4+ currently, and is not present in Spark 1.3.1 or older.

The regression appears to have been introduced in https://github.com/apache/spark/commit/9ac8393663d759860c67799e000ec072ced76493 which actually fixes a prior related classloading issue, SPARK-8368. While the fix itself is correct for normal cases, there's a method IsolatedClientLoader.isSharedClass used to determine which classloader to use, and interacts with the aforementioned commit to break GoogleHadoopFileSystem classloading.

The following lines in that file include everything under com.google.* as a "shared class" because of Guava and possibly protobuf dependencies which are indeed loaded as shared libraries, but unfortunately GoogleHadoopFileSystem should be loaded as a "hive class" in this case, just like org.apache.hadoop.hdfs.DistributedFileSystem. We just happen to unluckily share the com.google.* package namespace.

protected def isSharedClass(name: String): Boolean =
  name.contains("slf4j") ||
  name.contains("log4j") ||
  name.startsWith("org.apache.spark.") ||
  name.startsWith("scala.") ||
  name.startsWith("com.google") ||
  name.startsWith("java.lang.") ||
  name.startsWith("java.net") ||
  sharedPrefixes.exists(name.startsWith)

...

/** The classloader that is used to load an isolated version of Hive. */
protected val classLoader: ClassLoader = new URLClassLoader(allJars, rootClassLoader) {
  override def loadClass(name: String, resolve: Boolean): Class[_] = {
    val loaded = findLoadedClass(name)
    if (loaded == null) doLoadClass(name, resolve) else loaded
  }

  def doLoadClass(name: String, resolve: Boolean): Class[_] = {
    ...
    } else if (!isSharedClass(name)) {
      logDebug(s"hive class: $name - ${getResource(classToPath(name))}")
      super.loadClass(name, resolve)
    } else {
      // For shared classes, we delegate to baseClassLoader.
      logDebug(s"shared class: $name")
      baseClassLoader.loadClass(name)
    }
  }
}

This can be verified by adding the following line to ${SPARK_INSTALL}/conf/log4j.properties:

log4j.logger.org.apache.spark.sql.hive.client=DEBUG

And the output shows:

...
15/07/20 20:59:14 DEBUG IsolatedClientLoader: hive class: org.apache.hadoop.hdfs.DistributedFileSystem - jar:file:/home/hadoop/spark-install/lib/spark-assembly-1.4.1-hadoop2.6.0.jar!/org/apache/hadoop/hdfs/DistributedFileSystem.class
...
15/07/20 20:59:14 DEBUG IsolatedClientLoader: shared class: com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem
java.lang.RuntimeException: java.lang.ClassCastException: com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem cannot be cast to org.apache.hadoop.fs.FileSystem
Dennis Huo
  • 10,517
  • 27
  • 43