Java多线程之Exchanger

一. 概念

         Exchanger能在两个线程驱动的任务之间交换对象。交换之前,A任务持有D1对象,B任务持有D2对象。交换之后,A任务持有D2对象,B任务持有D1对象。

        Exchanger可以使用的场景: 

        1. 大数据量、多步骤执行的任务 

            比如把整个任务分割成了n段。第一段子任务在执行到某种程度后,执行Exchanger exchange( )操作,把已经加工好的数据传递给第二段子任务,每段任务由不同的线程来驱动,整体上加快了任务的处理速度。

        2. 基因算法

二. 使用方式

       1. Exchanger( )  

           构造函数,创建一个新的Exchanger对象

/**
 * Creates a new Exchanger.
 */
public Exchanger() {
   participant = new Participant();
}

       2. exchange(V x)

          互换对象。两个任务必须同时调用exchange( )互换才会成功。当一个任务调用了Exchanger对象的exchange( )后,它将被阻塞住,直到另一个任务也调用了同一个Exchanger对象的exchange( )方法,此时x对象的互换操作才会成功

三. 案例

      本例中创建了一个生产者和消费者,生产者生产Fat对象,消费者消费Fat对象,对象之间借助Exchanger,互换List<Fat>集合。

     1. 生产者

    private Generator<T> generator;
    private Exchanger<List<T>> exchanger;
    private List<T> holder;

    public ExchangerProducer(Exchanger<List<T>> exchanger, Generator<T> gen, List<T> holder) {
        this.exchanger = exchanger;
        this.generator = gen;
        this.holder = holder;
    }
    @Override
    public void run() {
        try {
            while(!Thread.interrupted()) {
                for(int i = 0; i < ExchangerDemo.size; i++) {
                    holder.add(generator.next());
                }
                holder = exchanger.exchange(holder);
            }
        } catch (InterruptedException e) {
            System.out.println("OK to terminate this way");
        }
    }
}

    2. 消费者

class ExchangerConsumer<T> implements Runnable {
    private Exchanger<List<T>> exchanger;
    private List<T> holder;
    private volatile T value;
    public ExchangerConsumer(Exchanger<List<T>> exchanger, List<T> holder) {
        this.exchanger = exchanger;
        this.holder = holder;
    }
    @Override
    public void run() {
        try {
            while(!Thread.interrupted()) {
                holder = exchanger.exchange(holder);
                for (T x : holder) {
                    // 此处在遍历列表时,移除了元素。为了不报错ConcurrentModificationException,
                    // 所以使用CopyOnWriteArrayList
                    value = x;
                    holder.remove(x);
                }
            }
        }catch (InterruptedException e) {
            System.out.println("OK to terminate this way");
        }
        System.out.println("Final value: " + value);
    }
}

    3. main方法

public class ExchangerDemo {
    static int size = 10;
    static int delay = 5;

    public static void main(String[] args) throws Exception{
        if(args.length > 0) {
            size = Integer.parseInt(args[0]);
        }
        if(args.length > 1) {
            delay = Integer.parseInt(args[1]);
        }
        ExecutorService exec = Executors.newCachedThreadPool();
        Exchanger<List<Fat>> xc = new Exchanger<>();
        List<Fat> producerList = new CopyOnWriteArrayList<>();
        List<Fat> consumerList = new CopyOnWriteArrayList<>();
        exec.execute(new ExchangerProducer<>(xc, BasicGenerator.create(Fat.class), producerList));
        exec.execute(new ExchangerConsumer<>(xc, consumerList));
        TimeUnit.SECONDS.sleep(delay);
        exec.shutdownNow();
    }
}

     4. 执行结果

OK to terminate this way
Final value: Fat id:90189
OK to terminate this way
发布了45 篇原创文章 · 获赞 13 · 访问量 1万+

猜你喜欢

转载自blog.csdn.net/miaomiao19971215/article/details/104117795