001: package distributedcache;
002:
003: import java.io.IOException;
004: import java.util.Collections;
005: import java.util.HashMap;
006: import java.util.HashSet;
007: import java.util.Map;
008: import java.util.Set;
009: import java.util.Timer;
010: import java.util.TimerTask;
011: import java.util.logging.Level;
012: import java.util.logging.Logger;
013:
014: import org.xsocket.DataConverter;
015: import org.xsocket.datagram.IDatagramHandler;
016: import org.xsocket.datagram.IEndpoint;
017: import org.xsocket.datagram.MulticastEndpoint;
018: import org.xsocket.datagram.UserDatagram;
019: import org.xsocket.group.Address;
020:
021: public final class StoreServiceRegistry {
022:
023: private static final Logger LOG = Logger
024: .getLogger(StoreServiceRegistry.class.getName());
025:
026: private static final int PACKET_SIZE = 128;
027:
028: private static final byte MAGIC_BYTE = 78;
029: private static final byte REFRESH_CMD = 0;
030: private static final byte REGISTER_CMD = 1;
031: private static final byte DEREGISTER_CMD = 2;
032:
033: private static final Map<Address, StoreServiceRegistry> INSTANCES = new HashMap<Address, StoreServiceRegistry>();
034:
035: private static final long PUPLISH_PERIOD = 10 * 1000;
036: private static final Timer TIMER = new Timer(true);
037: private TimerTask publishTimerTask = null;
038:
039: private final Set<Address> localAddresses = Collections
040: .synchronizedSet(new HashSet<Address>());
041: private final Set<Address> addresses = Collections
042: .synchronizedSet(new HashSet<Address>());
043: private IEndpoint endpoint = null;
044:
045: static {
046: Runtime.getRuntime().addShutdownHook(new Thread() {
047: @Override
048: public void run() {
049: for (StoreServiceRegistry registry : INSTANCES.values()) {
050: registry.close();
051: }
052: }
053: });
054:
055: TimerTask task = new TimerTask() {
056: @Override
057: public void run() {
058: Thread.currentThread().setPriority(Thread.MIN_PRIORITY);
059: }
060: };
061: TIMER.schedule(task, 0);
062: }
063:
064: StoreServiceRegistry(Address locatorGroupAddress)
065: throws IOException {
066: long start = System.currentTimeMillis();
067:
068: endpoint = new MulticastEndpoint(locatorGroupAddress
069: .getAddress(), locatorGroupAddress.getPort(),
070: PACKET_SIZE, new NotificationHandler());
071:
072: sendRefreshRequest(500);
073:
074: publishTimerTask = new TimerTask() {
075: @Override
076: public void run() {
077: publishLocalRegistered();
078: }
079: };
080: TIMER
081: .schedule(publishTimerTask, PUPLISH_PERIOD,
082: PUPLISH_PERIOD);
083:
084: if (LOG.isLoggable(Level.INFO)) {
085: LOG.info("["
086: + endpoint.getId()
087: + "] StoreServiceRegistry (group "
088: + endpoint.getLocalSocketAddress()
089: + ") initializes ("
090: + addresses.size()
091: + " services registered, inittime="
092: + DataConverter.toFormatedDuration(System
093: .currentTimeMillis()
094: - start) + ")");
095: }
096: }
097:
098: private void sendRefreshRequest(long waittime) {
099: try {
100: UserDatagram datagram = new UserDatagram(PACKET_SIZE);
101: datagram.write(MAGIC_BYTE);
102: datagram.write(REFRESH_CMD);
103:
104: endpoint.send(datagram);
105:
106: if (waittime > 0) {
107: long start = System.currentTimeMillis();
108: do {
109: if (addresses.size() > 0) {
110: return;
111: } else {
112: try {
113: Thread.sleep(50);
114: } catch (InterruptedException ignore) {
115: }
116: }
117: } while (System.currentTimeMillis() < (start + waittime));
118: }
119:
120: } catch (IOException ioe) {
121: if (LOG.isLoggable(Level.FINE)) {
122: LOG
123: .fine("error occured by sending refresh request. Reason "
124: + ioe.toString());
125: }
126: }
127: }
128:
129: @SuppressWarnings("unchecked")
130: private void publishLocalRegistered() {
131: Set<Address> copy = new HashSet<Address>();
132: copy.addAll(localAddresses);
133: for (Address address : copy) {
134: sendRegistered(address);
135: }
136: }
137:
138: public static synchronized StoreServiceRegistry getInstance(
139: Address locatorGroupAddress) throws IOException {
140: StoreServiceRegistry registry = INSTANCES
141: .get(locatorGroupAddress);
142: if (registry == null) {
143: registry = new StoreServiceRegistry(locatorGroupAddress);
144: INSTANCES.put(locatorGroupAddress, registry);
145: }
146:
147: return registry;
148: }
149:
150: private void close() {
151: if (publishTimerTask != null) {
152: publishTimerTask.cancel();
153: }
154:
155: try {
156: endpoint.close();
157: } catch (IOException ignore) {
158: }
159: }
160:
161: public void registerStoreServiceAddress(Address address) {
162: register(address);
163: sendRegistered(address);
164:
165: localAddresses.add(address);
166: }
167:
168: private void register(Address address) {
169: if (!addresses.contains(address)) {
170: addresses.add(address);
171:
172: if (LOG.isLoggable(Level.INFO)) {
173: LOG.info("[" + endpoint.getId() + "](group "
174: + endpoint.getLocalSocketAddress()
175: + ") group service address "
176: + address.toString() + " registered");
177: }
178: }
179: }
180:
181: private void sendRegistered(Address address) {
182: try {
183: UserDatagram datagram = new UserDatagram(PACKET_SIZE);
184: datagram.write(MAGIC_BYTE);
185: datagram.write(REGISTER_CMD);
186: address.writeTo(datagram);
187:
188: endpoint.send(datagram);
189: } catch (IOException ioe) {
190: if (LOG.isLoggable(Level.FINE)) {
191: LOG
192: .fine("["
193: + endpoint.getId()
194: + "] error occured by sending deregistered notification for address "
195: + address);
196: }
197: }
198: }
199:
200: public void deregisterStoreServiceAddress(Address address) {
201: remove(address);
202: sendDeregistered(address);
203:
204: localAddresses.remove(address);
205: }
206:
207: @SuppressWarnings("unchecked")
208: private void remove(Address address) {
209: boolean isRemoved = addresses.remove(address);
210: localAddresses.remove(address);
211:
212: if (isRemoved) {
213: if (LOG.isLoggable(Level.INFO)) {
214: LOG.info("[" + endpoint.getId() + "] (group "
215: + endpoint.getLocalSocketAddress()
216: + ") group service address "
217: + address.toString() + " deregistered");
218: }
219: }
220: }
221:
222: private void sendDeregistered(Address address) {
223: try {
224: UserDatagram datagram = new UserDatagram(PACKET_SIZE);
225: datagram.write(MAGIC_BYTE);
226: datagram.write(DEREGISTER_CMD);
227: address.writeTo(datagram);
228:
229: endpoint.send(datagram);
230: } catch (IOException ioe) {
231: if (LOG.isLoggable(Level.FINE)) {
232: LOG
233: .fine("["
234: + endpoint.getId()
235: + "] error occured by sending registered notification for address "
236: + address);
237: }
238: }
239: }
240:
241: @SuppressWarnings("unchecked")
242: public Set<Address> getStoreServiceAddresses() {
243: Set<Address> result = new HashSet<Address>();
244: result.addAll(addresses);
245:
246: return result;
247: }
248:
249: @Override
250: public String toString() {
251: StringBuilder sb = new StringBuilder();
252: for (Address address : addresses) {
253: if (localAddresses.contains(address)) {
254: sb.append(address.toString() + " (local)\r\n");
255: } else {
256: sb.append(address.toString() + " (remote)\r\n");
257: }
258: }
259:
260: return sb.toString();
261: }
262:
263: private final class NotificationHandler implements IDatagramHandler {
264: @SuppressWarnings("unchecked")
265: public boolean onDatagram(IEndpoint localEndpoint)
266: throws IOException {
267:
268: UserDatagram datagram = localEndpoint.receive();
269: if (datagram != null) {
270: byte magicByte = datagram.readByte();
271: if (magicByte == MAGIC_BYTE) {
272: byte cmd = datagram.readByte();
273:
274: switch (cmd) {
275:
276: case REFRESH_CMD:
277: publishLocalRegistered();
278: break;
279:
280: case REGISTER_CMD:
281: register(Address.readFrom(datagram));
282: break;
283:
284: case DEREGISTER_CMD:
285: remove(Address.readFrom(datagram));
286: break;
287:
288: default:
289: if (LOG.isLoggable(Level.FINE)) {
290: LOG
291: .fine("["
292: + endpoint.getId()
293: + "] receive datagram with unknown cmd "
294: + cmd);
295: }
296: break;
297: }
298: }
299: }
300:
301: return true;
302: }
303: }
304: }
|