001: /*
002: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright notice. All rights reserved.
003: */
004: package com.tc.memorydatastore.client;
005:
006: import com.tc.exception.TCRuntimeException;
007: import com.tc.memorydatastore.message.MemoryDataStoreRequestMessage;
008: import com.tc.memorydatastore.message.MemoryDataStoreResponseMessage;
009: import com.tc.memorydatastore.server.MemoryDataStoreServer;
010: import com.tc.net.MaxConnectionsExceededException;
011: import com.tc.net.core.ConnectionAddressProvider;
012: import com.tc.net.core.ConnectionInfo;
013: import com.tc.net.protocol.PlainNetworkStackHarnessFactory;
014: import com.tc.net.protocol.tcm.ClientMessageChannel;
015: import com.tc.net.protocol.tcm.CommunicationsManager;
016: import com.tc.net.protocol.tcm.CommunicationsManagerImpl;
017: import com.tc.net.protocol.tcm.NullMessageMonitor;
018: import com.tc.net.protocol.tcm.TCMessage;
019: import com.tc.net.protocol.tcm.TCMessageType;
020: import com.tc.net.protocol.transport.NullConnectionPolicy;
021: import com.tc.object.lockmanager.api.ThreadID;
022: import com.tc.object.session.NullSessionManager;
023: import com.tc.util.Assert;
024: import com.tc.util.TCTimeoutException;
025: import com.tc.util.concurrent.ThreadUtil;
026:
027: import java.io.IOException;
028: import java.net.ConnectException;
029: import java.util.Collection;
030: import java.util.HashMap;
031: import java.util.Map;
032:
033: public class MemoryDataStoreClient implements MemoryDataMap {
034: private final static CommunicationsManager communicationsManager = new CommunicationsManagerImpl(
035: new NullMessageMonitor(),
036: new PlainNetworkStackHarnessFactory(),
037: new NullConnectionPolicy());
038:
039: private ClientMessageChannel channel;
040: private final Map pendingRequests = new HashMap();
041: private final Map waitObjectMap = new HashMap();
042: private final Map pendingResponses = new HashMap();
043: private final String storeName;
044: private final ThreadLocal threadID = new ThreadLocal();
045:
046: private long threadIDSequence;
047:
048: public MemoryDataStoreClient(String storeName, String serverHost,
049: int serverPort) {
050: super ();
051: this .storeName = storeName;
052: setupClient(serverHost, serverPort);
053: }
054:
055: public MemoryDataStoreClient(String storeName) {
056: this (storeName, "localhost", MemoryDataStoreServer.DEFAULT_PORT);
057: }
058:
059: private void setupClient(String serverHost, int serverPort) {
060:
061: this .channel = communicationsManager.createClientChannel(
062: new NullSessionManager(), -1, serverHost, serverPort,
063: 10000, new ConnectionAddressProvider(
064: new ConnectionInfo[] { new ConnectionInfo(
065: serverHost, serverPort) }));
066:
067: channel.addClassMapping(
068: TCMessageType.MEMORY_DATA_STORE_RESPONSE_MESSAGE,
069: MemoryDataStoreResponseMessage.class);
070: channel.addClassMapping(
071: TCMessageType.MEMORY_DATA_STORE_REQUEST_MESSAGE,
072: MemoryDataStoreRequestMessage.class);
073: channel.routeMessageType(
074: TCMessageType.MEMORY_DATA_STORE_RESPONSE_MESSAGE,
075: new MemoryDataStoreResponseSink(this ));
076:
077: while (true) {
078: try {
079: channel.open();
080: break;
081: } catch (TCTimeoutException tcte) {
082: ThreadUtil.reallySleep(5000);
083: } catch (ConnectException e) {
084: ThreadUtil.reallySleep(5000);
085: } catch (MaxConnectionsExceededException e) {
086: ThreadUtil.reallySleep(5000);
087: } catch (IOException ioe) {
088: ioe.printStackTrace();
089: throw new RuntimeException(ioe);
090: }
091: }
092: }
093:
094: public void close() {
095: channel.close();
096: }
097:
098: private synchronized long nextThreadID() {
099: return ++threadIDSequence;
100: }
101:
102: private ThreadID getThreadID() {
103: ThreadID rv = (ThreadID) threadID.get();
104: if (rv == null) {
105: rv = new ThreadID(nextThreadID());
106: threadID.set(rv);
107: }
108:
109: return rv;
110: }
111:
112: public void put(byte[] key, byte[] value) {
113: ThreadID thId = getThreadID();
114: MemoryDataStoreRequestMessage request = (MemoryDataStoreRequestMessage) channel
115: .createMessage(TCMessageType.MEMORY_DATA_STORE_REQUEST_MESSAGE);
116: request.initializePut(thId, this .storeName, key, value);
117: request.send();
118: // MemoryDataStoreResponseMessage responseMessage =
119: // waitForResponse(threadID, request);
120: // Assert.assertTrue(responseMessage.isRequestCompletedFlag());
121: }
122:
123: public byte[] get(byte[] key) {
124: ThreadID thId = getThreadID();
125: MemoryDataStoreRequestMessage request = (MemoryDataStoreRequestMessage) channel
126: .createMessage(TCMessageType.MEMORY_DATA_STORE_REQUEST_MESSAGE);
127: request.initializeGet(thId, this .storeName, key, false);
128:
129: Object waitObject = getWaitObject(thId, request);
130:
131: request.send();
132: MemoryDataStoreResponseMessage responseMessage = waitForResponse(
133: thId, waitObject);
134: Assert.assertTrue(responseMessage.isRequestCompletedFlag());
135: return responseMessage.getValue();
136: }
137:
138: public Collection getAll(byte[] key) {
139: ThreadID thId = getThreadID();
140: MemoryDataStoreRequestMessage request = (MemoryDataStoreRequestMessage) channel
141: .createMessage(TCMessageType.MEMORY_DATA_STORE_REQUEST_MESSAGE);
142: request.initializeGet(thId, this .storeName, key, true);
143:
144: Object waitObject = getWaitObject(thId, request);
145:
146: request.send();
147: MemoryDataStoreResponseMessage responseMessage = waitForResponse(
148: thId, waitObject);
149: Assert.assertTrue(responseMessage.isRequestCompletedFlag());
150: return responseMessage.getValues();
151: }
152:
153: public Collection getAll() {
154: byte[] emptyKey = new byte[0];
155: return (getAll(emptyKey));
156: }
157:
158: public void remove(byte[] key) {
159: ThreadID thId = getThreadID();
160: MemoryDataStoreRequestMessage request = (MemoryDataStoreRequestMessage) channel
161: .createMessage(TCMessageType.MEMORY_DATA_STORE_REQUEST_MESSAGE);
162: request.initializeRemove(thId, this .storeName, key, false);
163: request.send();
164: // MemoryDataStoreResponseMessage responseMessage =
165: // waitForResponse(threadID, request);
166: // Assert.assertTrue(responseMessage.isRequestCompletedFlag());
167: // return responseMessage.getValue();
168: }
169:
170: public void removeAll(byte[] key) {
171: ThreadID thId = getThreadID();
172: MemoryDataStoreRequestMessage request = (MemoryDataStoreRequestMessage) channel
173: .createMessage(TCMessageType.MEMORY_DATA_STORE_REQUEST_MESSAGE);
174: request.initializeRemove(thId, this .storeName, key, true);
175: request.send();
176: // MemoryDataStoreResponseMessage responseMessage =
177: // waitForResponse(threadID, request);
178: // Assert.assertTrue(responseMessage.isRequestCompletedFlag());
179: // return responseMessage.getNumOfRemove();
180: }
181:
182: void notifyResponse(ThreadID thId,
183: MemoryDataStoreResponseMessage response) {
184: Object waitObject = null;
185: synchronized (this ) {
186: waitObject = this .waitObjectMap.get(thId);
187: Object pendingRequest = this .pendingRequests.remove(thId);
188: this .pendingResponses.put(thId, response);
189: Assert.assertNotNull(waitObject);
190: Assert.assertNotNull(pendingRequest);
191: }
192: synchronized (waitObject) {
193: waitObject.notifyAll();
194: }
195: }
196:
197: private MemoryDataStoreResponseMessage waitForResponse(
198: ThreadID thId, Object waitObject) {
199: synchronized (waitObject) {
200: while (hasPendingRequest(thId)) {
201: try {
202: waitObject.wait();
203: } catch (InterruptedException e) {
204: throw new TCRuntimeException(e);
205: }
206: }
207: }
208: synchronized (this ) {
209: MemoryDataStoreResponseMessage responseMessage = (MemoryDataStoreResponseMessage) this .pendingResponses
210: .remove(thId);
211: Assert.assertNotNull(responseMessage);
212: return responseMessage;
213: }
214: }
215:
216: private boolean hasPendingRequest(ThreadID thId) {
217: return this .pendingRequests.get(thId) != null;
218: }
219:
220: private synchronized Object getWaitObject(ThreadID thId,
221: TCMessage message) {
222: Object waitObject = new Object();
223: this.waitObjectMap.put(thId, waitObject);
224: this.pendingRequests.put(thId, message);
225: return waitObject;
226: }
227: }
|