编写代码
- package com.itunic.sql
- import java.util.Properties
- import org.apache.spark.sql.{Row, SQLContext}
- import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
- import org.apache.spark.{SparkConf, SparkContext}
- /**
- * Created by c on 2017/1/3.
- * Spark SQL
- * 将数据写入到MySQL中
- * by me:
- * 我本沉默是关注互联网以及分享IT相关工作经验的博客,
- * 主要涵盖了操作系统运维、计算机编程、项目开发以及系统架构等经验。
- * 博客宗旨:把最实用的经验,分享给最需要的你,
- * 希望每一位来访的朋友都能有所收获!
- */
- object JdbcRDD {
- def main(args: Array[String]) {
- val conf = new SparkConf().setAppName("JdbcRDD").setMaster("local")
- val sc = new SparkContext(conf)
- val sqlContext = new SQLContext(sc)
- //通过并行化创建RDD
- val personRDD = sc.parallelize(Array("1 tom 5", "2 jerry 3", "3 kitty 6")).map(_.split(" "))
- //通过StructType直接指定每个字段的schema
- val schema = StructType(
- List(
- StructField("id", IntegerType, true),
- StructField("name", StringType, true),
- StructField("age", IntegerType, true)
- )
- )
- //将RDD映射到rowRDD
- val rowRDD = personRDD.map(p => Row(p(0).toInt, p(1).trim, p(2).toInt))
- //将schema信息应用到rowRDD上
- val personDataFrame = sqlContext.createDataFrame(rowRDD, schema)
- //创建Properties存储数据库相关属性
- val prop = new Properties()
- prop.put("user", "root")
- prop.put("password", "root")
- //将数据追加到数据库
- personDataFrame.write.mode("append").jdbc("jdbc:mysql://192.168.155.1:3306/test", "test.t_person", prop)
- //停止SparkContext
- sc.stop()
- }
- }
打包并发布到spark集群
- $SPARK_HOME/bin/spark-submit \
- --class com.itunic.sql.JdbcRDD \
- --master spark://ResourceManagerServer1:7077 \
- --jars /usr/local/spark-1.5.2-bin-hadoop2.6/mysql-connector-java-5.1.35-bin.jar \
- --driver-class-path /usr/local/spark-1.5.2-bin-hadoop2.6/mysql-connector-java-5.1.35-bin.jar \
- /root/spark-mvn-1.0-SNAPSHOT.jar