java 线程 生产者-消费者与队列,任务间使用管道进行输入、输出 讲解示例 --thinking java4
package org.rui.thread.block2; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.SynchronousQueue; import org.rui.thread.LiftOff; /** * 生产者-消费者与队列 * * @author lenovo * */ class LiftOffRunner implements Runnable { private BlockingQueue<LiftOff> rockets; public LiftOffRunner(BlockingQueue<LiftOff> b) { rockets = b; } //添加一个任务到队列 public void add(LiftOff lo) { //将指定元素插入此队列中(如果立即可行且不会违反容量限制), try { rockets.put(lo); } catch (InterruptedException e) { e.printStackTrace(); } } @Override public void run() { try { while (!Thread.interrupted()) { // 获取并移除此队列的头部,在元素变得可用之前一直等待(如果有必要)。 LiftOff rocket = rockets.take(); rocket.run(); } } catch (InterruptedException e) { System.out.println("中断退出"); } System.out.println("x exiting liftOffRunner"); } } public class TestBlockingQueues { static void getkey() { try { // compensate for windows/linux difference in the // 回车键产生的结果 new BufferedReader(new InputStreamReader(System.in)).readLine(); } catch (IOException e) { e.printStackTrace(); } } static void getkey(String message) { System.out.println(message); getkey(); } static void tets(String msg, BlockingQueue<LiftOff> queue) { System.out.println(msg); LiftOffRunner runner = new LiftOffRunner(queue); //启动一个线程 Thread t = new Thread(runner); t.start(); for (int i = 0; i < 5; i++) { //加入任务到LiftOffRunner队列中 runner.add(new LiftOff(5)); } //输入控制台 getkey("press 'enter' (" + msg + ")"); t.interrupt(); System.out.println(" 完了 " + msg + "test"); } public static void main(String[] args) { tets("LinkedBlockingQueue", new LinkedBlockingQueue<LiftOff>());// unlimited // size tets("ArrayBlockingQueue", new ArrayBlockingQueue<LiftOff>(3));// fied // size tets("SynchronousQueue", new SynchronousQueue<LiftOff>());// size of 1 } }
package org.rui.thread.block2; import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; /** * 吐司BlockingQueue * @author lenovo * */ class Toast { public enum Status { DRY/* 干的 */, BUTTERED/* 涂黄油 */, JAMMED// 果酱 } private Status status = Status.DRY; private final int id; public Toast(int idn) { id = idn; } public void butter() { status = Status.BUTTERED; } public void jam() { status = Status.JAMMED; } public Status getStatus() { return status; } public int getId() { return id; } public String toString() { return "Toast " + id + ":" + status; } } /** * 吐司队列 * * @author lenovo * */ class ToastQueue extends LinkedBlockingQueue<Toast> { } class Toaster implements Runnable { private ToastQueue toastQueue; private int count = 0; private Random rand = new Random(47); public Toaster(ToastQueue tq) { toastQueue = tq; } @Override public void run() { try { while (!Thread.interrupted()) { TimeUnit.MILLISECONDS.sleep(100 + rand.nextInt(500)); // 制作 toast Toast t = new Toast(count++); System.out.println(t); // insert into queue toastQueue.put(t); } } catch (InterruptedException e) { System.out.println("Toaster interrupted"); } System.out.println("toaster off"); } } // apply butter to toast class Butterer implements Runnable { private ToastQueue dryQueue, butteredQueue; public Butterer(ToastQueue dry, ToastQueue buttered) { dryQueue = dry; butteredQueue = buttered; } @Override public void run() { try { while (!Thread.interrupted()) { // blocks until next piece of toast is available 块,直到下一块面包 Toast t = dryQueue.take(); t.butter(); System.out.println(t); butteredQueue.put(t); } } catch (InterruptedException e) { System.out.println("涂黄油 interrupted"); } System.out.println("涂黄油 off"); } } // apply jam to buttered toast class Jammer implements Runnable { private ToastQueue butteredQueue, finishedQueue; public Jammer(ToastQueue butteredQueue, ToastQueue finishedQueue) { this.butteredQueue = butteredQueue; this.finishedQueue = finishedQueue; } @Override public void run() { try { while (!Thread.interrupted()) { // blocks until next piece of toast is available 块,直到下一块面包 Toast t = butteredQueue.take(); t.jam(); System.out.println(t); finishedQueue.put(t); } } catch (InterruptedException e) { System.out.println("涂果酱 interrupted"); } System.out.println("涂果酱 off"); } } // ////使用烤面包 consume the toast class Eater implements Runnable { private ToastQueue finishedQueue; private int counter = 0; public Eater(ToastQueue finished) { finishedQueue = finished; } @Override public void run() { try { while (!Thread.interrupted()) { Toast t = finishedQueue.take(); // verify that the toast is coming in order 确认面包来了 // and that all pieces are getting jammed ,所有碎片越来越挤 if (t.getId() != counter++ || t.getStatus() != Toast.Status.JAMMED) { System.out.println("===>>>>error" + t); System.exit(1); } else { System.out.println("吃!" + t); } } } catch (InterruptedException e) { System.out.println("食者 interrupted"); } System.out.println(" 食者 off"); } } /** * main * * @author lenovo * */ public class ToastOMatic { public static void main(String[] args) throws InterruptedException { ToastQueue dryQueue = new ToastQueue(); ToastQueue butteredQueue = new ToastQueue(); ToastQueue finishedQueue = new ToastQueue(); ExecutorService exec = Executors.newCachedThreadPool(); exec.execute(new Toaster(dryQueue));//烤面包 exec.execute(new Butterer(dryQueue, butteredQueue));//涂黄油 exec.execute(new Jammer(butteredQueue, finishedQueue));//上果酱 exec.execute(new Eater(finishedQueue));//吃 TimeUnit.SECONDS.sleep(5); exec.shutdownNow(); } } /**output: Toast 0:DRY Toast 0:BUTTERED Toast 0:JAMMED 吃!Toast 0:JAMMED Toast 1:DRY Toast 1:BUTTERED Toast 1:JAMMED 吃!Toast 1:JAMMED Toast 2:DRY Toast 2:BUTTERED Toast 2:JAMMED 吃!Toast 2:JAMMED ... ... Toast 10:DRY Toast 10:BUTTERED Toast 10:JAMMED 吃!Toast 10:JAMMED Toast 11:DRY Toast 11:BUTTERED Toast 11:JAMMED 吃!Toast 11:JAMMED Toast 12:DRY Toast 12:BUTTERED Toast 12:JAMMED 吃!Toast 12:JAMMED Toast 13:DRY Toast 13:BUTTERED Toast 13:JAMMED 吃!Toast 13:JAMMED Toast 14:DRY Toast 14:BUTTERED Toast 14:JAMMED 吃!Toast 14:JAMMED 食者 interrupted Toaster interrupted 食者 off 涂果酱 interrupted 涂果酱 off 涂黄油 interrupted 涂黄油 off toaster off */
package org.rui.thread.block2; import java.io.IOException; import java.io.PipedReader; import java.io.PipedWriter; import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** * 任务间使用管道进行输入、输出 * * @author lenovo * */ class Sender implements Runnable { private Random rand = new Random(47); private PipedWriter out = new PipedWriter(); public PipedWriter getPipedWriter() { return out; } @Override public void run() { try { while (true) { for (char c = 'A'; c <= 'z'; c++) { out.write(c); TimeUnit.MILLISECONDS.sleep(rand.nextInt(500)); } } } catch (IOException e) { System.out.println(e + " sender write Exception"); } catch (InterruptedException e) { System.out.println(e + " sender sleep interrupted"); } } } class Receiver implements Runnable { private PipedReader in; public Receiver(Sender sender) throws IOException { in = new PipedReader(sender.getPipedWriter()); } @Override public void run() { try { while (true) { // blocks until characters are there System.out.println("Read:" + (char) in.read() + ","); } } catch (IOException e) { System.out.println(e+"receiver read execption"); } } } public class PipedIO { // 接收器 Receiver public static void main(String[] args) throws IOException, InterruptedException { Sender sender = new Sender(); Receiver receiver = new Receiver(sender); ExecutorService exec=Executors.newCachedThreadPool(); exec.execute(sender); exec.execute(receiver); TimeUnit.SECONDS.sleep(4); exec.shutdownNow(); } } /**outpt: Read:A, Read:B, Read:C, Read:D, Read:E, Read:F, Read:G, Read:H, Read:I, Read:J, Read:K, Read:L, Read:M, Read:N, Read:O, Read:P, java.lang.InterruptedException: sleep interrupted sender sleep interrupted Read:Q, java.io.IOException: Write end deadreceiver read execption */
郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。