Source Code Cross Referenced for StoreService.java in  » Web-Server » xsocket » distributedcache » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Web Server » xsocket » distributedcache 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        package distributedcache;
002:
003:        import java.io.ByteArrayInputStream;
004:        import java.io.ByteArrayOutputStream;
005:        import java.io.IOException;
006:        import java.io.ObjectInputStream;
007:        import java.io.ObjectOutputStream;
008:        import java.io.Serializable;
009:        import java.net.InetAddress;
010:        import java.nio.BufferUnderflowException;
011:        import java.nio.ByteBuffer;
012:        import java.util.ArrayList;
013:        import java.util.Collections;
014:        import java.util.HashMap;
015:        import java.util.HashSet;
016:        import java.util.List;
017:        import java.util.Map;
018:        import java.util.Set;
019:        import java.util.Timer;
020:        import java.util.TimerTask;
021:        import java.util.logging.Level;
022:        import java.util.logging.Logger;
023:        import java.util.zip.DataFormatException;
024:        import java.util.zip.Deflater;
025:        import java.util.zip.Inflater;
026:
027:        import net.sf.ehcache.Cache;
028:        import net.sf.ehcache.CacheManager;
029:        import net.sf.ehcache.Element;
030:
031:        import org.xsocket.IDataSink;
032:        import org.xsocket.IDataSource;
033:        import org.xsocket.ILifeCycle;
034:        import org.xsocket.Resource;
035:        import org.xsocket.group.Address;
036:        import org.xsocket.group.GroupEndpoint;
037:        import org.xsocket.group.IGroupEndpoint;
038:        import org.xsocket.group.IGroupEndpointHandler;
039:        import org.xsocket.group.ObjectMessage;
040:        import org.xsocket.stream.BlockingConnectionPool;
041:        import org.xsocket.stream.IBlockingConnection;
042:        import org.xsocket.stream.IDataHandler;
043:        import org.xsocket.stream.INonBlockingConnection;
044:        import org.xsocket.stream.IServerContext;
045:        import org.xsocket.stream.StreamUtils;
046:
047:        import distributedcache.MultiRpcCaller.Request;
048:        import distributedcache.MultiRpcCaller.Response;
049:
050:        public final class StoreService implements  IDataHandler, ILifeCycle {
051:
052:            private static final Logger LOG = Logger
053:                    .getLogger(StoreService.class.getName());
054:
055:            private static final byte FALSE = 00;
056:            private static final byte TRUE = 01;
057:
058:            private static final byte CMD_PUT = 01;
059:            private static final byte CMD_GET = 02;
060:            private static final byte CMD_REMOVE = 03;
061:
062:            private static final String ENCODING = "Cp1252";
063:            private static final Serializer SERIALIZER = new Serializer();
064:
065:            private Cache cache = null;
066:
067:            private Address serviceAddress = null;
068:
069:            private GroupEndpoint<ObjectMessage> groupEndpoint = null;
070:
071:            @Resource
072:            private IServerContext ctx = null;
073:
074:            private StoreServiceRegistry registry = null;
075:            private Set<Integer> localSupported = new HashSet<Integer>();
076:            private Map<Integer, Address> remoteSupported = new HashMap<Integer, Address>();
077:
078:            private Map<Address, Integer> load = Collections
079:                    .synchronizedMap(new HashMap<Address, Integer>());
080:
081:            private static BlockingConnectionPool connectionPool = new BlockingConnectionPool(
082:                    3L * 60L * 1000L);
083:
084:            private MultiRpcCaller multiRpcCaller = null;
085:
086:            private static final Timer TIMER = new Timer(true);
087:            private final LoadNotifierTimerTask loadNotifierTimerTask = new LoadNotifierTimerTask();
088:
089:            public StoreService(InetAddress groupAddress, int groupPort)
090:                    throws IOException {
091:                registry = StoreServiceRegistry.getInstance(new Address(
092:                        groupAddress, groupPort));
093:                groupEndpoint = new GroupEndpoint<ObjectMessage>(groupAddress,
094:                        groupPort, new GroupMessageHandler());
095:                multiRpcCaller = new MultiRpcCaller(groupEndpoint);
096:            }
097:
098:            public void onInit() {
099:                cache = new Cache(ctx.getLocaleAddress().getAddress() + ":"
100:                        + ctx.getLocalePort(), 100, true, true, 0, 0);
101:                CacheManager.getInstance().addCache(cache);
102:
103:                serviceAddress = new Address(ctx.getLocaleAddress(), ctx
104:                        .getLocalePort());
105:                registry.registerStoreServiceAddress(serviceAddress);
106:
107:                //load.put(serviceAddress, 0);
108:                load.put(serviceAddress, 30);
109:                TIMER.schedule(loadNotifierTimerTask, 100, 500);
110:            }
111:
112:            public void onDestroy() {
113:                loadNotifierTimerTask.cancel();
114:
115:                registry.deregisterStoreServiceAddress(serviceAddress);
116:
117:                CacheManager.getInstance().removeCache(
118:                        ctx.getLocaleAddress().getAddress() + ":"
119:                                + ctx.getLocalePort());
120:                cache.dispose();
121:
122:                try {
123:                    groupEndpoint.close();
124:                } catch (Exception e) {
125:                    e.printStackTrace();
126:                }
127:            }
128:
129:            synchronized Address assignService(int keyHashCode)
130:                    throws IOException {
131:                Address address = serviceAddress;
132:
133:                if (!load.isEmpty()) {
134:                    for (Address addr : load.keySet()) {
135:                        if (load.get(addr) < load.get(address)) {
136:                            address = addr;
137:                        }
138:                    }
139:                }
140:
141:                address = sendAssignNotification(address, keyHashCode);
142:                LOG.info("key " + keyHashCode
143:                        + " has been assigned to service " + address);
144:
145:                registerAddressToKeyHashCode(address, keyHashCode);
146:
147:                return address;
148:            }
149:
150:            private Address sendAssignNotification(Address address,
151:                    int keyHashCode) throws IOException {
152:                List<Response> responses = multiRpcCaller.call(new AssignMsg(
153:                        keyHashCode, address));
154:                for (Response response : responses) {
155:                    if (response instanceof  AlreadyAssignedMsg) {
156:                        AlreadyAssignedMsg alreadyAssignedMsg = (AlreadyAssignedMsg) response;
157:                        remoteSupported.put(keyHashCode, alreadyAssignedMsg
158:                                .getAddress());
159:                        return alreadyAssignedMsg.getAddress();
160:                    }
161:                }
162:
163:                return address;
164:            }
165:
166:            private void registerAddressToKeyHashCode(Address address,
167:                    int keyHashCode) {
168:                if (address.equals(serviceAddress)) {
169:                    localSupported.add(keyHashCode);
170:                } else {
171:                    remoteSupported.put(keyHashCode, address);
172:                }
173:            }
174:
175:            String getServiceAddress() {
176:                return serviceAddress.toString();
177:            }
178:
179:            List<String> getLocalSupported() {
180:                ArrayList<String> result = new ArrayList<String>();
181:                for (Integer hashCode : localSupported) {
182:                    result.add(Integer.toString(hashCode));
183:                }
184:
185:                return result;
186:            }
187:
188:            List<String> getRemoteSupported() {
189:                ArrayList<String> result = new ArrayList<String>();
190:                for (Integer hashCode : remoteSupported.keySet()) {
191:                    result.add(Integer.toString(hashCode) + " ("
192:                            + remoteSupported.get(hashCode) + ")");
193:                }
194:
195:                return result;
196:            }
197:
198:            List<String> getLoadList() {
199:                List<String> result = new ArrayList<String>();
200:                for (Address address : load.keySet()) {
201:                    result.add(address.toString() + "=" + load.get(address));
202:                }
203:
204:                return result;
205:            }
206:
207:            int getLoad() {
208:                int load = 0;
209:                if (cache.getSize() != 0) {
210:                    load = (int) ((cache.getSize() * 100) / cache
211:                            .getMaxElementsInMemory());
212:                }
213:                return load;
214:            }
215:
216:            synchronized Address getServiceAddress(int keyHashcode)
217:                    throws IOException {
218:                if (localSupported.contains(keyHashcode)) {
219:                    return serviceAddress;
220:
221:                } else if (remoteSupported.containsKey(keyHashcode)) {
222:                    return remoteSupported.get(keyHashcode);
223:
224:                } else {
225:                    LOG.info("non assignment for key " + keyHashcode
226:                            + " assign service");
227:                    return assignService(keyHashcode);
228:                }
229:            }
230:
231:            public static Address callPut(Address address, int keyHashCode,
232:                    String key, Serializable value) throws IOException {
233:
234:                IBlockingConnection connection = connectionPool
235:                        .getBlockingConnection(address.getAddress()
236:                                .getHostName(), address.getPort());
237:
238:                connection.markWritePosition(); // mark current position
239:                connection.write((int) 0); // write "emtpy" length field
240:
241:                int written = connection.write(CMD_PUT);
242:
243:                written += connection.write(keyHashCode);
244:
245:                byte[] serializedKey = key.getBytes(ENCODING);
246:                written += connection.write(serializedKey.length);
247:                written += connection.write(serializedKey);
248:
249:                byte[] serializedValue = SERIALIZER.serialize(value);
250:                written += new Record(serializedValue).writeTo(connection);
251:
252:                connection.resetToWriteMark(); // return to length field position
253:                connection.write(written); // and update it
254:
255:                connection.flush(); // flush (marker will be removed implicit) 
256:
257:                connection.readInt();
258:                Address srvAddress = Address.readFrom(connection);
259:                connection.close();
260:
261:                return srvAddress;
262:            }
263:
264:            private void put(INonBlockingConnection connection)
265:                    throws IOException {
266:                int keylength = connection.readInt();
267:                byte[] serializedKey = connection.readBytesByLength(keylength);
268:                String key = new String(serializedKey, ENCODING);
269:
270:                byte[] data = Record.readFrom(connection).getData();
271:                Serializable value = SERIALIZER.deserialize(data);
272:
273:                cache.put(new Element(key, value));
274:
275:                if (LOG.isLoggable(Level.INFO)) {
276:                    LOG.info("[" + serviceAddress + "] element " + key + "="
277:                            + value + " inserted");
278:                }
279:
280:                connection.markWritePosition();
281:                connection.write((int) 0);
282:
283:                int written = serviceAddress.writeTo(connection);
284:
285:                connection.resetToWriteMark();
286:                connection.write(written);
287:            }
288:
289:            public static IGetResult callGet(Address address, int keyHashCode,
290:                    String key) throws IOException {
291:
292:                IBlockingConnection connection = connectionPool
293:                        .getBlockingConnection(address.getAddress()
294:                                .getHostName(), address.getPort());
295:
296:                connection.markWritePosition(); // mark current position
297:                connection.write((int) 0); // write "emtpy" length field
298:
299:                int written = connection.write(CMD_GET);
300:
301:                written += connection.write(keyHashCode);
302:
303:                byte[] serializedKey = key.getBytes(ENCODING);
304:                written += connection.write(serializedKey.length);
305:                written += connection.write(serializedKey);
306:
307:                connection.resetToWriteMark(); // return to length field position
308:                connection.write(written); // and update it
309:
310:                connection.flush(); // flush (marker will be removed implicit) 
311:
312:                connection.readInt();
313:
314:                final Address srvAddress = Address.readFrom(connection);
315:
316:                Record record = Record.readFrom(connection);
317:                connection.close();
318:
319:                Serializable ser = null;
320:                if (record.getData() != null) {
321:                    ser = SERIALIZER.deserialize(record.getData());
322:                }
323:                final Serializable value = ser;
324:
325:                return new IGetResult() {
326:                    public Serializable getValue() {
327:                        return value;
328:                    }
329:
330:                    public Address getServiceAddress() {
331:                        return srvAddress;
332:                    }
333:                };
334:            }
335:
336:            public interface IGetResult {
337:                public Serializable getValue();
338:
339:                public Address getServiceAddress();
340:            }
341:
342:            private void get(INonBlockingConnection connection)
343:                    throws IOException {
344:                int keylength = connection.readInt();
345:                byte[] serializedKey = connection.readBytesByLength(keylength);
346:                String key = new String(serializedKey, ENCODING);
347:
348:                Element element = cache.get(key);
349:                if (LOG.isLoggable(Level.INFO)) {
350:                    LOG.info("element " + element + " requested by key " + key);
351:                }
352:
353:                connection.markWritePosition();
354:                connection.write((int) 0);
355:
356:                int written = serviceAddress.writeTo(connection);
357:
358:                byte[] serializedData = null;
359:                if (element != null) {
360:                    serializedData = SERIALIZER
361:                            .serialize((Serializable) element.getObjectValue());
362:                }
363:
364:                written += new Record(serializedData).writeTo(connection);
365:
366:                connection.resetToWriteMark();
367:                connection.write(written);
368:            }
369:
370:            public static IRemoveResult callRemove(Address address,
371:                    int keyHashCode, String key) throws IOException {
372:
373:                IBlockingConnection connection = connectionPool
374:                        .getBlockingConnection(address.getAddress()
375:                                .getHostName(), address.getPort());
376:
377:                connection.markWritePosition(); // mark current position
378:                connection.write((int) 0); // write "emtpy" length field
379:
380:                int written = connection.write(CMD_REMOVE);
381:
382:                written += connection.write(keyHashCode);
383:
384:                byte[] serializedKey = key.getBytes(ENCODING);
385:                written += connection.write(serializedKey.length);
386:                written += connection.write(serializedKey);
387:
388:                connection.resetToWriteMark(); // return to length field position
389:                connection.write(written); // and update it
390:
391:                connection.flush(); // flush (marker will be removed implicit) 
392:
393:                connection.readInt();
394:
395:                final Address srvAddress = Address.readFrom(connection);
396:
397:                Record record = Record.readFrom(connection);
398:                connection.close();
399:
400:                Serializable ser = null;
401:                if (record.getData() != null) {
402:                    ser = SERIALIZER.deserialize(record.getData());
403:                }
404:
405:                final Serializable value = ser;
406:
407:                return new IRemoveResult() {
408:                    public Serializable getValue() {
409:                        return value;
410:                    }
411:
412:                    public Address getServiceAddress() {
413:                        return srvAddress;
414:                    }
415:                };
416:            }
417:
418:            public interface IRemoveResult {
419:                public Serializable getValue();
420:
421:                public Address getServiceAddress();
422:            }
423:
424:            private void remove(INonBlockingConnection connection)
425:                    throws IOException {
426:                int keylength = connection.readInt();
427:                byte[] serializedKey = connection.readBytesByLength(keylength);
428:                String key = new String(serializedKey, ENCODING);
429:
430:                Element element = cache.get(key);
431:                boolean isRemoved = cache.remove(key);
432:                if (LOG.isLoggable(Level.INFO)) {
433:                    LOG.info("element removed is " + isRemoved);
434:                }
435:
436:                connection.markWritePosition();
437:                connection.write((int) 0);
438:
439:                int written = serviceAddress.writeTo(connection);
440:
441:                byte[] serializedData = null;
442:                if (isRemoved) {
443:                    serializedData = SERIALIZER
444:                            .serialize((Serializable) element.getObjectValue());
445:                }
446:
447:                written += new Record(serializedData).writeTo(connection);
448:
449:                connection.resetToWriteMark();
450:                connection.write(written);
451:            }
452:
453:            public boolean onData(INonBlockingConnection connection)
454:                    throws IOException, BufferUnderflowException {
455:
456:                int length = StreamUtils
457:                        .validateSufficientDatasizeByIntLengthField(connection);
458:
459:                byte cmd = connection.readByte();
460:                int keyHashCode = connection.readInt();
461:                Address targetAddress = getServiceAddress(keyHashCode);
462:
463:                if (targetAddress.equals(serviceAddress)) {
464:                    switch (cmd) {
465:                    case CMD_PUT:
466:                        put(connection);
467:                        break;
468:
469:                    case CMD_GET:
470:                        get(connection);
471:                        break;
472:
473:                    case CMD_REMOVE:
474:                        remove(connection);
475:                        break;
476:
477:                    default:
478:                        if (LOG.isLoggable(Level.FINE)) {
479:                            LOG
480:                                    .fine("receive datagram with unknown cmd "
481:                                            + cmd);
482:                        }
483:
484:                        break;
485:                    }
486:
487:                } else {
488:                    forwardRequest(connection, targetAddress, length, cmd,
489:                            keyHashCode);
490:                }
491:
492:                return true;
493:            }
494:
495:            private void forwardRequest(INonBlockingConnection connection,
496:                    Address address, int length, byte cmd, int keyHashCode)
497:                    throws IOException {
498:                LOG.info("forwarding request (cmd=" + cmd + ", hashkey="
499:                        + keyHashCode + ") to " + address);
500:
501:                // forward request 
502:                IBlockingConnection con = connectionPool.getBlockingConnection(
503:                        address.getAddress().getHostName(), address.getPort());
504:                con.write(length);
505:                con.write(cmd);
506:                con.write(keyHashCode);
507:                con.write(connection.readByteBufferByLength(length - 1 - 4));
508:                con.flush();
509:
510:                // return response
511:                int lengthResponse = con.readInt();
512:                connection.write(lengthResponse);
513:                connection.write(con.readByteBufferByLength(lengthResponse));
514:                con.close();
515:            }
516:
517:            private final class LoadNotifierTimerTask extends TimerTask {
518:                @Override
519:                public void run() {
520:                    try {
521:                        int currentLoad = getLoad();
522:                        load.put(serviceAddress, currentLoad);
523:
524:                        ObjectMessage msg = groupEndpoint
525:                                .createObjectMessage((Serializable) new LoadNotificationMsg(
526:                                        currentLoad, serviceAddress));
527:                        groupEndpoint.send(msg);
528:
529:                    } catch (IOException e) {
530:                        e.printStackTrace();
531:                    }
532:                }
533:            }
534:
535:            private final class GroupMessageHandler implements 
536:                    IGroupEndpointHandler<ObjectMessage> {
537:                public boolean onMessage(IGroupEndpoint<ObjectMessage> endpoint)
538:                        throws IOException {
539:                    ObjectMessage message = endpoint.receiveMessage();
540:                    boolean handled = multiRpcCaller.handleResponse(message);
541:
542:                    if (!handled) {
543:                        if (message.getObject() instanceof  AssignMsg) {
544:                            AssignMsg assignMsg = (AssignMsg) message
545:                                    .getObject();
546:
547:                            if (localSupported
548:                                    .contains(assignMsg.getHashCode())) {
549:                                LOG
550:                                        .info("hashCode="
551:                                                + assignMsg.getHashCode()
552:                                                + " is already supported by local address ("
553:                                                + serviceAddress
554:                                                + ") sending NOK");
555:                                ObjectMessage response = endpoint
556:                                        .createObjectMessage(new AlreadyAssignedMsg(
557:                                                message.getId(), serviceAddress));
558:                                response.addDestinationAddress(message
559:                                        .getSourceAddress());
560:                                endpoint.send(response);
561:
562:                            } else {
563:                                LOG.info("register service "
564:                                        + assignMsg.getAddress()
565:                                        + " for keyHash "
566:                                        + assignMsg.getHashCode());
567:
568:                                registerAddressToKeyHashCode(assignMsg
569:                                        .getAddress(), assignMsg.getHashCode());
570:                                ObjectMessage response = endpoint
571:                                        .createObjectMessage(new OkMsg(message
572:                                                .getId()));
573:                                response.addDestinationAddress(message
574:                                        .getSourceAddress());
575:                                endpoint.send(response);
576:                            }
577:
578:                        } else if (message.getObject() instanceof  LoadNotificationMsg) {
579:                            LoadNotificationMsg loadNotificationMessage = (LoadNotificationMsg) message
580:                                    .getObject();
581:                            load.put(loadNotificationMessage.getAddress(),
582:                                    loadNotificationMessage.getLoad());
583:                        }
584:                    }
585:
586:                    return true;
587:                }
588:            }
589:
590:            private static final class Serializer {
591:
592:                byte[] serialize(Serializable object) {
593:                    try {
594:                        ByteArrayOutputStream os = new ByteArrayOutputStream();
595:                        new ObjectOutputStream(os).writeObject(object);
596:
597:                        return os.toByteArray();
598:                    } catch (Exception e) {
599:                        e.printStackTrace();
600:                        return null;
601:                    }
602:                }
603:
604:                Serializable deserialize(byte[] bytes) {
605:                    try {
606:                        ObjectInputStream ois = new ObjectInputStream(
607:                                new ByteArrayInputStream(bytes));
608:                        return (Serializable) ois.readObject();
609:                    } catch (Exception e) {
610:                        e.printStackTrace();
611:                        return null;
612:                    }
613:                }
614:            }
615:
616:            private static final class AssignMsg implements  Request {
617:                private int hashCode = 0;
618:                private Address address = null;
619:
620:                AssignMsg(int hashCode, Address address) {
621:                    this .hashCode = hashCode;
622:                    this .address = address;
623:                }
624:
625:                public int getHashCode() {
626:                    return hashCode;
627:                }
628:
629:                public Address getAddress() {
630:                    return address;
631:                }
632:            }
633:
634:            private static final class LoadNotificationMsg implements 
635:                    Serializable {
636:                private Address address = null;
637:                private int load = 0;
638:
639:                public LoadNotificationMsg(int load, Address address) {
640:                    this .load = load;
641:                    this .address = address;
642:                }
643:
644:                public int getLoad() {
645:                    return load;
646:                }
647:
648:                public Address getAddress() {
649:                    return address;
650:                }
651:            }
652:
653:            private static final class AlreadyAssignedMsg implements  Response {
654:                private long correlatedMsgId = 0;
655:                private Address address = null;
656:
657:                public AlreadyAssignedMsg(long correlatedMsgId, Address address) {
658:                    this .correlatedMsgId = correlatedMsgId;
659:                    this .address = address;
660:                }
661:
662:                public long getRequestMsgId() {
663:                    return correlatedMsgId;
664:                }
665:
666:                public Address getAddress() {
667:                    return address;
668:                }
669:            }
670:
671:            private static final class OkMsg implements  Response {
672:                private long correlatedMsgId = 0;
673:
674:                OkMsg(long correlatedMsgId) {
675:                    this .correlatedMsgId = correlatedMsgId;
676:                }
677:
678:                public long getRequestMsgId() {
679:                    return correlatedMsgId;
680:                }
681:            }
682:
683:            private static final class Record {
684:
685:                private static final int COMPRESS_THRESHOLD = 50;
686:
687:                private byte[] data = null;
688:
689:                Record(byte[] data) {
690:                    this .data = data;
691:                }
692:
693:                public byte[] getData() {
694:                    return data;
695:                }
696:
697:                public static Record readFrom(IDataSource dataSource)
698:                        throws IOException {
699:
700:                    boolean isCompressed = (dataSource.readByte() == TRUE);
701:
702:                    byte[] data = null;
703:                    int length = dataSource.readInt();
704:                    if (length > 0) {
705:                        data = dataSource.readBytesByLength(length);
706:                    }
707:
708:                    if (isCompressed) {
709:                        data = decompress(data);
710:                    }
711:
712:                    return new Record(data);
713:                }
714:
715:                public int writeTo(IDataSink dataSink) throws IOException {
716:                    int written = 0;
717:
718:                    if (data != null) {
719:                        byte[] dataToWrite = data;
720:
721:                        boolean isCompressed = false;
722:                        if (dataToWrite.length > COMPRESS_THRESHOLD) {
723:                            int originalLength = dataToWrite.length;
724:                            dataToWrite = compress(dataToWrite);
725:                            isCompressed = true;
726:
727:                            LOG
728:                                    .info("data has been compressed ("
729:                                            + (100 - ((dataToWrite.length * 100) / originalLength))
730:                                            + "%)");
731:                        }
732:
733:                        if (isCompressed) {
734:                            written += dataSink.write(TRUE);
735:                        } else {
736:                            written += dataSink.write(FALSE);
737:                        }
738:                        written += dataSink.write(dataToWrite.length);
739:                        written += dataSink.write(dataToWrite);
740:
741:                    } else {
742:                        written += dataSink.write(FALSE);
743:                        written += dataSink.write(0);
744:                    }
745:
746:                    return written;
747:                }
748:
749:                private static byte[] compress(byte[] data) {
750:                    Deflater compressor = new Deflater();
751:                    compressor.setLevel(Deflater.BEST_COMPRESSION);
752:
753:                    compressor.setInput(data);
754:                    compressor.finish();
755:
756:                    ByteArrayOutputStream bos = new ByteArrayOutputStream(
757:                            data.length);
758:
759:                    byte[] buf = new byte[1024];
760:                    while (!compressor.finished()) {
761:                        int count = compressor.deflate(buf);
762:                        bos.write(buf, 0, count);
763:                    }
764:                    try {
765:                        bos.close();
766:                    } catch (IOException e) {
767:                    }
768:
769:                    return bos.toByteArray();
770:                }
771:
772:                private static byte[] decompress(byte[] data) {
773:                    Inflater decompressor = new Inflater();
774:                    decompressor.setInput(data);
775:
776:                    ByteArrayOutputStream bos = new ByteArrayOutputStream(
777:                            data.length);
778:
779:                    byte[] buf = new byte[1024];
780:                    while (!decompressor.finished()) {
781:                        try {
782:                            int count = decompressor.inflate(buf);
783:                            bos.write(buf, 0, count);
784:                        } catch (DataFormatException e) {
785:                        }
786:                    }
787:                    try {
788:                        bos.close();
789:                    } catch (IOException e) {
790:                    }
791:
792:                    return bos.toByteArray();
793:                }
794:            }
795:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.