package org.example.file.mult;
//函数值接口
@FunctionalInterface
public interface FuncationCallback {
void callback(String param);
}
package org.example.file.mult;
import java.util.ArrayList;
public class FuncationCallbackImpl {
//函数式 回调参数处理
public FuncationCallbackImpl(ArrayList arrayList, FuncationCallback funcationCallback) {
arrayList.forEach(ele->{
funcationCallback.callback(ele+"456789");
});
}
}
package org.example.file.mult;
import org.apache.tomcat.util.threads.TaskThreadFactory;
import java.util.ArrayList;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class Securite {
//有界队列,根据实际业务设置即可
public static ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
//静态线程池,一会多线程执行能用到,根据自己的机器性能配置即可
public static Executor executor = new ThreadPoolExecutor(3, 10, 2000, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), new TaskThreadFactory("测试队列", false, 7));
public Securite() {
}
public void exec(Integer ele) {
queue.offer(ele);
}
//全局静态 内存可见性常量,空值任务暂停使用
public static volatile int a = 0;//刷回主内存
//静态内部类,有利于在主程序空值进度
public static class MultTask implements Runnable {
private ArrayBlockingQueue<Integer> arrayBlockingQueue1;
//线程-队列构造器 便于每个线程都能冲全局队列取值
public MultTask(ArrayBlockingQueue<Integer> arrayBlockingQueue1) {
this.arrayBlockingQueue1 = arrayBlockingQueue1;
}
@Override
public void run() {
//循环,这里要注意和arrayBlockingQueue1.take()配合使用,避免空悬打满cpu
while (true) {
try {
//当参数等于8时,后面的线程停止取队列的元素进行操作,来达到外界可控的目的
if (a == 8) {
System.out.println("开始终端了");
Thread.sleep(5000);
System.out.println("5秒后继续");
// a = 51;
return;
}
Integer take = arrayBlockingQueue1.take();
String name = Thread.currentThread().getName();
ArrayList arrayList = new ArrayList();
arrayList.add(take);
//队列每次取值后再回调函数里处理后的值
new FuncationCallbackImpl(arrayList, new FuncationCallback() {
@Override
public void callback(String param) {
System.out.println("返回param:" + param);
}
});
//TODO 根据自己的业务进行后续处理
System.out.println(">>>>>>>>>>>>>>>>>>>>>:" + take + "<><><><><><><>:" + name);
} catch (InterruptedException e) {
}
}
}
}
public static void main(String[] args) throws InterruptedException {
Securite securite = new Securite();
for (int i = 0; i < 10; i++) {
if (i == 8) {
a = 8;
}
securite.exec(i);
executor.execute(new MultTask(queue));
}
System.out.println("10s后在运行一次");
Thread.sleep(1500);
securite.exec(10);
executor.execute(new MultTask(queue));
}
}