Java多线程间的通信问题扩展

到底什么是线程间的通信?

线程间需要通过一些协调性的通信,实现共同完成同一件任务,简单说就是,你想做这件事,我也想做这件事,我们两个相互交流,共同做这件事,而共同任务的同步性的实现,就必须通过同步锁,每一个对象实例都有自己的一把锁,当一个线程想要对这个任务进行处理的时候,就必须获得这把锁。线程之间锁的释放与获取,是通过Object类中的wait()/notify()方法实现的。wait()方法是将当前拥有锁的线程至于等待状态让其释放锁,而notify()方法是唤醒其他线程使其具备执行资格,过来拿这个锁,拿到这个锁后才具备了执行权。

需要注意的是,一定要确保wait()方法是出于同步区域中,才会拥有锁,而同样至于同步区域的Thread.sleep()方法,只会让线程在指定的时间段内睡眠,而不会释放锁


线程间的通信主要通过两种方式:

1.共享内存

初次学习的时候,已经写过,写个简单点的

public class Main{
	public static void main(String[] args){
		MemoryThread MyThread = new MemoryThread();
		MyThread.getThread().start();
		MyThread.getThread().start();
	}
}
class MemoryThread {
	private int data;
	public MemoryThread() {
		// TODO Auto-generated constructor stub
		data = 0;
	}
	public class Inside implements Runnable{

		public void run() {
			while(true){
				synchronized (this) {
					System.out.println(Thread.currentThread().getName()+"--:--"+(++data));
				}
			}	
		}
	}
	public Thread getThread(){
		return new Thread(new Inside());
	}
}


ps:共享内存这中通信方式,存在不稳定性,有时候会出现问题


2.管道流

通过管道流实现线程间的通信,主要是将管道流连接到线程。

class Producer implements Runnable{

	private PipedOutputStream pout = null;
	public Producer(PipedOutputStream pout) {
		
		this.pout = pout;
	}
	public void run() {
		int x = 1;
		// TODO Auto-generated method stub
		synchronized (this) {
			
			while(true){	
				synchronized (this) {
						try {
							System.out.println("Producer set "+x);
						pout.write(x);
					} catch (Exception e) {
						e.printStackTrace();
					}
				}
			}			
		}	
	}
}
class Customer implements Runnable{

	private PipedInputStream pin = null;
	public Customer(PipedInputStream pin) {
		
		this.pin = pin;
	}
	public void run() {
		
			while(true){
				synchronized(this){
				try {
					System.out.println("Customer get "+pin.read());
				} catch (Exception e) {
					// TODO: handle exception
				}
				
			}
		}
		
	}	
}
public class Main{
	
	public static void main(String[] args) {
		
		PipedInputStream pin = new PipedInputStream();
		PipedOutputStream pot = new PipedOutputStream();
		try{
			pot.connect(pin);
		}catch(Exception e){
			e.printStackTrace();
			
		}
		new Thread(new Producer(pot)).start();
		new Thread(new Customer(pin)).start();
		
	}
}

ps:因为管道流的read方法是阻塞式的,数据的同步和并发会出现问题


由线程间的通信问题进而可以提升到多生产者多消费者问题

多生产者多消费者问题解决方法一般分为两种:

1.采用一种机制实现生产者/消费者间的同步(常用,效率高)

2.采用管道流的方式

解决同步问题,一般都是synchronized函数或代码块+while+notifyAll的方式,接口Lock的出现,有了更加灵活的操作,可以支持多个监听器,以此来实现锁更广泛的操作


此处直接演示Lock的方式


import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * Created by kevy on 14-12-01.
 */

/**
 * 描述:仓库存储着货物,每个货物都对应一个编号
 * 仓库货满时,生产者不能存放货物。 仓库为空时,消费者不能拿走货物
 * 面向对象的思想考虑问题:涉及的对象,生产者,消费者,货物,仓库
 * 生产者,消费者对应着同一个仓库,仓库对应着不同的货物
 */
class Goods {
    private String name;
    private int num;
    Goods(String name,int num){
        this.name = name;
        this.num = num;
    }
    public String getName() {
        return name;
    }
    public int getNum() {
        return num;
    }
}
class StoreHouse{
    public final static int SIZE = 10;
    private Goods[] goodses = new Goods[SIZE];//定义仓库储存大小

    private Lock lock = new ReentrantLock();//创建锁
    private Condition Pro_lock = lock.newCondition();//创建生产者监视器
    private Condition Cus_lock = lock.newCondition();//创建消费者监视器
    private int putnum = 0,count = 0,takenum = 0;

    public void set(Goods good){
        lock.lock();
        try{
            while(count==SIZE){
                try{
                    Pro_lock.await();
                }catch (Exception e){
                    e.printStackTrace();
                }
            }
            Thread.sleep(100);//延时100
            goodses[putnum] = good;
            System.out.println(Thread.currentThread().getName()+"++++生产++++"+
                    good.getName()+good.getNum());
            if(++putnum==SIZE) putnum = 0;
            count++;
            Cus_lock.signal();

           }catch (Exception e){
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }

    public void out(){
        lock.lock();
        try{
            while(count==0){
                try{
                    Cus_lock.await();
                }catch(Exception e){
                    e.printStackTrace();
                }
            }
            Thread.sleep(100);//延时100
            Goods item = goodses[takenum];
            if(++takenum==SIZE) takenum = 0;
            --count;
            System.out.println(Thread.currentThread().getName()+"----消费----"+
                item.getName()+item.getNum());
            Pro_lock.signal();
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }
}
class Producer implements Runnable{
    private StoreHouse storeHouse = null;
    private String Goodname = null;
    Producer(StoreHouse storeHouse,String goodname){
        this.storeHouse = storeHouse;
        this.Goodname = goodname;
    }

    public void run() {
        int NUM = 1;
        while(true){
            storeHouse.set(new Goods(Goodname,NUM++));
        }
    }
}
class Customer implements Runnable{
    private StoreHouse storeHouse = null;
    Customer(StoreHouse storeHouse){
        this.storeHouse = storeHouse;
    }

    public void run() {
        while(true){
            storeHouse.out();
        }
    }
}
public class Main {
    public static StoreHouse storeHouse;
    public static void main(String[] args){

        storeHouse = new StoreHouse();
        Runnable p = new Producer(storeHouse,"卫生纸");
        Runnable p1 = new Producer(storeHouse,"Java技术-核心卷I");

        Runnable c = new Customer(storeHouse);
        Runnable c1 = new Customer(storeHouse);

        Thread proThread1 = new Thread(p);//生产者线程
        Thread proThread2 = new Thread(p1);
        Thread cusThread1 = new Thread(c);//消费者线程
        Thread cusThread2 = new Thread(c1);

        proThread1.start();  cusThread1.start();
        proThread2.start();  cusThread2.start();

    }
}








郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。