Spark SQL模块
这个模块是Spark中用来处理结构化数据的,提供一个叫SparkDataFrame的东西并且自动解析为分布式SQL查询数据。
在RDD阶段,程序的执行入口对象是: SparkContext
在Spark 2.0后,推出了SparkSession对象,作为Spark编码的统一入口对象
SparkSession对象的作用:
-
用于SparkSQL编程作为入口对象
-
用于SparkCore编程,可以通过SparkSession对象中获取到SparkContext
DataFrame
DataFrame和RDD的对比
-
相同点:DataFrame和RDD都是:弹性的、分布式的、数据集
-
不同点:DataFrame存储的数据结构限定为:二维表结构化数据;而RDD可以存储的数据则没有任何限制
也就是说,DataFrame 是按照二维表格的形式存储数据;RDD则是存储对象本身
DataFrame的组成
在结构层面:
-
StructType对象描述整个DataFrame的表结构
-
StructField对象描述一个列的信息
在数据层面:
-
Row对象记录一行数据
-
Column对象记录一列数据并包含列的信息(包含StructField)
schema = StructType().\
add("name", StringType(), nullable=True).\
add("age", IntegerType(), nullable=False)
# StructType是由多个StructField组成的
# 通过add方法向StructType中添加StructField
# 一个StructField记录由列名、列类型、列是否运行为空组成
创建SparkDataFrame
几种创建SparkDataFrame的方法,分别是使用RDD来创建、使用python的pandas创建DataFrame、使用List来创建、读取数据文件来创建、通过读取数据库来创建。
1.使用RDD来创建
主要使用RDD的toDF方法
#appName:任务名称
#config:设置一些属性
#master:Spark运行模式
#getOrCreate:创建SparkSession对象
from pyspark.sql import SparkSession
# 构建SparkSession执行环境入口对象
spark = SparkSession.builder.\
appName("spark_sql").\
config("spark.sql.shuffle.partitions", "4").\
master("local[*]").\
getOrCreate()
# 通过SparkSession可以获SparkContext对象
sc = spark.sparkContext
#定义rdd
rdd = sc.parallelize([("Sam", 28, 88, "M"),
("Flora", 28, 90, "F"),
("Run", 1, 60, None),
("Peter", 55, 100, "M"),
("Mei", 54, 95, "F")])
df = rdd.toDF(["Name", "age", "score", "sex"])
df.show()
df.printSchema()
#结果如下
+-----+---+-----+----+
| Name|age|score| sex|
+-----+---+-----+----+
| Sam| 28| 88| M|
|Flora| 28| 90| F|
| Run| 1| 60|null|
|Peter| 55| 100| M|
| Mei| 54| 95| F|
+-----+---+-----+----+
root
|-- Name: string (nullable = true)
|-- age: long (nullable = true)
|-- score: long (nullable = true)
|-- sex: string (nullable = true)
2.使用python的pandas创建DataFrame
from pyspark.sql import SparkSession
import pandas as pd
# 构建SparkSession执行环境入口对象
spark = SparkSession.builder.\
appName("spark_sql").\
config("spark.sql.shuffle.partitions", "4").\
master("local[*]").\
getOrCreate()
# 通过SparkSession可以获SparkContext对象
sc = spark.sparkContext
# 首先构建pandas的df对象
pdf = pd.DataFrame(
{
"id": [1, 2, 3],
"name": ["aa", "bb", "cc"],
"age": [11, 12, 13]
}
)
# 然后创建spark的df对象
df = spark.createDataFrame(pdf)
df.show()
df.printSchema()
#执行结果
+---+----+---+
| id|name|age|
+---+----+---+
| 1| aa| 11|
| 2| bb| 12|
| 3| cc| 13|
+---+----+---+
root
|-- id: long (nullable = true)
|-- name: string (nullable = true)
|-- age: long (nullable = true)
3.使用list来创建
from pyspark.sql import SparkSession
# 构建SparkSession执行环境入口对象
spark = SparkSession.builder.\
appName("spark_sql").\
config("spark.sql.shuffle.partitions", "4").\
master("local[*]").\
getOrCreate()
# 通过SparkSession可以获SparkContext对象
sc = spark.sparkContext
list_values = [["aa", 28, 88], ["bb", 28, 90], ["cc", 1, 60]]
df = spark.createDataFrame(list_values, ["name", "age", "score"])
df.show()
df.printSchema()
#执行结果如下
+----+---+-----+
|name|age|score|
+----+---+-----+
| aa| 28| 88|
| bb| 28| 90|
| cc| 1| 60|
+----+---+-----+
root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- score: long (nullable = true)
4.读取数据文件来创建
from pyspark.sql import SparkSession
# 构建SparkSession执行环境入口对象
spark = SparkSession.builder.\
appName("spark_sql").\
config("spark.sql.shuffle.partitions", "4").\
master("local[*]").\
getOrCreate()
# 通过SparkSession可以获SparkContext对象
sc = spark.sparkContext
df = spark.read.format("text")\
.option("interSchema", "true")\
.option("delimiter", " ")\
.schema("name STRING")\
.load("./word.txt")
df.show()
df.printSchema()
#执行结果如下
+------------------+
| name|
+------------------+
| hive spark hadoop|
|hadoop spark hbase|
+------------------+
root
|-- name: string (nullable = true)
#读取json数据:format("json")
json自带schema,直接load即可:
df = spark.read.format("json").load("hdfs://10.245.150.47:8020/user/wuhaoyi/input/sql/people.json")
#读取csv数据:format("csv")
df = spark.read.format("csv").\
option("sep", ";").\ # 列分隔符
option("header", True).\ # 是否有CSV标头
option("encoding", "utf-8").\ # 编码
schema("name STRING, age INT, job STRING").\ # 列名和类型
load("hdfs://10.245.150.47:8020/user/wuhaoyi/input/sql/people.csv")
#读取parquet数据:format("parquet")
parquet自带schema,直接load即可:
df = spark.read.format("parquet").load("hdfs://10.245.150.47:8020/user/wuhaoyi/input/sql/users.parquet")
5.通过读取数据库来创建
from pyspark.sql import SparkSession
# 构建SparkSession执行环境入口对象
spark = SparkSession.builder.\
appName("spark_sql").\
config("spark.sql.shuffle.partitions", "4").\
master("local[*]").\
getOrCreate()
# 通过SparkSession可以获SparkContext对象
sc = spark.sparkContext
url = "jdbc:mysql://192.168.10.106:3306/test"
df = spark.read.format("jdbc")\
.option("url", url)\
.option("dbtable", "test")\
.option("user", "root")\
.option("password", "123456")\
.load()\
df.show()
df.printSchema()
#执行结果如下
+--------------------+-------------+---------------+
| order_code|serial_number| ip|
+--------------------+-------------+---------------+
| 4849818240345981248| 123456 |192.168.10.106 |
| 4849818240345981248| 123456 |192.168.10.106 |
+--------------------+-------------+---------------+
only showing top 20 rows
root
|-- order_code: string (nullable = true)
|-- serial_number: string (nullable = true)
|-- ip: string (nullable = true)
###操作数据库需要将数据库驱动添加到java对应目录下否则会报错:java.sql.SQLException: No suitable driver
#####DataFrame 通过JDBC读写数据库
首先需要在anaconda中安装mysql的驱动:
将mysql的驱动包放在anaconda3/envs/pyspark/lib/python3.8/site-packages/pyspark/jars/路径下
mysql-connector-java.jar
或者放在java指定目录下
C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext
