001: /*
002: * JBoss, Home of Professional Open Source.
003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004: * as indicated by the @author tags. See the copyright.txt file in the
005: * distribution for a full listing of individual contributors.
006: *
007: * This is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU Lesser General Public License as
009: * published by the Free Software Foundation; either version 2.1 of
010: * the License, or (at your option) any later version.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public
018: * License along with this software; if not, write to the Free
019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021: */
022: package org.jboss.mq.il.http;
023:
024: import java.util.ArrayList;
025: import java.util.HashMap;
026: import java.util.Iterator;
027: import java.util.Map;
028:
029: import org.jboss.logging.Logger;
030:
031: import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
032:
033: /**
034: * Stores requests on behalf of clients. This could of course, be done
035: * with a JMS queue, but I decided this would be a lighter weight solution.
036: *
037: * @author Nathan Phelps (nathan@jboss.org)
038: * @version $Revision: 57198 $
039: * @created January 15, 2003
040: */
041: public class HTTPClientILStorageQueue {
042:
043: private static Logger log = Logger
044: .getLogger(HTTPClientILStorageQueue.class);
045: private static HTTPClientILStorageQueue instance = null;
046: private Map map = new HashMap();
047: private Object queueLock = new Object();
048:
049: /* The static Id variable provides a mechanism for us to identify a particular
050: * ClientIL's requests in the map. This value is retrieved by the ClientILService
051: * init method, which sends an HTTPIL request to the servlet, which in turn
052: * asks this singleton instance for the next Id. If you are wondering why
053: * we need to do this, why we just can't use the *actual* connection Id,
054: * please see my comments in ClientILService.
055: */
056: private static long id = 100; //Start at 100 so we don't get confused with the Connection Ids.
057: private static Object idLock = new Object();
058:
059: private HTTPClientILStorageQueue() {
060: if (log.isTraceEnabled()) {
061: log.trace("created");
062: }
063: }
064:
065: public static synchronized HTTPClientILStorageQueue getInstance() {
066: if (log.isTraceEnabled()) {
067: log.trace("getInstance()");
068: }
069: if (instance == null) {
070: instance = new HTTPClientILStorageQueue();
071: }
072: return instance;
073: }
074:
075: public void put(HTTPILRequest request, String clientIlId)
076: throws InterruptedException {
077: if (log.isTraceEnabled()) {
078: log.trace("put(HTTPILRequest " + request.toString()
079: + ", String " + clientIlId + ")");
080: }
081: if (clientIlId == null) {
082: log
083: .warn("A request was put in a storage queue for a null ClientIl.");
084:
085: return;
086: }
087: synchronized (this .queueLock) {
088: if (this .map.containsKey(clientIlId)) {
089: if (log.isDebugEnabled()) {
090: log
091: .debug("ClientIL #"
092: + clientIlId
093: + " has existing storage queue, adding request to it.");
094: }
095: LinkedQueue queue = (LinkedQueue) this .map
096: .get(clientIlId);
097: queue.put(request);
098: } else {
099: if (log.isDebugEnabled()) {
100: log
101: .debug("ClientIL #"
102: + clientIlId
103: + " doesn't have a storage queue. Creating one and adding the request.");
104: }
105: LinkedQueue queue = new LinkedQueue();
106: queue.put(request);
107: this .map.put(clientIlId, queue);
108: }
109: }
110: }
111:
112: public HTTPILRequest[] get(String clientIlId, long timeout) {
113: if (log.isTraceEnabled()) {
114: log.trace("get(String " + clientIlId + ")");
115: }
116:
117: if (clientIlId == null) {
118: log.warn("A get was issued with a null clientIL Id.");
119: }
120:
121: LinkedQueue queue;
122: synchronized (queueLock) {
123: queue = (LinkedQueue) this .map.get(clientIlId);
124: if (queue == null) {
125: if (log.isDebugEnabled()) {
126: log
127: .debug("ClientIL #"
128: + clientIlId
129: + " doesn't have a storage queue. Creating new one.");
130: }
131: queue = new LinkedQueue();
132: this .map.put(clientIlId, queue); // create a new queue for this client
133: }
134: }
135: ArrayList messageList = new ArrayList();
136: try {
137: if (log.isDebugEnabled()) {
138: log.debug("Polling the queue for "
139: + String.valueOf(timeout)
140: + " milliseconds on behalf of clientIL #"
141: + clientIlId + ".");
142: }
143: Object object = queue.poll(timeout);
144: if (object != null) {
145: if (log.isDebugEnabled()) {
146: log
147: .debug("Poll returned a HTTPILRequest, adding it to our list of requests to deliver to clientIL #"
148: + clientIlId + ".");
149: }
150: messageList.add(object);
151: while ((object = queue.poll(0)) != null) {
152: if (log.isDebugEnabled()) {
153: log
154: .debug("We had a request, so we're are going to see if there are any more for us, but we're not going to block this time.");
155: }
156: messageList.add(object);
157: if (log.isDebugEnabled()) {
158: log.debug("Added request.");
159: }
160: }
161: }
162: } catch (InterruptedException exception) {
163: log
164: .debug("An interruptedException was triggered. We'll just deliver what we have to the client and try again next time.");
165: }
166: if (log.isDebugEnabled())
167: log.debug("Returning " + String.valueOf(messageList.size())
168: + " requests to clientIL #" + clientIlId + ".");
169: return this .createArrayFromList(messageList); // this could be empty, and if so, its OK.
170: }
171:
172: public void purgeEntry(String clientIlId) {
173: if (log.isTraceEnabled()) {
174: log.trace("purgeEntry(String " + clientIlId + ")");
175: }
176: Object entry;
177: synchronized (this .queueLock) {
178: entry = this .map.remove(clientIlId);
179: }
180: if (entry != null && log.isDebugEnabled()) {
181: log.debug("Purged storage queue entry for ClientIL #"
182: + clientIlId + ".");
183: }
184:
185: }
186:
187: public String getID() {
188: if (log.isTraceEnabled()) {
189: log.trace("getID()");
190: }
191: synchronized (idLock) {
192: return String.valueOf(++id);
193: }
194: }
195:
196: private HTTPILRequest[] createArrayFromList(ArrayList list) {
197: if (log.isTraceEnabled()) {
198: log.trace("createArrayFromList(ArrayList length="
199: + String.valueOf(list.size()) + ")");
200: }
201: HTTPILRequest[] requests = new HTTPILRequest[list.size()];
202: Iterator itemList = list.iterator();
203: int i = 0;
204: while (itemList.hasNext()) {
205: requests[i] = (HTTPILRequest) itemList.next();
206: i++;
207: }
208: return requests;
209: }
210: }
|