001: /*
002: * SSHTools - Java SSH2 API
003: *
004: * Copyright (C) 2002-2003 Lee David Painter and Contributors.
005: *
006: * Contributions made by:
007: *
008: * Brett Smith
009: * Richard Pernavas
010: * Erwin Bolwidt
011: *
012: * This program is free software; you can redistribute it and/or
013: * modify it under the terms of the GNU General Public License
014: * as published by the Free Software Foundation; either version 2
015: * of the License, or (at your option) any later version.
016: *
017: * This program is distributed in the hope that it will be useful,
018: * but WITHOUT ANY WARRANTY; without even the implied warranty of
019: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
020: * GNU General Public License for more details.
021: *
022: * You should have received a copy of the GNU General Public License
023: * along with this program; if not, write to the Free Software
024: * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
025: */
026: package com.sshtools.j2ssh.transport;
027:
028: import com.sshtools.j2ssh.io.ByteArrayReader;
029:
030: import org.apache.commons.logging.Log;
031: import org.apache.commons.logging.LogFactory;
032:
033: import java.util.ArrayList;
034: import java.util.HashMap;
035: import java.util.Iterator;
036: import java.util.List;
037: import java.util.Map;
038: import java.util.Vector;
039:
040: /**
041: * <p>
042: * This class implements a message store that can be used to provide a blocking
043: * mechanism for transport protocol messages.
044: * </p>
045: *
046: * @author Lee David Painter
047: * @version $Revision: 1.42 $
048: *
049: * @since 0.2.0
050: */
051: public final class SshMessageStore {
052: private static Log log = LogFactory.getLog(SshMessageStore.class);
053:
054: // List to hold messages as they are received
055: private List messages = new ArrayList();
056: private Map register = new HashMap();
057: private boolean isClosed = false;
058: private int[] singleIdFilter = new int[1];
059: private int interrupt = 5000;
060: private Vector listeners = new Vector();
061:
062: /**
063: * <p>
064: * Contructs the message store.
065: * </p>
066: *
067: * @since 0.2.0
068: */
069: public SshMessageStore() {
070: }
071:
072: /**
073: * <p>
074: * Evaluate whether the message store is closed.
075: * </p>
076: *
077: * @return
078: *
079: * @since 0.2.0
080: */
081: public boolean isClosed() {
082: return isClosed;
083: }
084:
085: public void addMessageListener(SshMessageListener listener) {
086: synchronized (listeners) {
087: listeners.add(listener);
088: }
089: }
090:
091: /**
092: * <p>
093: * Get a message from the store. This method will block until a message
094: * with an id matching the supplied filter arrives, or the message store
095: * closes. The message is removed from the store.
096: * </p>
097: *
098: * @param messageIdFilter an array of message ids that are acceptable
099: *
100: * @return the next available message
101: *
102: * @throws MessageStoreEOFException if the message store is closed
103: * @throws InterruptedException if the thread was interrupted
104: *
105: * @since 0.2.0
106: */
107: public synchronized SshMessage getMessage(int[] messageIdFilter)
108: throws MessageStoreEOFException, InterruptedException {
109: try {
110: return getMessage(messageIdFilter, 0);
111: } catch (MessageNotAvailableException e) {
112: // This should never happen but throw just in case
113: throw new MessageStoreEOFException();
114: }
115: }
116:
117: /**
118: * <p>
119: * Get a message from the store. This method will block until a message
120: * with an id matching the supplied filter arrives, the specified timeout
121: * is reached or the message store closes. The message is removed from the
122: * store.
123: * </p>
124: *
125: * @param messageIdFilter an array of message ids that are acceptable.
126: * @param timeout the maximum number of milliseconds to block before
127: * returning.
128: *
129: * @return the next available message
130: *
131: * @throws MessageStoreEOFException if the message store is closed
132: * @throws MessageNotAvailableException if the message is not available
133: * after a timeout
134: * @throws InterruptedException if the thread is interrupted
135: *
136: * @since 0.2.0
137: */
138: public synchronized SshMessage getMessage(int[] messageIdFilter,
139: int timeout) throws MessageStoreEOFException,
140: MessageNotAvailableException, InterruptedException {
141: if ((messages.size() <= 0) && isClosed) {
142: throw new MessageStoreEOFException();
143: }
144:
145: if (messageIdFilter == null) {
146: return nextMessage();
147: }
148:
149: SshMessage msg;
150: boolean firstPass = true;
151:
152: if (timeout < 0) {
153: timeout = 0;
154: }
155:
156: while ((messages.size() > 0) || !isClosed) {
157: // lookup the message
158: msg = lookupMessage(messageIdFilter, true);
159:
160: if (msg != null) {
161: return msg;
162: } else {
163: // If this is the second time and there's no message, then throw
164: if (!firstPass && (timeout > 0)) {
165: throw new MessageNotAvailableException();
166: }
167: }
168:
169: // Now wait
170: if (!isClosed) {
171: wait((timeout == 0) ? interrupt : timeout);
172: }
173:
174: firstPass = false;
175: }
176:
177: throw new MessageStoreEOFException();
178: }
179:
180: /**
181: * <p>
182: * Get a message from the store. This method will block until a message
183: * with an id matching the supplied id arrives, or the message store
184: * closes. The message is removed from the store.
185: * </p>
186: *
187: * @param messageId the id of the message requried
188: *
189: * @return the next available message with the id supplied
190: *
191: * @throws MessageStoreEOFException if the message store closed
192: * @throws InterruptedException if the thread is interrupted
193: *
194: * @since 0.2.0
195: */
196: public synchronized SshMessage getMessage(int messageId)
197: throws MessageStoreEOFException, InterruptedException {
198: try {
199: return getMessage(messageId, 0);
200: } catch (MessageNotAvailableException e) {
201: // This should never happen by throw jsut in case
202: throw new MessageStoreEOFException();
203: }
204: }
205:
206: /**
207: * <p>
208: * Get a message from the store. This method will block until a message
209: * with an id matching the supplied id arrives,the specified timeout is
210: * reached or the message store closes. The message will be removed from
211: * the store.
212: * </p>
213: *
214: * @param messageId the id of the message requried
215: * @param timeout the maximum number of milliseconds to block before
216: * returning.
217: *
218: * @return the next available message with the id supplied
219: *
220: * @throws MessageStoreEOFException if the message store closed
221: * @throws InterruptedException if the thread is interrupted
222: * @throws InterruptedException
223: *
224: * @since 0.2.0
225: */
226: public synchronized SshMessage getMessage(int messageId, int timeout)
227: throws MessageStoreEOFException,
228: MessageNotAvailableException, InterruptedException {
229: singleIdFilter[0] = messageId;
230:
231: return getMessage(singleIdFilter, timeout);
232: }
233:
234: /**
235: * <p>
236: * Evaluate whether the store has any messages.
237: * </p>
238: *
239: * @return true if messages exist, otherwise false
240: *
241: * @since 0.2.0
242: */
243: public boolean hasMessages() {
244: return messages.size() > 0;
245: }
246:
247: /**
248: * <p>
249: * Returns the number of messages contained within this message store.
250: * </p>
251: *
252: * @return the number of messages
253: *
254: * @since 0.2.0
255: */
256: public int size() {
257: return messages.size();
258: }
259:
260: /**
261: * <p>
262: * Determines if the message id is a registered message of this store.
263: * </p>
264: *
265: * @param messageId the message id
266: *
267: * @return true if the message id is registered, otherwise false
268: *
269: * @since 0.2.0
270: */
271: public boolean isRegisteredMessage(Integer messageId) {
272: return register.containsKey(messageId);
273: }
274:
275: /**
276: * <p>
277: * Adds a raw message to the store and processes the data into a registered
278: * message.
279: * </p>
280: *
281: * @param msgdata the raw message data to process
282: *
283: * @throws MessageNotRegisteredException if the message id of the raw data
284: * is not a registered message
285: * @throws InvalidMessageException if the message is invalid
286: *
287: * @since 0.2.0
288: */
289: public void addMessage(byte[] msgdata)
290: throws MessageNotRegisteredException,
291: InvalidMessageException {
292: Integer messageId = new Integer(msgdata[5]);
293:
294: if (!isRegisteredMessage(messageId)) {
295: throw new MessageNotRegisteredException(messageId);
296: }
297:
298: Class cls = (Class) register.get(SshMessage
299: .getMessageId(msgdata));
300:
301: try {
302: SshMessage msg = (SshMessage) cls.newInstance();
303: msg.fromByteArray(new ByteArrayReader(msgdata));
304: addMessage(msg);
305: } catch (IllegalAccessException iae) {
306: throw new InvalidMessageException(
307: "Illegal access for implementation class "
308: + cls.getName());
309: } catch (InstantiationException ie) {
310: throw new InvalidMessageException(
311: "Instantiation failed for class " + cls.getName());
312: }
313: }
314:
315: /**
316: * <p>
317: * Add a formed message to the store.
318: * </p>
319: *
320: * @param msg the message to add to the store
321: *
322: * @throws MessageNotRegisteredException if the message type is not
323: * registered with the store
324: *
325: * @since 0.2.0
326: */
327: public synchronized void addMessage(SshMessage msg)
328: throws MessageNotRegisteredException {
329: // Add the message
330: messages.add(messages.size(), msg);
331:
332: synchronized (listeners) {
333: if (listeners.size() > 0) {
334: for (Iterator it = listeners.iterator(); it.hasNext();) {
335: ((SshMessageListener) it.next())
336: .messageReceived(msg);
337: }
338: }
339: }
340:
341: // Notify the threads
342: notifyAll();
343: }
344:
345: /**
346: * <p>
347: * Closes the store. This will cause any blocking operations on the message
348: * store to return.
349: * </p>
350: *
351: * @since 0.2.0
352: */
353: public synchronized void close() {
354: isClosed = true;
355:
356: // We need to notify all anyway as if there are messages still available
357: // it should not affect the waiting threads as they are waiting for their
358: // own messages to be received because non were avaialable in the first place
359: //if (messages.size()<=0) {
360: notifyAll();
361:
362: //}
363: }
364:
365: /**
366: * <p>
367: * Get the next message in the store or wait until a new message arrives.
368: * The message is removed from the store.
369: * </p>
370: *
371: * @return the next available message.
372: *
373: * @throws MessageStoreEOFException if the message store is closed
374: * @throws InterruptedException if the thread is interrupted
375: *
376: * @since 0.2.0
377: */
378: public synchronized SshMessage nextMessage()
379: throws MessageStoreEOFException, InterruptedException {
380: if ((messages.size() <= 0) && isClosed) {
381: throw new MessageStoreEOFException();
382: }
383:
384: // If there are no messages available then wait untill there are.
385: while ((messages.size() <= 0) && !isClosed) {
386: wait(interrupt);
387: }
388:
389: if (messages.size() > 0) {
390: return (SshMessage) messages.remove(0);
391: } else {
392: throw new MessageStoreEOFException();
393: }
394: }
395:
396: /**
397: *
398: */
399: public synchronized void breakWaiting() {
400: notifyAll();
401: }
402:
403: /**
404: * <p>
405: * Get a message from the store without removing or blocking if the message
406: * does not exist.
407: * </p>
408: *
409: * @param messageIdFilter the id of the message requried
410: *
411: * @return the next available message with the id supplied
412: *
413: * @throws MessageStoreEOFException if the message store closed
414: * @throws MessageNotAvailableException if the message is not available
415: * @throws InterruptedException if the thread is interrupted
416: *
417: * @since 0.2.0
418: */
419: public synchronized SshMessage peekMessage(int[] messageIdFilter)
420: throws MessageStoreEOFException,
421: MessageNotAvailableException, InterruptedException {
422: return peekMessage(messageIdFilter, 0);
423: }
424:
425: /**
426: * <p>
427: * Get a message from the store without removing it; only blocking for the
428: * number of milliseconds specified in the timeout field. If timeout is
429: * zero, the method will not block.
430: * </p>
431: *
432: * @param messageIdFilter an array of acceptable message ids
433: * @param timeout the number of milliseconds to wait
434: *
435: * @return the next available message of the acceptable message ids
436: *
437: * @throws MessageStoreEOFException if the message store is closed
438: * @throws MessageNotAvailableException if the message is not available
439: * @throws InterruptedException if the thread is interrupted
440: *
441: * @since 0.2.0
442: */
443: public synchronized SshMessage peekMessage(int[] messageIdFilter,
444: int timeout) throws MessageStoreEOFException,
445: MessageNotAvailableException, InterruptedException {
446: SshMessage msg;
447:
448: // Do a straight lookup
449: msg = lookupMessage(messageIdFilter, false);
450:
451: if (msg != null) {
452: return msg;
453: }
454:
455: // If were willing to wait the wait and look again
456: if (timeout > 0) {
457: if (log.isDebugEnabled()) {
458: log.debug("No message so waiting for "
459: + String.valueOf(timeout) + " milliseconds");
460: }
461:
462: wait(timeout);
463: msg = lookupMessage(messageIdFilter, false);
464:
465: if (msg != null) {
466: return msg;
467: }
468: }
469:
470: // Nothing even after a wait so throw the relevant exception
471: if (isClosed) {
472: throw new MessageStoreEOFException();
473: } else {
474: throw new MessageNotAvailableException();
475: }
476: }
477:
478: private SshMessage lookupMessage(int[] messageIdFilter,
479: boolean remove) {
480: SshMessage msg;
481:
482: for (int x = 0; x < messages.size(); x++) {
483: msg = (SshMessage) messages.get(x);
484:
485: // Determine whether its one of the filtered messages
486: for (int i = 0; i < messageIdFilter.length; i++) {
487: if (msg.getMessageId() == messageIdFilter[i]) {
488: if (remove) {
489: messages.remove(msg);
490: }
491:
492: return msg;
493: }
494: }
495: }
496:
497: return null;
498: }
499:
500: /**
501: * <p>
502: * Get a message from the store without removing it.
503: * </p>
504: *
505: * @param messageId the acceptable message id
506: *
507: * @return the next available message.
508: *
509: * @throws MessageStoreEOFException if the message store is closed.
510: * @throws MessageNotAvailableException if the message is not available.
511: * @throws InterruptedException if the thread is interrupted
512: *
513: * @since 0.2.0
514: */
515: public synchronized SshMessage peekMessage(int messageId)
516: throws MessageStoreEOFException,
517: MessageNotAvailableException, InterruptedException {
518: return peekMessage(messageId, 0);
519: }
520:
521: /**
522: * <p>
523: * Removes a message from the message store.
524: * </p>
525: *
526: * @param msg the message to remove
527: *
528: * @since 0.2.0
529: */
530: public synchronized void removeMessage(SshMessage msg) {
531: messages.remove(msg);
532: }
533:
534: /**
535: * <p>
536: * Get a message from the store without removing it, only blocking for the
537: * number of milliseconds specified in the timeout field.
538: * </p>
539: *
540: * @param messageId the acceptable message id
541: * @param timeout the timeout setting in milliseconds
542: *
543: * @return the next available message
544: *
545: * @throws MessageStoreEOFException if the message store is closed
546: * @throws MessageNotAvailableException if the message is not available
547: * @throws InterruptedException if the thread is interrupted
548: *
549: * @since 0.2.0
550: */
551: public synchronized SshMessage peekMessage(int messageId,
552: int timeout) throws MessageStoreEOFException,
553: MessageNotAvailableException, InterruptedException {
554: singleIdFilter[0] = messageId;
555:
556: return peekMessage(singleIdFilter, timeout);
557: }
558:
559: /**
560: * <p>
561: * Register a message implementation with the store.
562: * </p>
563: *
564: * @param messageId the id of the message
565: * @param implementor the class of the implementation
566: *
567: * @since 0.2.0
568: */
569: public void registerMessage(int messageId, Class implementor) {
570: Integer id = new Integer(messageId);
571: register.put(id, implementor);
572: }
573:
574: /**
575: * <p>
576: * Returns an Object array (Integers) of the registered message ids.
577: * </p>
578: *
579: * @return the registered message id array
580: *
581: * @since 0.2.0
582: */
583: public Object[] getRegisteredMessageIds() {
584: return register.keySet().toArray();
585: }
586:
587: /**
588: * <p>
589: * Create a formed message from raw message data.
590: * </p>
591: *
592: * @param msgdata the raw message data
593: *
594: * @return the formed message
595: *
596: * @throws MessageNotRegisteredException if the message is not a registered
597: * message
598: * @throws InvalidMessageException if the message is invalid
599: *
600: * @since 0.2.0
601: */
602: public SshMessage createMessage(byte[] msgdata)
603: throws MessageNotRegisteredException,
604: InvalidMessageException {
605: Integer messageId = SshMessage.getMessageId(msgdata);
606:
607: if (!isRegisteredMessage(messageId)) {
608: throw new MessageNotRegisteredException(messageId);
609: }
610:
611: Class cls = (Class) register.get(SshMessage
612: .getMessageId(msgdata));
613:
614: try {
615: SshMessage msg = (SshMessage) cls.newInstance();
616: msg.fromByteArray(new ByteArrayReader(msgdata));
617:
618: return msg;
619: } catch (IllegalAccessException iae) {
620: throw new InvalidMessageException(
621: "Illegal access for implementation class "
622: + cls.getName());
623: } catch (InstantiationException ie) {
624: throw new InvalidMessageException(
625: "Instantiation failed for class " + cls.getName());
626: }
627: }
628: }
|