鸿 网 互 联 www.68idc.cn

当前位置 : 服务器租用 > 编程语言开发 > erlang > >

Java并发编程——Executor接口及线程池的使用

来源:互联网 作者:佚名 时间:2017-02-11 10:29
在如今的程序里,单线程的程序,应该已经比较少了,而Java语言是内置支持多线程并发的,大家都说Java语言内置支持多线程,非常非常的强大和方便,但一直没有深入研究jdk内concurrent包。今天就认真学习了一下java.util.concurrent包,发现jdk多线程编程果然

在如今的程序里,单线程的程序,应该已经比较少了,而Java语言是内置支持多线程并发的,大家都说Java语言内置支持多线程,非常非常的强大和方便,但一直没有深入研究jdk内concurrent包。今天就认真学习了一下java.util.concurrent包,发现jdk多线程编程果然是强大和方便。本文是学习java.util.concurrent包内线程池及相关接口的一些总结。

任务接口抽象

Runnable接口

在java.lang包内,为多线程提供了Runnable接口。

public interface Runnable {
    public abstract void run();
}

Runnable接口只是简单地提供了一个任务运行的入口。但对于任务执行的结果以及任务的状态,都是没有定义的。但在jdk 1.5之后,Java针对多线程任务,提供了更强大的接口支持。那就是提供了Callable和Future接口。这两个接口,为Task提供了更多的抽象。可以更方便进行Task抽象。

Callable接口

Callable接口,虽然定义仍然很简单。但提供了Task运行的返回值,同时,也支持抛出异常。Callable接口的定义如下:

public interface Callable<V> {
    V call() throws Exception;
}

Future接口

Future接口,是对异步任务结果的抽象。Future接口可以查询任务执行结果,或者等待任务结果,并且可以获取任务的执行结果。Future接口有5个方法。
1. cancel 用于取消任务,mayInterruptIfRunning参数表示,是否通过中断取消正在运行的任务,如果任务已经完成,则此方法返回false;
2. isCancelled 任务是否被取消,如果任务在正常完成前被取消,则返回true;
3. isDone 任务是否完成,无论任务是通过正常执行完,或者中途抛出异常,或者被取消,都认为任务已经完成;
4. get 获取任务执行结果,如果任务正在执行,则等待任务执行完成。get可以指定超时时间,如果超时,则抛出TimeoutException异常,如果任务被取消,则抛出CancellationException异常,如果任务在中途抛出异常,则get方法将异常封装在ExecutionException异常内,并抛出;如果当前线程被中断,则抛出InterruptedException异常。

Future接口定义如下:

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}

Executor(执行器接口)

使用Thread和Runnable接口创建异步任务,线程和线程执行任务之间都是紧耦合的,同时,多个任务的执行策略,需要程序员自己进行繁琐地控制。为了将任务提交和任务执行之间进行解耦,Java在jdk1.5之后,提供了Executor接口。Executor接口定义如下:

public interface Executor {
    void execute(Runnable command);
}

Executor接口是基于生产者-消费者模式,生产者只需要调用execute方法提交任务,至于消费者什么时候在哪里执行方法是和生产者解耦的。

线程池概述

在Java的java.util.concurrent包内,通过Executors类的静态方法,主要提供了4种线程池。具体如下:

方法 作用
newFixedThreadPool 创建一个定长的线程池,当提交一个任务就会创建一个线程,直到达到池子的最大长度,然后线程池内的线程数不会再变化,如果有线程由于异常Exception而结束,线程池会补充一个新的线程。
newCachedThreadPool 创建一个可缓存的线程池,如果当前线程池内空闲线程过多,它可以灵活地回收空闲的线程,当需求增加时,它可以灵活地添加新的线程,而不会对池的长度作任何限制。
newSingleThreadExecutor 创建一个单线程化的executor,并只创建唯一的工作线程来执行任务,如果这个线程异常结束,会另外创建一个取代它。executor会保证任务依照任务队列所规定的顺序(FIFO,LIFO,优先级)执行。
newScheduledThreadPool 创建一个定长的线程池,而且支持定时的和周期性任务执行,类似于Timer。

ExecutorService接口

在上面四种线程池,newFixedThreadPool,newCachedThreadPool和newSingleThreadExecutor都是返回ExecutorService接口的对象。

ExecutorService接口是继承Executor接口的,ExecutorService接口提供了更多更丰富的方法。主要方法如下:
1. submit submit方法通过重载,可以接受Callable任务对象和Runnable对象;submit方法返回一个Future对象。通过返回的Future对象,可以进行线程池任务状态的查询,以及取消任务。
2. invokeAll invokeAll方法可以进行任务的匹配提交,接受一个任务列表,同时可以可选提供一个超时时间;invokeAll方法返回一个Future对象列表。
3. isShutdown 线程池是否已经关闭
4. isTerminated 线程池所有任务是否已经执行完全。
5. shutdown 关闭线程池,线程池关闭后,则不能再接受任务。此方法只是使线程池不再接收新的任务,但已经提交的任务,仍然会继续运行。
6. shutdownNow 立刻关闭线程池,正在运行的任务会继续执行。未执行的任务将不会再执行,并将等待执行的任务返回。
7. awaitTermination 在指定时间内,等待线程池任务完成。

下面是定长线程池以及Future接口和Callable接口的代码示例:

package com.test.concurrent;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

class Job1 implements Callable<Boolean> {
    int id = 0;
    public Job1(int id ){
        this.id = id;
    }
    public Boolean call() throws Exception {
        System.out.println("Job1 running.id:"+id+" Current time:"+System.currentTimeMillis());
        return false;
    }
}

class Job2 implements Callable<Boolean> {
    int id =0;
    public Job2(int id ){
        this.id = id;
    }
    public Boolean call() throws Exception {
        System.out.println("Job2 running.id:"+id+" Current time:"+System.currentTimeMillis());
        Thread.sleep(15*1000);
        System.out.println("Job2 end.id:"+id+" Current time:"+System.currentTimeMillis());
        return true;
    }
}

class Job3 implements Callable<Boolean> {
    int id =0;
    public Job3(int id ){
        this.id = id;
    }
    public Boolean call() throws Exception {
        System.out.println("Job3 running.id:"+id+" Current time:"+System.currentTimeMillis());
        throw new RuntimeException("Job3 throw exception.");
    }
}

public class ThreadPool {
    static void addTask() throws InterruptedException, ExecutionException {
        int id = 0;
        ExecutorService executor = Executors.newFixedThreadPool(2);
        Future<Boolean> future1 = executor.submit(new Job1(++id));
        Future<Boolean> future2 = executor.submit(new Job2(++id));
        Future<Boolean> future3 = executor.submit(new Job3(++id));
        System.out.println("job1 is done:"+future1.isDone());
        System.out.println("job1 result:"+future1.get());
        System.out.println("job2 is done:"+future2.isDone());
        System.out.println("job2 result:"+future2.get());
        try {
            System.out.println("job3 result:"+future3.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
            e.getCause();
        }

        List<Callable<Boolean>> taskList = new ArrayList<Callable<Boolean>>();
        taskList.add(new Job1(++id));
        taskList.add(new Job2(++id));
        List<Future<Boolean>> futures = executor.invokeAll(taskList);
        for (Future<Boolean> future : futures) {
            System.out.println(future.get());
        }

        executor.shutdown();
        executor.submit(new Job1(2));
    }

    public static void main( String[] args ) throws InterruptedException, ExecutionException {
        addTask();
    }
}

上面的代码,执行结果如下:

Job1 running.id:1 Current time:1468632394429
Job2 running.id:2 Current time:1468632394430
job1 is done:true
Job3 running.id:3 Current time:1468632394431
job1 result:false
job2 is done:false
Job2 end.id:2 Current time:1468632409431
job2 result:true
java.util.concurrent.ExecutionException: java.lang.RuntimeException: Job3 throw exception.
    at java.util.concurrent.FutureTask.report(FutureTask.java:122)
    at java.util.concurrent.FutureTask.get(FutureTask.java:188)
    at com.test.concurrent.ThreadPool.addTask(ThreadPool.java:56)
    at com.test.concurrent.ThreadPool.main(ThreadPool.java:77)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
Caused by: java.lang.RuntimeException: Job3 throw exception.
    at com.test.concurrent.Job3.call(ThreadPool.java:40)
    at com.test.concurrent.Job3.call(ThreadPool.java:33)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at java.lang.Thread.run(Thread.java:745)
Job1 running.id:4 Current time:1468632409434
Job2 running.id:5 Current time:1468632409434
Job2 end.id:5 Current time:1468632424435
false
true
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@67854d1f rejected from java.util.concurrent.ThreadPoolExecutor@608a6351[Shutting down, pool size = 1, active threads = 0, queued tasks = 0, completed tasks = 5]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2048)
    at com.test.concurrent.ThreadPool.addTask(ThreadPool.java:73)
    at com.test.concurrent.ThreadPool.main(ThreadPool.java:77)

ScheduledExecutorService接口

ScheduledExecutorService接口继承ExecutorService接口,但额外提供了三个用于周期调度任务的接口。

方法 说明
schedule(Callable callable, long delay, TimeUnit unit) 提交任务,并且指定任务延迟执行的时间,此方法提交的任务,只会被执行一次
scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) 提交一个任务,同时指定延迟时间initialDelay,每隔任务开始period时间,如果,任务已经结束,则会再次调度任务;如果任务没有结束,则任务结束后,则会立刻调度。如果没有空闲进程,则仍旧会等待
scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) 提交任务,指定延迟时间initialDelay,每次任务结束后,再间隔delay时间,再次调度任务

scheduleAtFixedRate和scheduleWithFixedDelay方法的区别,主要是scheduleAtFixedRate方法的任务间隔是以任务起始时间算,scheduleWithFixedDelay方法是以任务结束时间算的。

package com.test.concurrent;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

class ScheduledTask1 implements Runnable{
    AtomicInteger index = new AtomicInteger(0);
    public void run() {
        int tmp_index = index.addAndGet(1);
        System.out.println("Thread name:"+Thread.currentThread().getName()+". Task1 begin running. index:"+tmp_index+" Current time:"+System.currentTimeMillis());
        try {
            Thread.sleep(10*1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("Task1 end running. index:"+tmp_index+" Current time:"+System.currentTimeMillis());
    }
}

class ScheduledTask2 implements Runnable{
    AtomicInteger index = new AtomicInteger(100);
    public void run() {
        int tmp_index = index.addAndGet(1);
        System.out.println("Thread name:"+Thread.currentThread().getName()+". Task2 begin running. index:"+tmp_index+" Current time:"+System.currentTimeMillis());
        try {
            Thread.sleep(10*1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("Task2 end running. index:"+tmp_index+" Current time:"+System.currentTimeMillis());
    }
}

class ScheduledTask3 implements Runnable{
    AtomicInteger index = new AtomicInteger(1000);
    public void run() {
        int tmp_index = index.addAndGet(1);
        System.out.println("Thread name:"+Thread.currentThread().getName()+". Task3 begin running. index:"+tmp_index+" Current time:"+System.currentTimeMillis());
        try {
            Thread.sleep(10*1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("Task3 end running. index:"+tmp_index+" Current time:"+System.currentTimeMillis());
    }
}

public class ScheduledPool {
    static public void addTask() throws InterruptedException {
        ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
        executor.schedule(new ScheduledTask1(), 20, TimeUnit.SECONDS);
        executor.scheduleAtFixedRate(new ScheduledTask2(), 0, 5, TimeUnit.SECONDS);

        ScheduledExecutorService executor2 = Executors.newScheduledThreadPool(1);
        executor2.scheduleWithFixedDelay(new ScheduledTask3(), 0, 5, TimeUnit.SECONDS);

        Thread.sleep(50*1000);
        executor.shutdown();
        executor.shutdown();
    }

    public static void main( String[] args ) throws InterruptedException {
        addTask();
    }
}

上面代码,运行结果如下:

Thread name:pool-1-thread-1. Task2 begin running. index:101 Current time:1468637711181
Thread name:pool-2-thread-1. Task3 begin running. index:1001 Current time:1468637711182
Task2 end running. index:101 Current time:1468637721183
Thread name:pool-1-thread-1. Task2 begin running. index:102 Current time:1468637721183
Task3 end running. index:1001 Current time:1468637721183
Thread name:pool-2-thread-1. Task3 begin running. index:1002 Current time:1468637726184
Task2 end running. index:102 Current time:1468637731184
Thread name:pool-1-thread-1. Task2 begin running. index:103 Current time:1468637731184
Task3 end running. index:1002 Current time:1468637736185
Task2 end running. index:103 Current time:1468637741185
Thread name:pool-1-thread-1. Task2 begin running. index:104 Current time:1468637741185
Thread name:pool-2-thread-1. Task3 begin running. index:1003 Current time:1468637741185
Task2 end running. index:104 Current time:1468637751185
Thread name:pool-1-thread-1. Task1 begin running. index:1 Current time:1468637751186
Task3 end running. index:1003 Current time:1468637751186
Thread name:pool-2-thread-1. Task3 begin running. index:1004 Current time:1468637756187
Task1 end running. index:1 Current time:1468637761186
Task3 end running. index:1004 Current time:1468637766187

可以看出,ScheduledTask2任务,会在上次任务结束后,立马开始运行,而ScheduledTask3任务,则会在上次任务结束后,再等待5秒再运行,当executor线程池中,添加了一个新的阻塞任务后,则ScheduledTask2任务,会等待上个任务运行结束。同时,从上面的线程池名称可以看出,线程池中,线程的数量是固定的。

网友评论
<