001: // $Id: DataConverter.java 1546 2007-07-23 06:07:56Z grro $
002:
003: /*
004: * Copyright (c) xcache.org, 2007. All rights reserved.
005: *
006: * This library is free software; you can redistribute it and/or
007: * modify it under the terms of the GNU Lesser General Public
008: * License as published by the Free Software Foundation; either
009: * version 2.1 of the License, or (at your option) any later version.
010: *
011: * This library is distributed in the hope that it will be useful,
012: * but WITHOUT ANY WARRANTY; without even the implied warranty of
013: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
014: * Lesser General Public License for more details.
015: *
016: * You should have received a copy of the GNU Lesser General Public
017: * License along with this library; if not, write to the Free Software
018: * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
019: *
020: * Please refer to the LGPL license at: http://www.gnu.org/copyleft/lesser.txt
021: * The latest copy of this software may be found on http://www.xcache.org/
022: */
023: package org.xcache;
024:
025: import java.io.IOException;
026: import java.io.InputStreamReader;
027: import java.io.LineNumberReader;
028: import java.net.InetAddress;
029: import java.nio.BufferUnderflowException;
030: import java.util.ArrayList;
031: import java.util.HashMap;
032: import java.util.List;
033: import java.util.Map;
034: import java.util.concurrent.Executor;
035: import java.util.logging.Level;
036: import java.util.logging.Logger;
037:
038: import net.sf.jsr107cache.Cache;
039:
040: import org.xsocket.ClosedConnectionException;
041: import org.xsocket.MaxReadSizeExceededException;
042: import org.xsocket.stream.IBlockingConnection;
043: import org.xsocket.stream.IConnectHandler;
044: import org.xsocket.stream.IConnection;
045: import org.xsocket.stream.IDataHandler;
046: import org.xsocket.stream.INonBlockingConnection;
047: import org.xsocket.stream.IServer;
048: import org.xsocket.stream.IServerListener;
049: import org.xsocket.stream.Server;
050: import org.xsocket.stream.StreamUtils;
051: import org.xsocket.stream.IConnection.FlushMode;
052:
053: /**
054: *
055: *
056: * @author grro@xcache.org
057: */
058: public final class CacheServer implements IServer {
059:
060: private static final Logger LOG = Logger
061: .getLogger(CacheServer.class.getName());
062:
063: static final int IDLE_TIMEOUT_SEC = 6 * 60 * 60; // 6 hours
064: static final int CONNECTION_TIMEOUT_SEC = Integer.MAX_VALUE; // no timeout
065:
066: static final byte CMD_PUT = 88;
067: static final byte CMD_GET = 89;
068:
069: static final byte CMD_ADD_SEGMENT = 92;
070: static final byte CMD_TRANSFER_SEGMENT = 112;
071: static final byte CMD_RECEIVE_SEGMENT = 113;
072:
073: static final byte RESULT_OK_WITHOUT_RETURNVALUE = 55;
074: static final byte RESULT_OK_WITH_RETURNVALUE = 77;
075:
076: private static String implementationVersion = null;
077:
078: private final Map<Integer, Segment> segmentMap = new HashMap<Integer, Segment>();
079:
080: private ICacheFactory cacheFactory = null;
081: private Server server = null;
082:
083: CacheServer(ICacheFactory cacheFactory) throws IOException {
084: this (cacheFactory, null);
085: }
086:
087: CacheServer(ICacheFactory cacheFactory, Executor workerpool)
088: throws IOException {
089: this .cacheFactory = cacheFactory;
090:
091: if (workerpool != null) {
092: server = new Server(new Handler(), workerpool);
093: } else {
094: server = new Server(new Handler());
095: }
096:
097: server.setConnectionTimeoutSec(CONNECTION_TIMEOUT_SEC);
098: server.setIdleTimeoutSec(IDLE_TIMEOUT_SEC);
099:
100: server.setServerName("CacheServer "
101: + getImplementationVersion());
102: }
103:
104: Server getUnderlyingServer() {
105: return server;
106: }
107:
108: int getCacheSize() {
109: int size = 0;
110: for (Segment segment : segmentMap.values()) {
111: if (!segment.isForwardSegment()) {
112: size += segment.getCache().size();
113: }
114: }
115: return size;
116: }
117:
118: public void run() {
119: server.run();
120: }
121:
122: public boolean isOpen() {
123: return server.isOpen();
124: }
125:
126: public void close() throws IOException {
127: server.close();
128: }
129:
130: public int getLocalPort() {
131: return server.getLocalPort();
132: }
133:
134: public InetAddress getLocalAddress() {
135: return server.getLocalAddress();
136: }
137:
138: public Executor getWorkerpool() {
139: return server.getWorkerpool();
140: }
141:
142: public Object getOption(String name) throws IOException {
143: return server.getOption(name);
144: }
145:
146: public Map<String, Class> getOptions() {
147: return server.getOptions();
148: }
149:
150: public void setConnectionTimeoutSec(int timeoutSec) {
151: server.setConnectionTimeoutSec(timeoutSec);
152: }
153:
154: public int getConnectionTimeoutSec() {
155: return server.getConnectionTimeoutSec();
156: }
157:
158: public void setIdleTimeoutSec(int timeoutInSec) {
159: server.setIdleTimeoutSec(timeoutInSec);
160: }
161:
162: public int getIdleTimeoutSec() {
163: return server.getIdleTimeoutSec();
164: }
165:
166: public void addListener(IServerListener listener) {
167: server.addListener(listener);
168: }
169:
170: public boolean removeListener(IServerListener listener) {
171: return server.removeListener(listener);
172: }
173:
174: String[] getSegments() {
175: List<String> info = new ArrayList<String>();
176:
177: for (Integer segmentId : segmentMap.keySet()) {
178: Segment segment = segmentMap.get(segmentId);
179: if (segment.isForwardSegment()) {
180: info.add("(" + segment + " forwarding to "
181: + segment.getForwardAddress() + ")");
182: } else {
183: info.add(segment + " (size="
184: + segment.getCache().size() + ")");
185: }
186: }
187:
188: return info.toArray(new String[info.size()]);
189: }
190:
191: int getHits() {
192: int hits = 0;
193:
194: for (Segment segment : segmentMap.values()) {
195: if (!segment.isForwardSegment()) {
196: hits += segment.getCache().getCacheStatistics()
197: .getCacheHits();
198: }
199: }
200: return hits;
201: }
202:
203: String getImplementationVersion() {
204: if (implementationVersion == null) {
205: try {
206: LineNumberReader lnr = new LineNumberReader(
207: new InputStreamReader(this .getClass()
208: .getResourceAsStream(
209: "/org/xcache/version.txt")));
210: String line = null;
211: do {
212: line = lnr.readLine();
213: if (line != null) {
214: if (line.startsWith("Implementation-Version=")) {
215: implementationVersion = line.substring(
216: "Implementation-Version=".length(),
217: line.length()).trim();
218: break;
219: }
220: }
221: } while (line != null);
222:
223: lnr.close();
224: } catch (Exception ignore) {
225: }
226: }
227:
228: return implementationVersion;
229: }
230:
231: protected void onCmd(byte cmd, INonBlockingConnection connection)
232: throws IOException {
233: switch (cmd) {
234:
235: case CMD_PUT:
236: onPutCmd(connection);
237: break;
238:
239: case CMD_GET:
240: onGetCmd(connection);
241: break;
242:
243: // TODO remove me
244: case CMD_ADD_SEGMENT:
245: int segmentId = connection.readInt();
246: segmentMap.put(segmentId, new Segment(segmentId,
247: cacheFactory));
248: writeResult(null, connection);
249: break;
250:
251: case CMD_TRANSFER_SEGMENT:
252: onTransferSegment(connection);
253: break;
254:
255: case CMD_RECEIVE_SEGMENT:
256: onReceiveSegment(connection);
257: break;
258:
259: default:
260: if (LOG.isLoggable(Level.FINE)) {
261: LOG.info("got unknown command " + cmd);
262: }
263: break;
264: }
265: }
266:
267: private void onPutCmd(INonBlockingConnection connection)
268: throws IOException, ClosedConnectionException {
269: int segmentId = connection.readInt();
270: Item keyItem = Item.readFrom(connection);
271: Item valueItem = Item.readUnresolvedFrom(connection);
272: Item oldValueItem = null;
273:
274: Segment segment = segmentMap.get(segmentId);
275: synchronized (segment) {
276: Cache cache = segment.getCache();
277: oldValueItem = (Item) cache.put(keyItem.getValue(),
278: valueItem);
279: }
280:
281: writeResult(oldValueItem, connection);
282: }
283:
284: private void onGetCmd(INonBlockingConnection connection)
285: throws IOException, ClosedConnectionException {
286: int segmentId = connection.readInt();
287: Item keyItem = Item.readFrom(connection);
288: Item valueItem = null;
289:
290: Segment segment = segmentMap.get(segmentId);
291: synchronized (segment) {
292: Cache cache = segment.getCache();
293: valueItem = (Item) cache.get(keyItem.getValue());
294: }
295:
296: writeResult(valueItem, connection);
297: }
298:
299: static final void callTransferSegment(Address sourceService,
300: Address targetService, int segment) throws IOException {
301: IBlockingConnection connection = null;
302: try {
303: connection = ConnectionPool.getInstance().getConnection(
304: sourceService);
305:
306: connection.markWritePosition();
307: connection.write((int) 0); // emtpy length field
308: int length = 0;
309:
310: length += connection
311: .write(CacheServer.CMD_TRANSFER_SEGMENT);
312: length += connection.write(segment);
313: length += targetService.writeTo(connection);
314:
315: connection.resetToWriteMark();
316: connection.write(length);
317:
318: connection.flush();
319:
320: connection.readInt(); // first position is length
321: byte response = connection.readByte();
322: if (response != CacheServer.RESULT_OK_WITHOUT_RETURNVALUE) {
323: throw new IOException(
324: "couldn't initiate transfer. reason");
325: }
326:
327: connection.close();
328: } catch (IOException e) {
329: if (connection != null) {
330: try {
331: ConnectionPool.getInstance().destroyConnection(
332: connection);
333: } catch (Exception ignore) {
334: }
335: }
336:
337: throw e;
338: }
339: }
340:
341: private void onTransferSegment(INonBlockingConnection connection)
342: throws IOException, ClosedConnectionException {
343:
344: int segmentId = connection.readInt();
345: Address targetService = Address.readFrom(connection);
346:
347: if (targetService.equals(new Address(server.getLocalAddress(),
348: server.getLocalPort()))) {
349: if (LOG.isLoggable(Level.FINE)) {
350: LOG.fine("transfer to self. ignore transfer command");
351: }
352: return;
353: }
354:
355: Segment segment = segmentMap.get(segmentId);
356: synchronized (segment) {
357: segment.transferTo(targetService);
358: }
359:
360: writeResult(null, connection);
361: }
362:
363: private void onReceiveSegment(INonBlockingConnection connection)
364: throws IOException, ClosedConnectionException {
365: int segmentId = connection.readInt();
366:
367: synchronized (segmentMap) {
368: if (segmentMap.containsKey(segmentId)) {
369: LOG.warning("segment " + segmentId
370: + " already exist. overridig it");
371: } else {
372: Segment segment = new Segment(segmentId, cacheFactory);
373: segmentMap.put(segmentId, segment);
374: }
375: }
376:
377: Segment segment = segmentMap.get(segmentId);
378: synchronized (segment) {
379: segment.readFrom(connection);
380: }
381:
382: writeResult(null, connection);
383: }
384:
385: private void writeResult(Item data, IConnection connection)
386: throws IOException {
387:
388: connection.markWritePosition();
389: connection.write((int) 0);
390: int length = 0;
391:
392: if (data == null) {
393: length += connection.write(RESULT_OK_WITHOUT_RETURNVALUE);
394:
395: } else {
396: length += connection.write(RESULT_OK_WITH_RETURNVALUE);
397: length += data.writeTo(connection);
398: }
399:
400: connection.resetToWriteMark();
401: connection.write(length);
402:
403: connection.flush();
404: }
405:
406: private final class Handler implements IConnectHandler,
407: IDataHandler {
408:
409: public boolean onConnect(INonBlockingConnection connection)
410: throws IOException {
411: connection.setAutoflush(false);
412: connection.setFlushmode(FlushMode.ASYNC);
413: connection.setOption(IConnection.TCP_NODELAY, true);
414:
415: return true;
416: }
417:
418: public boolean onData(INonBlockingConnection connection)
419: throws IOException, BufferUnderflowException,
420: MaxReadSizeExceededException {
421:
422: StreamUtils
423: .validateSufficientDatasizeByIntLengthField(connection);
424:
425: byte cmd = connection.readByte();
426: onCmd(cmd, connection);
427:
428: return true;
429: }
430: }
431: }
|