头歌-Spark SQL 多数据源操作(Scala)

06-02 1595阅读

第1关:加载与保存操作

编程要求

打开右侧代码文件窗口,在 Begin 至 End 区域补充代码,完善程序。读取本地文件 file:///data/bigfiles/demo.json,根据年龄字段 age 设置降序,输出结果。

demo.json 文件内容如下所示:

{"name": "zhangsan", "age": 20, "sex": "m"},

{"name": "lisi", "age": 21, "sex": "m"},

{"name": "tiantian", "age": 22, "sex": "f"},

{"name": "lihua", "age": 23, "sex": "f"},

{"name": "zhaoliu", "age": 24, "sex": "m"},

{"name": "liguanqing", "age": 25, "sex": "f"},

{"name": "zhangqi", "age": 26, "sex": "m"},

{"name": "zhaoai", "age": 27, "sex": "m"},

{"name": "wangjiu", "age": 28, "sex": "f"}

开始任务前,注意先启动 Hadoop 与 Hive 环境:start-all.sh、nohup hive --service metastore &

import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
object First_Question {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession
      .builder()
      .appName("First_Question")
      .master("local[*]")
      .getOrCreate()
    /******************* Begin *******************/
    val df: DataFrame = spark.read.json("file:///data/bigfiles/demo.json")
    val sortedDf = df.orderBy(df("age").desc)
    sortedDf.show()
    /******************* End *******************/
    spark.stop()
  }
}

第2关:Parquet 格式文件

编程要求

打开右侧代码文件窗口,在 Begin 至 End 区域补充代码,根据下列要求,完善程序。

读取本地文件 file:///data/bigfiles/demo.json,使用 Parquet 完成分区,列名为 student=1,保存到本地路径file:///result/下。

读取本地文件 file:///data/bigfiles/demo2.json,使用 Parquet 完成分区,列名为 student=2,保存到本地路径file:///result/下。

demo.json 文件内容如下所示:

{"name": "zhangsan", "age": 20, "sex": "m"},

{"name": "lisi", "age": 21, "sex": "m"},

{"name": "tiantian", "age": 22, "sex": "f"},

{"name": "lihua", "age": 23, "sex": "f"},

{"name": "zhaoliu", "age": 24, "sex": "m"},

{"name": "liguanqing", "age": 25, "sex": "f"},

{"name": "zhangqi", "age": 26, "sex": "m"},

{"name": "zhaoai", "age": 27, "sex": "m"},

{"name": "wangjiu", "age": 28, "sex": "f"}

demo2.json 文件内容如下所示:

{"name": "hongkong", "age": 20, "sex": "m"},

{"name": "kulu", "age": 21, "sex": "m"},

{"name": "huxiaotian", "age": 22, "sex": "f"},

{"name": "yueming", "age": 23, "sex": "f"},

{"name": "wangsan", "age": 24, "sex": "m"},

{"name": "zhaojiu", "age": 25, "sex": "f"},

{"name": "wangqiqi", "age": 26, "sex": "m"},

{"name": "wangxiantian", "age": 27, "sex": "m"},

{"name": "zhaoba", "age": 28, "sex": "f"}

开始任务前,注意先启动 Hadoop 与 Hive 环境:start-all.sh、nohup hive --service metastore &

import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
object Second_Question {
  def main(args: Array[String]): Unit = {
    
    val spark: SparkSession = SparkSession
      .builder()
      .appName("Second_Question")
      .master("local[*]")
      .getOrCreate()
 
    /******************* Begin *******************/  
    spark.read.json("file:///data/bigfiles/demo.json").write.parquet("file:///result/student=1") spark.read.json("file:///data/bigfiles/demo2.json").write.parquet("file:///result/student=2")
    /******************* End *******************/
    spark.stop()
 
  }
}

第3关:ORC 格式文件

编程要求

根据下列要求,完善程序。

创建 Orc 格式的 Hive 数据表 student,添加字段id(int),name(string),age(int),class(string)。

按顺序插入如下数据:

1001,"王刚",19,"大数据一班"

1002,"李虹",18,"大数据一班"

1003,"张子萱",20,"大数据一班"

1004,"赵云",18,"大数据一班"

1005,"李晓玲",19,"大数据一班"

1006,"张惠",18,"大数据二班"

1007,"秦散",19,"大数据二班"

1008,"王丽",18,"大数据二班"

1009,"田忌",20,"大数据二班"

1010,"张花",18,"大数据二班"

打开右侧代码文件窗口,在 Begin 至 End 区域补充代码,编写 spark sql 程序,读取创建的 student 表并按字段 id 升序输出。

开始任务前,注意先启动 Hadoop 与 Hive 环境:start-all.sh、nohup hive --service metastore &

import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
object Third_Question {
  def main(args: Array[String]): Unit = {
    // 创建 SparkSession,并启用 Hive 支持
    val spark: SparkSession = SparkSession
      .builder()
      .appName("Third_Question")
      .master("local[*]")
      .enableHiveSupport()  // 启用 Hive 支持
      .getOrCreate()
    /******************* Begin *******************/
    // 创建 Hive 数据表 student
    spark.sql(
      """
        |CREATE TABLE IF NOT EXISTS student (
        |  id INT,
        |  name STRING,
        |  age INT,
        |  class STRING
        |)
        |STORED AS ORC
      """.stripMargin)
    // 插入数据到 student 表
    spark.sql(
      """
        |INSERT INTO student VALUES
        |(1001, '王刚', 19, '大数据一班'),
        |(1002, '李虹', 18, '大数据一班'),
        |(1003, '张子萱', 20, '大数据一班'),
        |(1004, '赵云', 18, '大数据一班'),
        |(1005, '李晓玲', 19, '大数据一班'),
        |(1006, '张惠', 18, '大数据二班'),
        |(1007, '秦散', 19, '大数据二班'),
        |(1008, '王丽', 18, '大数据二班'),
        |(1009, '田忌', 20, '大数据二班'),
        |(1010, '张花', 18, '大数据二班')
      """.stripMargin)
    // 查询并按 id 字段升序输出 student 表数据
    val studentDF: DataFrame = spark.sql("SELECT * FROM student ORDER BY id ASC")
    studentDF.show()
    /******************* End *******************/
    spark.stop()
  }
}

第4关:JSON 格式文件

编程要求

打开右侧代码文件窗口,在 Begin 至 End 区域补充代码,完善程序。读取本地文件 file:///data/bigfiles/test.json,不改变原数据排列顺序进行输出。

test.json 文件内容如下所示:

{"id":1001,"name":"王刚","age":19,"class":"大数据一班"},

{"id":1002,"name":"李虹","age":18,"class":"大数据一班"},

{"id":1003,"name":"张子萱","age":20,"class":"大数据一班"},

{"id":1004,"name":"赵云","age":18,"class":"大数据一班"},

{"id":1005,"name":"李晓玲","age":19,"class":"大数据一班"},

{"id":1006,"name":"张惠","age":18,"class":"大数据二班"},

{"id":1007,"name":"秦散","age":19,"class":"大数据二班"},

{"id":1008,"name":"王丽","age":18,"class":"大数据二班"},

{"id":1009,"name":"田忌","age":20,"class":"大数据二班"},

{"id":1010,"name":"张花","age":18,"class":"大数据二班"}

开始任务前,注意先启动 Hadoop 与 Hive 环境:start-all.sh、nohup hive --service metastore &

import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
object Forth_Question {
  def main(args: Array[String]): Unit = {
    
    val spark: SparkSession = SparkSession
      .builder()
      .appName("Forth_Question")
      .master("local[*]")
      .getOrCreate()
    /******************* Begin *******************/  
    // 读取 JSON 文件
    val df = spark.read.json("file:///data/bigfiles/test.json")
    df.select("id", "name", "age", "class").show()
    /******************* End *******************/
    spark.stop()
  }
}

第5关:JDBC 操作数据库

编程要求

打开右侧代码文件窗口,在 Begin 至 End 区域补充代码,完善程序。读取本地 csv 文件 file:///data/bigfiles/job58_data.csv(有表头),将加载的数据以覆盖的方式保存到本地 Mysql 数据库的 work.job_data 表中,数据库连接信息如下:

账号:root

密码:123123

端口:3306

注意设置 useSSL=false。

开始任务前,注意先启动 Hadoop 与 Hive 环境:start-all.sh、nohup hive --service metastore &

import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
object Fifth_Question {
  def main(args: Array[String]): Unit = {
    
    val spark: SparkSession = SparkSession
      .builder()
      .appName("Fifth_Question")
      .master("local[*]")
      .getOrCreate()
    /******************* Begin *******************/  
    // 1. 读取本地 CSV 文件
    val df: DataFrame = spark.read
      .option("header", "true") 
      .csv("file:///data/bigfiles/job58_data.csv")
    // 2. MySQL 连接信息
    val jdbcUrl = "jdbc:mysql://localhost:3306/work?useSSL=false"
    val dbProperties = new java.util.Properties()
    dbProperties.setProperty("user", "root")
    dbProperties.setProperty("password", "123123")
  
    // 3. 将数据写入 MySQL 数据库的 work.job_data 表中,覆盖原有数据
    df.write
      .mode(SaveMode.Overwrite) 
      .jdbc(jdbcUrl, "job_data", dbProperties)
    /******************* End *******************/
    spark.stop()
  }
}

第6关:Hive 表操作

编程要求

打开右侧代码文件窗口,在 Begin 至 End 区域补充代码,根据下列要求,完善程序。

在 Hive 中创建数据表 employee,添加字段eid(string),ename(string),age(int),part(string)。

插入如下数据:

"A568952","王晓",25,"财务部"

"B256412","张天",28,"人事部"

"C125754","田笑笑",23,"销售部"

"D265412","赵云",24,"研发部"

"F256875","李姿姿",26,"后勤部"

编写 spark sql 程序,直接采用 Spark on Hive 的方式读取创建的 employee 表并按字段 eid 升序输出。

开始任务前,注意先启动 Hadoop 与 Hive 环境:start-all.sh、nohup hive --service metastore &

import org.apache.spark.sql.{DataFrame, SparkSession}
object Sixth_Question {
  def main(args: Array[String]): Unit = {
    /******************* Begin *******************/  
    try {
      // 创建SparkSession,启用Hive支持
      val spark = SparkSession.builder()
        .appName("ReadHiveTable")
        .enableHiveSupport()
        .getOrCreate()
        
      import spark.implicits._
      import spark.sql
      // 如果表不存在则创建
      if (!spark.catalog.tableExists("employee")) {
        sql("""
          CREATE TABLE employee (
            eid STRING, 
            ename STRING, 
            age INT, 
            part STRING
          )
        """)
        
        // 准备数据
        val employeeData = Seq(
          ("A568952","王晓",25,"财务部"),
          ("B256412","张天",28,"人事部"),
          ("C125754","田笑笑",23,"销售部"),
          ("D265412","赵云",24,"研发部"),
          ("F256875","李姿姿",26,"后勤部")
        )
        
        // 转换为DataFrame并写入Hive表
        employeeData.toDF("eid", "ename", "age", "part")
          .write.mode("append").saveAsTable("employee")
      }
      // 读取Hive表并按eid升序排序
      val employeeDF: DataFrame = sql("SELECT * FROM employee ORDER BY eid ASC")
      
      // 显示结果
      println("")
      employeeDF.show()
      
      // 停止SparkSession
      spark.stop()
    } catch {
      case e: Exception =>
        println(s"程序执行出错: ${e.getMessage}")
        e.printStackTrace()
    }
    /******************* End *******************/
  }
}
免责声明:我们致力于保护作者版权,注重分享,被刊用文章因无法核实真实出处,未能及时与作者取得联系,或有版权异议的,请联系管理员,我们会立即处理! 部分文章是来自自研大数据AI进行生成,内容摘自(百度百科,百度知道,头条百科,中国民法典,刑法,牛津词典,新华词典,汉语词典,国家院校,科普平台)等数据,内容仅供学习参考,不准确地方联系删除处理! 图片声明:本站部分配图来自人工智能系统AI生成,觅知网授权图片,PxHere摄影无版权图库和百度,360,搜狗等多加搜索引擎自动关键词搜索配图,如有侵权的图片,请第一时间联系我们。

相关阅读

目录[+]

取消
微信二维码
微信二维码
支付宝二维码