Tom's Blog

From tech to non-tech

PostgreSQL JSON to Spark RDD

| Comments

I had the privilege of trying to get one of my Zeppelin Notebooks to work in a different Zeppelin installation. The data model was very similar but not the same.

My previous notebook expected the data to be as fields in rows, fairly simple. It was read in with sqlContext.read.json(path) and the schema was infered.

The new format was rows of PostgreSQL jsonb. Basically org.apache.spark.sql.DataFrame = [id: bigint, data: string]. It is possible to query this also. You can do queries like select data->>field from table BUT I would have had to rewrite my paragraphs.

After an extensive googling and asking around I did find a solution how to convert the schema to the required one into a temp table. Not sure if this is the best solution but good enough to get my analysis run and getting results. Hopefully helps others out there!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
val df = sqlContext.read.format("jdbc").options(
    Map("url" -> "jdbc:postgresql://localhost:5432/my_db",
    "driver" -> "org.postgresql.Driver",
    "user" -> "user", "password" -> "password",
    "dbtable" -> "my_db" )).load()

// give me just the data and ignore the ids
val jsons = df.map(t => t.get(1))

// convert RDD[Any] to RDD[String] as jsonRDD would be confused otherwise
val jsonsStrings = jsons.map(x => x.toString)

// load the jsons, infer the schema
val jsonSchemaRDD = sqlContext.jsonRDD(jsonsStrings)

// put into a table in memory
jsonSchemaRDD.registerTempTable("my_table")

Comments