spark sql使用--创建SparkDataFrame

发布时间 2023-10-17 16:12:55作者: whiteY

Spark SQL模块

这个模块是Spark中用来处理结构化数据的,提供一个叫SparkDataFrame的东西并且自动解析为分布式SQL查询数据。

在RDD阶段,程序的执行入口对象是: SparkContext

在Spark 2.0后,推出了SparkSession对象,作为Spark编码的统一入口对象

SparkSession对象的作用:

  • 用于SparkSQL编程作为入口对象

  • 用于SparkCore编程,可以通过SparkSession对象中获取到SparkContext

DataFrame

DataFrame和RDD的对比
  • 相同点:DataFrame和RDD都是:弹性的、分布式的、数据集

  • 不同点:DataFrame存储的数据结构限定为:二维表结构化数据;而RDD可以存储的数据则没有任何限制

也就是说,DataFrame 是按照二维表格的形式存储数据;RDD则是存储对象本身

DataFrame的组成

在结构层面:

  • StructType对象描述整个DataFrame的表结构

  • StructField对象描述一个列的信息

在数据层面:

  • Row对象记录一行数据

  • Column对象记录一列数据并包含列的信息(包含StructField)

schema = StructType().\
add("name", StringType(), nullable=True).\
add("age", IntegerType(), nullable=False)

# StructType是由多个StructField组成的
# 通过add方法向StructType中添加StructField
# 一个StructField记录由列名、列类型、列是否运行为空组成

创建SparkDataFrame

几种创建SparkDataFrame的方法,分别是使用RDD来创建、使用python的pandas创建DataFrame、使用List来创建、读取数据文件来创建、通过读取数据库来创建。

1.使用RDD来创建

主要使用RDD的toDF方法

#appName:任务名称
#config:设置一些属性
#master:Spark运行模式
#getOrCreate:创建SparkSession对象

from pyspark.sql import SparkSession

# 构建SparkSession执行环境入口对象
spark = SparkSession.builder.\
    appName("spark_sql").\
    config("spark.sql.shuffle.partitions", "4").\
    master("local[*]").\
    getOrCreate()

# 通过SparkSession可以获SparkContext对象
sc = spark.sparkContext

#定义rdd
rdd = sc.parallelize([("Sam", 28, 88, "M"),
                      ("Flora", 28, 90, "F"),
                      ("Run", 1, 60, None),
                      ("Peter", 55, 100, "M"),
                      ("Mei", 54, 95, "F")])

df = rdd.toDF(["Name", "age", "score", "sex"])

df.show()
df.printSchema()

#结果如下

+-----+---+-----+----+
| Name|age|score| sex|
+-----+---+-----+----+
|  Sam| 28|   88|   M|
|Flora| 28|   90|   F|
|  Run|  1|   60|null|
|Peter| 55|  100|   M|
|  Mei| 54|   95|   F|
+-----+---+-----+----+

root
 |-- Name: string (nullable = true)
 |-- age: long (nullable = true)
 |-- score: long (nullable = true)
 |-- sex: string (nullable = true)

2.使用python的pandas创建DataFrame


from pyspark.sql import SparkSession
import pandas as pd

# 构建SparkSession执行环境入口对象
spark = SparkSession.builder.\
    appName("spark_sql").\
    config("spark.sql.shuffle.partitions", "4").\
    master("local[*]").\
    getOrCreate()
# 通过SparkSession可以获SparkContext对象
sc = spark.sparkContext

# 首先构建pandas的df对象
pdf = pd.DataFrame(
    {
        "id": [1, 2, 3],
        "name": ["aa", "bb", "cc"],
        "age": [11, 12, 13]
    }
)
# 然后创建spark的df对象
df = spark.createDataFrame(pdf)

df.show()
df.printSchema()


#执行结果
+---+----+---+
| id|name|age|
+---+----+---+
|  1|  aa| 11|
|  2|  bb| 12|
|  3|  cc| 13|
+---+----+---+

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)

3.使用list来创建


from pyspark.sql import SparkSession


# 构建SparkSession执行环境入口对象
spark = SparkSession.builder.\
    appName("spark_sql").\
    config("spark.sql.shuffle.partitions", "4").\
    master("local[*]").\
    getOrCreate()
# 通过SparkSession可以获SparkContext对象
sc = spark.sparkContext

list_values = [["aa", 28, 88], ["bb", 28, 90], ["cc", 1, 60]]
df = spark.createDataFrame(list_values, ["name", "age", "score"])
df.show()
df.printSchema()

#执行结果如下
+----+---+-----+
|name|age|score|
+----+---+-----+
|  aa| 28|   88|
|  bb| 28|   90|
|  cc|  1|   60|
+----+---+-----+

root
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)
 |-- score: long (nullable = true)

4.读取数据文件来创建


from pyspark.sql import SparkSession


# 构建SparkSession执行环境入口对象
spark = SparkSession.builder.\
    appName("spark_sql").\
    config("spark.sql.shuffle.partitions", "4").\
    master("local[*]").\
    getOrCreate()
# 通过SparkSession可以获SparkContext对象
sc = spark.sparkContext

df = spark.read.format("text")\
    .option("interSchema", "true")\
    .option("delimiter", " ")\
    .schema("name STRING")\
    .load("./word.txt")


df.show()
df.printSchema()

#执行结果如下

+------------------+
|              name|
+------------------+
| hive spark hadoop|
|hadoop spark hbase|
+------------------+

root
 |-- name: string (nullable = true)


#读取json数据:format("json")
json自带schema,直接load即可:
df = spark.read.format("json").load("hdfs://10.245.150.47:8020/user/wuhaoyi/input/sql/people.json")

#读取csv数据:format("csv")
df = spark.read.format("csv").\
option("sep", ";").\ # 列分隔符
option("header", True).\ # 是否有CSV标头
option("encoding", "utf-8").\ # 编码
schema("name STRING, age INT, job STRING").\ # 列名和类型
load("hdfs://10.245.150.47:8020/user/wuhaoyi/input/sql/people.csv")


#读取parquet数据:format("parquet")
parquet自带schema,直接load即可:
df = spark.read.format("parquet").load("hdfs://10.245.150.47:8020/user/wuhaoyi/input/sql/users.parquet")

5.通过读取数据库来创建


from pyspark.sql import SparkSession


# 构建SparkSession执行环境入口对象
spark = SparkSession.builder.\
    appName("spark_sql").\
    config("spark.sql.shuffle.partitions", "4").\
    master("local[*]").\
    getOrCreate()
# 通过SparkSession可以获SparkContext对象
sc = spark.sparkContext

url = "jdbc:mysql://192.168.10.106:3306/test"

df = spark.read.format("jdbc")\
    .option("url", url)\
    .option("dbtable", "test")\
    .option("user", "root")\
    .option("password", "123456")\
    .load()\

df.show()
df.printSchema()


#执行结果如下

+--------------------+-------------+---------------+
|          order_code|serial_number|             ip|
+--------------------+-------------+---------------+
| 4849818240345981248|  123456     |192.168.10.106 |
| 4849818240345981248|  123456     |192.168.10.106 |
+--------------------+-------------+---------------+
only showing top 20 rows

root
 |-- order_code: string (nullable = true)
 |-- serial_number: string (nullable = true)
 |-- ip: string (nullable = true)


###操作数据库需要将数据库驱动添加到java对应目录下否则会报错:java.sql.SQLException: No suitable driver

#####DataFrame 通过JDBC读写数据库

首先需要在anaconda中安装mysql的驱动:

将mysql的驱动包放在anaconda3/envs/pyspark/lib/python3.8/site-packages/pyspark/jars/路径下

mysql-connector-java.jar

或者放在java指定目录下

C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext

![](https://img2023.cnblogs.com/blog/2069550/202310/2069550-20231017160039645-1898787876.png)