001: /*
002: * $Id: FilePersistenceStrategy.java 11343 2008-03-13 10:58:26Z tcarlson $
003: * --------------------------------------------------------------------------------------
004: * Copyright (c) MuleSource, Inc. All rights reserved. http://www.mulesource.com
005: *
006: * The software in this package is published under the terms of the CPAL v1.0
007: * license, a copy of which has been included with this distribution in the
008: * LICENSE.txt file.
009: */
010:
011: package org.mule.util.queue;
012:
013: import org.mule.RegistryContext;
014: import org.mule.api.MuleContext;
015: import org.mule.api.context.MuleContextAware;
016: import org.mule.util.FileUtils;
017: import org.mule.util.file.DeleteException;
018:
019: import java.io.File;
020: import java.io.FileInputStream;
021: import java.io.FileNotFoundException;
022: import java.io.FileOutputStream;
023: import java.io.IOException;
024: import java.io.ObjectInputStream;
025: import java.io.ObjectOutputStream;
026: import java.util.ArrayList;
027: import java.util.List;
028:
029: import org.apache.commons.logging.Log;
030: import org.apache.commons.logging.LogFactory;
031: import org.safehaus.uuid.UUIDGenerator;
032:
033: public class FilePersistenceStrategy implements
034: QueuePersistenceStrategy, MuleContextAware {
035: private static final Log logger = LogFactory
036: .getLog(FilePersistenceStrategy.class);
037:
038: /** The default queueStore directory for persistence */
039: public static final String DEFAULT_QUEUE_STORE = "queuestore";
040:
041: public static final String EXTENSION = ".msg";
042:
043: private File store;
044:
045: private UUIDGenerator gen = UUIDGenerator.getInstance();
046:
047: protected MuleContext muleContext;
048:
049: public FilePersistenceStrategy() {
050: super ();
051: }
052:
053: public void setMuleContext(MuleContext context) {
054: this .muleContext = context;
055: }
056:
057: protected String getId(Object obj) {
058: String id = gen.generateRandomBasedUUID().toString();
059: return id;
060: }
061:
062: /*
063: * (non-Javadoc)
064: *
065: * @see org.mule.transaction.xa.queue.QueuePersistenceStrategy#store(java.lang.Object)
066: */
067: public Object store(String queue, Object obj) throws IOException {
068: String id = getId(obj);
069: File file = FileUtils.newFile(store, queue + File.separator
070: + id + EXTENSION);
071: file.getParentFile().mkdirs();
072: ObjectOutputStream oos = new ObjectOutputStream(
073: new FileOutputStream(file));
074: oos.writeObject(obj);
075: oos.close();
076: return id;
077: }
078:
079: /*
080: * (non-Javadoc)
081: *
082: * @see org.mule.transaction.xa.queue.QueuePersistenceStrategy#remove(java.lang.Object)
083: */
084: public void remove(String queue, Object id) throws IOException {
085: File file = FileUtils.newFile(store, queue + File.separator
086: + id + EXTENSION);
087: if (file.exists()) {
088: if (!file.delete()) {
089: throw new DeleteException(file);
090: }
091: } else {
092: throw new FileNotFoundException(file.toString());
093: }
094: }
095:
096: /*
097: * (non-Javadoc)
098: *
099: * @see org.mule.transaction.xa.queue.QueuePersistenceStrategy#load(java.lang.Object)
100: */
101: public Object load(String queue, Object id) throws IOException {
102: File file = FileUtils.newFile(store, queue + File.separator
103: + id + EXTENSION);
104: ObjectInputStream ois = null;
105: try {
106: ois = new ObjectInputStream(new FileInputStream(file));
107: Object obj = ois.readObject();
108: return obj;
109: } catch (ClassNotFoundException e) {
110: throw (IOException) new IOException(
111: "Error loading persistent object").initCause(e);
112: } finally {
113: if (ois != null) {
114: ois.close();
115: }
116: }
117: }
118:
119: /*
120: * (non-Javadoc)
121: *
122: * @see org.mule.transaction.xa.queue.QueuePersistenceStrategy#restore()
123: */
124: public List restore() throws IOException {
125: List msgs = new ArrayList();
126: if (store == null) {
127: logger
128: .warn("No store has be set on the File Persistence Strategy. Not restoring at this time");
129: return msgs;
130: }
131: try {
132: restoreFiles(store, msgs);
133: logger.debug("Restore retrieved " + msgs.size()
134: + " objects");
135: return msgs;
136: } catch (ClassNotFoundException e) {
137: throw (IOException) new IOException("Could not restore")
138: .initCause(e);
139: }
140: }
141:
142: protected void restoreFiles(File dir, List msgs)
143: throws IOException, ClassNotFoundException {
144: File[] files = dir.listFiles();
145: if (files == null) {
146: return;
147: }
148:
149: for (int i = 0; i < files.length; i++) {
150: if (files[i].isDirectory()) {
151: restoreFiles(files[i], msgs);
152: } else if (files[i].getName().endsWith(EXTENSION)) {
153: String id = files[i].getCanonicalPath();
154: id = id.substring(
155: store.getCanonicalPath().length() + 1, id
156: .length()
157: - EXTENSION.length());
158: String queue = id.substring(0, id
159: .indexOf(File.separator));
160: id = id.substring(queue.length() + 1);
161: msgs.add(new HolderImpl(queue, id));
162: }
163: }
164: }
165:
166: /*
167: * (non-Javadoc)
168: *
169: * @see org.mule.util.queue.QueuePersistenceStrategy#open()
170: */
171: public void open() throws IOException {
172: String path = RegistryContext.getConfiguration()
173: .getWorkingDirectory()
174: + File.separator + DEFAULT_QUEUE_STORE;
175: store = FileUtils.newFile(path).getCanonicalFile();
176: store.mkdirs();
177: }
178:
179: /*
180: * (non-Javadoc)
181: *
182: * @see org.mule.transaction.xa.queue.QueuePersistenceStrategy#close()
183: */
184: public void close() throws IOException {
185: // Nothing to do
186: }
187:
188: protected static class HolderImpl implements Holder {
189: private String queue;
190: private Object id;
191:
192: public HolderImpl(String queue, Object id) {
193: this .queue = queue;
194: this .id = id;
195: }
196:
197: public Object getId() {
198: return id;
199: }
200:
201: public String getQueue() {
202: return queue;
203: }
204: }
205:
206: public boolean isTransient() {
207: return false;
208: }
209: }
|