001: /*
002: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright notice. All rights reserved.
003: */
004: package com.tc.memorydatastore.server.handler;
005:
006: import com.tc.async.api.AbstractEventHandler;
007: import com.tc.async.api.EventContext;
008: import com.tc.async.api.EventHandlerException;
009: import com.tc.memorydatastore.message.MemoryDataStoreRequestMessage;
010: import com.tc.memorydatastore.message.MemoryDataStoreResponseMessage;
011: import com.tc.memorydatastore.server.MemoryDataStore;
012: import com.tc.net.protocol.tcm.MessageChannel;
013: import com.tc.net.protocol.tcm.TCMessageType;
014: import com.tc.object.lockmanager.api.ThreadID;
015: import com.tc.util.Assert;
016:
017: import java.util.Collection;
018:
019: public class MemoryDataStoreRequestHandler extends AbstractEventHandler {
020: private final static String DATA_STORE_NAME_ATTACHMENT_KEY = "DataStoreName";
021:
022: private final MemoryDataStore store = new MemoryDataStore();
023:
024: private long numOfRequestsProcessed = 0;
025: private long totalTimeProcessed = 0;
026:
027: public void handleEvent(EventContext context)
028: throws EventHandlerException {
029: long startTime = System.currentTimeMillis();
030:
031: MemoryDataStoreRequestMessage message = (MemoryDataStoreRequestMessage) context;
032: MemoryDataStoreRequestMessage dataStoreRequestMessage = (MemoryDataStoreRequestMessage) message;
033:
034: MessageChannel channel = message.getChannel();
035:
036: serviceRequest(channel, dataStoreRequestMessage);
037:
038: long endTime = System.currentTimeMillis();
039:
040: synchronized (this ) {
041: totalTimeProcessed += (endTime - startTime);
042: numOfRequestsProcessed++;
043: if (numOfRequestsProcessed % 10000 == 0) {
044: System.err
045: .println(numOfRequestsProcessed
046: + " requests processed with average processing time: "
047: + (totalTimeProcessed * 1.0 / numOfRequestsProcessed)
048: + "ms");
049: }
050: }
051: }
052:
053: private void serviceRequest(MessageChannel channel,
054: MemoryDataStoreRequestMessage requestMessage) {
055: int type = requestMessage.getType();
056: String dataStoreName = getDataStoreName(channel, requestMessage);
057:
058: byte[] key = requestMessage.getKey();
059: byte[] value = requestMessage.getValue();
060:
061: switch (type) {
062: case MemoryDataStoreRequestMessage.PUT:
063: Assert.assertNotNull(key);
064: Assert.assertNotNull(value);
065:
066: //System.err.println("MemoryDataStore -- type: " + type + ", key: " + new String(key) + ", value: "
067: // + new String(value));
068:
069: store.put(dataStoreName, key, value);
070: sendPutResponseMessage(channel, requestMessage
071: .getThreadID(), true);
072: break;
073: case MemoryDataStoreRequestMessage.GET:
074: Assert.assertNotNull(key);
075:
076: if (requestMessage.isGetAll()) {
077: Collection values = store.getAll(dataStoreName, key);
078: //System.err.println("MemoryDataStore -- type: " + type + ", size of returned collection: " + values.size());
079: sendGetAllResponseMessage(channel, requestMessage
080: .getThreadID(), values, true);
081: } else {
082: value = store.get(dataStoreName, key);
083: //System.err.println("MemoryDataStore -- type: " + type + ", return value: "
084: // + (value != null ? new String(value) : null));
085: sendGetResponseMessage(channel, requestMessage
086: .getThreadID(), value, true);
087: }
088: break;
089: case MemoryDataStoreRequestMessage.REMOVE:
090: Assert.assertNotNull(key);
091:
092: if (requestMessage.isRemoveAll()) {
093: int numOfRemove = store.removeAll(dataStoreName, key);
094: //System.err.println("MemoryDataStore -- type: " + type + ", return num of remove: " + numOfRemove);
095: sendRemoveAllResponseMessage(channel, requestMessage
096: .getThreadID(), numOfRemove, true);
097: } else {
098: value = store.remove(dataStoreName, key);
099: //System.err.println("MemoryDataStore -- type: " + type + ", return value: "
100: // + (value != null ? new String(value) : null));
101: sendRemoveResponseMessage(channel, requestMessage
102: .getThreadID(), value, true);
103: }
104: break;
105: }
106:
107: }
108:
109: private String getDataStoreName(MessageChannel channel,
110: MemoryDataStoreRequestMessage requestMessage) {
111: String dataStoreName = (String) channel
112: .getAttachment(DATA_STORE_NAME_ATTACHMENT_KEY);
113: if (dataStoreName == null) {
114: dataStoreName = requestMessage.getDataStoreName();
115: channel.addAttachment(DATA_STORE_NAME_ATTACHMENT_KEY,
116: dataStoreName, false);
117: }
118: return dataStoreName;
119: }
120:
121: private void sendRemoveResponseMessage(MessageChannel channel,
122: ThreadID threadID, byte[] value,
123: boolean requestCompletedStatus) {
124: MemoryDataStoreResponseMessage response = (MemoryDataStoreResponseMessage) channel
125: .createMessage(TCMessageType.MEMORY_DATA_STORE_RESPONSE_MESSAGE);
126: response.initializeRemoveResponse(threadID, value,
127: requestCompletedStatus);
128: response.send();
129: }
130:
131: private void sendRemoveAllResponseMessage(MessageChannel channel,
132: ThreadID threadID, int numOfRemove,
133: boolean requestCompletedStatus) {
134: MemoryDataStoreResponseMessage response = (MemoryDataStoreResponseMessage) channel
135: .createMessage(TCMessageType.MEMORY_DATA_STORE_RESPONSE_MESSAGE);
136: response.initializeRemoveAllResponse(threadID, numOfRemove,
137: requestCompletedStatus);
138: response.send();
139: }
140:
141: private void sendGetResponseMessage(MessageChannel channel,
142: ThreadID threadID, byte[] value,
143: boolean requestCompletedStatus) {
144: MemoryDataStoreResponseMessage response = (MemoryDataStoreResponseMessage) channel
145: .createMessage(TCMessageType.MEMORY_DATA_STORE_RESPONSE_MESSAGE);
146: response.initializeGetResponse(threadID, value,
147: requestCompletedStatus);
148: response.send();
149: }
150:
151: private void sendGetAllResponseMessage(MessageChannel channel,
152: ThreadID threadID, Collection values,
153: boolean requestCompletedStatus) {
154: MemoryDataStoreResponseMessage response = (MemoryDataStoreResponseMessage) channel
155: .createMessage(TCMessageType.MEMORY_DATA_STORE_RESPONSE_MESSAGE);
156: response.initializeGetAllResponse(threadID, values,
157: requestCompletedStatus);
158: response.send();
159: }
160:
161: private void sendPutResponseMessage(MessageChannel channel,
162: ThreadID threadID, boolean requestCompletedStatus) {
163: MemoryDataStoreResponseMessage response = (MemoryDataStoreResponseMessage) channel
164: .createMessage(TCMessageType.MEMORY_DATA_STORE_RESPONSE_MESSAGE);
165: response
166: .initializePutResponse(threadID, requestCompletedStatus);
167: response.send();
168: }
169:
170: }
|