001: /* OperationThread.java
002:
003: {{IS_NOTE
004: Purpose:
005:
006: Description:
007:
008: History:
009: Sep 29, 2007 9:21:36 AM 2007, Created by Dennis.Chen
010: }}IS_NOTE
011:
012: Copyright (C) 2007 Potix Corporation. All Rights Reserved.
013:
014: {{IS_RIGHT
015: This program is distributed under GPL Version 2.0 in the hope that
016: it will be useful, but WITHOUT ANY WARRANTY.
017: }}IS_RIGHT
018: */
019: package org.zkoss.zkex.zul.impl;
020:
021: import org.zkoss.lang.D;
022: import org.zkoss.util.logging.Log;
023: import org.zkoss.zk.ui.Desktop;
024: import org.zkoss.zk.ui.DesktopUnavailableException;
025: import org.zkoss.zk.ui.Executions;
026:
027: /**
028: * This class is for model sharer developer only, you rarely need to use this class.<br/>
029: *
030: * OperationThread has only one instance in each desktop, it store it-self in the desktop by setAttribute.
031: * It create and monitor the {@link OperationQueue}, if there are any operation in queue, it will consume it. <br/>
032: *
033: * @author Dennis.Chen
034: * @since 3.0.0
035: */
036: public class OperationThread extends Thread {
037:
038: private static final String DESKTOP_KEY = "zkex:opthread";
039:
040: private boolean _running = true;
041:
042: private OperationQueue _queue;
043:
044: private Desktop _desktop;
045:
046: private long _activateTimeout = 10000;
047:
048: private long _waitTimeout = 10000;
049:
050: private int _maxFailCount = 4;
051:
052: private static final Log log = Log.lookup(OperationThread.class);
053:
054: /* package */OperationThread(Desktop desktop) {
055: _desktop = desktop;
056: _queue = new OperationQueue();
057: this .setName("OPThread-" + desktop.getId());
058: }
059:
060: /* package */OperationQueue getQueue() {
061: return _queue;
062: }
063:
064: /**
065: * Get the {@link OperationQueue} of {@linkplain OperationThread},
066: * It is check is there any {@linkplain OperationThread} exist in desktop.
067: * If no, create one ,start it and store in desktop, then return thread's operation queue.
068: * If yes, return operation queue directly.
069: *
070: * There is only one {@linkplain OperationThread} in each desktop.
071: * @param desktop the associated desktop
072: * @return a queue which associate to desktop
073: */
074: public static OperationQueue getQueue(Desktop desktop) {
075: if (desktop == null)
076: throw new NullPointerException("desktop is null");
077: synchronized (desktop) {
078: if (!desktop.isAlive()) {
079: throw new IllegalStateException("desktop not alive:"
080: + desktop);
081: }
082: OperationThread t = (OperationThread) desktop
083: .getAttribute(DESKTOP_KEY);
084: if (t == null) {
085: t = new OperationThread(desktop);
086: if (D.ON && log.debugable()) {
087: log.debug("staring a Operation Thread for desktop:"
088: + desktop + ",name=" + t.getName());
089: }
090: desktop.setAttribute(DESKTOP_KEY, t);
091: t.start();
092:
093: }
094: return t.getQueue();
095: }
096: }
097:
098: /**
099: * Terminate a {@linkplain OperationThread} which is stored in desktop and clear it.
100: *
101: * @param desktop the associated desktop
102: */
103: public static void destroyWith(Desktop desktop) {
104: if (desktop == null) {
105: throw new NullPointerException("desktop is null");
106: }
107: if (D.ON && log.debugable()) {
108: log.debug("destory a Operation Thread for desktop:"
109: + desktop);
110: }
111: synchronized (desktop) {
112: if (desktop.isAlive()) {// destroy desktop if it still alive.
113: OperationThread t = (OperationThread) desktop
114: .getAttribute(DESKTOP_KEY);
115: desktop.removeAttribute(DESKTOP_KEY);
116: if (t != null && t.isRunning()) {
117: t.terminate();
118: }
119:
120: }
121: }
122: }
123:
124: /**
125: * Is this thread still running
126: * @return true is thread still running
127: */
128: public boolean isRunning() {
129: return _running;
130: }
131:
132: /**
133: * Terminate this thread. thread will be stopped after last run trip.
134: */
135: public void terminate() {
136: _running = false;
137: synchronized (_queue) {
138: _queue.notifyAll();
139: }
140: }
141:
142: public void run() {
143: try {
144: int failCount = 0;
145: while (_running) {
146: if (!_desktop.isAlive()) {
147: throw new DesktopUnavailableException(
148: "Desktop is not alive" + _desktop);
149: }
150: if (_queue.hasElement()) {
151: boolean r = false;
152: try {
153: r = Executions.activate(_desktop,
154: _activateTimeout);
155: } catch (InterruptedException e) {
156: }
157: if (!r) {
158: failCount++;
159: if (failCount >= _maxFailCount) {
160: throw new DesktopUnavailableException(
161: "Fail to Activate Desktop:"
162: + _desktop + " after "
163: + failCount + " times");
164: }
165: continue;
166: }
167: failCount = 0;
168:
169: // since we have activated desktop,
170: // we execute all operation in queue
171: Operation op = null;
172: try {
173: while (_queue.hasElement()) {
174: op = _queue.next();
175: op.execute(_desktop);
176: }
177: } catch (Exception x) {
178: // if exception, notify fail.
179: if (op != null) {
180: op.failToExecute(_desktop);
181: }
182: throw x;
183: } finally {
184: Executions.deactivate(_desktop);
185: }
186: }
187: try {
188: // if queue is still empty, and still running then we wait.
189: // queue will be notify when someone put operation to queue
190: // or terminate is invoked check OperationQueue#put
191: synchronized (_queue) {
192: if (_running && !_queue.hasElement()) {
193: //wait just a while,
194: _queue.wait(_waitTimeout);
195: }
196: }
197: } catch (InterruptedException e) {
198: }
199: }
200: } catch (DesktopUnavailableException x) {
201: // send fail to all operation that it can't be execute.
202: if (D.ON && log.debugable()) {
203: log.debug("Desktop not available:" + x.getMessage());
204: }
205: while (_queue.hasElement()) {
206: Operation op = _queue.next();
207: op.failToExecute(_desktop);
208: }
209: } catch (Exception x) {
210: log.warning(x);
211: while (_queue.hasElement()) {
212: Operation op = _queue.next();
213: op.failToExecute(_desktop);
214: }
215: } finally {
216: _running = false;
217: OperationThread.destroyWith(_desktop);
218: //notify all listener thread is destroyed.
219: _queue.fireQueueUnavailable(_desktop);
220: _queue.clearListener();
221: _desktop = null;
222: _queue = null;
223: if (D.ON && log.debugable()) {
224: log.debug("end of a Operation Thread, name="
225: + this.getName());
226: }
227: }
228: }
229: }
|