Java并发——Callable和Future

Callable和Future
Executor框架将工作单元划分为任务,即任务是逻辑上的工作单元,而线程是任务异步执行的机制。Runnable是任务的一个抽象,并且理想状态下任务是独立的执行,但是Runnable的run( )不能返回一个结果或者抛出一个受检查的异常,这与我们有些实际任务是不相符的。在通过线程或者executor执行Runnable任务中,不仅仅是不能返回任务的执行结果,有时我们希望可以控制某个任务,或取消或终止,但在executor中一旦提交任务,我们将很难单一的控制任务的生命周期,虽然ExecutorService扩展了Executor接口,添加了生命周期的控制,但是基于线程池的,针对的是所有任务,是无法单一的控制某个任务的。
JDK还提供了另外一种更佳的任务抽象Callable,它和Runnable十分的相似,但也存在一些差异。Callable任务可以返回一个执行结果,但我们向executor提交一个Callable任务时,就会得到一个Future对象,这就像这个Callable提交到executor之后的一个发票回执,利用这个回执,在以后我们能获取任务的执行结果,或者当我们想取消该任务时,也可以利用这个任务提交时得到的Future对象去取消这个任务,而且利用这个Future我们还能在提交后的任意时间得到任务的状态(是否被取消,是否完成)。

Callable和Runnable的异同点:

  • Callable定义的方法是call( ),而Runnable定义的方法是run( )。
  • Callable的call方法可以有返回值,而Runnable的run方法不能有返回值。
  • Callable的call方法可抛出受检查的异常,而Runnable的run方法不能抛出异常。  

在工具类Executors中有一些工具方法可以把Runnable任务转成Callable。你可以使用executor去执行一个Callable任务,也可以将Callable转成FutureTask对象,然后交由线程去执行。


Future是异步计算的结果,它描述了任务的生命周期,并提供了相关的方法来获得任务执行的结果、取消任务以及检查任务是否已经完成或者取消。

有多种方式可以创建一个Future。ExecutorService中的所有submit方法都会返回一个Future,利用这个返回的Future你可以获取任务的执行结果,或者取消任务。可以显示将Runnable或者Callable实例化一个FutureTask。

下面的例子演示了Callable和Future的一些方法,程序中定义了两个任务c1和c2,并且模拟c2的执行时间是8秒左右,然后依次调用future的相关方法

import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class CallableAndFuture {

	public static void main(String[] args) {
		
		ExecutorService es = Executors.newFixedThreadPool(5);
		
		Callable<Integer> c1 = new Target(false);
		Callable<Integer> c2 = new Target(true);

		Future<Integer> f1 = es.submit(c1);
		Future<Integer> f2 = es.submit(c2);
		
		int res = 0;
		try {
			res = f1.get();
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (ExecutionException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		boolean isCancelled = f1.isCancelled();
		boolean isDone = f1.isDone();
		
		System.out.println(res);
		System.out.println(isCancelled);
		System.out.println(isDone);
		
		System.out.println("---------------------------");
		
		try {
			boolean cancel = f2.cancel(true);
			int res2 = f2.get();
			isCancelled = f1.isCancelled();
			isDone = f1.isDone();
			
			System.out.println(res2);
			System.out.println(cancel);
			System.out.println(isCancelled);
			System.out.println(isDone);
		} catch (CancellationException e) {
			// TODO Auto-generated catch block
			System.out.println("任务被取消.");
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			System.out.println("任务被中断.");
		} catch (ExecutionException e) {
			// TODO Auto-generated catch block
			System.out.println("任务执行异常.");
		}
		
	}
	
}

class Target implements Callable<Integer> {
	
	private boolean sleep = false;
	
	public Target(boolean sleep) {
		// TODO Auto-generated constructor stub
		this.sleep = sleep;
	}
	
	@Override
	public Integer call() throws Exception {
		// TODO Auto-generated method stub
		if(sleep) {
			Thread.sleep(8000);
		}
		int i = new Random().nextInt(1000);
		return i;
	}
	
}

任务的执行结果:

982
false
true
---------------------------
任务被取消.

Future接口的相关方法

cancel( )方法可以试图取消任务的执行,如果当前任务已经完成、或已经被取消、或由于某些原因无法取消,则取消操作失败,返回false;如果该任务尚未运行,调用cancel( )方法将会使该任务永不会运行;如果调用cancel( )方法时,该任务已经运行,那么取决于参数boolean的值,如果是true,则表示立即中断该任务的执行,否则,等待该运行的任务结束后,尝试cancel并返回false。

isCancel( ),如果在任务正常完成前将其取消,那么返回true,否则,返回false。

isDone( ) , 如果任务已完成,则返回true,由于正常终止、异常或取消而完成,也会返回true。

get( ) , 如果任务已经完成,那么get会立即返回或者抛出一个Exception,如果任务没有完成,get会阻塞直到它完成。如果任务抛出了异常,get会将该异常封装为ExecutionException,然后重新抛出;如果任务被取消,get会抛出CancellationException。


FutureTask

FutureTask类相当于同时实现了Runnable和Future接口,提供了Future接口的具体实现,可以使用FutureTask去包装Callable或Runnable任务。因为FutureTask实现了Runnable接口,所以可以将其交给Executor去执行,或者直接调用run( )去执行。

使用FutureTask的一个示例

import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;

public class MyFutureTask {

	public static void main(String[] args) throws Exception {
		
		Executor executor = Executors.newFixedThreadPool(5);
		
		Callable<Integer> callable = new MyTarget();
		FutureTask<Integer> ft = new FutureTask<>(callable);
		
		executor.execute(ft);
		System.out.println(ft.get());
		
//		直接调用run
//		ft.run();
//		System.out.println(ft.get());
		
		System.out.println("-----------------------");
		
		Runnable runnable = new MyRunnableTarget();
		FutureTask<String> ft2 = new FutureTask<String>(runnable, "SUCCESS");
		executor.execute(ft2);
		System.out.println(ft2.get());
		
	}
	
}

class MyTarget implements Callable<Integer> {
	
	@Override
	public Integer call() throws Exception {
		// TODO Auto-generated method stub
		int i = new Random().nextInt(1000);
		return i;
	}
	
}

class MyRunnableTarget implements Runnable {
	
	@Override
	public void run() {
		// TODO Auto-generated method stub
		System.out.println("Runnable is invoke...");
	}
	
}

程序输出:

280
-----------------------
Runnable is invoke...
SUCCESS


CompletionService

有时候我们需要利用executor去执行一批任务,每个任务都有一个返回值,利用Future就可以解决这个问题,为此我们需要保存每个任务提交后的Future,然后依次调用get方法轮询,获得已经执行完毕的任务的结果,这样的过程显得无趣。我们希望一次提交一批任务后,executor执行结束也是返回给我们一个已经执行完毕的Future集合。

CompletionService整合了Executor和BlockingQueue的功能。你可以将一批Callable任务交给它去执行,然后使用类似于队列中的take和poll方法,在结果完成时获得这个结果,就像一个打包的Future。将生产新的异步任务与使用已完成任务的结果分离开来的服务。生产者submit方法 执行的任务。使用者 take 已完成的任务,并按照完成这些任务的顺序处理它们的结果。ExecutorCompletionService类是一个实现了CompletionService接口的实现类,它将计算任务交给一个传入的Executor去执行。

下面是一个ExecutorCompletionService的示例

import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Executors;

public class TestCompletionService {

	private class Target implements Callable<Integer> {
		
		@Override
		public Integer call() throws Exception {
			// TODO Auto-generated method stub
			int i = new Random().nextInt(1000);
			return i;
		}
		
	}
	
	public static void main(String[] args) throws Exception {
		Executor executor = Executors.newFixedThreadPool(5);
		ExecutorCompletionService<Integer> ecs = new ExecutorCompletionService<>(executor);
		
		Callable<Integer> c1 = new TestCompletionService().new Target();
		Callable<Integer> c2 = new TestCompletionService().new Target();
		Callable<Integer> c3 = new TestCompletionService().new Target();
		
		ecs.submit(c1);
		ecs.submit(c2);
		ecs.submit(c3);
		
		System.out.println(ecs.take().get());
		System.out.println(ecs.poll().get());
		System.out.println(ecs.take().get());
		
	}
	
}

这样将Future分离开来,已经完成的任务的Future就会被加入到BlockingQueue中供用户直接获取。

关于poll方法和get方法的区别,poll方法是非阻塞的,有则返回,无则返回NULL,take方法是阻塞的,没有的话则会等待。


批处理与任务执行时限

在有些应用场景中,我们需要同时处理多个任务,并获取结果,使用上面的CompletionService将完成的任务与未完成的任务分隔开似乎能够解决,但是如果其中有一个任务相当耗时,就会影响整个批处理任务的完成速度。比如,在一个页面中,我们需要从多个数据源获取数据,并在页面展示,同时我们希望整个页面的加载过程不超过2秒,那么那些超过2秒没有响应成功的数据源数据则用默认值替换,ExecutorService提供了invokeAll( )来完成这个任务。

下面我们通过一个示例演示invokeAll方法,程序中定义了3个任务,c1、c2、c3模拟执行时间分别为1、2、3秒,程序允许的最大执行时间是2秒,超过2秒的任务就会被取消。

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class TestCompletionService {

	private class Target implements Callable<Integer> {
		
		private int a = 0;
		
		public Target(int a) {
			// TODO Auto-generated constructor stub
			this.a = a;
		}
		
		@Override
		public Integer call() throws Exception {
			// TODO Auto-generated method stub
			Thread.sleep(1000*a);
			return a;
		}
		
	}
	
	public static void main(String[] args) {
		ExecutorService es = Executors.newFixedThreadPool(5);
		
		Callable<Integer> c1 = new TestCompletionService().new Target(1);
		Callable<Integer> c2 = new TestCompletionService().new Target(2);
		Callable<Integer> c3 = new TestCompletionService().new Target(3);
		
		List<Callable<Integer>> list = new ArrayList<>();
		list.add(c1);
		list.add(c2);
		list.add(c3);
		
		try {
			List<Future<Integer>> res = es.invokeAll(list, 2, TimeUnit.SECONDS);
			
			Iterator<Future<Integer>> it = res.iterator();
			while(it.hasNext()) {
				Future<Integer> f = it.next();
				int i = f.get();
				System.out.println(i);
			}
		} catch (CancellationException e ) {
			System.out.println("任务取消");
		} catch (InterruptedException e) {
			System.out.println("中断异常");
		} catch (ExecutionException e) {
			System.out.println("执行异常");
		}
		
	}
	
}

程序的输出:

1
2
任务取消

需要注意的是,java.util.concurrent中所有的关于时间的方法都将负数作为0处理,不需要额外的处理



郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。