001: package org.xcache;
002:
003: import java.io.IOException;
004: import java.net.InetAddress;
005: import java.net.InetSocketAddress;
006: import java.util.Random;
007: import java.util.logging.Level;
008: import java.util.logging.Logger;
009:
010: import org.xsocket.stream.IBlockingConnection;
011:
012: import net.sf.jsr107cache.Cache;
013:
014: public final class DynamicCacheClientManager implements
015: ICacheClientManager {
016:
017: private static final Logger LOG = Logger
018: .getLogger(DynamicCacheClientManager.class.getName());
019:
020: private static final int DEFAULT_SEGMENT_NUMBERS = 5000;
021:
022: private final ICacheServerLocator locator = new CacheServerLocator();
023:
024: private int segments = 0;
025: private Service[] services = null;
026:
027: public DynamicCacheClientManager() {
028: this (DEFAULT_SEGMENT_NUMBERS);
029: }
030:
031: public DynamicCacheClientManager(int sgm) {
032: this .segments = sgm + 1;
033: services = new Service[this .segments];
034:
035: NullService service = new NullService();
036: for (int j = 0; j < segments; j++) {
037: services[j] = service;
038: }
039: }
040:
041: public Cache getCache() {
042: return new CacheClient(locator);
043: }
044:
045: void addCacheServer(InetAddress address, int port)
046: throws IOException {
047: addCacheServer(new Address(address, port));
048: }
049:
050: synchronized void addCacheServer(Address address)
051: throws IOException {
052:
053: int i = new Random().nextInt();
054: if (i < 0) {
055: i = 0 - i;
056: }
057:
058: int randomSegment = i % (segments - 1); // ignore highest segment (this is a system segment)
059: Service srv = services[randomSegment];
060:
061: if (LOG.isLoggable(Level.FINE)) {
062: LOG.fine("splitting service " + srv.toString());
063: }
064: srv.split(address);
065: }
066:
067: private final class CacheServerLocator implements
068: ICacheServerLocator {
069:
070: public int computeSegement(Object key) {
071: int hc = key.hashCode();
072: if (hc < 0) {
073: hc = 0 - hc;
074: }
075:
076: int segment = hc % (segments - 1); // ignore highest slot (this is a system slot)
077: return segment;
078: }
079:
080: public Address getServiceAddress(int segment)
081: throws IOException {
082: return services[segment].address;
083: }
084:
085: }
086:
087: private void addSegment(Address serviceAddress, int segment)
088: throws IOException {
089:
090: IBlockingConnection connection = null;
091: try {
092: connection = ConnectionPool.getInstance().getConnection(
093: serviceAddress);
094: connection.markWritePosition();
095: connection.write((int) 0); // emtpy length field
096: int length = 0;
097:
098: length += connection.write(CacheServer.CMD_ADD_SEGMENT);
099: length += connection.write(segment);
100:
101: connection.resetToWriteMark();
102: connection.write(length);
103:
104: connection.flush();
105:
106: connection.readInt(); // first position is length
107: byte response = connection.readByte();
108: if (response != CacheServer.RESULT_OK_WITHOUT_RETURNVALUE) {
109: throw new IOException("couldn't add segment");
110: }
111:
112: connection.close();
113: } catch (IOException e) {
114: if (connection != null) {
115: try {
116: ConnectionPool.getInstance().destroyConnection(
117: connection);
118: } catch (Exception ignore) {
119: }
120: }
121:
122: throw e;
123: }
124: }
125:
126: private class Service {
127: private int lowSegment = 0;
128: private int highSegment = 0;
129: private Address address = null;
130:
131: Service(int lowSegment, int highSegment, Address address) {
132: this .lowSegment = lowSegment;
133: this .highSegment = highSegment;
134: this .address = address;
135: }
136:
137: Address getAddress() {
138: return address;
139: }
140:
141: synchronized void split(Address addressService)
142: throws IOException {
143: if (lowSegment < highSegment) {
144: int newRange = (highSegment - lowSegment) / 2;
145:
146: Service newService = new Service(lowSegment, lowSegment
147: + newRange, addressService);
148:
149: if (LOG.isLoggable(Level.FINE)) {
150: LOG.fine("initiate transfering segements to "
151: + addressService);
152: }
153: for (int i = newService.lowSegment; i < newService.highSegment; i++) {
154: CacheServer.callTransferSegment(address,
155: newService.address, i);
156: services[i] = newService;
157: }
158:
159: lowSegment = lowSegment + newRange + 1;
160:
161: if (LOG.isLoggable(Level.FINE)) {
162: LOG.fine("existing service splitted "
163: + this .toString());
164: LOG.fine("new service " + newService.toString());
165: }
166:
167: } else {
168: throw new RuntimeException(
169: "service is responsible for one slot. couldn't split");
170: }
171: }
172:
173: @Override
174: public String toString() {
175: return address.toString() + " [segments: " + lowSegment
176: + "-" + highSegment + "]";
177: }
178: }
179:
180: private final class NullService extends Service {
181:
182: public NullService() {
183: super (0, segments, null);
184: }
185:
186: void split(Address address) throws IOException {
187: Service service = new Service(0, segments - 1, address);
188: for (int i = 0; i < (segments - 1); i++) {
189: addSegment(address, i);
190: services[i] = service;
191: }
192: }
193:
194: @Override
195: public String toString() {
196: return "NullService";
197: }
198:
199: }
200:
201: private final class TransferJob implements Runnable {
202:
203: private int slot = 0;
204: private InetSocketAddress origin = null;
205: private InetSocketAddress target = null;
206:
207: TransferJob(int slot, InetSocketAddress origin,
208: InetSocketAddress target) {
209: this .slot = slot;
210: this .origin = origin;
211: this .target = target;
212: }
213:
214: public void run() {
215:
216: }
217:
218: }
219: }
|