大数据 | Spark基本使用示例

发布时间 2023-09-27 10:28:42作者: LittleDonkey

欢迎参观我的博客,一个Vue 与 SpringBoot结合的产物:https://poetize.cn

原文链接:https://poetize.cn/article?id=47

基本概念

SparkSubmit(进程)

应用提交的客户端程序。

Driver(线程)

含有 SparkContext 实例的线程。它负责创建逻辑和物理计划,并与集群管理器协调调度任务。

Executor(进程)

Executor 是一个执行 Task 的容器,负责调用 Task 的 runTask 方法来执行 Task 的运算逻辑。

Task

一段计算逻辑的封装对象。

Shuffle

在 Spark 中,Shuffle 是指在不同阶段之间重新分配数据的过程。它通常发生在需要对数据进行聚合或分组操作的时候,例如 reduceByKey 或 groupByKey 等操作。

在 Shuffle 过程中,Spark 会将数据按照键值进行分区,并将属于同一分区的数据发送到同一个计算节点上。这样,每个计算节点就可以独立地处理属于它自己分区的数据。

Spark执行流程

  • 构建Spark Application的运行环境(启动SparkContext),SparkContext向资源管理器(可以是Standalone或Yarn)注册并申请运行Executor资源。
  • 资源管理器为Executor分配资源并启动Executor进程,Executor运行情况将随着“心跳”发送到资源管理器上。
  • SparkContext构建DAG图,将DAG图分解成多个Stage,并把每个Stage的TaskSet(任务集)发送给Task Scheduler (任务调度器)。
  • Executor向SparkContext申请Task,Task Scheduler将Task发放给Executor,同时,SparkContext将应用程序代码发放给Executor。
  • Task在Executor上运行,把执行结果反馈给Task Scheduler,然后再反馈给DAG Scheduler。
  • 当一个阶段完成后,Spark会根据数据依赖关系将结果传输给下一个阶段,并开始执行下一个阶段的任务。
  • 最后,当所有阶段都完成后,Spark会将最终结果返回给驱动程序,并完成作业的执行。

创建Spark客户端

SparkConf sparkConf = new SparkConf();
sparkConf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
sparkConf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
sparkConf.set("spark.sql.warehouse.dir", basePath);
sparkConf.set("hive.metastore.uris", metaUri);
System.setProperty("HADOOP_USER_NAME", "root");
SparkSession spark = SparkSession.builder()
        .appName(appName)
        .master(master)
        .config(sparkConf)
        .enableHiveSupport()
        .getOrCreate();

Spark下载文件到本地

Dataset<Row> result = spark.sql("select * from tableName");
result.write().mode(SaveMode.Overwrite).csv("file:///D:\\Spark\\data");

Spark广播变量

广播变量的好处:不是每个Task一份变量副本,而是变成每个节点的Executor才一份副本。这样的话,就可以让变量产生的副本大大减少。

List<String> data = new ArrayList<>();
Broadcast<List<String>> dataBc = javaSparkContext.broadcast(data);

Spark累加器

//生成累加器
LongAccumulator change = spark.sparkContext().longAccumulator();

//在算子中累加
change.add(1L);

//获取结果
long count = change.count()

Spark Dataset 算子

Dataset<JSONObject> jSONObjectDataset = sourceDataset.filter((FilterFunction<JSONObject>) row -> true)
        .map((MapFunction<JSONObject, JSONObject>) row -> new JSONObject(), Encoders.javaSerialization(JSONObject.class));

Dataset<String> result = jSONObjectDataset.map((MapFunction<JSONObject, String>) row -> JSON.toJSONString(row), Encoders.STRING());

创建StructType

List<String> fields = ...;
List<StructField> structFields = fields.stream().map(f -> DataTypes.createStructField(f, DataTypes.StringType, true)).collect(Collectors.toList());
StructType structType = DataTypes.createStructType(structFields);

使用StructType创建Row

StructType schema = new StructType(new StructField[]{
        new StructField("id", DataTypes.StringType, true, Metadata.empty()),
        new StructField("title", DataTypes.StringType, true, Metadata.empty())
});

JavaRDD<Row> rowJavaRDD = jsonJavaRDD.map(json -> RowFactory.create(
        json.getString("id"),
        json.getString("title")
        )
);

Dataset<Row> dataFrame = spark.createDataFrame(rowJavaRDD, schema);
dataFrame.write().partitionBy("partition_time_hive").mode(SaveMode.Overwrite).orc(tablePath);

使用Java对象创建Row

//JavaData需要实现Serializable
JavaRDD<JavaData> javaData = ...;
Dataset<Row> rowDataset = sparkSession.createDataFrame(javaData, JavaData.class);

从JVM内存中创建Dataset

List<JSONObject> cleanList = ...;
Dataset<JSONObject> dataset = spark.createDataset(cleanList, Encoders.javaSerialization(JSONObject.class));

Spark分区写入Hdfs数据

public class Partitioner extends Partitioner {

    private final int numPartitions;

    public PersonPartitioner(int numPartitions) {
        this.numPartitions = numPartitions;
    }

    @Override
    public int numPartitions() {
    	//传入的分区数
        return numPartitions;
    }

    @Override
    public int getPartition(Object key) {
    	//根据key获取分区
        int partition = ...;
        return partition;
    }
}

public static void writeToHdfs(Dataset<JSONObject> result, String path) {
    JavaPairRDD<String, String> pairData = result.toJavaRDD().mapToPair(map -> new Tuple2<>(map.getString("status"), JSON.toJSONString(map)));
    pairData.partitionBy(new Partitioner(2)).map(tuple -> tuple._2).saveAsTextFile(path);
}

Spark分区写入Mysql数据

public static final String TABLE_TEMP_SUFFIX = "_temp";

public static void writeToMysql(Dataset<Row> result, JdbcConfig jdbcConfig, String table, DataSource dataSource) {
    try {
        try (Connection connection = dataSource.getConnection()) {
            JdbcUtils.executeSql(connection, "create table " + table + EtlConstant.TABLE_TEMP_SUFFIX + " like " + table);
        }

        result.write().format("jdbc")
                .mode(SaveMode.Overwrite)
                .option("driver", jdbcConfig.getDriverClassName())
                .option("url", jdbcConfig.getUrl())
                .option("dbtable", table + EtlConstant.TABLE_TEMP_SUFFIX)
                .option("user", jdbcConfig.getUsername())
                .option("password", jdbcConfig.getPassword())
                //JDBC批大小,默认 1000,灵活调整该值可以提高写入性能
                .option("batchsize", 1000)
                //事务级别,默认为 READ_UNCOMMITTED,无事务要求可以填 NONE 以提高性能
                .option("isolationLevel", "NONE")
                //需要注意该配置项,Overwrite 模式下,不设置为 true 会删表重建
                .option("truncate", "true")
                .save();

        try (Connection connection = dataSource.getConnection()) {
            JdbcUtils.executeSql(connection, "drop table if exists " + table);
            JdbcUtils.executeSql(connection, "rename table " + table + EtlConstant.TABLE_TEMP_SUFFIX + " to " + table);
        }
    } finally {
        try (Connection connection = dataSource.getConnection()) {
            JdbcUtils.executeSql(connection, "drop table if exists " + table + EtlConstant.TABLE_TEMP_SUFFIX);
        }
    }
}

Spark任务提交的Yarn

/opt/spark-3.0.0/bin/spark-submit \
--master yarn \
--deploy-mode cluster \
--conf spark.yarn.am.waitTime=6000 \
--conf spark.sql.broadcastTimeout=6000 \
--conf spark.network.timeout=600 \
--num-executors 1 \
--driver-memory 3G \
--executor-memory 3G \
--jars /home/developer/base-spark.jar \
--class com.task.dwd.Task \
/home/developer/spark.jar dev yarn