Thursday, April 4, 2013

Blocking Queue Implementation

public class MyBlockingQueue<T> {

private Queue<T> queue;
private AtomicInteger limit = new AtomicInteger(10);
private Lock put_lock = new ReentrantLock();
private Lock take_lock = new ReentrantLock();

public MyBlockingQueue(AtomicInteger limit){
queue = new LinkedList<T>();
this.limit = limit;
}

public boolean put(T item) throws InterruptedException{
put_lock.lockInterruptibly();
try{
while(queue.size() == limit.get()){
put_lock.newCondition().await();
}
put_lock.newCondition().signal();
queue.add(item);
}finally{
put_lock.unlock();
}

return true;
}

public T take() throws InterruptedException{
take_lock.lockInterruptibly();
try{
while(queue.size() == 0){
take_lock.newCondition().await();
}
take_lock.newCondition().signal();
return queue.poll();
}finally {
take_lock.unlock();
}
}
}

No comments: