001: /**
002: * $RCSfile$
003: * $Revision: 1530 $
004: * $Date: 2005-06-17 18:38:27 -0300 (Fri, 17 Jun 2005) $
005: *
006: * Copyright (C) 2007 Jive Software. All rights reserved.
007: *
008: * This software is published under the terms of the GNU Public License (GPL),
009: * a copy of which is included in this distribution.
010: */package org.jivesoftware.openfire.server;
011:
012: import org.dom4j.Element;
013: import org.dom4j.io.XMPPPacketReader;
014: import org.jivesoftware.openfire.session.OutgoingServerSession;
015: import org.jivesoftware.util.Log;
016:
017: import java.io.IOException;
018: import java.util.concurrent.BlockingQueue;
019: import java.util.concurrent.LinkedBlockingQueue;
020: import java.util.concurrent.TimeUnit;
021:
022: /**
023: * An OutgoingServerSocketReader is responsible for reading and queueing the DOM Element sent by
024: * a remote server. Since the DOM Elements are received using the outgoing connection only special
025: * stanzas may be sent by the remote server (eg. db:result stanzas for answering if the
026: * Authoritative Server verified the key sent by this server).<p>
027: *
028: * This class is also responsible for closing the outgoing connection if the remote server sent
029: * an end of the stream element.
030: *
031: * @author Gaston Dombiak
032: */
033: public class OutgoingServerSocketReader {
034:
035: private OutgoingServerSession session;
036: private boolean open = true;
037: private XMPPPacketReader reader = null;
038: /**
039: * Queue that holds the elements read by the XMPPPacketReader.
040: */
041: private BlockingQueue<Element> elements = new LinkedBlockingQueue<Element>();
042:
043: public OutgoingServerSocketReader(XMPPPacketReader reader) {
044: this .reader = reader;
045: init();
046: }
047:
048: /**
049: * Returns the OutgoingServerSession for which this reader is working for or <tt>null</tt> if
050: * a OutgoingServerSession was not created yet. While the OutgoingServerSession is being
051: * created it is possible to have a reader with no session.
052: *
053: * @return the OutgoingServerSession for which this reader is working for or <tt>null</tt> if
054: * a OutgoingServerSession was not created yet.
055: */
056: public OutgoingServerSession getSession() {
057: return session;
058: }
059:
060: /**
061: * Sets the OutgoingServerSession for which this reader is working for.
062: *
063: * @param session the OutgoingServerSession for which this reader is working for
064: */
065: public void setSession(OutgoingServerSession session) {
066: this .session = session;
067: }
068:
069: /**
070: * Retrieves and removes the first received element that was stored in the queue, waiting
071: * if necessary up to the specified wait time if no elements are present on this queue.
072: *
073: * @param timeout how long to wait before giving up, in units of <tt>unit</tt>.
074: * @param unit a <tt>TimeUnit</tt> determining how to interpret the <tt>timeout</tt> parameter.
075: * @return the head of this queue, or <tt>null</tt> if the specified waiting time elapses
076: * before an element is present.
077: * @throws InterruptedException if interrupted while waiting.
078: */
079: public Element getElement(long timeout, TimeUnit unit)
080: throws InterruptedException {
081: return elements.poll(timeout, unit);
082: }
083:
084: private void init() {
085: // Create a thread that will read and store DOM Elements.
086: Thread thread = new Thread("Outgoing Server Reader") {
087: public void run() {
088: while (open) {
089: Element doc;
090: try {
091: doc = reader.parseDocument().getRootElement();
092:
093: if (doc == null) {
094: // Stop reading the stream since the remote server has sent an end of
095: // stream element and probably closed the connection.
096: closeSession();
097: } else {
098: elements.add(doc);
099: }
100: } catch (IOException e) {
101: String message = "Finishing Outgoing Server Reader. ";
102: if (session != null) {
103: message = message + "Closing session: "
104: + session.toString();
105: } else {
106: message = message + "No session to close.";
107: }
108: Log.debug("OutgoingServerSocketReader: "
109: + message, e);
110: closeSession();
111: } catch (Exception e) {
112: String message = "Finishing Outgoing Server Reader. ";
113: if (session != null) {
114: message = message + "Closing session: "
115: + session.toString();
116: } else {
117: message = message + "No session to close.";
118: }
119: Log.error(message, e);
120: closeSession();
121: }
122: }
123: }
124: };
125: thread.setDaemon(true);
126: thread.start();
127: }
128:
129: private void closeSession() {
130: open = false;
131: if (session != null) {
132: session.close();
133: }
134: }
135: }
|