001: package distributedcache;
002:
003: import java.io.ByteArrayInputStream;
004: import java.io.ByteArrayOutputStream;
005: import java.io.IOException;
006: import java.io.ObjectInputStream;
007: import java.io.ObjectOutputStream;
008: import java.io.Serializable;
009: import java.net.InetAddress;
010: import java.nio.BufferUnderflowException;
011: import java.nio.ByteBuffer;
012: import java.util.ArrayList;
013: import java.util.Collections;
014: import java.util.HashMap;
015: import java.util.HashSet;
016: import java.util.List;
017: import java.util.Map;
018: import java.util.Set;
019: import java.util.Timer;
020: import java.util.TimerTask;
021: import java.util.logging.Level;
022: import java.util.logging.Logger;
023: import java.util.zip.DataFormatException;
024: import java.util.zip.Deflater;
025: import java.util.zip.Inflater;
026:
027: import net.sf.ehcache.Cache;
028: import net.sf.ehcache.CacheManager;
029: import net.sf.ehcache.Element;
030:
031: import org.xsocket.IDataSink;
032: import org.xsocket.IDataSource;
033: import org.xsocket.ILifeCycle;
034: import org.xsocket.Resource;
035: import org.xsocket.group.Address;
036: import org.xsocket.group.GroupEndpoint;
037: import org.xsocket.group.IGroupEndpoint;
038: import org.xsocket.group.IGroupEndpointHandler;
039: import org.xsocket.group.ObjectMessage;
040: import org.xsocket.stream.BlockingConnectionPool;
041: import org.xsocket.stream.IBlockingConnection;
042: import org.xsocket.stream.IDataHandler;
043: import org.xsocket.stream.INonBlockingConnection;
044: import org.xsocket.stream.IServerContext;
045: import org.xsocket.stream.StreamUtils;
046:
047: import distributedcache.MultiRpcCaller.Request;
048: import distributedcache.MultiRpcCaller.Response;
049:
050: public final class StoreService implements IDataHandler, ILifeCycle {
051:
052: private static final Logger LOG = Logger
053: .getLogger(StoreService.class.getName());
054:
055: private static final byte FALSE = 00;
056: private static final byte TRUE = 01;
057:
058: private static final byte CMD_PUT = 01;
059: private static final byte CMD_GET = 02;
060: private static final byte CMD_REMOVE = 03;
061:
062: private static final String ENCODING = "Cp1252";
063: private static final Serializer SERIALIZER = new Serializer();
064:
065: private Cache cache = null;
066:
067: private Address serviceAddress = null;
068:
069: private GroupEndpoint<ObjectMessage> groupEndpoint = null;
070:
071: @Resource
072: private IServerContext ctx = null;
073:
074: private StoreServiceRegistry registry = null;
075: private Set<Integer> localSupported = new HashSet<Integer>();
076: private Map<Integer, Address> remoteSupported = new HashMap<Integer, Address>();
077:
078: private Map<Address, Integer> load = Collections
079: .synchronizedMap(new HashMap<Address, Integer>());
080:
081: private static BlockingConnectionPool connectionPool = new BlockingConnectionPool(
082: 3L * 60L * 1000L);
083:
084: private MultiRpcCaller multiRpcCaller = null;
085:
086: private static final Timer TIMER = new Timer(true);
087: private final LoadNotifierTimerTask loadNotifierTimerTask = new LoadNotifierTimerTask();
088:
089: public StoreService(InetAddress groupAddress, int groupPort)
090: throws IOException {
091: registry = StoreServiceRegistry.getInstance(new Address(
092: groupAddress, groupPort));
093: groupEndpoint = new GroupEndpoint<ObjectMessage>(groupAddress,
094: groupPort, new GroupMessageHandler());
095: multiRpcCaller = new MultiRpcCaller(groupEndpoint);
096: }
097:
098: public void onInit() {
099: cache = new Cache(ctx.getLocaleAddress().getAddress() + ":"
100: + ctx.getLocalePort(), 100, true, true, 0, 0);
101: CacheManager.getInstance().addCache(cache);
102:
103: serviceAddress = new Address(ctx.getLocaleAddress(), ctx
104: .getLocalePort());
105: registry.registerStoreServiceAddress(serviceAddress);
106:
107: //load.put(serviceAddress, 0);
108: load.put(serviceAddress, 30);
109: TIMER.schedule(loadNotifierTimerTask, 100, 500);
110: }
111:
112: public void onDestroy() {
113: loadNotifierTimerTask.cancel();
114:
115: registry.deregisterStoreServiceAddress(serviceAddress);
116:
117: CacheManager.getInstance().removeCache(
118: ctx.getLocaleAddress().getAddress() + ":"
119: + ctx.getLocalePort());
120: cache.dispose();
121:
122: try {
123: groupEndpoint.close();
124: } catch (Exception e) {
125: e.printStackTrace();
126: }
127: }
128:
129: synchronized Address assignService(int keyHashCode)
130: throws IOException {
131: Address address = serviceAddress;
132:
133: if (!load.isEmpty()) {
134: for (Address addr : load.keySet()) {
135: if (load.get(addr) < load.get(address)) {
136: address = addr;
137: }
138: }
139: }
140:
141: address = sendAssignNotification(address, keyHashCode);
142: LOG.info("key " + keyHashCode
143: + " has been assigned to service " + address);
144:
145: registerAddressToKeyHashCode(address, keyHashCode);
146:
147: return address;
148: }
149:
150: private Address sendAssignNotification(Address address,
151: int keyHashCode) throws IOException {
152: List<Response> responses = multiRpcCaller.call(new AssignMsg(
153: keyHashCode, address));
154: for (Response response : responses) {
155: if (response instanceof AlreadyAssignedMsg) {
156: AlreadyAssignedMsg alreadyAssignedMsg = (AlreadyAssignedMsg) response;
157: remoteSupported.put(keyHashCode, alreadyAssignedMsg
158: .getAddress());
159: return alreadyAssignedMsg.getAddress();
160: }
161: }
162:
163: return address;
164: }
165:
166: private void registerAddressToKeyHashCode(Address address,
167: int keyHashCode) {
168: if (address.equals(serviceAddress)) {
169: localSupported.add(keyHashCode);
170: } else {
171: remoteSupported.put(keyHashCode, address);
172: }
173: }
174:
175: String getServiceAddress() {
176: return serviceAddress.toString();
177: }
178:
179: List<String> getLocalSupported() {
180: ArrayList<String> result = new ArrayList<String>();
181: for (Integer hashCode : localSupported) {
182: result.add(Integer.toString(hashCode));
183: }
184:
185: return result;
186: }
187:
188: List<String> getRemoteSupported() {
189: ArrayList<String> result = new ArrayList<String>();
190: for (Integer hashCode : remoteSupported.keySet()) {
191: result.add(Integer.toString(hashCode) + " ("
192: + remoteSupported.get(hashCode) + ")");
193: }
194:
195: return result;
196: }
197:
198: List<String> getLoadList() {
199: List<String> result = new ArrayList<String>();
200: for (Address address : load.keySet()) {
201: result.add(address.toString() + "=" + load.get(address));
202: }
203:
204: return result;
205: }
206:
207: int getLoad() {
208: int load = 0;
209: if (cache.getSize() != 0) {
210: load = (int) ((cache.getSize() * 100) / cache
211: .getMaxElementsInMemory());
212: }
213: return load;
214: }
215:
216: synchronized Address getServiceAddress(int keyHashcode)
217: throws IOException {
218: if (localSupported.contains(keyHashcode)) {
219: return serviceAddress;
220:
221: } else if (remoteSupported.containsKey(keyHashcode)) {
222: return remoteSupported.get(keyHashcode);
223:
224: } else {
225: LOG.info("non assignment for key " + keyHashcode
226: + " assign service");
227: return assignService(keyHashcode);
228: }
229: }
230:
231: public static Address callPut(Address address, int keyHashCode,
232: String key, Serializable value) throws IOException {
233:
234: IBlockingConnection connection = connectionPool
235: .getBlockingConnection(address.getAddress()
236: .getHostName(), address.getPort());
237:
238: connection.markWritePosition(); // mark current position
239: connection.write((int) 0); // write "emtpy" length field
240:
241: int written = connection.write(CMD_PUT);
242:
243: written += connection.write(keyHashCode);
244:
245: byte[] serializedKey = key.getBytes(ENCODING);
246: written += connection.write(serializedKey.length);
247: written += connection.write(serializedKey);
248:
249: byte[] serializedValue = SERIALIZER.serialize(value);
250: written += new Record(serializedValue).writeTo(connection);
251:
252: connection.resetToWriteMark(); // return to length field position
253: connection.write(written); // and update it
254:
255: connection.flush(); // flush (marker will be removed implicit)
256:
257: connection.readInt();
258: Address srvAddress = Address.readFrom(connection);
259: connection.close();
260:
261: return srvAddress;
262: }
263:
264: private void put(INonBlockingConnection connection)
265: throws IOException {
266: int keylength = connection.readInt();
267: byte[] serializedKey = connection.readBytesByLength(keylength);
268: String key = new String(serializedKey, ENCODING);
269:
270: byte[] data = Record.readFrom(connection).getData();
271: Serializable value = SERIALIZER.deserialize(data);
272:
273: cache.put(new Element(key, value));
274:
275: if (LOG.isLoggable(Level.INFO)) {
276: LOG.info("[" + serviceAddress + "] element " + key + "="
277: + value + " inserted");
278: }
279:
280: connection.markWritePosition();
281: connection.write((int) 0);
282:
283: int written = serviceAddress.writeTo(connection);
284:
285: connection.resetToWriteMark();
286: connection.write(written);
287: }
288:
289: public static IGetResult callGet(Address address, int keyHashCode,
290: String key) throws IOException {
291:
292: IBlockingConnection connection = connectionPool
293: .getBlockingConnection(address.getAddress()
294: .getHostName(), address.getPort());
295:
296: connection.markWritePosition(); // mark current position
297: connection.write((int) 0); // write "emtpy" length field
298:
299: int written = connection.write(CMD_GET);
300:
301: written += connection.write(keyHashCode);
302:
303: byte[] serializedKey = key.getBytes(ENCODING);
304: written += connection.write(serializedKey.length);
305: written += connection.write(serializedKey);
306:
307: connection.resetToWriteMark(); // return to length field position
308: connection.write(written); // and update it
309:
310: connection.flush(); // flush (marker will be removed implicit)
311:
312: connection.readInt();
313:
314: final Address srvAddress = Address.readFrom(connection);
315:
316: Record record = Record.readFrom(connection);
317: connection.close();
318:
319: Serializable ser = null;
320: if (record.getData() != null) {
321: ser = SERIALIZER.deserialize(record.getData());
322: }
323: final Serializable value = ser;
324:
325: return new IGetResult() {
326: public Serializable getValue() {
327: return value;
328: }
329:
330: public Address getServiceAddress() {
331: return srvAddress;
332: }
333: };
334: }
335:
336: public interface IGetResult {
337: public Serializable getValue();
338:
339: public Address getServiceAddress();
340: }
341:
342: private void get(INonBlockingConnection connection)
343: throws IOException {
344: int keylength = connection.readInt();
345: byte[] serializedKey = connection.readBytesByLength(keylength);
346: String key = new String(serializedKey, ENCODING);
347:
348: Element element = cache.get(key);
349: if (LOG.isLoggable(Level.INFO)) {
350: LOG.info("element " + element + " requested by key " + key);
351: }
352:
353: connection.markWritePosition();
354: connection.write((int) 0);
355:
356: int written = serviceAddress.writeTo(connection);
357:
358: byte[] serializedData = null;
359: if (element != null) {
360: serializedData = SERIALIZER
361: .serialize((Serializable) element.getObjectValue());
362: }
363:
364: written += new Record(serializedData).writeTo(connection);
365:
366: connection.resetToWriteMark();
367: connection.write(written);
368: }
369:
370: public static IRemoveResult callRemove(Address address,
371: int keyHashCode, String key) throws IOException {
372:
373: IBlockingConnection connection = connectionPool
374: .getBlockingConnection(address.getAddress()
375: .getHostName(), address.getPort());
376:
377: connection.markWritePosition(); // mark current position
378: connection.write((int) 0); // write "emtpy" length field
379:
380: int written = connection.write(CMD_REMOVE);
381:
382: written += connection.write(keyHashCode);
383:
384: byte[] serializedKey = key.getBytes(ENCODING);
385: written += connection.write(serializedKey.length);
386: written += connection.write(serializedKey);
387:
388: connection.resetToWriteMark(); // return to length field position
389: connection.write(written); // and update it
390:
391: connection.flush(); // flush (marker will be removed implicit)
392:
393: connection.readInt();
394:
395: final Address srvAddress = Address.readFrom(connection);
396:
397: Record record = Record.readFrom(connection);
398: connection.close();
399:
400: Serializable ser = null;
401: if (record.getData() != null) {
402: ser = SERIALIZER.deserialize(record.getData());
403: }
404:
405: final Serializable value = ser;
406:
407: return new IRemoveResult() {
408: public Serializable getValue() {
409: return value;
410: }
411:
412: public Address getServiceAddress() {
413: return srvAddress;
414: }
415: };
416: }
417:
418: public interface IRemoveResult {
419: public Serializable getValue();
420:
421: public Address getServiceAddress();
422: }
423:
424: private void remove(INonBlockingConnection connection)
425: throws IOException {
426: int keylength = connection.readInt();
427: byte[] serializedKey = connection.readBytesByLength(keylength);
428: String key = new String(serializedKey, ENCODING);
429:
430: Element element = cache.get(key);
431: boolean isRemoved = cache.remove(key);
432: if (LOG.isLoggable(Level.INFO)) {
433: LOG.info("element removed is " + isRemoved);
434: }
435:
436: connection.markWritePosition();
437: connection.write((int) 0);
438:
439: int written = serviceAddress.writeTo(connection);
440:
441: byte[] serializedData = null;
442: if (isRemoved) {
443: serializedData = SERIALIZER
444: .serialize((Serializable) element.getObjectValue());
445: }
446:
447: written += new Record(serializedData).writeTo(connection);
448:
449: connection.resetToWriteMark();
450: connection.write(written);
451: }
452:
453: public boolean onData(INonBlockingConnection connection)
454: throws IOException, BufferUnderflowException {
455:
456: int length = StreamUtils
457: .validateSufficientDatasizeByIntLengthField(connection);
458:
459: byte cmd = connection.readByte();
460: int keyHashCode = connection.readInt();
461: Address targetAddress = getServiceAddress(keyHashCode);
462:
463: if (targetAddress.equals(serviceAddress)) {
464: switch (cmd) {
465: case CMD_PUT:
466: put(connection);
467: break;
468:
469: case CMD_GET:
470: get(connection);
471: break;
472:
473: case CMD_REMOVE:
474: remove(connection);
475: break;
476:
477: default:
478: if (LOG.isLoggable(Level.FINE)) {
479: LOG
480: .fine("receive datagram with unknown cmd "
481: + cmd);
482: }
483:
484: break;
485: }
486:
487: } else {
488: forwardRequest(connection, targetAddress, length, cmd,
489: keyHashCode);
490: }
491:
492: return true;
493: }
494:
495: private void forwardRequest(INonBlockingConnection connection,
496: Address address, int length, byte cmd, int keyHashCode)
497: throws IOException {
498: LOG.info("forwarding request (cmd=" + cmd + ", hashkey="
499: + keyHashCode + ") to " + address);
500:
501: // forward request
502: IBlockingConnection con = connectionPool.getBlockingConnection(
503: address.getAddress().getHostName(), address.getPort());
504: con.write(length);
505: con.write(cmd);
506: con.write(keyHashCode);
507: con.write(connection.readByteBufferByLength(length - 1 - 4));
508: con.flush();
509:
510: // return response
511: int lengthResponse = con.readInt();
512: connection.write(lengthResponse);
513: connection.write(con.readByteBufferByLength(lengthResponse));
514: con.close();
515: }
516:
517: private final class LoadNotifierTimerTask extends TimerTask {
518: @Override
519: public void run() {
520: try {
521: int currentLoad = getLoad();
522: load.put(serviceAddress, currentLoad);
523:
524: ObjectMessage msg = groupEndpoint
525: .createObjectMessage((Serializable) new LoadNotificationMsg(
526: currentLoad, serviceAddress));
527: groupEndpoint.send(msg);
528:
529: } catch (IOException e) {
530: e.printStackTrace();
531: }
532: }
533: }
534:
535: private final class GroupMessageHandler implements
536: IGroupEndpointHandler<ObjectMessage> {
537: public boolean onMessage(IGroupEndpoint<ObjectMessage> endpoint)
538: throws IOException {
539: ObjectMessage message = endpoint.receiveMessage();
540: boolean handled = multiRpcCaller.handleResponse(message);
541:
542: if (!handled) {
543: if (message.getObject() instanceof AssignMsg) {
544: AssignMsg assignMsg = (AssignMsg) message
545: .getObject();
546:
547: if (localSupported
548: .contains(assignMsg.getHashCode())) {
549: LOG
550: .info("hashCode="
551: + assignMsg.getHashCode()
552: + " is already supported by local address ("
553: + serviceAddress
554: + ") sending NOK");
555: ObjectMessage response = endpoint
556: .createObjectMessage(new AlreadyAssignedMsg(
557: message.getId(), serviceAddress));
558: response.addDestinationAddress(message
559: .getSourceAddress());
560: endpoint.send(response);
561:
562: } else {
563: LOG.info("register service "
564: + assignMsg.getAddress()
565: + " for keyHash "
566: + assignMsg.getHashCode());
567:
568: registerAddressToKeyHashCode(assignMsg
569: .getAddress(), assignMsg.getHashCode());
570: ObjectMessage response = endpoint
571: .createObjectMessage(new OkMsg(message
572: .getId()));
573: response.addDestinationAddress(message
574: .getSourceAddress());
575: endpoint.send(response);
576: }
577:
578: } else if (message.getObject() instanceof LoadNotificationMsg) {
579: LoadNotificationMsg loadNotificationMessage = (LoadNotificationMsg) message
580: .getObject();
581: load.put(loadNotificationMessage.getAddress(),
582: loadNotificationMessage.getLoad());
583: }
584: }
585:
586: return true;
587: }
588: }
589:
590: private static final class Serializer {
591:
592: byte[] serialize(Serializable object) {
593: try {
594: ByteArrayOutputStream os = new ByteArrayOutputStream();
595: new ObjectOutputStream(os).writeObject(object);
596:
597: return os.toByteArray();
598: } catch (Exception e) {
599: e.printStackTrace();
600: return null;
601: }
602: }
603:
604: Serializable deserialize(byte[] bytes) {
605: try {
606: ObjectInputStream ois = new ObjectInputStream(
607: new ByteArrayInputStream(bytes));
608: return (Serializable) ois.readObject();
609: } catch (Exception e) {
610: e.printStackTrace();
611: return null;
612: }
613: }
614: }
615:
616: private static final class AssignMsg implements Request {
617: private int hashCode = 0;
618: private Address address = null;
619:
620: AssignMsg(int hashCode, Address address) {
621: this .hashCode = hashCode;
622: this .address = address;
623: }
624:
625: public int getHashCode() {
626: return hashCode;
627: }
628:
629: public Address getAddress() {
630: return address;
631: }
632: }
633:
634: private static final class LoadNotificationMsg implements
635: Serializable {
636: private Address address = null;
637: private int load = 0;
638:
639: public LoadNotificationMsg(int load, Address address) {
640: this .load = load;
641: this .address = address;
642: }
643:
644: public int getLoad() {
645: return load;
646: }
647:
648: public Address getAddress() {
649: return address;
650: }
651: }
652:
653: private static final class AlreadyAssignedMsg implements Response {
654: private long correlatedMsgId = 0;
655: private Address address = null;
656:
657: public AlreadyAssignedMsg(long correlatedMsgId, Address address) {
658: this .correlatedMsgId = correlatedMsgId;
659: this .address = address;
660: }
661:
662: public long getRequestMsgId() {
663: return correlatedMsgId;
664: }
665:
666: public Address getAddress() {
667: return address;
668: }
669: }
670:
671: private static final class OkMsg implements Response {
672: private long correlatedMsgId = 0;
673:
674: OkMsg(long correlatedMsgId) {
675: this .correlatedMsgId = correlatedMsgId;
676: }
677:
678: public long getRequestMsgId() {
679: return correlatedMsgId;
680: }
681: }
682:
683: private static final class Record {
684:
685: private static final int COMPRESS_THRESHOLD = 50;
686:
687: private byte[] data = null;
688:
689: Record(byte[] data) {
690: this .data = data;
691: }
692:
693: public byte[] getData() {
694: return data;
695: }
696:
697: public static Record readFrom(IDataSource dataSource)
698: throws IOException {
699:
700: boolean isCompressed = (dataSource.readByte() == TRUE);
701:
702: byte[] data = null;
703: int length = dataSource.readInt();
704: if (length > 0) {
705: data = dataSource.readBytesByLength(length);
706: }
707:
708: if (isCompressed) {
709: data = decompress(data);
710: }
711:
712: return new Record(data);
713: }
714:
715: public int writeTo(IDataSink dataSink) throws IOException {
716: int written = 0;
717:
718: if (data != null) {
719: byte[] dataToWrite = data;
720:
721: boolean isCompressed = false;
722: if (dataToWrite.length > COMPRESS_THRESHOLD) {
723: int originalLength = dataToWrite.length;
724: dataToWrite = compress(dataToWrite);
725: isCompressed = true;
726:
727: LOG
728: .info("data has been compressed ("
729: + (100 - ((dataToWrite.length * 100) / originalLength))
730: + "%)");
731: }
732:
733: if (isCompressed) {
734: written += dataSink.write(TRUE);
735: } else {
736: written += dataSink.write(FALSE);
737: }
738: written += dataSink.write(dataToWrite.length);
739: written += dataSink.write(dataToWrite);
740:
741: } else {
742: written += dataSink.write(FALSE);
743: written += dataSink.write(0);
744: }
745:
746: return written;
747: }
748:
749: private static byte[] compress(byte[] data) {
750: Deflater compressor = new Deflater();
751: compressor.setLevel(Deflater.BEST_COMPRESSION);
752:
753: compressor.setInput(data);
754: compressor.finish();
755:
756: ByteArrayOutputStream bos = new ByteArrayOutputStream(
757: data.length);
758:
759: byte[] buf = new byte[1024];
760: while (!compressor.finished()) {
761: int count = compressor.deflate(buf);
762: bos.write(buf, 0, count);
763: }
764: try {
765: bos.close();
766: } catch (IOException e) {
767: }
768:
769: return bos.toByteArray();
770: }
771:
772: private static byte[] decompress(byte[] data) {
773: Inflater decompressor = new Inflater();
774: decompressor.setInput(data);
775:
776: ByteArrayOutputStream bos = new ByteArrayOutputStream(
777: data.length);
778:
779: byte[] buf = new byte[1024];
780: while (!decompressor.finished()) {
781: try {
782: int count = decompressor.inflate(buf);
783: bos.write(buf, 0, count);
784: } catch (DataFormatException e) {
785: }
786: }
787: try {
788: bos.close();
789: } catch (IOException e) {
790: }
791:
792: return bos.toByteArray();
793: }
794: }
795: }
|