Java 生产者与消费者案例 双指针操纵数据缓存区

简介

生产者和消费者共同操纵一个数据缓冲区,其中生成者向缓冲区中写入数据,消费者从缓冲区中读取数据。
本程序实现如下,生产者和消费者实现Runnable接口,不断写入或读取数据。多个生产者和消费者线程通过Executor进行管理。
对数据缓冲区的操作中,使用写入位置指针和读取位置指针分别标明写入或读取位置。

测试程序:

// 声明唯一的数据缓存区
DataCache cache = new DataCache();
// 生产者线程池
ExecutorService exec1 = Executors.newCachedThreadPool();
// 消费者线程池
ExecutorService exec2 = Executors.newCachedThreadPool();
for(int i=0; i<5; i++) {
    exec1.execute(new Producer(i, cache));
}
for(int i=0; i<5; i++) {
    exec2.execute(new Customer(i, cache));
}
exec1.shutdown();
exec2.shutdown();

数据缓冲区:

class DataCache {
    public int[] datas = new int[10];
    public int inPointer = 0; // 写入位置指针
    public int outPointer = 0; // 读取位置指针
    public int order = 0; // 测试使用,用于生成有序的数据

    // 翻转标记,当写指针先走到缓冲区的末尾时,回到为缓冲区的初始位置,
    // 复用已被读过的空间,此时打开翻转标记。当读指针走到缓冲区的末尾时,
    // 也回到缓冲区的初始位置,关闭翻转标记。
    public boolean reverse = false; 
}

生产者:

class Producer implements Runnable {
    private final int id;
    private DataCache cache;

    public Producer(int id, DataCache cache) {
        this.id = id;
        this.cache = cache;
    }

    /** 执行方法:循环执行写操作 */
    @Override
    public void run() {
        while(true) {
            try {
                this.write();
            } catch (InterruptedException e) {
                System.out.println("[@Writer#" + id + "]  is interrupted...");
            }
        }
    }

    /** 写操作,找到合适的写的位置,并写入数据 */
    public void write() throws InterruptedException {
        synchronized (this.cache) {
            if(this.cache.inPointer < this.cache.datas.length) {
                if(this.cache.reverse) {
                    if(this.cache.inPointer < this.cache.outPointer)
                        writeData();
                    else
                        this.cache.wait();
                } else {
                    writeData();
                }
            } else {
                this.cache.reverse = true;
                this.cache.inPointer = 0;
                if(this.cache.outPointer > 0) {
                    writeData();
                } else {
                    this.cache.wait();
                }
            }
        }
    }

    /** 写入数据 */
    public void writeData() {
        int pos = this.cache.inPointer++;
        int data = this.cache.order++;
        this.cache.datas[pos] = data;
        synchronized (this.cache) {
            this.cache.notifyAll();
        }
        System.out.println("[Writer#" + id + "]  write in [ " + pos + ", " + data + " ]");
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            System.out.println("[Writer#" + id + "]  is interrupted...");
        }
    }
}

消费者:

class Customer implements Runnable {
    private final int id;
    private DataCache cache;

    public Customer(int id, DataCache cache) {
        this.id = id;
        this.cache = cache;
    }

    /** 执行方法:循环执行读操作 */
    @Override
    public void run() {
        while(true) {
            try {
                this.read();
            } catch (InterruptedException e) {
                System.out.println("[@Reader#" + id + "]  is interrupted...");
            }
        }
    }

    /** 读操作,找到合适的读的位置,并读出数据 */
    public void read() throws InterruptedException {
        synchronized (this.cache) {
            if(this.cache.outPointer < this.cache.datas.length) {
                if(this.cache.reverse) {
                    readData();
                } else {
                    if(this.cache.outPointer < this.cache.inPointer)
                        readData();
                    else
                        this.cache.wait();
                }
            } else {
                this.cache.reverse = false;
                this.cache.outPointer = 0;
                if(this.cache.inPointer > 0)
                    readData();
                else
                    this.cache.wait();
            }
        }
    }

    /** 读出数据 */
    public void readData() {
        int pos = this.cache.outPointer++;
        int data = this.cache.datas[pos];
        System.out.println("[Reader#" + id + "]  read in [ " + pos + ", " + data + " ]");
        synchronized (this.cache) {
            this.cache.notifyAll();
        }
        try {
            Thread.sleep(1500);
        } catch (InterruptedException e) {
            System.out.println("[Reader#" + id + "]  is interrupted...");
        }
    }
}

运行结果

[Writer #0 ]  write in [ 0, 0 ]
[Reader #4 ]  read in [ 0, 0 ]
[Writer #4 ]  write in [ 1, 1 ]
[Writer #4 ]  write in [ 2, 2 ]
[Writer #3 ]  write in [ 3, 3 ]
[Writer #2 ]  write in [ 4, 4 ]
[Writer #2 ]  write in [ 5, 5 ]
[Writer #2 ]  write in [ 6, 6 ]
[Writer #2 ]  write in [ 7, 7 ]
[Writer #2 ]  write in [ 8, 8 ]
[Writer #2 ]  write in [ 9, 9 ]
[Writer #1 ]  write in [ 0, 10 ]
[Reader #0 ]  read in [ 1, 1 ]
[Reader #0 ]  read in [ 2, 2 ]
[Reader #0 ]  read in [ 3, 3 ]
[Reader #1 ]  read in [ 4, 4 ]
[Reader #1 ]  read in [ 5, 5 ]
[Reader #1 ]  read in [ 6, 6 ]
[Reader #2 ]  read in [ 7, 7 ]
[Reader #2 ]  read in [ 8, 8 ]
[Reader #2 ]  read in [ 9, 9 ]
[Writer #0 ]  write in [ 1, 11 ]
[Reader #2 ]  read in [ 0, 10 ]
[Reader #4 ]  read in [ 1, 11 ]
[Writer #4 ]  write in [ 2, 12 ]
[Writer #4 ]  write in [ 3, 13 ]
[Writer #4 ]  write in [ 4, 14 ]
[Writer #4 ]  write in [ 5, 15 ]
[Writer #4 ]  write in [ 6, 16 ]
[Writer #4 ]  write in [ 7, 17 ]
... CTRL+C ...

猜你喜欢

转载自blog.csdn.net/goldlone/article/details/81144495
今日推荐