001: /*
002: * Copyright Aduna (http://www.aduna-software.com/) (c) 2008.
003: *
004: * Licensed under the Aduna BSD-style license.
005: */
006: package org.openrdf.sail.rdbms.managers.helpers;
007:
008: import java.util.AbstractQueue;
009: import java.util.Arrays;
010: import java.util.Collection;
011: import java.util.Iterator;
012: import java.util.LinkedHashSet;
013: import java.util.concurrent.BlockingQueue;
014: import java.util.concurrent.TimeUnit;
015:
016: import org.openrdf.sail.rdbms.schema.Batch;
017:
018: /**
019: *
020: * @author James Leigh
021: */
022: public class BatchBlockingQueue extends AbstractQueue<Batch> implements
023: BlockingQueue<org.openrdf.sail.rdbms.schema.Batch> {
024:
025: private LinkedHashSet<Batch> queue;
026:
027: private int capacity;
028:
029: private int size;
030:
031: public BatchBlockingQueue(int capacity) {
032: queue = new LinkedHashSet<Batch>(capacity / 16);
033: this .capacity = capacity;
034: }
035:
036: @Override
037: public boolean remove(Object o) {
038: synchronized (queue) {
039: if (queue.remove(o)) {
040: size -= ((Batch) o).size();
041: queue.notify();
042: return true;
043: }
044: return false;
045: }
046: }
047:
048: @Override
049: public Iterator<Batch> iterator() {
050: synchronized (queue) {
051: Batch[] array = queue.toArray(new Batch[queue.size()]);
052: return Arrays.asList(array).iterator();
053: }
054: }
055:
056: @Override
057: public int size() {
058: return queue.size();
059: }
060:
061: public boolean offer(Batch e) {
062: synchronized (queue) {
063: boolean added = queue.add(e);
064: size += e.size();
065: queue.notify();
066: return added;
067: }
068: }
069:
070: public Batch peek() {
071: synchronized (queue) {
072: return queue.iterator().next();
073: }
074: }
075:
076: public Batch poll() {
077: synchronized (queue) {
078: Iterator<Batch> iter = queue.iterator();
079: if (iter.hasNext()) {
080: Batch e = iter.next();
081: iter.remove();
082: size -= e.size();
083: queue.notify();
084: return e;
085: }
086: return null;
087: }
088: }
089:
090: public int drainTo(Collection<? super Batch> c) {
091: synchronized (queue) {
092: return drainTo(c, queue.size());
093: }
094: }
095:
096: public int drainTo(Collection<? super Batch> c, int n) {
097: synchronized (queue) {
098: Iterator<Batch> iter = queue.iterator();
099: int i;
100: for (i = 0; i < n && iter.hasNext(); i++) {
101: Batch next = iter.next();
102: c.add(next);
103: iter.remove();
104: size -= next.size();
105: queue.notify();
106: }
107: return i;
108: }
109: }
110:
111: public boolean offer(Batch e, long timeout, TimeUnit unit)
112: throws InterruptedException {
113: return offer(e);
114: }
115:
116: public Batch poll(long timeout, TimeUnit unit)
117: throws InterruptedException {
118: return poll();
119: }
120:
121: public void put(Batch e) throws InterruptedException {
122: synchronized (queue) {
123: while (size >= capacity) {
124: queue.wait();
125: }
126: queue.add(e);
127: size += e.size();
128: queue.notify();
129: }
130: }
131:
132: public int remainingCapacity() {
133: return Integer.MAX_VALUE;
134: }
135:
136: public Batch take() throws InterruptedException {
137: synchronized (queue) {
138: while (queue.isEmpty()) {
139: queue.wait();
140: }
141: Iterator<Batch> iter = queue.iterator();
142: Batch e = iter.next();
143: iter.remove();
144: size -= e.size();
145: queue.notify();
146: return e;
147: }
148: }
149: }
|