CompletableFuture从入门到精通?算了,入个门就行了

发布时间 2023-04-07 22:50:33作者: loveletters

Future vs CompletableFuture

准备工作

为了便于后续更好地调试和学习,我们需要定义一个工具类CommonUtils辅助我们对知识的理解。这个工具类总共四个方法

  • readFile:读取指定路径的文件内容
  • sleepMillis:休眠指定的毫秒数
  • sleepSecond:休眠指定的秒数
  • printThreadLog:打印携带线程信息的日志信息
object CommonUtils {
    fun readFile(pathToFile: String): String {
        return try {
            Files.readString(pathToFile.let { Paths.get(it) })
        } catch (e: Exception) {
            e.printStackTrace()
            ""
        }
    }

    fun sleepMillis(millis: Long) {
        try {
            TimeUnit.MILLISECONDS.sleep(millis)
        } catch (e: InterruptedException) {
            e.printStackTrace()
        }
    }

    fun sleepSecond(seconds: Int) {
        try {
            TimeUnit.SECONDS.sleep(seconds.toLong())
        } catch (e: InterruptedException) {
            e.printStackTrace()
        }
    }

    fun printThreadLog(message: String?) {
        val result = StringJoiner(" | ")
                .add(System.currentTimeMillis().toString())
                .add(String.format("%2d", Thread.currentThread().id))
                .add(Thread.currentThread().name.toString())
                .add(message)
                .toString()
        println(result)
    }
}

Future 的局限性

需求:替换新闻稿 ( news.txt ) 中敏感词汇 ,把敏感词汇替换成*,敏感词存储在 filter_words.txt 中

news.txt

oh my god!completablefuture真tmd好用

filter_words.txt

尼玛,SB,tmd
fun main(args: Array<String>) {
    val executor = Executors.newFixedThreadPool(5)
    // step1: 读取敏感词汇  thread1
    val filterWordFuture = executor.submit<Array<String>> {
        val str: String = CommonUtils.readFile("filter_words.txt")
        str.split(",".toRegex()).dropLastWhile { it.isEmpty() }.toTypedArray()
    }
    // step2: 读取新闻稿 thread2
    val newsFuture: Future<String> = executor.submit<String> { CommonUtils.readFile("news.txt") }

    // step3 : 替换操作 thread3
    val replaceFuture = executor.submit<String> {
        val words = filterWordFuture.get()
        var news = newsFuture.get()
        for (word in words) {
            if (news.indexOf(word) > 0) {
                news = news.replace(word, "**")
            }
        }
        news
    }

    // step 4: 打印输出替换后的新闻稿 main
    val filteredNews = replaceFuture.get()
    println("filteredNews=$filteredNews")
    executor.shutdown()
}

通过上面的代码,我们会发现,Future相比于所有任务都直接在主线程处理,有很多优势,但同时也存在不足,至少表现如下:

  • 在没有阻塞的情况下,无法对Future的结果执行进一步的操作。Future不会告知你它什么时候完成,你如果想要得到结果,必须通过一个get()方法,该方法会阻塞直到结果可用为止。 它不具备将回调函数附加到Future后并在Future的结果可用时自动调用回调的能力。
  • 无法解决任务相互依赖的问题。filterWordFuture和newsFuture的结果不能自动发送给replaceFuture,需要在replaceFuture中手动获取,所以使用Future不能轻而易举地创建异步工作流。
  • 不能将多个Future合并在一起。假设你有多种不同的Future,你想在它们全部并行完成后然后再运行某个函数,Future很难独立完成这一需要。
  • 没有异常处理。Future提供的方法中没有专门的API应对异常处理,还是需要开发者自己手动异常处理。

CompletableFuture 的优势

CompletableFuture 实现了FutureCompletionStage接口

CompletableFuture 相对于 Future 具有以下优势:

  • 为快速创建、链接依赖和组合多个Future提供了大量的便利方法。
  • 提供了适用于各种开发场景的回调函数,它还提供了非常全面的异常处理支持。
  • 无缝衔接和亲和 lambda 表达式 和 Stream - API 。
  • 我见过的真正意义上的异步编程,把异步编程和函数式编程、响应式编程多种高阶编程思维集于一身,设计上更优雅。

创建异步任务

runAsync

如果你要异步运行某些耗时的后台任务,并且不想从任务中返回任何内容,则可以使用CompletableFuture.runAsync()方法。它接受一个Runnable接口的实现类对象,方法返回CompletableFuture<Void> 对象

static CompletableFuture<Void> runAsync(Runnable runnable);

演示案例:开启一个不从任务中返回任何内容的CompletableFuture异步任务

fun main() {
    CommonUtils.printThreadLog("main start")
    // 使用Lambda表达式
    CompletableFuture.runAsync {
        CommonUtils.printThreadLog("读取文件开始");
        // 使用睡眠来模拟一个长时间的工作任务(例如读取文件,网络请求等)
        CommonUtils.sleepSecond(3);
        CommonUtils.printThreadLog("读取文件结束");
    }
    CommonUtils.printThreadLog("here are not blocked,main continue");
    CommonUtils.sleepSecond(4); //  此处休眠为的是等待CompletableFuture背后的线程池执行完成。
    CommonUtils.printThreadLog("main end");
}

supplyAsync

CompletableFuture.runAsync() 开启不带返回结果异步任务。但是,如果您想从后台的异步任务中返回一个结果怎么办?此时,CompletableFuture.supplyAsync()是你最好的选择了。

static CompletableFuture<U>	supplyAsync(Supplier<U> supplier)

它入参一个 Supplier 供给者,用于供给带返回值的异步任务
并返回CompletableFuture<U>,其中U是供给者给程序供给值的类型。

需求:开启异步任务读取 news.txt 文件中的新闻稿,返回文件中内容并在主线程打印输出

fun main() {
    CommonUtils.printThreadLog("main start")

    val newsFuture = CompletableFuture.supplyAsync {

        CommonUtils.readFile("news.txt")
    }
    CommonUtils.printThreadLog("here are not blocked, main continue")
    val news = newsFuture.get()
    CommonUtils.printThreadLog("news=$news")
    CommonUtils.printThreadLog("main end")

}

如果想要获取newsFuture结果,可以调用completableFuture.get()方法,get()方法将阻塞,直到newsFuture完成。

异步任务中的线程池

我们已经知道,runAsync()supplyAsync()方法都是开启单独的线程中执行异步任务。但是,我们从未创建线程对吗? 不是吗!

CompletableFuture 会从全局的`F