001: /*
002: * Copyright 2001-2006 C:1 Financial Services GmbH
003: *
004: * This software is free software; you can redistribute it and/or
005: * modify it under the terms of the GNU Lesser General Public
006: * License Version 2.1, as published by the Free Software Foundation.
007: *
008: * This software is distributed in the hope that it will be useful,
009: * but WITHOUT ANY WARRANTY; without even the implied warranty of
010: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
011: * Lesser General Public License for more details.
012: *
013: * You should have received a copy of the GNU Lesser General Public
014: * License along with this library; if not, write to the Free Software
015: * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA
016: */
017:
018: package de.finix.contelligent.clientsupport;
019:
020: import java.io.IOException;
021: import java.io.OutputStreamWriter;
022: import java.net.Socket;
023: import java.net.SocketException;
024: import java.net.UnknownHostException;
025: import java.security.AccessController;
026: import java.security.PrivilegedAction;
027: import java.util.ArrayList;
028: import java.util.HashMap;
029: import java.util.Iterator;
030: import java.util.LinkedList;
031: import java.util.List;
032: import java.util.Map;
033:
034: import javax.servlet.http.HttpSessionBindingEvent;
035: import javax.servlet.http.HttpSessionBindingListener;
036:
037: import de.finix.contelligent.Session;
038: import de.finix.contelligent.core.ContelligentImpl;
039: import de.finix.contelligent.event.ContelligentEvent;
040: import de.finix.contelligent.event.EventQueue;
041: import de.finix.contelligent.event.EventQueueListener;
042: import de.finix.contelligent.logging.LoggingService;
043:
044: /**
045: * Takes care of notifying clients of interesting events in their contexts
046: */
047: public class ClientNotificator implements EventQueueListener,
048: HttpSessionBindingListener {
049:
050: final static org.apache.log4j.Logger log = LoggingService
051: .getLogger(ClientNotificator.class);
052:
053: public final static String SESSION_KEY = "_clientNotificator_";
054:
055: private final static String MESSAGE_SEPARATOR = "---- END ----";
056:
057: private final static int MAX_WAIT_FOR_CONNECTION_SECS = 5;
058:
059: private String clientHostName;
060:
061: private int clientPort;
062:
063: private Socket clientSocket;
064:
065: private boolean connectionOK = false;
066:
067: // do we need to create a new socket on every send?
068: private boolean reuseSocket = false;
069:
070: private LinkedList eventBuffer = new LinkedList();
071:
072: private Session session;
073:
074: public ClientNotificator(String clientHostName, int clientPort,
075: boolean reuseSocket, Session session) {
076: this .clientHostName = clientHostName;
077: if (clientPort != -1
078: && ContelligentImpl.getInstance().isAllowPush()) {
079: this .reuseSocket = reuseSocket;
080: this .clientPort = clientPort;
081: connectionOK = testConnection();
082: }
083: setSession(session);
084: }
085:
086: public void stopNow() {
087: try {
088: closeSocket(true);
089: } catch (IOException ioe) {
090: // dont't care
091: }
092: }
093:
094: public void setSession(Session session) {
095: this .session = session;
096: }
097:
098: public Session getSession() {
099: return session;
100: }
101:
102: public String getClientHostName() {
103: return clientHostName;
104: }
105:
106: public void onEvents(List eventList) {
107: // decide whether we can PUSH the events directly to the listening
108: // client
109: // or have to store them for later use, for example to be PULLed by
110: // PullComponentSystemEventsAction
111: if (connectionOK) {
112: pushEventsToClient(eventList);
113: } else {
114: synchronized (eventBuffer) {
115: eventBuffer.addAll(eventList);
116: }
117: }
118: }
119:
120: public boolean filterApplies(ContelligentEvent event) {
121: if (!event.isClientEvent()) {
122: return false;
123: }
124: if (event.isLockEvent()) {
125: return true;
126: }
127: if ((event.getOrigin() != null)
128: && !(session.getComponentManager().getName()
129: .startsWith(event.getOrigin()))) {
130: return false;
131: }
132: return true;
133: }
134:
135: /**
136: * Returns a list of events. Note that no filtering was done on those events
137: * because the manager-name might has been changed since the addition of the
138: * events and hence we must {@link #filterApplies apply the select} at the
139: * time the events are pulled by the client.
140: */
141: public List getBufferedEvents() {
142: synchronized (eventBuffer) {
143: return (List) eventBuffer.clone();
144: }
145: }
146:
147: public void removeEventsFromBuffer(List events) {
148: synchronized (eventBuffer) {
149: eventBuffer.removeAll(events);
150: }
151: }
152:
153: private PushThread pushMsgToClient(String msg) {
154: PushThread pushThread = new PushThread(msg);
155: pushThread.setDaemon(true);
156: pushThread.start();
157: return pushThread;
158: }
159:
160: private PushThread pushEventsToClient(List eventList) {
161: StringBuffer buf = new StringBuffer(eventList.size() * 64);
162: buf.append("<?xml version='1.0' encoding='UTF-8' ?>\n").append(
163: "<EventList>\n");
164: buf.append("<Session>").append(session.getId()).append(
165: "</Session>\n");
166:
167: Iterator events = eventList.iterator();
168: while (events.hasNext()) {
169: ContelligentEvent event = (ContelligentEvent) events.next();
170: if (filterApplies(event)) {
171: event.toXML(buf);
172: }
173: }
174: buf.append("</EventList>\n");
175: return pushMsgToClient(buf.toString());
176: }
177:
178: /** Determines if a connection can be established to the client. */
179: private boolean testConnection() {
180: PushThread pushThread = pushMsgToClient("");
181: try {
182: pushThread.join(MAX_WAIT_FOR_CONNECTION_SECS * 1000);
183: } catch (InterruptedException ie) {
184: // oops
185: }
186: if (pushThread.isAlive()) {
187: // connection did not work, so stop it
188: try {
189: closeSocket(true);
190: } catch (IOException ioe) {
191: // oops
192: }
193: return false;
194: }
195: return pushThread.wasSuccessful();
196: }
197:
198: private Socket getSocket() throws UnknownHostException,
199: IOException, SocketException {
200: return getSocket(false);
201: }
202:
203: private Socket getSocket(boolean forceNew)
204: throws UnknownHostException, IOException, SocketException {
205: if (!forceNew && reuseSocket && clientSocket != null) {
206: return clientSocket;
207: }
208: clientSocket = new Socket(clientHostName, clientPort);
209: return clientSocket;
210: }
211:
212: private void closeSocket() throws IOException {
213: closeSocket(false);
214: }
215:
216: private void closeSocket(boolean force) throws IOException {
217: if (clientSocket != null && (force || !reuseSocket)) {
218: clientSocket.close();
219: clientSocket = null;
220: }
221: }
222:
223: private void blockinglySendOverSocket(Socket socket, String msg)
224: throws IOException {
225: synchronized (socket) {
226: // FIXME: we should use a specific encoding, the client should too
227: // when reading
228: OutputStreamWriter ow = new OutputStreamWriter(socket
229: .getOutputStream());
230: ow.write(msg);
231: // if we reuse this socket make clear to receiver that this message
232: // ends here:
233: if (reuseSocket) {
234: ow.write("\n" + MESSAGE_SEPARATOR + "\n");
235: ow.flush();
236: } else {
237: ow.close();
238: }
239: }
240: }
241:
242: public void valueBound(HttpSessionBindingEvent bindingEvent) {
243: EventQueue.getInstance().addListener(this );
244: }
245:
246: public void valueUnbound(HttpSessionBindingEvent bindingEvent) {
247: EventQueue.getInstance().removeListener(this );
248: String sessionId = session.getId();
249: Map info = new HashMap();
250: info.put("session", sessionId);
251: ContelligentEvent event = new ContelligentEvent(
252: ContelligentEvent.SESSION_TIMEOUT_EVENT, null, info);
253: EventQueue.getInstance().addEvent(event);
254: // we are already removed as event listener, so send the event to myself
255: // do not change the order of addEvent() and removeListener(), otherwise
256: // there
257: // might be cases when the logout event is not sent to the corresponding
258: // client
259: List list = new ArrayList(1);
260: list.add(event);
261: onEvents(list);
262: }
263:
264: private class PushThread extends Thread {
265:
266: private String msg;
267:
268: private boolean successful = false;
269:
270: public PushThread(String msg) {
271: super ("Push-Thread");
272: this .msg = msg;
273: }
274:
275: public void run() {
276: AccessController.doPrivileged(new PrivilegedAction() {
277: public Object run() {
278:
279: try {
280: successful = false;
281: blockinglySendOverSocket(getSocket(), msg);
282: closeSocket();
283: successful = true;
284: } catch (UnknownHostException uhe) {
285: log.error(
286: "UnknownHostException: Connection can not be established: "
287: + uhe, uhe);
288: } catch (SocketException se) {
289: log.error(
290: "SocketException: Connection seems to be closed by other thread: "
291: + se, se);
292: } catch (IOException e) {
293: log.error(
294: "IOException: Connection can not be established: "
295: + e, e);
296: } catch (Throwable t) {
297: log.error(
298: "Unexpected error: Connection can not be established: "
299: + t, t);
300: }
301:
302: return null;
303: }
304: });
305: }
306:
307: public boolean wasSuccessful() {
308: return successful;
309: }
310: }
311: }
|