4

Wanne read a json file with indentation into RDD, but spark throws an exception then.

# txts = sc.textFile('data/jsons_without_indentation') # works
txts = sc.textFile('data/jsons_with_indentation')      # fails
txts_dicts = txts.map(lambda data: json.loads(data))
txts_dicts.collect()

sc.wholeTextFiles does not work either. Is it possible to load a json with indentation without transforming it into a file without first?

Example json file looks like this:

{
    "data": {
        "text": {
            "de": "Ein Text.",
            "en": "A text."
        }
    }
}
rebeling
  • 718
  • 9
  • 31
  • 1
    so the file is composed of multiple `json`'s and each one of them is in multiple lines? – Udy Nov 08 '15 at 23:50
  • No, it is a folder of json files and the json inside of each file has an indentation level of 4. Spark do not like it, neither in one nor multiple files. One json per line - jsonl works out of the box - no question. – rebeling Nov 09 '15 at 00:00
  • 1
    can you add an example of a file/line? – Udy Nov 09 '15 at 00:07
  • 4
    Which exception is thrown? – mrhn Nov 09 '15 at 00:10

1 Answers1

7

If this is just a single JSON document per file all you need is SparkContext.wholeTextFiles. First lets create some dummy data:

import tempfile
import json 

input_dir = tempfile.mkdtemp()

docs = [
    {'data': {'text': {'de': 'Ein Text.', 'en': 'A text.'}}},
    {'data': {'text': {'de': 'Ein Bahnhof.', 'en': 'A railway station.'}}},
    {'data': {'text': {'de': 'Ein Hund.', 'en': 'A dog.'}}}]

for doc in docs:
    with open(tempfile.mktemp(suffix="json", dir=input_dir), "w") as fw:
        json.dump(doc, fw, indent=4)

Now lets read data:

rdd = sc.wholeTextFiles(input_dir).values()

and make sure that files are indeed indented:

print rdd.top(1)[0]

## {
##     "data": {
##         "text": {
##             "de": "Ein Text.", 
##             "en": "A text."
##         }
##     }
## }

Finally we can parse:

parsed = rdd.map(json.loads)

and check if everything worked as expected:

parsed.takeOrdered(3)

## [{u'data': {u'text': {u'de': u'Ein Bahnhof.', u'en': u'A railway station.'}}},
##  {u'data': {u'text': {u'de': u'Ein Hund.', u'en': u'A dog.'}}},
##  {u'data': {u'text': {u'de': u'Ein Text.', u'en': u'A text.'}}}]

If you still experience some problems it is most likely due to some malformed entries. The simplest thing you can do is to discard malformed entries using flatMap with custom wrapper:

rdd_malformed = sc.parallelize(["{u'data': {u'text': {u'de':"]).union(rdd)

## org.apache.spark.api.python.PythonException: Traceback (most recent call ...
##     ...
## ValueError: Expecting property name: line 1 column 2 (char 1)

and wrapped using try_seq (defined here: What is the equivalent to scala.util.Try in pyspark?)

rdd_malformed.flatMap(lambda x: seq_try(json.loads, x)).collect()

## [{u'data': {u'text': {u'de': u'Ein Hund.', u'en': u'A dog.'}}},
##  {u'data': {u'text': {u'de': u'Ein Text.', u'en': u'A text.'}}},
##  {u'data': {u'text': {u'de': u'Ein Bahnhof.', u'en': u'A railway station.'}}}]
Community
  • 1
  • 1
zero323
  • 322,348
  • 103
  • 959
  • 935