001: /*
002: * JBoss, Home of Professional Open Source.
003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004: * as indicated by the @author tags. See the copyright.txt file in the
005: * distribution for a full listing of individual contributors.
006: *
007: * This is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU Lesser General Public License as
009: * published by the Free Software Foundation; either version 2.1 of
010: * the License, or (at your option) any later version.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public
018: * License along with this software; if not, write to the Free
019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021: */
022: package org.jboss.ha.framework.server;
023:
024: import org.jboss.ha.framework.interfaces.HAPartition;
025: import org.jboss.ha.framework.interfaces.ClusterNode;
026: import org.jboss.system.server.ServerConfigLocator;
027: import org.jboss.logging.Logger;
028: import org.jboss.ha.framework.interfaces.HAPartition.AsynchHAMembershipListener;
029:
030: import java.util.*;
031: import java.io.*;
032:
033: /**
034: * Handles transfering files on the cluster. Files are sent in small chunks at a time (up to MAX_CHUNK_BUFFER_SIZE bytes per
035: * Cluster call).
036: *
037: * @author <a href="mailto:smarlow@novell.com">Scott Marlow</a>.
038: * @version $Revision: 62037 $
039: */
040: public class ClusterFileTransfer implements AsynchHAMembershipListener {
041:
042: // Specify max file transfer buffer size that we read and write at a time.
043: // This influences the number of times that we will invoke disk read/write file
044: // operations versus how much memory we will consume for a file transfer.
045: private static final int MAX_CHUNK_BUFFER_SIZE = 512 * 1024;
046:
047: // collection of in-progress file push operations
048: private Map mPushsInProcess = Collections
049: .synchronizedMap(new HashMap());
050:
051: // collection of in-progress file pull operations
052: private Map mPullsInProcess = Collections
053: .synchronizedMap(new HashMap());
054:
055: private HAPartition mPartition;
056:
057: private static final File TEMP_DIRECTORY = ServerConfigLocator
058: .locate().getServerTempDir();
059:
060: // Mapping between parent folder name and target destination folder
061: // the search key is the parent folder name and value is the java.io.File.
062: // We don't synchronize on the mParentFolders as we assume its safe to read it.
063: private Map mParentFolders = null;
064:
065: private static final String SERVICE_NAME = ClusterFileTransfer.class
066: .getName()
067: + "Service";
068:
069: private static final Logger log = Logger
070: .getLogger(ClusterFileTransfer.class.getName());
071:
072: /**
073: * Constructor needs the cluster partition and the mapping of server folder names to the java.io.File instance
074: * representing the physical folder.
075: *
076: * @param partition represents the cluster.
077: * @param destinationDirectoryMap is the mapping between server folder name and physical folder representation.
078: */
079: public ClusterFileTransfer(HAPartition partition,
080: Map destinationDirectoryMap) {
081: this .mPartition = partition;
082: this .mPartition.registerRPCHandler(SERVICE_NAME, this );
083: this .mPartition.registerMembershipListener(this );
084: mParentFolders = destinationDirectoryMap;
085: }
086:
087: /**
088: * Get specified file from the cluster.
089: *
090: * @param file identifies the file to get from the cluster.
091: * @param parentName is the parent folder name for the file on both source and destination nodes.
092: * @throws ClusterFileTransferException
093: */
094: public void pull(File file, String parentName)
095: throws ClusterFileTransferException {
096: String myNodeName = this .mPartition.getNodeName();
097: ClusterNode myNodeAddress = this .mPartition.getClusterNode();
098: FileOutputStream output = null;
099: try {
100: log.info("Start pull of file " + file.getName()
101: + " from cluster.");
102: ArrayList response = mPartition
103: .callMethodOnCoordinatorNode(SERVICE_NAME,
104: "remotePullOpenFile", new Object[] { file,
105: myNodeName, myNodeAddress,
106: parentName }, new Class[] {
107: java.io.File.class,
108: java.lang.String.class,
109: ClusterNode.class,
110: java.lang.String.class }, true);
111:
112: if (response == null || response.size() < 1) {
113: throw new ClusterFileTransferException(
114: "Did not receive response from remote machine trying to open file '"
115: + file
116: + "'. Check remote machine error log.");
117: }
118:
119: FileContentChunk fileChunk = (FileContentChunk) response
120: .get(0);
121: if (null == fileChunk) {
122: throw new ClusterFileTransferException(
123: "An error occured on remote machine trying to open file '"
124: + file
125: + "'. Check remote machine error log.");
126: }
127:
128: File tempFile = new File(ClusterFileTransfer
129: .getServerTempDir(), file.getName());
130: output = new FileOutputStream(tempFile);
131:
132: // get the remote file modification time and change our local copy to have the same time.
133: long lastModification = fileChunk.lastModified();
134: while (fileChunk.mByteCount > 0) {
135: output.write(fileChunk.mChunk, 0, fileChunk.mByteCount);
136: response = mPartition.callMethodOnCoordinatorNode(
137: SERVICE_NAME, "remotePullReadFile",
138: new Object[] { file, myNodeName }, new Class[] {
139: java.io.File.class,
140: java.lang.String.class }, true);
141: if (response.size() < 1) {
142: if (!tempFile.delete())
143: throw new ClusterFileTransferException(
144: "An error occured on remote machine trying to read file '"
145: + file
146: + "'. Is remote still running? Also, we couldn't delete temp file "
147: + tempFile.getName());
148: throw new ClusterFileTransferException(
149: "An error occured on remote machine trying to read file '"
150: + file
151: + "'. Is remote still running?");
152: }
153: fileChunk = (FileContentChunk) response.get(0);
154: if (null == fileChunk) {
155: if (!tempFile.delete())
156: throw new ClusterFileTransferException(
157: "An error occured on remote machine trying to read file '"
158: + file
159: + "'. Check remote machine error log. Also, we couldn't delete temp file "
160: + tempFile.getName());
161: throw new ClusterFileTransferException(
162: "An error occured on remote machine trying to read file '"
163: + file
164: + "'. Check remote machine error log.");
165: }
166: }
167: output.close();
168: output = null;
169: File target = new File(getParentFile(parentName), file
170: .getName());
171: if (target.exists()) {
172: if (!target.delete())
173: throw new ClusterFileTransferException(
174: "The destination file "
175: + target
176: + " couldn't be deleted, the updated application will not be copied to this node");
177:
178: }
179: tempFile.setLastModified(lastModification);
180: if (!localMove(tempFile, target)) {
181: throw new ClusterFileTransferException(
182: "Could not move " + tempFile + " to " + target);
183: }
184: log.info("Finished cluster pull of file " + file.getName()
185: + " to " + target.getName());
186: } catch (IOException e) {
187: throw new ClusterFileTransferException(e);
188: } catch (ClusterFileTransferException e) {
189: throw e;
190: } catch (Exception e) {
191: throw new ClusterFileTransferException(e);
192: } finally {
193: if (output != null) {
194: try {
195: output.close();
196: } catch (IOException e) {
197: logException(e);
198: } // we are already in the middle of a throw if output isn't null.
199: }
200: }
201: }
202:
203: /**
204: * This is remotely called by {@link #pull(File , String )} to open the file on the machine that
205: * the file is being copied from.
206: *
207: * @param file is the file to pull.
208: * @param originNodeName is the cluster node that is requesting the file.
209: * @param parentName is the parent folder name for the file on both source and destination nodes.
210: * @return FileContentChunk containing the first part of the file read after opening it.
211: */
212: public FileContentChunk remotePullOpenFile(File file,
213: String originNodeName, ClusterNode originNode,
214: String parentName) {
215: try {
216: File target = new File(getParentFile(parentName), file
217: .getName());
218: FileContentChunk fileChunk = new FileContentChunk(target,
219: originNodeName, originNode);
220: FilePullOperation filePullOperation = new FilePullOperation(
221: fileChunk);
222: // save the operation for the next call to remoteReadFile
223: this .mPullsInProcess.put(CompositeKey(originNodeName, file
224: .getName()), filePullOperation);
225: filePullOperation.openInputFile();
226: fileChunk.readNext(filePullOperation.getInputStream());
227: return fileChunk;
228: } catch (IOException e) {
229: logException(e);
230: } catch (Exception e) {
231: logException(e);
232: }
233: return null;
234: }
235:
236: /**
237: * This is remotely called by {@link #pull(File, String )} to read the file on the machine that the file is being
238: * copied from.
239: *
240: * @param file is the file to pull.
241: * @param originNodeName is the cluster node that is requesting the file.
242: * @return FileContentChunk containing the next part of the file read.
243: */
244: public FileContentChunk remotePullReadFile(File file,
245: String originNodeName) {
246: try {
247: FilePullOperation filePullOperation = (FilePullOperation) this .mPullsInProcess
248: .get(CompositeKey(originNodeName, file.getName()));
249: filePullOperation.getFileChunk().readNext(
250: filePullOperation.getInputStream());
251: if (filePullOperation.getFileChunk().mByteCount < 1) {
252: // last call to read, so clean up
253: filePullOperation.getInputStream().close();
254: this .mPullsInProcess.remove(CompositeKey(
255: originNodeName, file.getName()));
256: }
257: return filePullOperation.getFileChunk();
258: } catch (IOException e) {
259: logException(e);
260: }
261: return null;
262: }
263:
264: /**
265: * Send specified file to cluster.
266: *
267: * @param file is the file to send.
268: * @param leaveInTempFolder is true if the file should be left in the server temp folder.
269: * @throws ClusterFileTransferException
270: */
271: public void push(File file, String parentName,
272: boolean leaveInTempFolder)
273: throws ClusterFileTransferException {
274: File target = new File(getParentFile(parentName), file
275: .getName());
276:
277: log.info("Start push of file " + file.getName()
278: + " to cluster.");
279: // check if trying to send explored archive (cannot send subdirectories)
280: if (target.isDirectory()) {
281: // let the user know why we are skipping this file and return.
282: logMessage("You cannot send the contents of directories, consider archiving folder containing"
283: + target.getName() + " instead.");
284: return;
285: }
286: ClusterNode myNodeAddress = this .mPartition.getClusterNode();
287: FileContentChunk fileChunk = new FileContentChunk(target,
288: this .mPartition.getNodeName(), myNodeAddress);
289: try {
290: InputStream input = fileChunk.openInputFile();
291: while (fileChunk.readNext(input) >= 0) {
292: mPartition.callMethodOnCluster(SERVICE_NAME,
293: "remotePushWriteFile", new Object[] {
294: fileChunk, parentName }, new Class[] {
295: fileChunk.getClass(),
296: java.lang.String.class }, true);
297: }
298: // tell remote(s) to close the output file
299: mPartition
300: .callMethodOnCluster(SERVICE_NAME,
301: "remotePushCloseFile", new Object[] {
302: fileChunk,
303: new Boolean(leaveInTempFolder),
304: parentName }, new Class[] {
305: fileChunk.getClass(),
306: Boolean.class,
307: java.lang.String.class }, true);
308: input.close();
309: log.info("Finished push of file " + file.getName()
310: + " to cluster.");
311: } catch (FileNotFoundException e) {
312: throw new ClusterFileTransferException(e);
313: } catch (IOException e) {
314: throw new ClusterFileTransferException(e);
315: } catch (Exception e) {
316: throw new ClusterFileTransferException(e);
317: }
318: }
319:
320: /**
321: * Remote method for writing file a fragment at a time.
322: *
323: * @param fileChunk
324: */
325: public void remotePushWriteFile(FileContentChunk fileChunk,
326: String parentName) {
327: try {
328: String key = CompositeKey(fileChunk
329: .getOriginatingNodeName(), fileChunk
330: .getDestinationFile().getName());
331: FilePushOperation filePushOperation = (FilePushOperation) mPushsInProcess
332: .get(key);
333:
334: // handle first call to write
335: if (filePushOperation == null) {
336: if (fileChunk.mChunkNumber != FileContentChunk.FIRST_CHUNK) {
337: // we joined the cluster after the file transfer started
338: logMessage("Ignoring file transfer of '"
339: + fileChunk.getDestinationFile().getName()
340: + "' from "
341: + fileChunk.getOriginatingNodeName()
342: + ", we missed the start of it.");
343: return;
344: }
345: filePushOperation = new FilePushOperation(fileChunk
346: .getOriginatingNodeName(), fileChunk
347: .getOriginatingNode());
348: File tempFile = new File(ClusterFileTransfer
349: .getServerTempDir(), fileChunk
350: .getDestinationFile().getName());
351: filePushOperation.openOutputFile(tempFile);
352: mPushsInProcess.put(key, filePushOperation);
353: }
354: filePushOperation.getOutputStream().write(fileChunk.mChunk,
355: 0, fileChunk.mByteCount);
356: } catch (FileNotFoundException e) {
357: logException(e);
358: } catch (IOException e) {
359: logException(e);
360: }
361: }
362:
363: /**
364: * Remote method for closing the file just transmitted.
365: *
366: * @param fileChunk
367: * @param leaveInTempFolder is true if we should leave the file in the server temp folder
368: */
369: public void remotePushCloseFile(FileContentChunk fileChunk,
370: Boolean leaveInTempFolder, String parentName) {
371: try {
372: FilePushOperation filePushOperation = (FilePushOperation) mPushsInProcess
373: .remove(CompositeKey(fileChunk
374: .getOriginatingNodeName(), fileChunk
375: .getDestinationFile().getName()));
376:
377: if ((filePushOperation != null)
378: && (filePushOperation.getOutputStream() != null)) {
379: filePushOperation.getOutputStream().close();
380: if (!leaveInTempFolder.booleanValue()) {
381: File tempFile = new File(ClusterFileTransfer
382: .getServerTempDir(), fileChunk
383: .getDestinationFile().getName());
384: File target = new File(getParentFile(parentName),
385: fileChunk.getDestinationFile().getName());
386: if (target.exists())
387: if (!target.delete())
388: logMessage("Could not delete target file "
389: + target);
390:
391: tempFile.setLastModified(fileChunk.lastModified());
392: if (!localMove(tempFile, target)) {
393: logMessage("Could not move " + tempFile
394: + " to " + target);
395: }
396: }
397: }
398: } catch (IOException e) {
399: logException(e);
400: }
401: }
402:
403: /** Called when a new partition topology occurs. see HAPartition.AsynchHAMembershipListener
404: *
405: * @param deadMembers A list of nodes that have died since the previous view
406: * @param newMembers A list of nodes that have joined the partition since the previous view
407: * @param allMembers A list of nodes that built the current view
408: */
409: public void membershipChanged(Vector deadMembers,
410: Vector newMembers, Vector allMembers) {
411: // Are there any deadMembers contained in mPushsInProcess or in mPullsInProcess.
412: // If so, cancel operations for them.
413: // If contained in mPushsInProcess, then we can stop waiting for the rest of the file transfer.
414: // If contained in mPullsInProcess, then we can stop supplying for the rest of the file transfer.
415:
416: if (mPushsInProcess.size() > 0) {
417: synchronized (mPushsInProcess) {
418: Collection values = mPushsInProcess.values();
419: Iterator iter = values.iterator();
420: while (iter.hasNext()) {
421: FilePushOperation push = (FilePushOperation) iter
422: .next();
423: if (deadMembers.contains(push.getOriginatingNode())) {
424: // cancel the operation and remove the operation from mPushsInProcess
425: push.cancel();
426: iter.remove();
427: }
428: }
429: }
430: }
431:
432: if (mPullsInProcess.size() > 0) {
433: synchronized (mPullsInProcess) {
434: Collection values = mPullsInProcess.values();
435: Iterator iter = values.iterator();
436: while (iter.hasNext()) {
437: FilePullOperation pull = (FilePullOperation) iter
438: .next();
439: if (deadMembers.contains(pull.getFileChunk()
440: .getOriginatingNode())) {
441: // cancel the operation and remove the operation from mPullsInProcess
442: pull.cancel();
443: iter.remove();
444: }
445: }
446: }
447: }
448: }
449:
450: private static File getServerTempDir() {
451: return TEMP_DIRECTORY;
452: }
453:
454: private File getParentFile(String parentName) {
455: return (File) mParentFolders.get(parentName);
456: }
457:
458: private String CompositeKey(String originNodeName, String fileName) {
459: return originNodeName + "#" + fileName;
460: }
461:
462: private static void logMessage(String message) {
463: log.info(message);
464: }
465:
466: private static void logException(Throwable e) {
467: //e.printStackTrace();
468: log.error(e);
469: }
470:
471: /**
472: * Represents file push operation.
473: */
474: private static class FilePushOperation {
475:
476: public FilePushOperation(String originNodeName,
477: ClusterNode originNode) {
478: mOriginNodeName = originNodeName;
479: mOriginNode = originNode;
480: }
481:
482: public void openOutputFile(File file)
483: throws FileNotFoundException {
484: mOutput = new FileOutputStream(file);
485: mOutputFile = file;
486: }
487:
488: /**
489: * Cancel the file push operation. To be called locally on each machine that is
490: * receiving the file.
491: */
492: public void cancel() {
493: ClusterFileTransfer
494: .logMessage("Canceling receive of file "
495: + mOutputFile
496: + " as remote server "
497: + mOriginNodeName
498: + " left the cluster. Partial results will be deleted.");
499: try {
500: // close the output stream and delete the file.
501: mOutput.close();
502: if (!mOutputFile.delete())
503: logMessage("Could not delete output file "
504: + mOutputFile);
505: } catch (IOException e) {
506: logException(e);
507: }
508: }
509:
510: /**
511: * Get the IPAddress of the cluster node that is pushing file to the cluster.
512: * @return IPAddress
513: */
514: public ClusterNode getOriginatingNode() {
515: return mOriginNode;
516: }
517:
518: public OutputStream getOutputStream() {
519: return mOutput;
520: }
521:
522: private OutputStream mOutput;
523: private String mOriginNodeName;
524: private ClusterNode mOriginNode;
525: private File mOutputFile;
526: }
527:
528: /**
529: * Represents file pull operation.
530: */
531: private static class FilePullOperation {
532: public FilePullOperation(FileContentChunk fileChunk) {
533: mFileChunk = fileChunk;
534: }
535:
536: public void openInputFile() throws FileNotFoundException {
537: mInput = mFileChunk.openInputFile();
538: }
539:
540: public InputStream getInputStream() {
541: return mInput;
542: }
543:
544: /**
545: * Cancel the file pull operation. To be called locally on the machine that is supplying the file.
546: */
547: public void cancel() {
548: logMessage("Canceling send of file "
549: + mFileChunk.getDestinationFile()
550: + " as remote server "
551: + mFileChunk.getOriginatingNodeName()
552: + " left the cluster.");
553: try {
554: mInput.close();
555: } catch (IOException e) {
556: logException(e);
557: }
558: }
559:
560: public FileContentChunk getFileChunk() {
561: return mFileChunk;
562: }
563:
564: private FileContentChunk mFileChunk;
565: private InputStream mInput;
566: }
567:
568: /**
569: * For representing filetransfer state on the wire.
570: * The inputStream or OutputStream is expected to be maintained by the sender/receiver.
571: */
572: private static class FileContentChunk implements Serializable {
573:
574: public FileContentChunk(File file, String originNodeName,
575: ClusterNode originNode) {
576: this .mDestinationFile = file;
577: this .mLastModified = file.lastModified();
578: this .mOriginNode = originNode;
579: this .mOriginNodeName = originNodeName;
580: mChunkNumber = 0;
581: long size = file.length();
582: if (size > MAX_CHUNK_BUFFER_SIZE)
583: size = MAX_CHUNK_BUFFER_SIZE;
584: else if (size <= 0)
585: size = 1;
586: mChunk = new byte[(int) size]; // set amount transferred at a time
587: mByteCount = 0;
588: }
589:
590: /**
591: * Get the name of the cluster node that started the file transfer operation
592: *
593: * @return node name
594: */
595: public String getOriginatingNodeName() {
596: return this .mOriginNodeName;
597: }
598:
599: /**
600: * Get the address of the cluster node that started the file transfer operation.
601: * @return ClusterNode
602: */
603: public ClusterNode getOriginatingNode() {
604: return mOriginNode;
605: }
606:
607: public File getDestinationFile() {
608: return this .mDestinationFile;
609: }
610:
611: /**
612: * Open input file
613: *
614: * @throws FileNotFoundException
615: */
616: public InputStream openInputFile() throws FileNotFoundException {
617: return new FileInputStream(this .mDestinationFile);
618: }
619:
620: /**
621: * Open output file
622: *
623: * @return
624: * @throws FileNotFoundException
625: */
626: public OutputStream openOutputFile()
627: throws FileNotFoundException {
628: File lFile = new File(ClusterFileTransfer
629: .getServerTempDir(), this .mDestinationFile
630: .getName());
631: FileOutputStream output = new FileOutputStream(lFile);
632: return output;
633: }
634:
635: /**
636: * @return number of bytes read
637: * @throws IOException
638: */
639: public int readNext(InputStream input) throws IOException {
640: this .mChunkNumber++;
641: this .mByteCount = input.read(this .mChunk);
642: return this .mByteCount;
643: }
644:
645: public long lastModified() {
646: return mLastModified;
647: }
648:
649: static final long serialVersionUID = 3546447481674749363L;
650: private File mDestinationFile;
651: private long mLastModified;
652: private String mOriginNodeName;
653: private ClusterNode mOriginNode;
654: private int mChunkNumber;
655: private static final int FIRST_CHUNK = 1;
656: private byte[] mChunk;
657: private int mByteCount;
658: }
659:
660: public static boolean localMove(File source, File destination)
661: throws FileNotFoundException, IOException {
662: if (source.renameTo(destination)) // if we can simply rename the file
663: return true; // return success
664: // otherwise, copy source to destination
665: OutputStream out = new FileOutputStream(destination);
666: InputStream in = new FileInputStream(source);
667: byte buffer[] = new byte[32 * 1024];
668: int bytesRead = 0;
669: while (bytesRead > -1) { // until we hit end of source file
670: bytesRead = in.read(buffer);
671: if (bytesRead > 0) {
672: out.write(buffer, 0, bytesRead);
673: }
674: }
675: in.close();
676: out.close();
677: if (!source.delete())
678: logMessage("Could not delete file " + source);
679: return true;
680: }
681:
682: /**
683: * Exception wrapper class
684: */
685: public static class ClusterFileTransferException extends Exception {
686: public ClusterFileTransferException(String message) {
687: super (message);
688: }
689:
690: public ClusterFileTransferException(String message,
691: Throwable cause) {
692: super (message, cause);
693: }
694:
695: public ClusterFileTransferException(Throwable cause) {
696: super(cause);
697: }
698: }
699: }
|