001: /*
002: #IFNDEF ALT_LICENSE
003: ThinWire(R) RIA Ajax Framework
004: Copyright (C) 2003-2007 Custom Credit Systems
005:
006: This library is free software; you can redistribute it and/or modify it under
007: the terms of the GNU Lesser General Public License as published by the Free
008: Software Foundation; either version 2.1 of the License, or (at your option) any
009: later version.
010:
011: This library is distributed in the hope that it will be useful, but WITHOUT ANY
012: WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
013: PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
014:
015: You should have received a copy of the GNU Lesser General Public License along
016: with this library; if not, write to the Free Software Foundation, Inc., 59
017: Temple Place, Suite 330, Boston, MA 02111-1307 USA
018:
019: Users who would rather have a commercial license, warranty or support should
020: contact the following company who invented, built and supports the technology:
021:
022: Custom Credit Systems, Richardson, TX 75081, USA.
023: email: info@thinwire.com ph: +1 (888) 644-6405
024: http://www.thinwire.com
025: #ENDIF
026: [ v1.2_RC2 ]
027: */
028: package thinwire.render.web;
029:
030: import java.io.IOException;
031: import java.io.Reader;
032: import java.io.Writer;
033: import java.net.SocketTimeoutException;
034: import java.util.Date;
035: import java.util.LinkedList;
036: import java.util.List;
037: import java.util.Map;
038: import java.util.logging.Level;
039: import java.util.logging.Logger;
040:
041: /**
042: * @author Joshua J. Gertzen
043: */
044: class EventProcessor extends Thread {
045: private static final char EVENT_WEB_COMPONENT = '0';
046: private static final char EVENT_GET_EVENTS = '1';
047: private static final char EVENT_SYNC_CALL = '2';
048: private static final char EVENT_RUN_TIMER = '3';
049: private static final int TIMEOUT = 1000 * 60 * 5;//5 minutes
050: private static final Level LEVEL = Level.FINER;
051: private static final Logger log = Logger
052: .getLogger(EventProcessor.class.getName());
053: private static int nextId = 0;
054:
055: static class GracefulShutdown extends Error {
056: }
057:
058: private EventProcessorPool pool;
059: private List<WebComponentEvent> queue = new LinkedList<WebComponentEvent>();
060: private StringBuilder sbParseUserAction = new StringBuilder(1024);
061: private char[] complexValueBuffer = new char[256];
062: private String syncCallResponse;
063: private Writer response;
064: private boolean waitToRespond;
065: private int updateEventsSize;
066: private boolean active;
067: private int captureCount;
068: private boolean threadCaptured;
069: private long lastActivityTime;
070:
071: WebApplication app;
072:
073: EventProcessor(EventProcessorPool pool) {
074: if (pool == null)
075: throw new IllegalArgumentException("pool == null");
076: setName("ThinWire-EventProcessorThread-" + (nextId++) + "-"
077: + this .hashCode());
078: this .pool = pool;
079: }
080:
081: boolean isInUse() {
082: return threadCaptured || active;
083: }
084:
085: public void run() {
086: if (log.isLoggable(LEVEL))
087: log.log(LEVEL, getName() + ": entering thread");
088: active = true;
089:
090: synchronized (queue) {
091: try {
092: while (true) {
093: processUserActionEvent();
094: }
095: } catch (GracefulShutdown e) {
096: if (log.isLoggable(LEVEL))
097: log.log(LEVEL, getName()
098: + ": exiting thread run method gracefully");
099: } finally {
100: //allow for graceful exit
101: pool.removeFromPool(this );
102:
103: //Not entirely necessary, but it makes me feel better ;-)
104: queue.clear();
105: queue = null;
106: sbParseUserAction = null;
107: response = null;
108: pool = null;
109: syncCallResponse = null;
110: complexValueBuffer = null;
111: }
112: }
113:
114: if (log.isLoggable(LEVEL))
115: log.log(LEVEL, getName() + ": exiting thread");
116: }
117:
118: void captureThread() {
119: int currentCaptureCount = ++captureCount;
120: if (log.isLoggable(LEVEL))
121: log.log(LEVEL, getName() + ": capture count:"
122: + captureCount);
123: threadCaptured = true;
124:
125: while (threadCaptured) {
126: processUserActionEvent();
127: if (currentCaptureCount == captureCount)
128: threadCaptured = true;
129: }
130: }
131:
132: //Must only be called by the main run loop or the capture method!
133: private void processUserActionEvent() {
134: if (queue.size() > 0) {
135: lastActivityTime = System.currentTimeMillis();
136:
137: WebComponentEvent event = queue.remove(0);
138: if (log.isLoggable(LEVEL))
139: log.log(LEVEL, getName()
140: + ": process user action event:" + event);
141: if (app.userActionListener != null)
142: app.notifyUserActionReceived(event);
143:
144: try {
145: WebComponentListener wcl = app
146: .getWebComponentListener((Integer) event
147: .getSource());
148: if (wcl != null)
149: wcl.componentChange(event);
150: } catch (Exception e) {
151: app.reportException(null, e);
152: }
153: } else if (app != null && app.timers != null
154: && app.timers.size() > 0) {
155: //Extra checks required since thread may start before being tied to an app.
156: try {
157: if (log.isLoggable(LEVEL))
158: log.log(LEVEL, getName()
159: + ": process timer task 1 of "
160: + app.timers.size());
161: app.timers.remove(0).run();
162: } catch (Exception e) {
163: app.reportException(null, e);
164: }
165: } else {
166: active = false;
167: waitToRespond = false;
168: queue.notify();
169: if (log.isLoggable(LEVEL))
170: log
171: .log(
172: LEVEL,
173: getName()
174: + ": Notified request handler thread so it returns if it is currently blocking");
175:
176: try {
177: if (threadCaptured) {
178: if (log.isLoggable(LEVEL))
179: log
180: .log(
181: LEVEL,
182: getName()
183: + ": Waiting for this captured thread to receive new user action events");
184: //This wait has the potential to deadlock if the client fails to make a request back to the server.
185: //To prevent this, a session timeout should be set in web.xml
186: queue.wait();
187: } else {
188: if (log.isLoggable(LEVEL))
189: log
190: .log(
191: LEVEL,
192: getName()
193: + ": Waiting "
194: + TIMEOUT
195: + " to be given new user action events, otherwise it will be shutdown");
196: queue.wait(TIMEOUT);
197:
198: long timePassed = System.currentTimeMillis()
199: - lastActivityTime;
200: if (log.isLoggable(LEVEL))
201: log.log(LEVEL, getName()
202: + ": time passed during wait="
203: + timePassed + ", timeout=" + TIMEOUT);
204:
205: //Bring the thread down if it's been idle for five minutes.
206: if (app == null && queue.size() == 0
207: && timePassed >= TIMEOUT) {
208: if (log.isLoggable(LEVEL))
209: log
210: .log(
211: LEVEL,
212: getName()
213: + ": triggering thread graceful shutdown");
214: throw new GracefulShutdown();
215: }
216: }
217: } catch (InterruptedException e) {
218: throw new RuntimeException(e);
219: }
220:
221: active = true;
222: }
223: }
224:
225: void releaseThread() {
226: threadCaptured = false;
227: captureCount--;
228: if (log.isLoggable(LEVEL))
229: log.log(LEVEL, getName() + ": release count:"
230: + captureCount);
231: }
232:
233: //This method is called by the servers request handler thread, not this thread.
234: void handleRequest(WebComponentEvent ev, Writer w)
235: throws IOException {
236: synchronized (queue) {
237: if (log.isLoggable(LEVEL))
238: log.log(LEVEL, getName() + ": queue user action event:"
239: + ev);
240: //ev would be null if this is called from shutdown() in an attempt to continue a dialog flush()
241: if (ev != null)
242: queue.add(ev);
243: queue.notify();
244: writeUpdateEvents(w);
245: }
246: }
247:
248: //This method is called by the servers request handler thread, not this thread.
249: void handleRequest(Reader r, Writer w) throws IOException {
250: synchronized (queue) {
251: StringBuilder sb = sbParseUserAction;
252:
253: try {
254: do {
255: char eventType = (char) r.read();
256: r.read(); //Remove ':'
257:
258: switch (eventType) {
259: case EVENT_GET_EVENTS:
260: break;
261:
262: case EVENT_WEB_COMPONENT: {
263: readSimpleValue(sb, r);
264: Integer source = Integer.valueOf(sb.toString());
265: readSimpleValue(sb, r);
266: String name = sb.toString();
267: readComplexValue(sb, r);
268: String value = sb.toString();
269: WebComponentListener wcl = app
270: .getWebComponentListener(source);
271:
272: if (wcl != null) {
273: WebComponentEvent ev = new WebComponentEvent(
274: source, name, value);
275: if (log.isLoggable(LEVEL))
276: log.log(LEVEL, getName()
277: + ": queue user action event:"
278: + ev);
279: queue.add(ev);
280: }
281:
282: break;
283: }
284:
285: case EVENT_RUN_TIMER: {
286: readSimpleValue(sb, r);
287: String timerId = sb.toString();
288: WebComponentEvent ev = ApplicationEventListener
289: .newRunTimerEvent(timerId);
290: if (log.isLoggable(LEVEL))
291: log.log(LEVEL, getName()
292: + ": queue run timer event:" + ev);
293: queue.add(ev);
294: break;
295: }
296:
297: case EVENT_SYNC_CALL: {
298: readComplexValue(sb, r);
299: syncCallResponse = sb.toString();
300: if (log.isLoggable(LEVEL))
301: log.log(LEVEL, getName()
302: + ": sync call response:"
303: + syncCallResponse);
304: break;
305: }
306: }
307: } while (r.read() == ':');
308: } catch (SocketTimeoutException e) {
309: log
310: .log(
311: Level.WARNING,
312: "Invalid action event format received from client",
313: e);
314: } finally {
315: sb.setLength(0);
316: }
317:
318: queue.notify();
319: writeUpdateEvents(w);
320: }
321: }
322:
323: //Must only be called by one of the handleRequest methods!
324: private void writeUpdateEvents(Writer w) throws IOException {
325: if (w == null)
326: return;
327:
328: try {
329: response = w;
330: waitToRespond = true;
331: updateEventsSize = 0;
332:
333: while (waitToRespond) {
334: if (log.isLoggable(LEVEL))
335: log.log(LEVEL, getName()
336: + ": waiting for events to be processed");
337: queue.wait();
338: }
339:
340: if (log.isLoggable(LEVEL))
341: log.log(LEVEL, getName()
342: + ": finishing up update events, active="
343: + active + ", updateEventsSize="
344: + updateEventsSize);
345:
346: if (active) {
347: w.write(updateEventsSize == 0 ? "[{m:\"" : ",{m:\"");
348: w.write("sendGetEvents\",a:[],n:tw_em}");
349: updateEventsSize += 36;
350: }
351:
352: } catch (InterruptedException e) {
353: //Only occurs if the request handler thread is interrupted, in which case we should
354: //try and gracefully exit;
355: } finally {
356: if (updateEventsSize > 0)
357: response.write(']');
358: response = null;
359: updateEventsSize = 0;
360: }
361: }
362:
363: private void readSimpleValue(StringBuilder sb, Reader r)
364: throws IOException {
365: sb.setLength(0);
366: int ch;
367:
368: while ((ch = r.read()) != ':') {
369: if (ch == -1)
370: throw new IllegalStateException(
371: "premature end of post event encountered["
372: + sb.toString() + "]");
373: sb.append((char) ch);
374: }
375: }
376:
377: private void readComplexValue(StringBuilder sb, Reader r)
378: throws IOException {
379: readSimpleValue(sb, r);
380: int length = Integer.parseInt(sb.toString());
381: sb.setLength(0);
382:
383: if (length > 0) {
384: int size;
385: char[] buff = complexValueBuffer;
386: int buffLen = buff.length;
387:
388: do {
389: size = length > buffLen ? buffLen : length;
390: size = r.read(buff, 0, size);
391: if (size == -1)
392: throw new IllegalStateException(
393: "premature end of complex value on action event encountered["
394: + sb.toString() + "], length="
395: + length);
396: length -= size;
397: sb.append(buff, 0, size);
398: } while (length > 0);
399: }
400: }
401:
402: String postUpdateEvent(boolean sync, Object objectId, String name,
403: Object[] args) {
404: try {
405: int size = 0;
406: response.write(updateEventsSize == 0 ? "[{m:\"" : ",{m:\"");
407: size += 5;
408: response.write(name);
409: response.write('\"');
410: size += name.length() + 1;
411:
412: if (objectId != null) {
413: if (objectId instanceof Integer) {
414: response.write(",i:");
415: String value = objectId.toString();
416: response.write(value);
417: size += 3 + value.length();
418: } else {
419: response.write(",n:");
420: String value = (String) objectId;
421: response.write(value);
422: size += 3 + value.length();
423: }
424: }
425:
426: if (args != null && args.length > 0) {
427: response.write(",a:[");
428: size += 4;
429:
430: for (int i = 0, cnt = args.length - 1; i < cnt; i++) {
431: String value = ComponentRenderer
432: .stringValueOf(args[i]);
433: response.write(value);
434: response.write(',');
435: size += value.length() + 1;
436: }
437:
438: String value = ComponentRenderer
439: .stringValueOf(args[args.length - 1]);
440: response.write(value);
441: response.write(']');
442: size += value.length() + 1;
443: } else {
444: response.write(",a:[]");
445: size += 5;
446: }
447:
448: if (sync) {
449: response.write(",s:1}");
450: size += 5;
451: } else {
452: response.write("}");
453: size += 1;
454: }
455:
456: updateEventsSize += size;
457:
458: if (waitToRespond && (sync || updateEventsSize >= 16384))
459: flush();
460:
461: } catch (IOException e) {
462: throw new RuntimeException(e);
463: }
464:
465: return sync ? syncCallResponse : null;
466: }
467:
468: void flush() {
469: waitToRespond = false;
470: queue.notify();
471:
472: try {
473: while (!waitToRespond) {
474: //This wait has the potential to deadlock if the client fails to make a request back to the server.
475: //To prevent this, a session timeout should be set in web.xml
476: queue.wait();
477: }
478: } catch (InterruptedException e) {
479: //Must throw an exception so the stack unrolls properly.
480: throw new RuntimeException(e);
481: }
482: }
483: }
|