001: /*
002: * $Id: QueueInfo.java 8077 2007-08-27 20:15:25Z aperepel $
003: * --------------------------------------------------------------------------------------
004: * Copyright (c) MuleSource, Inc. All rights reserved. http://www.mulesource.com
005: *
006: * The software in this package is published under the terms of the CPAL v1.0
007: * license, a copy of which has been included with this distribution in the
008: * LICENSE.txt file.
009: */
010:
011: package org.mule.util.queue;
012:
013: import java.util.LinkedList;
014:
015: /**
016: * Stores information about a Queue
017: */
018: public class QueueInfo {
019: protected LinkedList list;
020: protected String name;
021: protected QueueConfiguration config;
022:
023: public boolean equals(Object obj) {
024: return (obj instanceof QueueInfo && name
025: .equals(((QueueInfo) obj).name));
026: }
027:
028: public int hashCode() {
029: return name.hashCode();
030: }
031:
032: public void putNow(Object o) {
033: synchronized (list) {
034: list.addLast(o);
035: list.notifyAll();
036: }
037: }
038:
039: public boolean offer(Object o, int room, long timeout)
040: throws InterruptedException {
041: if (Thread.interrupted()) {
042: throw new InterruptedException();
043: }
044: synchronized (list) {
045: if (config.capacity > 0) {
046: if (config.capacity <= room) {
047: throw new IllegalStateException(
048: "Can not add more objects than the capacity in one time");
049: }
050: long l1 = timeout > 0L ? System.currentTimeMillis()
051: : 0L;
052: long l2 = timeout;
053: while (list.size() >= config.capacity - room) {
054: if (l2 <= 0L) {
055: return false;
056: }
057: list.wait(l2);
058: l2 = timeout - (System.currentTimeMillis() - l1);
059: }
060: }
061: if (o != null) {
062: list.addLast(o);
063: }
064: list.notifyAll();
065: return true;
066: }
067: }
068:
069: public Object poll(long timeout) throws InterruptedException {
070: if (Thread.interrupted()) {
071: throw new InterruptedException();
072: }
073: synchronized (list) {
074: long l1 = timeout > 0L ? System.currentTimeMillis() : 0L;
075: long l2 = timeout;
076: while (list.isEmpty()) {
077: if (l2 <= 0L) {
078: return null;
079: }
080: list.wait(l2);
081: l2 = timeout - (System.currentTimeMillis() - l1);
082: }
083: Object o = list.removeFirst();
084: list.notifyAll();
085: return o;
086: }
087: }
088:
089: public Object peek() throws InterruptedException {
090: if (Thread.interrupted()) {
091: throw new InterruptedException();
092: }
093: synchronized (list) {
094: if (list.isEmpty()) {
095: return null;
096: } else {
097: return list.getFirst();
098: }
099: }
100: }
101:
102: }
|