Source Code Cross Referenced for ClusterFileTransfer.java in  » EJB-Server-JBoss-4.2.1 » cluster » org » jboss » ha » framework » server » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » EJB Server JBoss 4.2.1 » cluster » org.jboss.ha.framework.server 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        /*
002:         * JBoss, Home of Professional Open Source.
003:         * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004:         * as indicated by the @author tags. See the copyright.txt file in the
005:         * distribution for a full listing of individual contributors.
006:         *
007:         * This is free software; you can redistribute it and/or modify it
008:         * under the terms of the GNU Lesser General Public License as
009:         * published by the Free Software Foundation; either version 2.1 of
010:         * the License, or (at your option) any later version.
011:         *
012:         * This software is distributed in the hope that it will be useful,
013:         * but WITHOUT ANY WARRANTY; without even the implied warranty of
014:         * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015:         * Lesser General Public License for more details.
016:         *
017:         * You should have received a copy of the GNU Lesser General Public
018:         * License along with this software; if not, write to the Free
019:         * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020:         * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021:         */
022:        package org.jboss.ha.framework.server;
023:
024:        import org.jboss.ha.framework.interfaces.HAPartition;
025:        import org.jboss.ha.framework.interfaces.ClusterNode;
026:        import org.jboss.system.server.ServerConfigLocator;
027:        import org.jboss.logging.Logger;
028:        import org.jboss.ha.framework.interfaces.HAPartition.AsynchHAMembershipListener;
029:
030:        import java.util.*;
031:        import java.io.*;
032:
033:        /**
034:         * Handles transfering files on the cluster.  Files are sent in small chunks at a time (up to MAX_CHUNK_BUFFER_SIZE bytes per
035:         * Cluster call).
036:         *
037:         * @author <a href="mailto:smarlow@novell.com">Scott Marlow</a>.
038:         * @version $Revision: 62037 $
039:         */
040:        public class ClusterFileTransfer implements  AsynchHAMembershipListener {
041:
042:            // Specify max file transfer buffer size that we read and write at a time.
043:            // This influences the number of times that we will invoke disk read/write file
044:            // operations versus how much memory we will consume for a file transfer.
045:            private static final int MAX_CHUNK_BUFFER_SIZE = 512 * 1024;
046:
047:            // collection of in-progress file push operations
048:            private Map mPushsInProcess = Collections
049:                    .synchronizedMap(new HashMap());
050:
051:            // collection of in-progress file pull operations
052:            private Map mPullsInProcess = Collections
053:                    .synchronizedMap(new HashMap());
054:
055:            private HAPartition mPartition;
056:
057:            private static final File TEMP_DIRECTORY = ServerConfigLocator
058:                    .locate().getServerTempDir();
059:
060:            // Mapping between parent folder name and target destination folder
061:            // the search key is the parent folder name and value is the java.io.File.
062:            // We don't synchronize on the mParentFolders as we assume its safe to read it.
063:            private Map mParentFolders = null;
064:
065:            private static final String SERVICE_NAME = ClusterFileTransfer.class
066:                    .getName()
067:                    + "Service";
068:
069:            private static final Logger log = Logger
070:                    .getLogger(ClusterFileTransfer.class.getName());
071:
072:            /**
073:             * Constructor needs the cluster partition and the mapping of server folder names to the java.io.File instance
074:             * representing the physical folder.
075:             *
076:             * @param partition represents the cluster.
077:             * @param destinationDirectoryMap is the mapping between server folder name and physical folder representation.
078:             */
079:            public ClusterFileTransfer(HAPartition partition,
080:                    Map destinationDirectoryMap) {
081:                this .mPartition = partition;
082:                this .mPartition.registerRPCHandler(SERVICE_NAME, this );
083:                this .mPartition.registerMembershipListener(this );
084:                mParentFolders = destinationDirectoryMap;
085:            }
086:
087:            /**
088:             * Get specified file from the cluster.
089:             *
090:             * @param file identifies the file to get from the cluster.
091:             * @param parentName is the parent folder name for the file on both source and destination nodes.
092:             * @throws ClusterFileTransferException
093:             */
094:            public void pull(File file, String parentName)
095:                    throws ClusterFileTransferException {
096:                String myNodeName = this .mPartition.getNodeName();
097:                ClusterNode myNodeAddress = this .mPartition.getClusterNode();
098:                FileOutputStream output = null;
099:                try {
100:                    log.info("Start pull of file " + file.getName()
101:                            + " from cluster.");
102:                    ArrayList response = mPartition
103:                            .callMethodOnCoordinatorNode(SERVICE_NAME,
104:                                    "remotePullOpenFile", new Object[] { file,
105:                                            myNodeName, myNodeAddress,
106:                                            parentName }, new Class[] {
107:                                            java.io.File.class,
108:                                            java.lang.String.class,
109:                                            ClusterNode.class,
110:                                            java.lang.String.class }, true);
111:
112:                    if (response == null || response.size() < 1) {
113:                        throw new ClusterFileTransferException(
114:                                "Did not receive response from remote machine trying to open file '"
115:                                        + file
116:                                        + "'.  Check remote machine error log.");
117:                    }
118:
119:                    FileContentChunk fileChunk = (FileContentChunk) response
120:                            .get(0);
121:                    if (null == fileChunk) {
122:                        throw new ClusterFileTransferException(
123:                                "An error occured on remote machine trying to open file '"
124:                                        + file
125:                                        + "'.  Check remote machine error log.");
126:                    }
127:
128:                    File tempFile = new File(ClusterFileTransfer
129:                            .getServerTempDir(), file.getName());
130:                    output = new FileOutputStream(tempFile);
131:
132:                    // get the remote file modification time and change our local copy to have the same time.
133:                    long lastModification = fileChunk.lastModified();
134:                    while (fileChunk.mByteCount > 0) {
135:                        output.write(fileChunk.mChunk, 0, fileChunk.mByteCount);
136:                        response = mPartition.callMethodOnCoordinatorNode(
137:                                SERVICE_NAME, "remotePullReadFile",
138:                                new Object[] { file, myNodeName }, new Class[] {
139:                                        java.io.File.class,
140:                                        java.lang.String.class }, true);
141:                        if (response.size() < 1) {
142:                            if (!tempFile.delete())
143:                                throw new ClusterFileTransferException(
144:                                        "An error occured on remote machine trying to read file '"
145:                                                + file
146:                                                + "'.  Is remote still running?  Also, we couldn't delete temp file "
147:                                                + tempFile.getName());
148:                            throw new ClusterFileTransferException(
149:                                    "An error occured on remote machine trying to read file '"
150:                                            + file
151:                                            + "'.  Is remote still running?");
152:                        }
153:                        fileChunk = (FileContentChunk) response.get(0);
154:                        if (null == fileChunk) {
155:                            if (!tempFile.delete())
156:                                throw new ClusterFileTransferException(
157:                                        "An error occured on remote machine trying to read file '"
158:                                                + file
159:                                                + "'.  Check remote machine error log.  Also, we couldn't delete temp file "
160:                                                + tempFile.getName());
161:                            throw new ClusterFileTransferException(
162:                                    "An error occured on remote machine trying to read file '"
163:                                            + file
164:                                            + "'.  Check remote machine error log.");
165:                        }
166:                    }
167:                    output.close();
168:                    output = null;
169:                    File target = new File(getParentFile(parentName), file
170:                            .getName());
171:                    if (target.exists()) {
172:                        if (!target.delete())
173:                            throw new ClusterFileTransferException(
174:                                    "The destination file "
175:                                            + target
176:                                            + " couldn't be deleted, the updated application will not be copied to this node");
177:
178:                    }
179:                    tempFile.setLastModified(lastModification);
180:                    if (!localMove(tempFile, target)) {
181:                        throw new ClusterFileTransferException(
182:                                "Could not move " + tempFile + " to " + target);
183:                    }
184:                    log.info("Finished cluster pull of file " + file.getName()
185:                            + " to " + target.getName());
186:                } catch (IOException e) {
187:                    throw new ClusterFileTransferException(e);
188:                } catch (ClusterFileTransferException e) {
189:                    throw e;
190:                } catch (Exception e) {
191:                    throw new ClusterFileTransferException(e);
192:                } finally {
193:                    if (output != null) {
194:                        try {
195:                            output.close();
196:                        } catch (IOException e) {
197:                            logException(e);
198:                        } // we are already in the middle of a throw if output isn't null.
199:                    }
200:                }
201:            }
202:
203:            /**
204:             * This is remotely called by {@link #pull(File , String )} to open the file on the machine that
205:             * the file is being copied from.
206:             *
207:             * @param file is the file to pull.
208:             * @param originNodeName is the cluster node that is requesting the file.
209:             * @param parentName     is the parent folder name for the file on both source and destination nodes.
210:             * @return FileContentChunk containing the first part of the file read after opening it.
211:             */
212:            public FileContentChunk remotePullOpenFile(File file,
213:                    String originNodeName, ClusterNode originNode,
214:                    String parentName) {
215:                try {
216:                    File target = new File(getParentFile(parentName), file
217:                            .getName());
218:                    FileContentChunk fileChunk = new FileContentChunk(target,
219:                            originNodeName, originNode);
220:                    FilePullOperation filePullOperation = new FilePullOperation(
221:                            fileChunk);
222:                    // save the operation for the next call to remoteReadFile
223:                    this .mPullsInProcess.put(CompositeKey(originNodeName, file
224:                            .getName()), filePullOperation);
225:                    filePullOperation.openInputFile();
226:                    fileChunk.readNext(filePullOperation.getInputStream());
227:                    return fileChunk;
228:                } catch (IOException e) {
229:                    logException(e);
230:                } catch (Exception e) {
231:                    logException(e);
232:                }
233:                return null;
234:            }
235:
236:            /**
237:             * This is remotely called by {@link #pull(File, String )} to read the file on the machine that the file is being
238:             * copied from.
239:             *
240:             * @param file is the file to pull.
241:             * @param originNodeName is the cluster node that is requesting the file.
242:             * @return FileContentChunk containing the next part of the file read.
243:             */
244:            public FileContentChunk remotePullReadFile(File file,
245:                    String originNodeName) {
246:                try {
247:                    FilePullOperation filePullOperation = (FilePullOperation) this .mPullsInProcess
248:                            .get(CompositeKey(originNodeName, file.getName()));
249:                    filePullOperation.getFileChunk().readNext(
250:                            filePullOperation.getInputStream());
251:                    if (filePullOperation.getFileChunk().mByteCount < 1) {
252:                        // last call to read, so clean up
253:                        filePullOperation.getInputStream().close();
254:                        this .mPullsInProcess.remove(CompositeKey(
255:                                originNodeName, file.getName()));
256:                    }
257:                    return filePullOperation.getFileChunk();
258:                } catch (IOException e) {
259:                    logException(e);
260:                }
261:                return null;
262:            }
263:
264:            /**
265:             * Send specified file to cluster.
266:             *
267:             * @param file is the file to send.
268:             * @param leaveInTempFolder is true if the file should be left in the server temp folder.
269:             * @throws ClusterFileTransferException
270:             */
271:            public void push(File file, String parentName,
272:                    boolean leaveInTempFolder)
273:                    throws ClusterFileTransferException {
274:                File target = new File(getParentFile(parentName), file
275:                        .getName());
276:
277:                log.info("Start push of file " + file.getName()
278:                        + " to cluster.");
279:                // check if trying to send explored archive (cannot send subdirectories)
280:                if (target.isDirectory()) {
281:                    // let the user know why we are skipping this file and return.
282:                    logMessage("You cannot send the contents of directories, consider archiving folder containing"
283:                            + target.getName() + " instead.");
284:                    return;
285:                }
286:                ClusterNode myNodeAddress = this .mPartition.getClusterNode();
287:                FileContentChunk fileChunk = new FileContentChunk(target,
288:                        this .mPartition.getNodeName(), myNodeAddress);
289:                try {
290:                    InputStream input = fileChunk.openInputFile();
291:                    while (fileChunk.readNext(input) >= 0) {
292:                        mPartition.callMethodOnCluster(SERVICE_NAME,
293:                                "remotePushWriteFile", new Object[] {
294:                                        fileChunk, parentName }, new Class[] {
295:                                        fileChunk.getClass(),
296:                                        java.lang.String.class }, true);
297:                    }
298:                    // tell remote(s) to close the output file
299:                    mPartition
300:                            .callMethodOnCluster(SERVICE_NAME,
301:                                    "remotePushCloseFile", new Object[] {
302:                                            fileChunk,
303:                                            new Boolean(leaveInTempFolder),
304:                                            parentName }, new Class[] {
305:                                            fileChunk.getClass(),
306:                                            Boolean.class,
307:                                            java.lang.String.class }, true);
308:                    input.close();
309:                    log.info("Finished push of file " + file.getName()
310:                            + " to cluster.");
311:                } catch (FileNotFoundException e) {
312:                    throw new ClusterFileTransferException(e);
313:                } catch (IOException e) {
314:                    throw new ClusterFileTransferException(e);
315:                } catch (Exception e) {
316:                    throw new ClusterFileTransferException(e);
317:                }
318:            }
319:
320:            /**
321:             * Remote method for writing file a fragment at a time.
322:             *
323:             * @param fileChunk
324:             */
325:            public void remotePushWriteFile(FileContentChunk fileChunk,
326:                    String parentName) {
327:                try {
328:                    String key = CompositeKey(fileChunk
329:                            .getOriginatingNodeName(), fileChunk
330:                            .getDestinationFile().getName());
331:                    FilePushOperation filePushOperation = (FilePushOperation) mPushsInProcess
332:                            .get(key);
333:
334:                    // handle first call to write
335:                    if (filePushOperation == null) {
336:                        if (fileChunk.mChunkNumber != FileContentChunk.FIRST_CHUNK) {
337:                            // we joined the cluster after the file transfer started
338:                            logMessage("Ignoring file transfer of '"
339:                                    + fileChunk.getDestinationFile().getName()
340:                                    + "' from "
341:                                    + fileChunk.getOriginatingNodeName()
342:                                    + ", we missed the start of it.");
343:                            return;
344:                        }
345:                        filePushOperation = new FilePushOperation(fileChunk
346:                                .getOriginatingNodeName(), fileChunk
347:                                .getOriginatingNode());
348:                        File tempFile = new File(ClusterFileTransfer
349:                                .getServerTempDir(), fileChunk
350:                                .getDestinationFile().getName());
351:                        filePushOperation.openOutputFile(tempFile);
352:                        mPushsInProcess.put(key, filePushOperation);
353:                    }
354:                    filePushOperation.getOutputStream().write(fileChunk.mChunk,
355:                            0, fileChunk.mByteCount);
356:                } catch (FileNotFoundException e) {
357:                    logException(e);
358:                } catch (IOException e) {
359:                    logException(e);
360:                }
361:            }
362:
363:            /**
364:             * Remote method for closing the file just transmitted.
365:             *
366:             * @param fileChunk
367:             * @param leaveInTempFolder is true if we should leave the file in the server temp folder
368:             */
369:            public void remotePushCloseFile(FileContentChunk fileChunk,
370:                    Boolean leaveInTempFolder, String parentName) {
371:                try {
372:                    FilePushOperation filePushOperation = (FilePushOperation) mPushsInProcess
373:                            .remove(CompositeKey(fileChunk
374:                                    .getOriginatingNodeName(), fileChunk
375:                                    .getDestinationFile().getName()));
376:
377:                    if ((filePushOperation != null)
378:                            && (filePushOperation.getOutputStream() != null)) {
379:                        filePushOperation.getOutputStream().close();
380:                        if (!leaveInTempFolder.booleanValue()) {
381:                            File tempFile = new File(ClusterFileTransfer
382:                                    .getServerTempDir(), fileChunk
383:                                    .getDestinationFile().getName());
384:                            File target = new File(getParentFile(parentName),
385:                                    fileChunk.getDestinationFile().getName());
386:                            if (target.exists())
387:                                if (!target.delete())
388:                                    logMessage("Could not delete target file "
389:                                            + target);
390:
391:                            tempFile.setLastModified(fileChunk.lastModified());
392:                            if (!localMove(tempFile, target)) {
393:                                logMessage("Could not move " + tempFile
394:                                        + " to " + target);
395:                            }
396:                        }
397:                    }
398:                } catch (IOException e) {
399:                    logException(e);
400:                }
401:            }
402:
403:            /** Called when a new partition topology occurs. see HAPartition.AsynchHAMembershipListener
404:             *
405:             * @param deadMembers A list of nodes that have died since the previous view
406:             * @param newMembers A list of nodes that have joined the partition since the previous view
407:             * @param allMembers A list of nodes that built the current view
408:             */
409:            public void membershipChanged(Vector deadMembers,
410:                    Vector newMembers, Vector allMembers) {
411:                // Are there any deadMembers contained in mPushsInProcess or in mPullsInProcess.
412:                // If so, cancel operations for them.
413:                // If contained in mPushsInProcess, then we can stop waiting for the rest of the file transfer.
414:                // If contained in mPullsInProcess, then we can stop supplying for the rest of the file transfer.
415:
416:                if (mPushsInProcess.size() > 0) {
417:                    synchronized (mPushsInProcess) {
418:                        Collection values = mPushsInProcess.values();
419:                        Iterator iter = values.iterator();
420:                        while (iter.hasNext()) {
421:                            FilePushOperation push = (FilePushOperation) iter
422:                                    .next();
423:                            if (deadMembers.contains(push.getOriginatingNode())) {
424:                                // cancel the operation and remove the operation from mPushsInProcess
425:                                push.cancel();
426:                                iter.remove();
427:                            }
428:                        }
429:                    }
430:                }
431:
432:                if (mPullsInProcess.size() > 0) {
433:                    synchronized (mPullsInProcess) {
434:                        Collection values = mPullsInProcess.values();
435:                        Iterator iter = values.iterator();
436:                        while (iter.hasNext()) {
437:                            FilePullOperation pull = (FilePullOperation) iter
438:                                    .next();
439:                            if (deadMembers.contains(pull.getFileChunk()
440:                                    .getOriginatingNode())) {
441:                                // cancel the operation and remove the operation from mPullsInProcess
442:                                pull.cancel();
443:                                iter.remove();
444:                            }
445:                        }
446:                    }
447:                }
448:            }
449:
450:            private static File getServerTempDir() {
451:                return TEMP_DIRECTORY;
452:            }
453:
454:            private File getParentFile(String parentName) {
455:                return (File) mParentFolders.get(parentName);
456:            }
457:
458:            private String CompositeKey(String originNodeName, String fileName) {
459:                return originNodeName + "#" + fileName;
460:            }
461:
462:            private static void logMessage(String message) {
463:                log.info(message);
464:            }
465:
466:            private static void logException(Throwable e) {
467:                //e.printStackTrace();
468:                log.error(e);
469:            }
470:
471:            /**
472:             * Represents file push operation.
473:             */
474:            private static class FilePushOperation {
475:
476:                public FilePushOperation(String originNodeName,
477:                        ClusterNode originNode) {
478:                    mOriginNodeName = originNodeName;
479:                    mOriginNode = originNode;
480:                }
481:
482:                public void openOutputFile(File file)
483:                        throws FileNotFoundException {
484:                    mOutput = new FileOutputStream(file);
485:                    mOutputFile = file;
486:                }
487:
488:                /**
489:                 * Cancel the file push operation.  To be called locally on each machine that is
490:                 * receiving the file.
491:                 */
492:                public void cancel() {
493:                    ClusterFileTransfer
494:                            .logMessage("Canceling receive of file "
495:                                    + mOutputFile
496:                                    + " as remote server "
497:                                    + mOriginNodeName
498:                                    + " left the cluster.  Partial results will be deleted.");
499:                    try {
500:                        // close the output stream and delete the file.
501:                        mOutput.close();
502:                        if (!mOutputFile.delete())
503:                            logMessage("Could not delete output file "
504:                                    + mOutputFile);
505:                    } catch (IOException e) {
506:                        logException(e);
507:                    }
508:                }
509:
510:                /**
511:                 * Get the IPAddress of the cluster node that is pushing file to the cluster.
512:                 * @return IPAddress
513:                 */
514:                public ClusterNode getOriginatingNode() {
515:                    return mOriginNode;
516:                }
517:
518:                public OutputStream getOutputStream() {
519:                    return mOutput;
520:                }
521:
522:                private OutputStream mOutput;
523:                private String mOriginNodeName;
524:                private ClusterNode mOriginNode;
525:                private File mOutputFile;
526:            }
527:
528:            /**
529:             * Represents file pull operation.
530:             */
531:            private static class FilePullOperation {
532:                public FilePullOperation(FileContentChunk fileChunk) {
533:                    mFileChunk = fileChunk;
534:                }
535:
536:                public void openInputFile() throws FileNotFoundException {
537:                    mInput = mFileChunk.openInputFile();
538:                }
539:
540:                public InputStream getInputStream() {
541:                    return mInput;
542:                }
543:
544:                /**
545:                 * Cancel the file pull operation.  To be called locally on the machine that is supplying the file.
546:                 */
547:                public void cancel() {
548:                    logMessage("Canceling send of file "
549:                            + mFileChunk.getDestinationFile()
550:                            + " as remote server "
551:                            + mFileChunk.getOriginatingNodeName()
552:                            + " left the cluster.");
553:                    try {
554:                        mInput.close();
555:                    } catch (IOException e) {
556:                        logException(e);
557:                    }
558:                }
559:
560:                public FileContentChunk getFileChunk() {
561:                    return mFileChunk;
562:                }
563:
564:                private FileContentChunk mFileChunk;
565:                private InputStream mInput;
566:            }
567:
568:            /**
569:             * For representing filetransfer state on the wire.
570:             * The inputStream or OutputStream is expected to be maintained by the sender/receiver.
571:             */
572:            private static class FileContentChunk implements  Serializable {
573:
574:                public FileContentChunk(File file, String originNodeName,
575:                        ClusterNode originNode) {
576:                    this .mDestinationFile = file;
577:                    this .mLastModified = file.lastModified();
578:                    this .mOriginNode = originNode;
579:                    this .mOriginNodeName = originNodeName;
580:                    mChunkNumber = 0;
581:                    long size = file.length();
582:                    if (size > MAX_CHUNK_BUFFER_SIZE)
583:                        size = MAX_CHUNK_BUFFER_SIZE;
584:                    else if (size <= 0)
585:                        size = 1;
586:                    mChunk = new byte[(int) size]; // set amount transferred at a time
587:                    mByteCount = 0;
588:                }
589:
590:                /**
591:                 * Get the name of the cluster node that started the file transfer operation
592:                 *
593:                 * @return node name
594:                 */
595:                public String getOriginatingNodeName() {
596:                    return this .mOriginNodeName;
597:                }
598:
599:                /**
600:                 * Get the address of the cluster node that started the file transfer operation.
601:                 * @return ClusterNode
602:                 */
603:                public ClusterNode getOriginatingNode() {
604:                    return mOriginNode;
605:                }
606:
607:                public File getDestinationFile() {
608:                    return this .mDestinationFile;
609:                }
610:
611:                /**
612:                 * Open input file
613:                 *
614:                 * @throws FileNotFoundException
615:                 */
616:                public InputStream openInputFile() throws FileNotFoundException {
617:                    return new FileInputStream(this .mDestinationFile);
618:                }
619:
620:                /**
621:                 * Open output file
622:                 *
623:                 * @return
624:                 * @throws FileNotFoundException
625:                 */
626:                public OutputStream openOutputFile()
627:                        throws FileNotFoundException {
628:                    File lFile = new File(ClusterFileTransfer
629:                            .getServerTempDir(), this .mDestinationFile
630:                            .getName());
631:                    FileOutputStream output = new FileOutputStream(lFile);
632:                    return output;
633:                }
634:
635:                /**
636:                 * @return number of bytes read
637:                 * @throws IOException
638:                 */
639:                public int readNext(InputStream input) throws IOException {
640:                    this .mChunkNumber++;
641:                    this .mByteCount = input.read(this .mChunk);
642:                    return this .mByteCount;
643:                }
644:
645:                public long lastModified() {
646:                    return mLastModified;
647:                }
648:
649:                static final long serialVersionUID = 3546447481674749363L;
650:                private File mDestinationFile;
651:                private long mLastModified;
652:                private String mOriginNodeName;
653:                private ClusterNode mOriginNode;
654:                private int mChunkNumber;
655:                private static final int FIRST_CHUNK = 1;
656:                private byte[] mChunk;
657:                private int mByteCount;
658:            }
659:
660:            public static boolean localMove(File source, File destination)
661:                    throws FileNotFoundException, IOException {
662:                if (source.renameTo(destination)) // if we can simply rename the file
663:                    return true; // return success
664:                // otherwise, copy source to destination
665:                OutputStream out = new FileOutputStream(destination);
666:                InputStream in = new FileInputStream(source);
667:                byte buffer[] = new byte[32 * 1024];
668:                int bytesRead = 0;
669:                while (bytesRead > -1) { // until we hit end of source file
670:                    bytesRead = in.read(buffer);
671:                    if (bytesRead > 0) {
672:                        out.write(buffer, 0, bytesRead);
673:                    }
674:                }
675:                in.close();
676:                out.close();
677:                if (!source.delete())
678:                    logMessage("Could not delete file " + source);
679:                return true;
680:            }
681:
682:            /**
683:             * Exception wrapper class
684:             */
685:            public static class ClusterFileTransferException extends Exception {
686:                public ClusterFileTransferException(String message) {
687:                    super (message);
688:                }
689:
690:                public ClusterFileTransferException(String message,
691:                        Throwable cause) {
692:                    super (message, cause);
693:                }
694:
695:                public ClusterFileTransferException(Throwable cause) {
696:                    super(cause);
697:                }
698:            }
699:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.