001: /*
002: * CoadunationLib: The coaduntion implementation library.
003: * Copyright (C) 2007 Rift IT Contracting
004: *
005: * This library is free software; you can redistribute it and/or
006: * modify it under the terms of the GNU Lesser General Public
007: * License as published by the Free Software Foundation; either
008: * version 2.1 of the License, or (at your option) any later version.
009: *
010: * This library is distributed in the hope that it will be useful,
011: * but WITHOUT ANY WARRANTY; without even the implied warranty of
012: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
013: * Lesser General Public License for more details.
014: *
015: * You should have received a copy of the GNU Lesser General Public
016: * License along with this library; if not, write to the Free Software
017: * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
018: *
019: * ChangeLog.java
020: */
021:
022: // package path
023: package com.rift.coad.util.change;
024:
025: // java imports
026: import java.io.Serializable;
027: import java.io.File;
028: import java.io.InputStream;
029: import java.io.IOException;
030: import java.io.FileInputStream;
031: import java.io.FileOutputStream;
032: import java.io.ObjectInputStream;
033: import java.io.ObjectOutputStream;
034: import java.io.ObjectStreamClass;
035: import java.util.ArrayList;
036: import java.util.HashMap;
037: import java.util.Iterator;
038: import java.util.List;
039: import java.util.Map;
040: import java.util.Queue;
041: import java.util.concurrent.ConcurrentHashMap;
042: import java.util.concurrent.ConcurrentLinkedQueue;
043: import javax.naming.Context;
044: import javax.naming.InitialContext;
045: import javax.transaction.UserTransaction;
046: import javax.transaction.xa.XAException;
047: import javax.transaction.xa.XAResource;
048: import javax.transaction.xa.Xid;
049:
050: // logging import
051: import org.apache.log4j.Logger;
052:
053: // coadunation imports
054: import com.rift.coad.lib.configuration.Configuration;
055: import com.rift.coad.lib.configuration.ConfigurationException;
056: import com.rift.coad.lib.configuration.ConfigurationFactory;
057: import com.rift.coad.lib.thread.CoadunationThread;
058: import com.rift.coad.lib.thread.ThreadStateMonitor;
059: import com.rift.coad.util.transaction.TransactionManager;
060: import com.rift.coad.util.transaction.UserTransactionWrapper;
061:
062: /**
063: * This object is responsible for applying changes to the database the message
064: * objects.
065: *
066: * @author Brett Chaldecott
067: */
068: public class ChangeLog implements XAResource {
069:
070: /**
071: * This class overrides the resolve to use the class loader on the
072: * thread to find the specified class.
073: */
074: public static class ClassLoaderObjectInputStream extends
075: ObjectInputStream {
076: /**
077: * This default constructor of the class loader object input stream.
078: *
079: * @exception IOException
080: */
081: public ClassLoaderObjectInputStream() throws IOException {
082: super ();
083: }
084:
085: /**
086: * This default constructor of the class loader object input stream.
087: *
088: * @param in The input stream for this object.
089: * @exception IOException
090: */
091: public ClassLoaderObjectInputStream(InputStream in)
092: throws IOException {
093: super (in);
094: }
095:
096: /**
097: * This method returns the class definition for the requested object.
098: *
099: * @return The class definition for the requested object.
100: * @param desc The description of the object.
101: * @exception IOException
102: * @exception ClassNotFoundException
103: */
104: protected Class resolveClass(ObjectStreamClass desc)
105: throws IOException, ClassNotFoundException {
106: try {
107: return Class.forName(desc.getName());
108: } catch (Exception ex) {
109: return Thread.currentThread().getContextClassLoader()
110: .loadClass(desc.getName());
111: }
112: }
113:
114: }
115:
116: /**
117: * This object is responsible for processing entries in the change log.
118: */
119: public class ChangeLogProcessor extends CoadunationThread {
120:
121: // private member variables
122: private ThreadStateMonitor state = new ThreadStateMonitor();
123: private Context context = null;
124: private UserTransactionWrapper utw = null;
125: private boolean process = false;
126:
127: /**
128: * The contructor of the change log processor.
129: *
130: * @exception Exception
131: */
132: public ChangeLogProcessor() throws Exception {
133: try {
134: utw = new UserTransactionWrapper();
135: } catch (Exception ex) {
136: throw new ChangeException(
137: "Failed to init the change log processor : "
138: + ex.getMessage(), ex);
139: }
140: }
141:
142: /**
143: * This method replaces the run method in the BasicThread.
144: *
145: * @exception Exception
146: */
147: public void process() throws Exception {
148: synchronized (this ) {
149: if (process == false) {
150: try {
151: wait();
152: } catch (Exception ex) {
153: log.error("Wait threw and exception : "
154: + ex.getMessage(), ex);
155: }
156: }
157: }
158: while (!state.isTerminated()) {
159: ChangeEntry change = null;
160: synchronized (changes) {
161: change = (ChangeEntry) changes.poll();
162: if (change == null) {
163: try {
164: changes.wait(500);
165: } catch (Exception ex) {
166: log.error("Failed to wait : "
167: + ex.getMessage(), ex);
168: }
169: continue;
170: }
171: }
172: while (true) {
173: try {
174: utw.begin();
175: change.applyChanges();
176: utw.commit();
177: break;
178: } catch (Exception ex) {
179: log.error("Failed to apply the changes : "
180: + ex.getMessage(), ex);
181: } finally {
182: utw.release();
183: }
184: try {
185: Thread.sleep(1000);
186: } catch (Exception ex2) {
187: log.error("Failed to back off : "
188: + ex2.getMessage(), ex2);
189: }
190: }
191: }
192: }
193:
194: /**
195: * This method will be implemented by child objects to terminate the
196: * processing of this thread.
197: */
198: public void terminate() {
199: state.terminate(true);
200: synchronized (this ) {
201: notify();
202: }
203: }
204:
205: /**
206: * This method starts the processing
207: */
208: public synchronized void startProcessing() {
209: process = true;
210: notify();
211: }
212: }
213:
214: /**
215: * This object tracks the changes to do with a transaction.
216: */
217: public static class ChangeEntry implements Serializable {
218:
219: // private member variables
220: private List changes = new ArrayList();
221:
222: /**
223: * The constructor of
224: */
225: public ChangeEntry() {
226:
227: }
228:
229: /**
230: * This method adds a change to the list of changes for this change
231: * change entry.
232: *
233: * @param change The object representing the change object.
234: */
235: public void addChange(Change change) {
236: changes.add(change);
237: }
238:
239: /**
240: * This method applys the list of changes.
241: *
242: * @exception ChangeException
243: */
244: public void applyChanges() throws ChangeException {
245: for (Iterator iter = changes.iterator(); iter.hasNext();) {
246: Change change = (Change) iter.next();
247: change.applyChanges();
248: }
249: }
250: }
251:
252: // class constants
253: private final static String USERNAME = "changelog_username";
254: private final static String DATA_DIR = "changelog_data_dir";
255: private final static String DATA_FILE = "changelog.dmp";
256:
257: // the logger reference
258: protected Logger log = Logger.getLogger(ChangeLog.class.getName());
259:
260: // class singleton
261: private static Map singletons = new HashMap();
262:
263: // class member variables
264: private ThreadStateMonitor state = new ThreadStateMonitor();
265: private Map changesMap = new ConcurrentHashMap();
266: private ThreadLocal currentChange = new ThreadLocal();
267: private ChangeLogProcessor processor = null;
268: private Queue changes = new ConcurrentLinkedQueue();
269: private String dataDirectory = null;
270: private UserTransactionWrapper utw = null;
271:
272: /**
273: * Creates a new instance of MessageChangeLog
274: *
275: * @param username The name of the user that this object will run as.
276: */
277: private ChangeLog(Class configInfo) throws ChangeException {
278: try {
279: utw = new UserTransactionWrapper();
280: Configuration configuration = ConfigurationFactory
281: .getInstance().getConfig(configInfo);
282: dataDirectory = configuration.getString(DATA_DIR);
283: loadData();
284: applyChanges();
285: processor = new ChangeLogProcessor();
286: processor.start(configuration.getString(USERNAME));
287: } catch (Exception ex) {
288: log.error("Failed to instanciate the change "
289: + "log object : " + ex.getMessage(), ex);
290: throw new ChangeException(
291: "Failed to instanciate the change "
292: + "log object : " + ex.getMessage(), ex);
293: }
294: }
295:
296: /**
297: * This method is responsible for instanciating the MessageChangeLog.
298: *
299: * @param username The name of the user to this object will run as.
300: * @exception ChangeException
301: */
302: public synchronized static void init(Class configInfo)
303: throws ChangeException {
304: synchronized (singletons) {
305: ClassLoader loader = Thread.currentThread()
306: .getContextClassLoader();
307: if (!singletons.containsKey(loader)) {
308: ChangeLog changeLog = new ChangeLog(configInfo);
309: singletons.put(loader, changeLog);
310: }
311: }
312: }
313:
314: /**
315: * This method returns a reference to the singelton instance.
316: *
317: * @return A reference to the singleton instance.
318: * @throws ChangeException
319: */
320: public static ChangeLog getInstance() throws ChangeException {
321: synchronized (singletons) {
322: ClassLoader loader = Thread.currentThread()
323: .getContextClassLoader();
324: ChangeLog changeLog = (ChangeLog) singletons.get(loader);
325: if (changeLog == null) {
326: throw new ChangeException(
327: "The change log has not been instanciated.");
328: }
329: return changeLog;
330: }
331: }
332:
333: /**
334: * The singleton method used to terminate the change log.
335: */
336: public static void terminate() throws ChangeException {
337: ChangeLog changeLog = null;
338: synchronized (singletons) {
339: ClassLoader loader = Thread.currentThread()
340: .getContextClassLoader();
341: changeLog = (ChangeLog) singletons.get(loader);
342: if (changeLog == null) {
343: throw new ChangeException(
344: "The change log has not been instanciated.");
345: }
346: singletons.remove(loader);
347: }
348: changeLog.terminateChangeLog();
349:
350: }
351:
352: /**
353: * This method is called to terminate the change log.
354: */
355: protected void terminateChangeLog() {
356: try {
357: state.terminate(false);
358: processor.terminate();
359: processor.join();
360: storeData();
361: } catch (Exception ex) {
362: log.error("Failed to terminate the change log object : "
363: + ex.getMessage(), ex);
364: }
365: }
366:
367: /**
368: * This method starts the change log processing the changes.
369: */
370: public void start() throws ChangeException {
371: processor.startProcessing();
372: }
373:
374: /**
375: * This method adds an object to the list of changes.
376: *
377: * @param change The object containing the changes to apply.
378: * @exception ChangeException
379: */
380: public void addChange(Change change) throws ChangeException {
381: if (state.isTerminated()) {
382: log
383: .error("The change log has been terminated cannot accept "
384: + "anymore changes.");
385: throw new ChangeException(
386: "The change log has been terminated cannot accept "
387: + "anymore changes.");
388: }
389: try {
390: TransactionManager.getInstance().bindResource(this , false);
391: ChangeEntry changeEntry = (ChangeEntry) currentChange.get();
392: changeEntry.addChange(change);
393: } catch (Exception ex) {
394: log.error("Failed to add the change to the list :"
395: + ex.getMessage(), ex);
396: throw new ChangeException(
397: "Failed to add the change to the list :"
398: + ex.getMessage(), ex);
399: }
400: }
401:
402: /**
403: * This method is called to commit the specified transaction.
404: *
405: * @param xid The id of the transaction to commit.
406: * @param onePhase If true a one phase commit should be used.
407: * @exception XAException
408: */
409: public void commit(Xid xid, boolean b) throws XAException {
410: synchronized (changes) {
411: changes.add(changesMap.remove(xid));
412: changes.notify();
413: }
414: }
415:
416: /**
417: * The resource manager has dissociated this object from the transaction.
418: *
419: * @param xid The id of the transaction that is getting ended.
420: * @param flags The flags associated with this operation.
421: * @exception XAException
422: */
423: public void end(Xid xid, int i) throws XAException {
424: }
425:
426: /**
427: * The transaction has been completed and must be forgotten.
428: *
429: * @param xid The id of the transaction to forget.
430: * @exception XAException
431: */
432: public void forget(Xid xid) throws XAException {
433: changesMap.remove(xid);
434: }
435:
436: /**
437: * This method returns the transaction timeout for this object.
438: *
439: * @return The int containing the transaction timeout.
440: * @exception XAException
441: */
442: public int getTransactionTimeout() throws XAException {
443: return -1;
444: }
445:
446: /**
447: * This method returns true if this object is the resource manager getting
448: * queried.
449: *
450: * @return TRUE if this is the resource manager, FALSE if not.
451: * @param xaResource The resource to perform the check against.
452: * @exception XAException
453: */
454: public boolean isSameRM(XAResource xAResource) throws XAException {
455: return this == xAResource;
456: }
457:
458: /**
459: * This is called before a transaction is committed.
460: *
461: * @return The results of the transaction.
462: * @param xid The id of the transaction to check against.
463: * @exception XAException
464: */
465: public int prepare(Xid xid) throws XAException {
466: return XAResource.XA_OK;
467: }
468:
469: /**
470: * This method returns the list of transaction branches for this resource
471: * manager.
472: *
473: * @return The list of resource branches.
474: * @param flags The flags
475: * @exception XAException
476: */
477: public Xid[] recover(int i) throws XAException {
478: return null;
479: }
480:
481: /**
482: * This method is called to roll back the specified transaction.
483: *
484: * @param xid The id of the transaction to roll back.
485: * @exception XAException
486: */
487: public void rollback(Xid xid) throws XAException {
488: changesMap.remove(xid);
489: }
490:
491: /**
492: * This method sets the transaction timeout for this resource manager.
493: *
494: * @return TRUE if the transaction timeout can be set successfully.
495: * @param transactionTimeout The new transaction timeout value.
496: * @exception XAException
497: */
498: public boolean setTransactionTimeout(int i) throws XAException {
499: return true;
500: }
501:
502: /**
503: * This method is called to start a transaction on a resource manager.
504: *
505: * @param xid The id of the new transaction.
506: * @param flags The flags associated with the transaction.
507: * @exception XAException
508: */
509: public void start(Xid xid, int i) throws XAException {
510: if (changesMap.containsKey(xid)) {
511: currentChange.set(changesMap.get(xid));
512: } else {
513: ChangeEntry changeEntry = new ChangeEntry();
514: changesMap.put(xid, changeEntry);
515: currentChange.set(changeEntry);
516: }
517: }
518:
519: /**
520: * This method loads the data
521: */
522: private void loadData() throws ChangeException {
523: try {
524: File dataFile = new File(dataDirectory, DATA_FILE);
525: if (!dataFile.exists()) {
526: return;
527: }
528: FileInputStream in = new FileInputStream(dataFile);
529: ClassLoaderObjectInputStream ois = new ClassLoaderObjectInputStream(
530: in);
531: changes = (ConcurrentLinkedQueue) ois.readObject();
532: ois.close();
533: in.close();
534: } catch (Exception ex) {
535: log.error("Failed to load the data : " + ex.getMessage(),
536: ex);
537: throw new ChangeException("Failed to load the data : "
538: + ex.getMessage(), ex);
539: }
540: }
541:
542: /**
543: * This method stores the data
544: */
545: private void storeData() throws ChangeException {
546: try {
547: File dataFile = new File(dataDirectory, DATA_FILE);
548: if (changes.size() == 0) {
549: // no data to delete the file if one exists
550: if (dataFile.exists()) {
551: dataFile.delete();
552: }
553: return;
554: }
555: FileOutputStream out = new FileOutputStream(dataFile);
556: ObjectOutputStream oos = new ObjectOutputStream(out);
557: oos.writeObject(changes);
558: oos.close();
559: out.close();
560: } catch (Exception ex) {
561: log.error("Failed to store the data : " + ex.getMessage(),
562: ex);
563: throw new ChangeException("Failed to store the data : "
564: + ex.getMessage(), ex);
565: }
566: }
567:
568: /**
569: * This method is responsible for applying all the changes.
570: */
571: private void applyChanges() throws ChangeException {
572: log.info("Applying changes from change log");
573: while (changes.size() > 0) {
574: ChangeEntry change = (ChangeEntry) changes.poll();
575: while (true) {
576: try {
577: utw.begin();
578: change.applyChanges();
579: utw.commit();
580: break;
581: } catch (Exception ex) {
582: log.error("Failed to apply the changes : "
583: + ex.getMessage(), ex);
584: } finally {
585: utw.release();
586: }
587: try {
588: Thread.sleep(1000);
589: } catch (Exception ex2) {
590: log.error("Failed to back off : "
591: + ex2.getMessage(), ex2);
592: }
593: }
594: }
595: log.info("After applying changes from change log");
596: }
597: }
|