What am I doing wrong? It gives an error at startup:

textfile.saveToCassandra("logs", "logstable")

All code:

 object App extends java.io.Serializable { final val APP_NAME = "SparkBatchTest" def main (args: Array[String]) { val conf = new SparkConf().setAppName(APP_NAME).setMaster("local[2]") .set("spark.cassandra.connection.host", "localhost") .set("spark.cassandra.connection.native.port", "9042") val sc = new SparkContext(conf) val textfile = sc.textFile("hdfs:////tmp/kafka/test/15-12-10/FlumeDat*") textfile.foreach(record => seperateFields(record)) textfile.saveToCassandra("logs", "logstable") //здесь ругается } def seperateFields(line: String): Tuple5[Object, Object, Object, Object, Object] = { println("Waiting...") Thread sleep 3000 println("Saving...") val split = line.split(" ").toArray[Object] println(line) return (split(0) + " " + split(1), if (line contains "Down") "0" else "1", split(5), split(6), split(4)) } } 

Before formatting the contents of the file are as follows:

 2015-12-10 12:04:48.299 AMP (amp-management-5-sa)[6953]: Aruba RAP-109 a63253686jypmO68380 Down System Device ID: 2490 Top > mariscos puerto vallarta 2-Standard 

The table in Cassandra looks like this:

 datetime | location | logid | status | systemid ---------------------+----------+---------+---------------------+---------- 2015-12-23 15:10:01 | 1 | RAP-109 | a73225704jypmO54359 | Aruba 2015-12-23 15:55:54 | 0 | RAP-109 | a62710684jypmO66318 | Aruba 2015-12-23 15:10:02 | 1 | RAP-109 | a2222705jypmO69412 | Aruba 2015-12-23 15:09:45 | 1 | RAP-109 | a80296226jypmO21003 | Aruba 2015-12-23 15:25:29 | 1 | RAP-109 | a11170884jypmO9634 | Aruba 2015-12-23 15:55:53 | 1 | RAP-109 | a18255961jypmO91299 | Aruba 2015-12-23 16:17:27 | 1 | RAP-109 | a41956492jypmO85560 | Aruba 

    1 answer 1

    Resolved, rewrote the following lines:

     val textfile = sc.textFile("hdfs:////tmp/kafka/test/15-12-10/FlumeDat*") val res = sc.parallelize(Seq(seperateFields(textfile.first()))) res.saveToCassandra("logs", "logstable", SomeColumns("datetime","location","logid","status","systemid")) 

    and here a little bit more:

     def seperateFields(line: String): Tuple5[String, String, String, String, String] = {... 

    and it worked!