C/C++教程

并发编程-FutureTask&CompletableFuture

本文主要是介绍并发编程-FutureTask&CompletableFuture,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

并发编程-FutureTask&CompletableFuture

今天会聊到【Future/callable】并且分析他们的原理,同时也会聊到【CompletableFuture】的使用和原理,在这一章中,我们聊并发就到此结束,下面我可能会去关注一些中间件,因为这些在分布式系统中起到了很重要的作用。

Future/callable使用

它是在原来线程的基础上,提供了一个带有返回值的线程。

public class FutureCallableExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CalculationCallable calculationCallable=new CalculationCallable(1,2);
         FutureTask<Integer> futureTask=new FutureTask(calculationCallable);
        System.out.println("开始--->"+new Date());
        //FutureTask本质上还是一个线程
        new Thread(futureTask).start();
        //这里是一个阻塞方法 ,我们要拿到返回结果,肯定要等到call方法执行完成才能进行返回,所以会阻塞
        Integer result= futureTask.get();
        System.out.println(result);
        System.out.println("结束--->"+new Date());
    }

    static class CalculationCallable implements Callable<Integer>{
        int x;
        int y;
        public CalculationCallable(int x, int y) {
            this.x = x;
            this.y = y;
        }
        @Override
        public Integer call() throws Exception {
            System.out.println("begin call--->"+new Date());
            TimeUnit.SECONDS.sleep(2);
            return x+y;
        }
    }
}

如何实现&源码解析

把重写Callable中的call方法传递到FutureTask中,然后把FutureTask放在线程中启动->FutureTas本身就是一个实现了Runnable的类,所以才能被启动,而传递这个对象进去,肯定是要调用他的call方法,那这个时候就要有一个共享变量记录他的状态,从而可以判断时候执行完毕or not

get 方法去获取结果的时候->通过上面的状态判读时候执行完成or not 如果没有执行完成则加入一个队列中进行等待,因为可能有多个线程来请求get,

整体流程:

  • 当执行run的时候执行call方法,把执行结果放在一个全局变量中,在set结果的时候唤醒被阻塞的线程,上下文切换的时候线程阻塞到了get方法中,这个时候进入get方法的下一次自旋中,然后获得结果从而返回结果
  • 当执行get方法的时候判断当前状态时候是执行完毕状态,是的话则直接返回数据,不是,则把当前节点加入到队列中进行阻塞

run()

public void run() {
    //这里就是进行状态的判断
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                //执行传递进来的Callable中的call方法
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
                set(result);
        }
    } finally {

        runner = null;
       
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

进行线程的唤醒

//这里进行唤醒操作
private void finishCompletion() {
    // assert state > COMPLETING;
    //遍历整个链表
    for (WaitNode q; (q = waiters) != null;) {
        //改成null,->不断的把节点释放掉
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                    //进行释放
                    LockSupport.unpark(t);
                }
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null; // unlink to help gc
                q = next;
            }
            break;
        }
    }

    done();

    callable = null;        // to reduce footprint
}

get()

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        //需要进行等待
        s = awaitDone(false, 0L);
    return report(s);
}

进行线程的阻塞

private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    //自旋,因为可能有多个线程来调用get方法,cas的时候可能失败,所以自旋重试
    for (;;) {
        if (Thread.interrupted()) {
            removeWaiter(q);
            throw new InterruptedException();
        }

        int s = state;
        //这里是已经执行完成就直接返回
        if (s > COMPLETING) {
            if (q != null)
                q.thread = null;
            return s;
        }
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
        else if (q == null)
            q = new WaitNode();
        else if (!queued)
            //把当前节点设置到waiters(一个队列中)
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q);
        else if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                removeWaiter(q);
                return state;
            }
            LockSupport.parkNanos(this, nanos);
        }
        else
            //阻塞,在set方法完成结果的时候就会去唤醒
            LockSupport.park(this);
    }
}

CompletableFuture

我们发现FutureTask在获取结果的时候会阻塞 ,我们是否可以让一个线程执行完成结果后主动通知我们呢(异步回调),是的,它就是做这个事情的,异步回调你的方法,不进行阻塞的获得结果,完成你获得结果后想做的事情。这个类图可以看出CompletableFuture融合了两者的实现,既可以通过future等待执行结果,又可以使用completionStage去增强异步回调的功能。

构建一个CompletableFuture

  • supplyAsync(Supplier<U> supplier):没有返回值
  • runAsync(Runnable runnable,Executor executor) :异步执行一个任务,并且可以自定义线程池,默认用【ForkJoinPool.commonPool
        //不带返回值(但是在调用get方法的时候也会阻塞)
        CompletableFuture.runAsync(() -> System.out.println("异步执行一个任务:" + Thread.currentThread().getName())).get();
        //带返回值(但是在调用get方法的时候也会阻塞)
      // System.out.println(CompletableFuture.supplyAsync(() -> "Glen is the handsomest person").get());
        //使用链式调用
        CompletableFuture.supplyAsync(() -> "Glen is the handsomest person").thenAccept(System.out::println);=

CompletionStage的api

纯消费类型】:只消费上个的执行结果,不返回新的数据 【一定包含了accept关键字

 这里只演示几个用法大致相同,除了包含either的方法,他是只要前面任何一个执行完成,就完成。

public class AcceptExample {
    public static void main(String[] args) {
        CompletableFuture.supplyAsync(()->"Then accept message").thenAcceptAsync(System.out::println);
        CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> "Then accept both");
        CompletableFuture<String> stringCompletableFuture1 = CompletableFuture.supplyAsync(() -> "message");
        //这里指的是两个任务都执行结束,可以在函数中获取两个函数的结果进行操作
        stringCompletableFuture.thenAcceptBoth(stringCompletableFuture1,(x1,x2)-> System.out.println(x1+"-->"+x2));

    }
}

有返回值的方法】: 即消费上个任务,也返回新的数据【包含apply

public class ApplyExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //还是获取上一个的执行结果,在下一个链中操作,然后返回
        System.out.println(CompletableFuture.supplyAsync(() -> "apply").thenApply((result) -> result + "->Glen").get());
        //这里你可以增加一个CompletableFuture,和上面相似
        System.out.println(CompletableFuture.supplyAsync(() -> "combine")
                .thenCombine(CompletableFuture.supplyAsync(() -> "Glen"), (x1, x2) -> x1 + ":" + x2).get());
    }
}

不消费也不返回】: 上个任务的结果我不需要,我只是想等你结束后,然后我执行,我的结果业务也不需要,我也不用返回

public class RunExample {
    public static void main(String[] args) {
        //注意:这里不消费也不返回,并且接受不到之前的结果
        CompletableFuture.supplyAsync(()->"Both").runAfterBoth(CompletableFuture.supplyAsync(()->"message"),()->{
            System.out.println("there isn't previous result");
        });
    }
}

组合类型】:组合多个类型任务,比如多个任务执行完成了,后面的任务才能执行 ,这个和上面的那些差不多

针对异常处理

public class ExceptionExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {

        CompletableFuture completableFuture= CompletableFuture.supplyAsync(()->{
            throw  new RuntimeException("occur exception");
        });/*.runAfterBoth(CompletableFuture.supplyAsync(()->"message"),()->{
            System.out.println("I have done all");
        });*/
        //这个时候获取就会抛出异常
        /*System.out.println(completableFuture.get());*/
        completableFuture.whenComplete((rs,exception)->{
            if (exception!=null){
                System.out.println("前置任务正常异常");
            }else {
                System.out.println("前置任务正常执行!!"+rs);
            }
        });
        System.out.println(completableFuture.handleAsync((result, exception) -> exception != null ? "任务异常" : result).get());
    }
}

 应用在实际中(模拟一个查询商品,但是现实中应该不会这样用,这里只是把上面的这些东西整合使用一下)

@Data
@ToString
public class Commodity {
    public Commodity(Integer id, String name, BigDecimal price) {
        this.id = id;
        this.name = name;
        this.price = price;
    }
    Integer id;
    String name;
    BigDecimal price;
    Integer repo;
    Integer buyerNum;
    List<String> remarks;
}

几个模拟获取数据的service

@Service
public class CommodityService {
    List<Commodity> getRemarkById(){
        return Arrays.asList(
                new Commodity(1,"电视",new BigDecimal(500)),
                new Commodity(2,"手机",new BigDecimal(1000)),
                new Commodity(3,"电脑",new BigDecimal(2000)),
                new Commodity(4,"台灯",new BigDecimal(50)),
                new Commodity(5,"水杯",new BigDecimal(56))
        );
    }

}
@Service
public class RemarkService {
    List<String> getRemarkById(Integer goodId){
        return Arrays.asList("好","非常好","还可以");
    }
}
@Service
public class RepoService {
    Integer getRepoById(Integer goodId){
        return new Random().nextInt(1000);
    }

}

controller层,对上面的技术进行使用

/**
 * @author : lizi
 * @date : 2021-07-02 23:34
 **/
@RestController
public class GoodsController {
    @Autowired
    CommodityService commodityService;
    @Autowired
    RemarkService remarkService;
    @Autowired
    RepoService repoService;

    @GetMapping("/goods")
    public List<Commodity> goods() throws ExecutionException, InterruptedException {
        //获取物品列表
        CompletableFuture<List<Commodity>> listCompletableFuture = CompletableFuture.supplyAsync(() -> commodityService.getRemarkById());

        //这里面都是在异步执行
        //thenApply 指的是获取上面的listCompletableFuture后再进行操作
        CompletableFuture<List<Commodity>> listCompletableFuture1 = listCompletableFuture.thenApply(goods -> {
            goods.stream().map(goods1 -> CompletableFuture.supplyAsync(() -> {
                goods1.setRepo(repoService.getRepoById(goods1.getId()));
                return goods1;
            }).thenCompose(goods2 -> CompletableFuture.supplyAsync(() -> {
                goods2.setRemarks(remarkService.getRemarkById(goods2.getId()));
                return goods2;
            }))).toArray();
            return goods;
        });

      /*  for (;;){
            System.out.println("这里可以写我们的逻辑,但是上面的东西是交给一个线程去跑的,所以就可以直接执行向下执行,这样从某种意义上讲反应就快了");
        }*/

        //在这里get的时候进行阻塞
        return (List<Commodity>) listCompletableFuture1.handleAsync((good, exception) -> exception != null ? "系统繁忙" : good).get();
    }
}

  原理分析

他是基于cas的无锁并发栈(Completion是一个栈结构,存储的我们传递进去的任务,这里的任务指的是我们传递进CompletableFuture中方法的一系列操作)

我们看completion的的关系类图,他是使用【forkjoin】来作为底层的,具体如下:

比如你使用

thenApply】:这种传递传递进来的任务会使用【UniCompletion】他下面的相关的类进行包装,

CoCompletion】:针对两个任务的combination的实现,比如【applyToEither】的任务 so on 

因为不同类型链式的结构执行的方式不一样,包装完成后,当【forkjoin】从上向下走的时候,他才能对不同的类型使用不同的执行策略,那是如何组装这些的呢?

  

举个例子(这就类似一个树的样子,我们每次点出一个新的方法,他就想当于树上的树枝,然后再次点出来的则数据树枝上的叶子,或者树枝)

/**
 * @author : lizi
 * @date : 2021-07-03 13:09
 **/
public class PrincipleExample {
    public static void main(String[] args) {
        CompletableFuture<String> base_future = CompletableFuture.completedFuture("Base Future");
        //这个时候构建的一定是一个UniCompletion对他进行包装(因为他是一个只有单个任务的 )
        base_future.thenApply(r->"then apply:"+r);

        //这里的第一个thenAccept,就是入栈(Completion)到base_future中的第二个,然后他返回一个新的CompletableFuture
        //这里的第二个thenAccept则是入栈到新的CompletableFuture中。
        base_future.thenAccept(r-> System.out.println(r)).thenAccept(none-> System.out.println("no result"));

        //这里的第一个thenApply则成为第三个入栈到到base_future中的,然后同理返回新的,thenAccept则进去新的completableFuture中,
        // 也就是现在在base_future中有三个子节点由UniCompletion进行包装,而其中的两个都有一个新的子节点,这样就类似于一个树的样子
        base_future.thenApply(r->"message").thenAccept(r-> System.out.println("where  should i go "));

    }
}

 

 

 当任务执行的时候,则是逐步出栈(等于一个个树枝进行捋,如果树枝下面有还有树枝,则继续)

 

 

这篇关于并发编程-FutureTask&CompletableFuture的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!