001: package com.tc.memorydatastore.server;
002:
003: import com.tc.async.api.ConfigurationContext;
004: import com.tc.async.api.Stage;
005: import com.tc.async.api.StageManager;
006: import com.tc.async.impl.StageManagerImpl;
007: import com.tc.exception.TCRuntimeException;
008: import com.tc.lang.TCThreadGroup;
009: import com.tc.lang.ThrowableHandler;
010: import com.tc.logging.TCLogger;
011: import com.tc.logging.TCLogging;
012: import com.tc.memorydatastore.message.MemoryDataStoreRequestMessage;
013: import com.tc.memorydatastore.message.MemoryDataStoreResponseMessage;
014: import com.tc.memorydatastore.server.handler.MemoryDataStoreRequestHandler;
015: import com.tc.net.TCSocketAddress;
016: import com.tc.net.protocol.PlainNetworkStackHarnessFactory;
017: import com.tc.net.protocol.tcm.CommunicationsManager;
018: import com.tc.net.protocol.tcm.CommunicationsManagerImpl;
019: import com.tc.net.protocol.tcm.HydrateHandler;
020: import com.tc.net.protocol.tcm.NetworkListener;
021: import com.tc.net.protocol.tcm.NullMessageMonitor;
022: import com.tc.net.protocol.tcm.TCMessageType;
023: import com.tc.net.protocol.transport.DefaultConnectionIdFactory;
024: import com.tc.net.protocol.transport.NullConnectionPolicy;
025: import com.tc.object.session.NullSessionManager;
026: import com.tc.properties.TCPropertiesImpl;
027: import com.tc.util.TCTimeoutException;
028:
029: import java.io.IOException;
030: import java.util.Collections;
031:
032: public class MemoryDataStoreServer {
033: public static final int DEFAULT_PORT = 9001;
034:
035: private static final String MEMORY_DATA_STORE_REQUEST_STAGE = "memory_data_store_request_stage";
036: private final static int STARTED = 1;
037: private final static int STOPPED = 2;
038:
039: private int serverPort;
040: private int state;
041: private NetworkListener lsnr;
042: private CommunicationsManager communicationManager;
043:
044: public static MemoryDataStoreServer createInstance() {
045: return new MemoryDataStoreServer(DEFAULT_PORT);
046: }
047:
048: public static MemoryDataStoreServer createInstance(int port) {
049: return new MemoryDataStoreServer(port);
050: }
051:
052: private MemoryDataStoreServer(int serverPort) {
053: super ();
054: this .serverPort = serverPort;
055: }
056:
057: public int getListenPort() {
058: return lsnr.getBindPort();
059: }
060:
061: private StageManager getStageManager() {
062: ThrowableHandler throwableHandler = new ThrowableHandler(
063: TCLogging.getLogger(MemoryDataStoreServer.class));
064: TCThreadGroup threadGroup = new TCThreadGroup(throwableHandler);
065: return new StageManagerImpl(threadGroup);
066: }
067:
068: private void setupListener(int serverPort) {
069: this .communicationManager = new CommunicationsManagerImpl(
070: new NullMessageMonitor(),
071: new PlainNetworkStackHarnessFactory(),
072: new NullConnectionPolicy());
073: this .lsnr = communicationManager.createListener(
074: new NullSessionManager(), new TCSocketAddress(
075: TCSocketAddress.WILDCARD_ADDR, serverPort),
076: true, new DefaultConnectionIdFactory());
077: }
078:
079: public void start() throws IOException {
080: StageManager stageManager = getStageManager();
081: setupListener(serverPort);
082:
083: lsnr.addClassMapping(
084: TCMessageType.MEMORY_DATA_STORE_REQUEST_MESSAGE,
085: MemoryDataStoreRequestMessage.class);
086: lsnr.addClassMapping(
087: TCMessageType.MEMORY_DATA_STORE_RESPONSE_MESSAGE,
088: MemoryDataStoreResponseMessage.class);
089:
090: Stage hydrateStage = stageManager.createStage(
091: "hydrate_message_stage", new HydrateHandler(), 1, 500); // temporary
092: // hardcoded
093:
094: MemoryDataStoreRequestHandler memoryDataStoreRequestHandler = new MemoryDataStoreRequestHandler();
095: Stage memoryDataStoreRequestStage = stageManager.createStage(
096: MEMORY_DATA_STORE_REQUEST_STAGE,
097: memoryDataStoreRequestHandler, 1, 1);
098: lsnr.routeMessageType(
099: TCMessageType.MEMORY_DATA_STORE_REQUEST_MESSAGE,
100: memoryDataStoreRequestStage.getSink(), hydrateStage
101: .getSink());
102:
103: stageManager.startAll(new NullContext(stageManager)); // temporary hack to
104: // start the stage
105: lsnr.start(Collections.EMPTY_SET);
106: this .state = STARTED;
107: }
108:
109: public void shutdown() throws TCTimeoutException {
110: this .lsnr.stop(5000);
111: this .communicationManager.shutdown();
112: this .state = STOPPED;
113: }
114:
115: public int getState() {
116: return this .state;
117: }
118:
119: // Temporary hack
120: private static class NullContext implements ConfigurationContext {
121:
122: private final StageManager manager;
123:
124: public NullContext(StageManager manager) {
125: this .manager = manager;
126: }
127:
128: public TCLogger getLogger(Class clazz) {
129: return TCLogging.getLogger(clazz);
130: }
131:
132: public Stage getStage(String name) {
133: return manager.getStage(name);
134: }
135:
136: }
137:
138: public static void main(String[] args) {
139: final String PropertyMemoryStorePort = "l2.memorystore.port";
140: final int memoryStorePort;
141: memoryStorePort = TCPropertiesImpl.getProperties().getInt(
142: PropertyMemoryStorePort);
143:
144: MemoryDataStoreServer server = createInstance(memoryStorePort);
145: try {
146: server.start();
147: System.out
148: .println("Memory Data Store Server started at port "
149: + server.getListenPort());
150:
151: while (server.getState() == STARTED) {
152: Thread.sleep(Long.MAX_VALUE);
153: }
154: } catch (InterruptedException e) {
155: throw new TCRuntimeException(e);
156: } catch (IOException e) {
157: throw new TCRuntimeException(e);
158: }
159: }
160: }
|