001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017:
018: package org.apache.catalina.ha.deploy;
019:
020: import java.io.File;
021: import java.io.IOException;
022: import java.io.FileInputStream;
023: import java.io.FileOutputStream;
024: import java.io.FileNotFoundException;
025:
026: /**
027: * This factory is used to read files and write files by splitting them up into
028: * smaller messages. So that entire files don't have to be read into memory.
029: * <BR>
030: * The factory can be used as a reader or writer but not both at the same time.
031: * When done reading or writing the factory will close the input or output
032: * streams and mark the factory as closed. It is not possible to use it after
033: * that. <BR>
034: * To force a cleanup, call cleanup() from the calling object. <BR>
035: * This class is not thread safe.
036: *
037: * @author Filip Hanik
038: * @version 1.0
039: */
040: public class FileMessageFactory {
041: /*--Static Variables----------------------------------------*/
042: public static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory
043: .getLog(FileMessageFactory.class);
044:
045: /**
046: * The number of bytes that we read from file
047: */
048: public static final int READ_SIZE = 1024 * 10; //10kb
049:
050: /**
051: * The file that we are reading/writing
052: */
053: protected File file = null;
054:
055: /**
056: * True means that we are writing with this factory. False means that we are
057: * reading with this factory
058: */
059: protected boolean openForWrite;
060:
061: /**
062: * Once the factory is used, it can not be reused.
063: */
064: protected boolean closed = false;
065:
066: /**
067: * When openForWrite=false, the input stream is held by this variable
068: */
069: protected FileInputStream in;
070:
071: /**
072: * When openForWrite=true, the output stream is held by this variable
073: */
074: protected FileOutputStream out;
075:
076: /**
077: * The number of messages we have read or written
078: */
079: protected int nrOfMessagesProcessed = 0;
080:
081: /**
082: * The total size of the file
083: */
084: protected long size = 0;
085:
086: /**
087: * The total number of packets that we split this file into
088: */
089: protected long totalNrOfMessages = 0;
090:
091: /**
092: * The bytes that we hold the data in, not thread safe.
093: */
094: protected byte[] data = new byte[READ_SIZE];
095:
096: /**
097: * Private constructor, either instantiates a factory to read or write. <BR>
098: * When openForWrite==true, then a the file, f, will be created and an
099: * output stream is opened to write to it. <BR>
100: * When openForWrite==false, an input stream is opened, the file has to
101: * exist.
102: *
103: * @param f
104: * File - the file to be read/written
105: * @param openForWrite
106: * boolean - true means we are writing to the file, false means
107: * we are reading from the file
108: * @throws FileNotFoundException -
109: * if the file to be read doesn't exist
110: * @throws IOException -
111: * if the system fails to open input/output streams to the file
112: * or if it fails to create the file to be written to.
113: */
114: private FileMessageFactory(File f, boolean openForWrite)
115: throws FileNotFoundException, IOException {
116: this .file = f;
117: this .openForWrite = openForWrite;
118: if (log.isDebugEnabled())
119: log.debug("open file " + f + " write " + openForWrite);
120: if (openForWrite) {
121: if (!file.exists())
122: file.createNewFile();
123: out = new FileOutputStream(f);
124: } else {
125: size = file.length();
126: totalNrOfMessages = (size / READ_SIZE) + 1;
127: in = new FileInputStream(f);
128: }//end if
129:
130: }
131:
132: /**
133: * Creates a factory to read or write from a file. When opening for read,
134: * the readMessage can be invoked, and when opening for write the
135: * writeMessage can be invoked.
136: *
137: * @param f
138: * File - the file to be read or written
139: * @param openForWrite
140: * boolean - true, means we are writing to the file, false means
141: * we are reading from it
142: * @throws FileNotFoundException -
143: * if the file to be read doesn't exist
144: * @throws IOException -
145: * if it fails to create the file that is to be written
146: * @return FileMessageFactory
147: */
148: public static FileMessageFactory getInstance(File f,
149: boolean openForWrite) throws FileNotFoundException,
150: IOException {
151: return new FileMessageFactory(f, openForWrite);
152: }
153:
154: /**
155: * Reads file data into the file message and sets the size, totalLength,
156: * totalNrOfMsgs and the message number <BR>
157: * If EOF is reached, the factory returns null, and closes itself, otherwise
158: * the same message is returned as was passed in. This makes sure that not
159: * more memory is ever used. To remember, neither the file message or the
160: * factory are thread safe. dont hand off the message to one thread and read
161: * the same with another.
162: *
163: * @param f
164: * FileMessage - the message to be populated with file data
165: * @throws IllegalArgumentException -
166: * if the factory is for writing or is closed
167: * @throws IOException -
168: * if a file read exception occurs
169: * @return FileMessage - returns the same message passed in as a parameter,
170: * or null if EOF
171: */
172: public FileMessage readMessage(FileMessage f)
173: throws IllegalArgumentException, IOException {
174: checkState(false);
175: int length = in.read(data);
176: if (length == -1) {
177: cleanup();
178: return null;
179: } else {
180: f.setData(data, length);
181: f.setTotalLength(size);
182: f.setTotalNrOfMsgs(totalNrOfMessages);
183: f.setMessageNumber(++nrOfMessagesProcessed);
184: return f;
185: }//end if
186: }
187:
188: /**
189: * Writes a message to file. If (msg.getMessageNumber() ==
190: * msg.getTotalNrOfMsgs()) the output stream will be closed after writing.
191: *
192: * @param msg
193: * FileMessage - message containing data to be written
194: * @throws IllegalArgumentException -
195: * if the factory is opened for read or closed
196: * @throws IOException -
197: * if a file write error occurs
198: * @return returns true if the file is complete and outputstream is closed,
199: * false otherwise.
200: */
201: public boolean writeMessage(FileMessage msg)
202: throws IllegalArgumentException, IOException {
203: if (!openForWrite)
204: throw new IllegalArgumentException(
205: "Can't write message, this factory is reading.");
206: if (log.isDebugEnabled())
207: log.debug("Message " + msg + " data " + msg.getData()
208: + " data length " + msg.getDataLength() + " out "
209: + out);
210: if (out != null) {
211: out.write(msg.getData(), 0, msg.getDataLength());
212: nrOfMessagesProcessed++;
213: out.flush();
214: if (msg.getMessageNumber() == msg.getTotalNrOfMsgs()) {
215: out.close();
216: cleanup();
217: return true;
218: }//end if
219: } else {
220: if (log.isWarnEnabled())
221: log
222: .warn("Receive Message again -- Sender ActTimeout to short [ path: "
223: + msg.getContextPath()
224: + " war: "
225: + msg.getFileName()
226: + " data: "
227: + msg.getData()
228: + " data length: "
229: + msg.getDataLength() + " ]");
230: }
231: return false;
232: }//writeMessage
233:
234: /**
235: * Closes the factory, its streams and sets all its references to null
236: */
237: public void cleanup() {
238: if (in != null)
239: try {
240: in.close();
241: } catch (Exception ignore) {
242: }
243: if (out != null)
244: try {
245: out.close();
246: } catch (Exception ignore) {
247: }
248: in = null;
249: out = null;
250: size = 0;
251: closed = true;
252: data = null;
253: nrOfMessagesProcessed = 0;
254: totalNrOfMessages = 0;
255: }
256:
257: /**
258: * Check to make sure the factory is able to perform the function it is
259: * asked to do. Invoked by readMessage/writeMessage before those methods
260: * proceed.
261: *
262: * @param openForWrite
263: * boolean
264: * @throws IllegalArgumentException
265: */
266: protected void checkState(boolean openForWrite)
267: throws IllegalArgumentException {
268: if (this .openForWrite != openForWrite) {
269: cleanup();
270: if (openForWrite)
271: throw new IllegalArgumentException(
272: "Can't write message, this factory is reading.");
273: else
274: throw new IllegalArgumentException(
275: "Can't read message, this factory is writing.");
276: }
277: if (this .closed) {
278: cleanup();
279: throw new IllegalArgumentException(
280: "Factory has been closed.");
281: }
282: }
283:
284: /**
285: * Example usage.
286: *
287: * @param args
288: * String[], args[0] - read from filename, args[1] write to
289: * filename
290: * @throws Exception
291: */
292: public static void main(String[] args) throws Exception {
293:
294: System.out
295: .println("Usage: FileMessageFactory fileToBeRead fileToBeWritten");
296: System.out
297: .println("Usage: This will make a copy of the file on the local file system");
298: FileMessageFactory read = getInstance(new File(args[0]), false);
299: FileMessageFactory write = getInstance(new File(args[1]), true);
300: FileMessage msg = new FileMessage(null, args[0], args[0]);
301: msg = read.readMessage(msg);
302: System.out.println("Expecting to write "
303: + msg.getTotalNrOfMsgs() + " messages.");
304: int cnt = 0;
305: while (msg != null) {
306: write.writeMessage(msg);
307: cnt++;
308: msg = read.readMessage(msg);
309: }//while
310: System.out.println("Actually wrote " + cnt + " messages.");
311: }///main
312:
313: public File getFile() {
314: return file;
315: }
316:
317: }
|