以编程方式执行Spark SQL查询的两种实现方式

摘 要

在自定义的程序中编写Spark SQL查询程序

1.通过反射推断Schema

  1. package com.itunic.sql
  2. import org.apache.spark.sql.SQLContext
  3. import org.apache.spark.{SparkConf, SparkContext}
  4. /**
  5.   * Created by itunic.com on 2017/1/2.
  6.   * Spark SQL
  7.   * 通过反射推断Schema
  8.   * by me:
  9.   * 我本沉默是关注互联网以及分享IT相关工作经验的博客,
  10.   * 主要涵盖了操作系统运维、计算机编程、项目开发以及系统架构等经验。
  11.   * 博客宗旨:把最实用的经验,分享给最需要的你,
  12.   * 希望每一位来访的朋友都能有所收获!
  13.   *
  14.   */
  15. object InferringSchema {
  16.   def main(args: Array[String]): Unit = {
  17.     //创建SparkConf()并设置App名称
  18.     val conf = new SparkConf().setAppName("InferringSchema").setMaster("local")
  19.     //SQLContext要依赖SparkContext
  20.     val sc = new SparkContext(conf)
  21.     //创建SQLContext
  22.     val sqlContext = new SQLContext(sc)
  23.     //从指定的地址创建RDD
  24.     val lineRdd = sc.textFile("F:\\test\\input\\wc.txt").map(f => {
  25.       val fields = f.split("\t")
  26.       //将RDD和case class关联
  27.       Person(fields(0).toLong, fields(1), fields(2).toInt)
  28.     })
  29.     //导入隐式转换,如果不导入无法将RDD转换成DataFrame
  30.     //将RDD转换成DataFrame
  31.     import sqlContext.implicits._
  32.     val personDF = lineRdd.toDF
  33.     //注册表
  34.     personDF.registerTempTable("t_person")
  35.     //传入SQL
  36.     val df = sqlContext.sql("select * from t_person order by age desc limit 2")
  37.     //显示
  38.     df.show()
  39.     //以json方式写入hdfs
  40.     //df.write.json("hdfs://ns1:9000/wc")
  41.     sc.stop()
  42.   }
  43. }
  44. //定义样例类
  45. case class Person(id: Long, userName: String, age: Int)

2.通过StructType直接指定Schema

  1. package com.itunic.sql
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. import org.apache.spark.sql.{Row, SQLContext}
  4. import org.apache.spark.sql.types._
  5. /**
  6.   * Created by itunic.com on 2017/1/2.
  7.   *  Spark SQL
  8.   * 通过StructType直接指定Schema
  9.   * by me:
  10.   * 我本沉默是关注互联网以及分享IT相关工作经验的博客,
  11.   * 主要涵盖了操作系统运维、计算机编程、项目开发以及系统架构等经验。
  12.   * 博客宗旨:把最实用的经验,分享给最需要的你,
  13.   * 希望每一位来访的朋友都能有所收获!
  14.   *
  15.   */
  16. object SpecifyingSchema {
  17.   def main(args: Array[String]): Unit = {
  18.     //创建SparkConf()并设置App名称
  19.     val conf = new SparkConf().setAppName("SpecifyingSchema").setMaster("local")
  20.     //SQLContext要依赖SparkContext
  21.     val sc = new SparkContext(conf)
  22.     //创建SQLContext
  23.     val sqlContext = new SQLContext(sc)
  24.     //通过StructType直接指定每个字段的schema
  25.     val schema = StructType(
  26.       List(
  27.         StructField("id", LongType, true),
  28.         StructField("name", StringType, true),
  29.         StructField("age", IntegerType, true)
  30.       )
  31.     )
  32.     val lineRdd = sc.textFile("F:\\test\\input\\wc.txt").map(f => {
  33.       val fields = f.split("\t")
  34.       Row(fields(0).toLong, fields(1), fields(2).toInt)
  35.     })
  36.     //将schema信息应用到lineRdd上
  37.     val personDF = sqlContext.createDataFrame(lineRdd, schema)
  38.     personDF.registerTempTable("t_person")
  39.     //传入SQL
  40.     val df = sqlContext.sql("select * from t_person order by age desc limit 2")
  41.     //显示
  42.     df.show()
  43.     //以json方式写入hdfs
  44.     //df.write.json("hdfs://ns1:9000/wc")
  45.     sc.stop()
  46.   }
  47. }

 

  • 以编程方式执行Spark SQL查询的两种实现方式已关闭评论
  • 25 views
  • A+
所属分类:未分类
avatar