Concurrent - Phaser - arrive()

原创转载请注明出处:http://agilestyle.iteye.com/blog/2344634

arrive()

arrive()作用是使parites值加1,并且不在屏障处等待,直接运行下面的代码,并且Phaser有计数重置功能


PhaserTest8.java

package org.fool.java.concurrent.phaser;

import java.util.concurrent.Phaser;

public class PhaserTest8 {
    public static void main(String[] args) {
        Phaser phaser = new Phaser(2) {
            @Override
            protected boolean onAdvance(int phase, int registeredParties) {
                System.out.println("phase=" + phase + " registeredParties=" + registeredParties);
                return super.onAdvance(phase, registeredParties);
            }
        };

        System.out.println("A1 phaser.getPhase()=" + phaser.getPhase() +
                "\tphaser.getRegisteredParties()=" + phaser.getRegisteredParties() +
                "\tphaser.getArrivedParties()=" + phaser.getArrivedParties());

        phaser.arrive();

        System.out.println("A1 phaser.getPhase()=" + phaser.getPhase() +
                "\tphaser.getRegisteredParties()=" + phaser.getRegisteredParties() +
                "\tphaser.getArrivedParties()=" + phaser.getArrivedParties());

        System.out.println("A2 phaser.getPhase()=" + phaser.getPhase() +
                "\tphaser.getRegisteredParties()=" + phaser.getRegisteredParties() +
                "\tphaser.getArrivedParties()=" + phaser.getArrivedParties());

        phaser.arrive();

        System.out.println("A2 phaser.getPhase()=" + phaser.getPhase() +
                "\tphaser.getRegisteredParties()=" + phaser.getRegisteredParties() +
                "\tphaser.getArrivedParties()=" + phaser.getArrivedParties());

        //--------------------------------------------------------------------------
        System.out.println("B1 phaser.getPhase()=" + phaser.getPhase() +
                "\tphaser.getRegisteredParties()=" + phaser.getRegisteredParties() +
                "\tphaser.getArrivedParties()=" + phaser.getArrivedParties());

        phaser.arrive();

        System.out.println("B1 phaser.getPhase()=" + phaser.getPhase() +
                "\tphaser.getRegisteredParties()=" + phaser.getRegisteredParties() +
                "\tphaser.getArrivedParties()=" + phaser.getArrivedParties());

        System.out.println("B2 phaser.getPhase()=" + phaser.getPhase() +
                "\tphaser.getRegisteredParties()=" + phaser.getRegisteredParties() +
                "\tphaser.getArrivedParties()=" + phaser.getArrivedParties());

        phaser.arrive();

        System.out.println("B2 phaser.getPhase()=" + phaser.getPhase() +
                "\tphaser.getRegisteredParties()=" + phaser.getRegisteredParties() +
                "\tphaser.getArrivedParties()=" + phaser.getArrivedParties());

        //--------------------------------------------------------------------------
        System.out.println("C1 phaser.getPhase()=" + phaser.getPhase() +
                "\tphaser.getRegisteredParties()=" + phaser.getRegisteredParties() +
                "\tphaser.getArrivedParties()=" + phaser.getArrivedParties());

        phaser.arrive();

        System.out.println("C1 phaser.getPhase()=" + phaser.getPhase() +
                "\tphaser.getRegisteredParties()=" + phaser.getRegisteredParties() +
                "\tphaser.getArrivedParties()=" + phaser.getArrivedParties());

        System.out.println("C2 phaser.getPhase()=" + phaser.getPhase() +
                "\tphaser.getRegisteredParties()=" + phaser.getRegisteredParties() +
                "\tphaser.getArrivedParties()=" + phaser.getArrivedParties());

        phaser.arrive();

        System.out.println("C2 phaser.getPhase()=" + phaser.getPhase() +
                "\tphaser.getRegisteredParties()=" + phaser.getRegisteredParties() +
                "\tphaser.getArrivedParties()=" + phaser.getArrivedParties());
    }
}

Run


Note:

arrive()功能是使getArrivedParties()计数加1,不等待其他线程到达屏障。

控制台中多次出现phaser.getArrivedParties()=0的运行结果,可以分析出Phaser在经过屏障点后计数被重置。

PhaserTest9.java

package org.fool.java.concurrent.phaser;

import java.util.concurrent.Phaser;

public class PhaserTest9 {
    public static class Service {
        private Phaser phaser;

        public Service(Phaser phaser) {
            this.phaser = phaser;
        }

        public void testMethodA() {
            try {
                System.out.println(Thread.currentThread().getName() + " begin A1 " + System.currentTimeMillis());

                Thread.sleep(3000);

                System.out.println(Thread.currentThread().getName() + "\tphaser.getArrivedParties()=" + phaser.getArrivedParties());

                phaser.arriveAndAwaitAdvance();

                System.out.println(Thread.currentThread().getName() + " end A1 " + System.currentTimeMillis());


                System.out.println(Thread.currentThread().getName() + " begin A2 " + System.currentTimeMillis());

                Thread.sleep(3000);

                System.out.println(Thread.currentThread().getName() + "\tphaser.getArrivedParties()=" + phaser.getArrivedParties());

                phaser.arriveAndAwaitAdvance();

                System.out.println(Thread.currentThread().getName() + " end A2 " + System.currentTimeMillis());


                System.out.println(Thread.currentThread().getName() + " begin A3 " + System.currentTimeMillis());

                Thread.sleep(3000);

                System.out.println(Thread.currentThread().getName() + "\tphaser.getArrivedParties()=" + phaser.getArrivedParties());

                phaser.arriveAndAwaitAdvance();

                System.out.println(Thread.currentThread().getName() + " end A3 " + System.currentTimeMillis());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        public void testMethodB() {
            System.out.println(Thread.currentThread().getName() + " begin A1 " + System.currentTimeMillis());

            phaser.arrive();

            System.out.println(Thread.currentThread().getName() + " end A1 " + System.currentTimeMillis());


            System.out.println(Thread.currentThread().getName() + " begin A2 " + System.currentTimeMillis());

            phaser.arrive();

            System.out.println(Thread.currentThread().getName() + " end A2 " + System.currentTimeMillis());


            System.out.println(Thread.currentThread().getName() + " begin A3 " + System.currentTimeMillis());

            phaser.arrive();

            System.out.println(Thread.currentThread().getName() + " end A3 " + System.currentTimeMillis());
        }
    }

    public static class ThreadA implements Runnable {

        private Service service;

        public ThreadA(Service service) {
            this.service = service;
        }

        @Override
        public void run() {
            service.testMethodA();
        }
    }

    public static class ThreadB implements Runnable {

        private Service service;

        public ThreadB(Service service) {
            this.service = service;
        }

        @Override
        public void run() {
            service.testMethodA();
        }
    }

    public static class ThreadC implements Runnable {

        private Service service;

        public ThreadC(Service service) {
            this.service = service;
        }

        @Override
        public void run() {
            service.testMethodB();
        }
    }

    public static void main(String[] args) {
        Phaser phaser = new Phaser(3);

        Service service = new Service(phaser);

        Thread t1 = new Thread(new ThreadA(service));
        Thread t2 = new Thread(new ThreadB(service));
        Thread t3 = new Thread(new ThreadC(service));

        t1.start();
        t2.start();
        t3.start();
    }
}

Run


Note:

线程2在parties计数达到3后自动重置为0,线程0和1由于达不到parties为3的情况,所以依然呈等待状态。 

Reference

Java并发编程核心方法与框架 

猜你喜欢

转载自agilestyle.iteye.com/blog/2344634