4

I am trying to extract twitter data using rest API in zeppelin. Tried both option registerAsTable and registerTempTable, both ways are not working. Please help me to resolve the error. Getting below error while executing zeppelin Tutorial Code:

error: value registerAsTable is not a member of org.apache.spark.rdd.RDD[Tweet] ).foreachRDD(rdd=> rdd.registerAsTable("tweets")

Hamad
  • 5,096
  • 13
  • 37
  • 65
Sonal
  • 41
  • 2

3 Answers3

0

RDD cannot be registered as Table whereas dataframe can. You can convert your RDD into dataframe and then write the resulting dataframe as tempTable or table.

You can convert RDD into Dataframe as below

val sqlContext = new SQLContext(sc) 
import sqlContext.implicits._
rdd.toDF()

Refer How to convert rdd object to dataframe in spark and http://spark.apache.org/docs/latest/sql-programming-guide.html

Community
  • 1
  • 1
sag
  • 5,333
  • 8
  • 54
  • 91
0

in zepplin interpretors add external dependency of org.apache.bahir:spark-streaming-twitter_2.11:2.0.0 from GUI and after that run following using spark-2.0.1

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.{ SparkConf, SparkContext}
import org.apache.spark.storage.StorageLevel
import scala.io.Source
//import org.apache.spark.Logging
import java.io.File
import org.apache.log4j.Logger
import org.apache.log4j.Level
import sys.process.stringSeqToProcess

import scala.collection.mutable.HashMap
/** Configures the Oauth Credentials for accessing Twitter */
def configureTwitterCredentials(apiKey: String, apiSecret: String, accessToken: String, accessTokenSecret: String) {
  val configs = new HashMap[String, String] ++= Seq(
    "apiKey" -> apiKey, "apiSecret" -> apiSecret, "accessToken" -> accessToken, "accessTokenSecret" -> accessTokenSecret)
  println("Configuring Twitter OAuth")
  configs.foreach{ case(key, value) =>
    if (value.trim.isEmpty) {
      throw new Exception("Error setting authentication - value for " + key + " not set")
    }
    val fullKey = "twitter4j.oauth." + key.replace("api", "consumer")
    System.setProperty(fullKey, value.trim)
    println("\tProperty " + fullKey + " set as [" + value.trim + "]")
  }
  println()
}


// Configure Twitter credentials , following config values will not work,it is for show off
val apiKey = "7AVLnhssAqumpgY6JtMa59w6Tr"
val apiSecret = "kRLstZgz0BYazK6nqfMkPvtJas7LEqF6IlCp9YB1m3pIvvxrRZl"
val accessToken = "79438845v6038203392-CH8jDX7iUSj9xmQRLpHqLzgvlLHLSdQ"
val accessTokenSecret = "OXUpYu5YZrlHnjSacnGJMFkgiZgi4KwZsMzTwA0ALui365"
configureTwitterCredentials(apiKey, apiSecret, accessToken, accessTokenSecret)

import org.apache.spark.{ SparkConf, SparkContext}
import org.apache.spark.streaming._
import org.apache.spark.streaming.twitter._
import org.apache.spark.SparkContext._

val ssc = new StreamingContext(sc, Seconds(2))

val tweets = TwitterUtils.createStream(ssc, None)
val twt = tweets.window(Seconds(10))

//twt.print


val sqlContext= new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._

case class Tweet(createdAt:Long, text:String)

val tweet = twt.map(status=>
  Tweet(status.getCreatedAt().getTime()/1000, status.getText())
)


tweet.foreachRDD(rdd=>rdd.toDF.registerTempTable("tweets"))
ssc.start()
//ssc.stop()

After that run some queries in the table in another zappelin cell

%sql select createdAt, text  from tweets   limit 50
donald
  • 478
  • 8
  • 19
0
val data = sc.textFile("/FileStore/tables/uy43p2971496606385819/testweet.json");

//convert RDD to DF

val inputs= data.toDF();
inputs.createOrReplaceTempView("tweets");
vaquar khan
  • 10,864
  • 5
  • 72
  • 96
  • Thank you for this code snippet, which may provide some immediate help. A proper explanation [would greatly improve](//meta.stackexchange.com/q/114762) its educational value by showing *why* this is a good solution to the problem, and would make it more useful to future readers with similar, but not identical, questions. Please [edit] your answer to add explanation, and give an indication of what limitations and assumptions apply. – Toby Speight Jun 05 '17 at 11:58