001: // $Id: DataConverter.java 1546 2007-07-23 06:07:56Z grro $
002:
003: /*
004: * Copyright (c) xcache.org, 2007. All rights reserved.
005: *
006: * This library is free software; you can redistribute it and/or
007: * modify it under the terms of the GNU Lesser General Public
008: * License as published by the Free Software Foundation; either
009: * version 2.1 of the License, or (at your option) any later version.
010: *
011: * This library is distributed in the hope that it will be useful,
012: * but WITHOUT ANY WARRANTY; without even the implied warranty of
013: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
014: * Lesser General Public License for more details.
015: *
016: * You should have received a copy of the GNU Lesser General Public
017: * License along with this library; if not, write to the Free Software
018: * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
019: *
020: * Please refer to the LGPL license at: http://www.gnu.org/copyleft/lesser.txt
021: * The latest copy of this software may be found on http://www.xcache.org/
022: */
023: package org.xcache;
024:
025: import java.io.IOException;
026: import java.util.Set;
027: import java.util.logging.Level;
028: import java.util.logging.Logger;
029:
030: import org.xsocket.DataConverter;
031: import org.xsocket.stream.IBlockingConnection;
032: import org.xsocket.stream.IConnection;
033:
034: import net.sf.jsr107cache.Cache;
035:
036: /**
037: *
038: *
039: * @author grro@xcache.org
040: */
041: final class Segment {
042:
043: private static final Logger LOG = Logger.getLogger(Segment.class
044: .getName());
045:
046: private Cache cache = null;
047: private int segmentId = 0;
048: private ICacheFactory cacheFactory = null;
049:
050: Segment(int segmentId, ICacheFactory cacheFactory) {
051: this .segmentId = segmentId;
052: this .cacheFactory = cacheFactory;
053:
054: setCache(cacheFactory.newCache());
055: }
056:
057: synchronized Cache getCache() {
058: return cache;
059: }
060:
061: boolean isForwardSegment() {
062: return (cache instanceof CacheClient);
063: }
064:
065: Address getForwardAddress() {
066: if (isForwardSegment()) {
067: return ((Forwarder) ((CacheClient) cache).getLocator())
068: .getTargetService();
069: } else {
070: return null;
071: }
072: }
073:
074: private void setCache(Cache cache) {
075: this .cache = cache;
076: }
077:
078: static final void callTransferSegment(Address sourceService,
079: Address targetService, int segmentId) throws IOException {
080: IBlockingConnection connection = null;
081: try {
082: connection = ConnectionPool.getInstance().getConnection(
083: sourceService);
084:
085: connection.markWritePosition();
086: connection.write((int) 0); // emtpy length field
087: int length = 0;
088:
089: length += connection
090: .write(CacheServer.CMD_TRANSFER_SEGMENT);
091: length += connection.write(segmentId);
092: length += targetService.writeTo(connection);
093:
094: connection.resetToWriteMark();
095: connection.write(length);
096:
097: connection.flush();
098:
099: connection.readInt(); // first position is length
100: byte response = connection.readByte();
101: if (response != CacheServer.RESULT_OK_WITHOUT_RETURNVALUE) {
102: throw new IOException(
103: "couldn't initiate transfer. reason");
104: }
105:
106: connection.close();
107: } catch (IOException e) {
108: if (connection != null) {
109: try {
110: ConnectionPool.getInstance().destroyConnection(
111: connection);
112: } catch (Exception ignore) {
113: }
114: }
115:
116: throw e;
117: }
118: }
119:
120: @SuppressWarnings("unchecked")
121: synchronized void transferTo(Address address) throws IOException {
122:
123: long start = System.currentTimeMillis();
124:
125: if (isForwardSegment()) {
126: callTransferSegment(getForwardAddress(), address, segmentId);
127:
128: if (LOG.isLoggable(Level.FINE)) {
129: long duration = System.currentTimeMillis() - start;
130: LOG.fine("forward segment " + segmentId
131: + " (forward address " + getForwardAddress()
132: + " has been forwarded to " + address
133: + " (duration="
134: + DataConverter.toFormatedDuration(duration)
135: + ")");
136: }
137:
138: } else {
139: IBlockingConnection con = null;
140: try {
141: con = ConnectionPool.getInstance().getConnection(
142: address);
143:
144: con.markWritePosition();
145: con.write((int) 0); // emtpy length field
146: int length = 0;
147:
148: length += con.write(CacheServer.CMD_RECEIVE_SEGMENT);
149: length += con.write(segmentId);
150:
151: Set<Object> keys = cache.keySet();
152: length += con.write(keys.size());
153:
154: for (Object key : keys) {
155: Item keyItem = new Item(key);
156: length += keyItem.writeTo(con);
157:
158: Item valueItem = (Item) cache.get(key);
159: if (valueItem == null) {
160: valueItem = new Item(null);
161: }
162: length += valueItem.writeTo(con);
163: }
164:
165: con.resetToWriteMark();
166: con.write(length);
167:
168: con.flush();
169:
170: // read response
171: con.setReceiveTimeoutMillis(10L * 1000L);
172: con.readInt(); // first position is length
173: byte response = con.readByte();
174: if (response != CacheServer.RESULT_OK_WITHOUT_RETURNVALUE) {
175: throw new RuntimeException("unexpected result");
176: }
177:
178: con.close();
179:
180: if (LOG.isLoggable(Level.FINE)) {
181: long duration = System.currentTimeMillis() - start;
182: LOG
183: .fine("segment "
184: + segmentId
185: + " (size= "
186: + cache.size()
187: + ") has been transfered to "
188: + address
189: + " update ref (duration="
190: + DataConverter
191: .toFormatedDuration(duration)
192: + ")");
193: }
194:
195: } catch (Exception e) {
196: if (con != null) {
197: try {
198: ConnectionPool.getInstance().destroyConnection(
199: con);
200: } catch (IOException ignore) {
201: }
202: ;
203: }
204: throw new IOException(
205: "error occured by transfering segment "
206: + segmentId + ". Reason: "
207: + e.toString());
208: }
209: }
210:
211: // transfer operation succeeds -> update cache with forward cache
212: setCache(new CacheClient(new Forwarder(segmentId, address)));
213: }
214:
215: synchronized void readFrom(IConnection connection)
216: throws IOException {
217:
218: Cache cache = cacheFactory.newCache();
219:
220: int countElements = connection.readInt();
221: for (int i = 0; i < countElements; i++) {
222: Item keyItem = Item.readFrom(connection);
223: Item valueItem = Item.readFrom(connection);
224:
225: if (valueItem.getValue() != null) {
226: cache.put(keyItem.getValue(), valueItem);
227: }
228: }
229:
230: setCache(cache);
231: }
232:
233: private static final class Forwarder implements ICacheServerLocator {
234:
235: private int segment = 0;
236: private Address targetService = null;
237:
238: Forwarder(int segment, Address targetService) {
239: this .segment = segment;
240: this .targetService = targetService;
241: }
242:
243: Address getTargetService() {
244: return targetService;
245: }
246:
247: public int computeSegement(Object key) {
248: return segment;
249: }
250:
251: public Address getServiceAddress(int segment)
252: throws IOException {
253: return targetService;
254: }
255: };
256: }
|