001: /*
002: * Copyright 2001-2006 C:1 Financial Services GmbH
003: *
004: * This software is free software; you can redistribute it and/or
005: * modify it under the terms of the GNU Lesser General Public
006: * License Version 2.1, as published by the Free Software Foundation.
007: *
008: * This software is distributed in the hope that it will be useful,
009: * but WITHOUT ANY WARRANTY; without even the implied warranty of
010: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
011: * Lesser General Public License for more details.
012: *
013: * You should have received a copy of the GNU Lesser General Public
014: * License along with this library; if not, write to the Free Software
015: * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA
016: */
017:
018: package de.finix.contelligent.client.remote;
019:
020: import java.io.BufferedReader;
021: import java.io.ByteArrayInputStream;
022: import java.io.IOException;
023: import java.io.InputStream;
024: import java.io.InputStreamReader;
025: import java.net.ServerSocket;
026: import java.net.Socket;
027: import java.net.SocketException;
028: import java.util.logging.Level;
029: import java.util.logging.Logger;
030:
031: import javax.xml.parsers.DocumentBuilder;
032: import javax.xml.parsers.DocumentBuilderFactory;
033: import javax.xml.parsers.ParserConfigurationException;
034:
035: import org.w3c.dom.Document;
036: import org.w3c.dom.Element;
037: import org.w3c.dom.NodeList;
038: import org.xml.sax.SAXException;
039:
040: import de.finix.contelligent.client.base.ContelligentConstants;
041: import de.finix.contelligent.client.base.Session;
042: import de.finix.contelligent.client.event.ServerEvent;
043: import de.finix.contelligent.client.i18n.Resources;
044: import de.finix.contelligent.client.util.xml.XMLUtil;
045:
046: /**
047: * Listen for events PUSHed by server.
048: */
049: class UpdateThread extends Thread implements ContelligentConstants {
050:
051: private ServerSocket serverSocket;
052:
053: private static Logger logger = Logger.getLogger(UpdateThread.class
054: .getName());
055:
056: private static DocumentBuilderFactory factory = DocumentBuilderFactory
057: .newInstance();
058:
059: private static UpdateThread instance = null;
060:
061: private boolean IAmAlive = true;
062:
063: // Have we been contacted, yet? If so, we are reachable. Of course,
064: // initially this is false.
065: private boolean isReachable = false;
066:
067: protected final static String MESSAGE_SEPARATOR = "---- END ----";
068:
069: private final static int MAX_WAIT_FOR_THREAD_TO_DIE_MILLIS = 2000;
070:
071: /**
072: * Creates a new update thread listening for events sent by server. If there
073: * is an instance listening it will be dropped and a new one will be
074: * created.
075: *
076: * @param port
077: * the port update thread listens on or <code>0</code> for a
078: * system assigned port
079: */
080: public static UpdateThread initInstance(int port) throws Exception {
081: // if there is an update thread running, we need to terminate it
082: if (instance != null) {
083: if (!dropInstance())
084: throw new RuntimeException(
085: "Failed to stop old update thread!");
086: }
087: // if there is no instance, we need one
088: // (even when we try to drop instance with dropInstance it can still be
089: // there)
090: if (instance == null) {
091: instance = new UpdateThread(port);
092: if (port == 0) {
093: logger.log(Level.INFO,
094: "Created new update thread on system assigned port "
095: + instance.getActualPort());
096: } else {
097: logger.log(Level.INFO,
098: "Created new update thread on user specified port "
099: + port);
100: }
101: }
102: return instance;
103: }
104:
105: /**
106: * Gets the instance initialized by {@link #initInstance}. If it has not
107: * been initialized by {@link #initInstance}, <code>null</code> will be
108: * returned.
109: */
110: public static UpdateThread getInstance() {
111: return instance;
112: }
113:
114: /**
115: * Tries to drop instance of this thread.
116: *
117: * @return <code>true</code> when accepting socket has been closed, thread
118: * killed and instance dropped, <code>false</code> when socket
119: * could not be closed and thread stilli is alive - in this case
120: * instance will not be dropped.
121: */
122: public static boolean dropInstance() {
123: if (instance == null)
124: return true;
125: logger.log(Level.INFO, "Dropping instance listening on port "
126: + instance.getActualPort());
127: instance.IAmAlive = false;
128: try {
129: // when calling close while socket is in accept causes a
130: // SocketException
131: instance.serverSocket.close();
132: } catch (IOException ioe) {
133: logger.log(Level.WARNING, "Error while dropping instance",
134: ioe);
135: }
136: try {
137: instance.join(MAX_WAIT_FOR_THREAD_TO_DIE_MILLIS);
138: } catch (InterruptedException ie) {
139: logger.log(Level.WARNING,
140: "Interrupt while joining current thread", ie);
141: }
142: if (instance.isAlive()) {
143: // XXX this is fatal and should throw an adequate exception
144: logger.log(Level.SEVERE,
145: "Could not stop update-thread at port "
146: + instance.getActualPort());
147: return false;
148: } else {
149: instance = null;
150: return true;
151: }
152: }
153:
154: /**
155: * @return effective port that may have been assigned by system when
156: * requested port was 0
157: */
158: public int getActualPort() {
159: return serverSocket.getLocalPort();
160: }
161:
162: /** Have we been contacted, yet? If so we are reachable. */
163: public boolean isReachable() {
164: return isReachable;
165: }
166:
167: public void run() {
168: while (IAmAlive) {
169: try {
170: Socket socket = serverSocket.accept();
171: // if we receive something we are reachable
172: isReachable = true;
173: logger.log(Level.FINE, "Got connection " + socket);
174:
175: socketClosed: {
176: while (true) {
177: StringBuffer buf = new StringBuffer();
178: BufferedReader reader = new BufferedReader(
179: new InputStreamReader((socket
180: .getInputStream())));
181: while (true) {
182: String line = reader.readLine();
183: if (line == null) {
184: if (buf.length() != 0)
185: parseResult(buf);
186: socket.close();
187: break socketClosed;
188: }
189: if (line.equals(MESSAGE_SEPARATOR)) {
190: parseResult(buf);
191: break;
192: }
193: buf.append(line).append("\n");
194: }
195: }
196: }
197: } catch (java.nio.channels.ClosedByInterruptException e) {
198: // ok, so we are dead
199: IAmAlive = false;
200: logger.log(Level.FINE, "Accept interrupted", e);
201: } catch (SocketException se) {
202: // ok, so we are dead
203: IAmAlive = false;
204: logger.log(Level.FINE, "Accept interrupted", se);
205: } catch (Exception e) {
206: logger.log(Level.FINE,
207: "Ignoring exception in update thread", e);
208: }
209: }
210: try {
211: serverSocket.close();
212: } catch (IOException ioe) {
213: logger.log(Level.WARNING,
214: "Server socket could not be closed", ioe);
215: }
216: logger.log(Level.INFO, "Update-Thread killed");
217: }
218:
219: private UpdateThread(int port) throws Exception {
220: super ("Update-Thread, port " + port);
221: try {
222: serverSocket = new ServerSocket(port);
223: setDaemon(true);
224: } catch (Exception e) {
225: throw new Exception(Resources
226: .getLocalString("no_create_socket")
227: + ": " + e.getMessage());
228: }
229: }
230:
231: private void parseResult(StringBuffer buf) throws IOException,
232: SAXException, ParserConfigurationException {
233: // skip empty messages as they can not be well formed
234: logger.log(Level.FINEST, "Buffer " + buf);
235:
236: if (buf.length() == 0) {
237: logger.log(Level.WARNING,
238: "Received empty message from server");
239: return;
240: }
241: InputStream in = new ByteArrayInputStream(buf.toString()
242: .getBytes());
243: DocumentBuilder builder = factory.newDocumentBuilder();
244: Document document = builder.parse(in);
245: // check session found e.g.:
246: // <Session>609b73f82658b3b912d43c223523d404d8</Session>
247: Element sessionElement = (Element) document
248: .getElementsByTagName("Session").item(0);
249: String contelligentSession = Session.getInstance()
250: .getContelligentSessionHandle();
251: // XXX checking our sessions for null is a hack, but necessay to receive
252: // lock events sent concurrent to login action
253: if (sessionElement != null && contelligentSession != null) {
254: String session = XMLUtil.getContent(sessionElement);
255: if (!Session.getInstance()
256: .bogusEqualsContelligentSessionHandle(session)) {
257: logger.log(Level.WARNING, "\nReceived Eventlist:\n"
258: + buf
259: + "\nis not for current session, skipping it!");
260: return;
261: }
262: }
263:
264: NodeList events = document
265: .getElementsByTagName(UpdateManager.EVENT);
266:
267: Session.getInstance().fireServerEvent(
268: new ServerEvent(UpdateThread.this , buf.toString(),
269: ServerEvent.PUSH_EVENT));
270:
271: if (events.getLength() > 0) {
272: logger.log(Level.FINEST, "Event: " + events);
273: UpdateManager.handleComponentEvents(events,
274: UpdateThread.this);
275: }
276: }
277: }
|