001: /*
002: * File: SocketThread.java
003: * Project: jMOS, com.aranova.java.jmos
004: * Revision: 0.9 - Inicial
005: * Date: 30-sep-2005 12:16:45
006: *
007: * Copyright (C) Aragón Innovación Tecnológica S.L.L.
008: * All rights reserved.
009: *
010: * This software is distributed under the terms of the Aranova License version 1.0.
011: * See the terms of the Aranova License in the documentation provided with this software.
012: */
013:
014: package com.aranova.java.jmos;
015:
016: import java.io.IOException;
017: import java.io.InputStream;
018: import java.io.OutputStream;
019: import java.net.Socket;
020: import java.util.Map;
021: import java.util.Observable;
022:
023: import javax.xml.parsers.SAXParser;
024: import javax.xml.stream.XMLStreamException;
025: import javax.xml.stream.XMLStreamWriter;
026:
027: import org.apache.commons.logging.Log;
028: import org.apache.commons.logging.LogFactory;
029: import org.xml.sax.InputSource;
030: import org.xml.sax.SAXException;
031:
032: import com.aranova.java.jmos.handler.MOSHandler;
033: import com.aranova.java.jmos.io.XMLInputStream;
034: import com.aranova.java.jmos.messages.Message;
035: import com.aranova.java.jmos.util.Common;
036:
037: /**
038: * Clase de uso interno para gestionar las socket del servidor.
039: *
040: * @author <a href="http://www.aranova.net/contactar/">Daniel Sánchez</a>
041: * @version 0.9.1
042: * @since 0.9
043: */
044: public class SocketThread extends Observable implements Runnable {
045: private static final Log _log = LogFactory
046: .getLog(SocketThread.class);
047:
048: private boolean _isShutDown;
049: private final Socket _socket;
050: private final MOSHandler _handler;
051: private final SAXParser _reader;
052: private final InputStream _inputStream;
053: private final OutputStream _outputStream;
054: private final XMLStreamWriter _writer;
055: private final XMLInputStream _XMLis;
056: private final InputSource _inputSource;
057:
058: private final Map<IServerMOS, MessageFilter[]> _listeners;
059:
060: /**
061: * Constructor del socket.
062: * @param socket
063: * @param listeners
064: * @throws Exception
065: */
066: public SocketThread(final Socket socket,
067: final Map<IServerMOS, MessageFilter[]> listeners)
068: throws Exception {
069: super ();
070: _isShutDown = false;
071: _socket = socket;
072: try {
073: _inputStream = _socket.getInputStream();
074: _outputStream = _socket.getOutputStream();
075: _writer = Common.getOutputFactory().createXMLStreamWriter(
076: _outputStream, Common.getEncoding());
077: _XMLis = new XMLInputStream(_inputStream);
078: _inputSource = new InputSource(_XMLis);
079: _inputSource.setEncoding(Common.getEncoding());
080: _handler = new MOSHandler(_XMLis);
081: _reader = Common.getParserFactory().newSAXParser();
082: _listeners = listeners;
083: } catch (SAXException e) {
084: throw e;
085: } catch (IOException e) {
086: throw e;
087: } catch (XMLStreamException e) {
088: throw e;
089: }
090: }
091:
092: /**
093: * Cierra la conezión con el cliente.
094: * @throws IOException
095: */
096: public final void close() throws IOException {
097: try {
098: _isShutDown = true;
099: _socket.close();
100: } catch (IOException e) {
101: _log.warn("Error cerrando el socket", e);
102: throw e;
103: }
104: }
105:
106: public final void run() {
107: Message message;
108: boolean error = false;
109: try {
110: while (true) {
111: try {
112: _reader.parse(_inputSource, _handler);
113: } catch (SAXException e) {
114: if (!e.getMessage().equals("createMessage")) {
115: throw e;
116: }
117: error = true;
118: }
119: if (!error) {
120: message = _handler.getMessage();
121:
122: //TODO Si el mensaje no es para mi MOSid o NCS id error NOACK o RO ACK.
123: notifyListeners(message);
124: } else {
125: error = false;
126: //TODO Write noack o roACk
127: }
128: _writer.flush();
129: _reader.reset();
130: if (_log.isTraceEnabled()) {
131: long time = System.currentTimeMillis()
132: - _handler.getStart();
133: _log.trace("Total time: " + time
134: + "ms - Parse time: " + _handler.getTime()
135: + "ms");
136: }
137: }
138: } catch (XMLStreamException e) {
139: _log.error("Error al escribir en el socket.", e);
140: } catch (SAXException e) {
141: if (_XMLis.getMarcaInicio()) {
142: _log.error("Error al parsear el XML.", e);
143: }
144: } catch (IOException e) {
145: if (!_isShutDown) {
146: _log.error("Error de IO en el socket.", e);
147: }
148: } finally {
149: try {
150: _log.info("Cerrando el socket");
151: _socket.close();
152: } catch (IOException e) {
153: _log.error("Error al cerrar el socket.", e);
154: }
155:
156: if (!_isShutDown) {
157: this .setChanged();
158: this .notifyObservers(this );
159: }
160: }
161: }
162:
163: private void notifyListeners(final Message message)
164: throws XMLStreamException {
165: for (IServerMOS listener : _listeners.keySet()) {
166: boolean filtroOK = true;
167: for (MessageFilter filter : _listeners.get(listener)) {
168: if (!filter.accept(Message.getMessages().get(
169: message.getName()).getMessage())) {
170: filtroOK = false;
171: break;
172: }
173: }
174: if (filtroOK) {
175: if (listener instanceof IServerReplier) {
176: ((IServerReplier) listener).message(message,
177: _writer);
178: } else if (listener instanceof IServerListener) {
179: ((IServerListener) listener).message(message);
180: }
181: }
182: }
183: }
184: }
|