001: package ri.cache.transport;
002:
003: import java.util.*;
004: import java.util.concurrent.ConcurrentHashMap;
005:
006: import ri.cache.transport.events.CacheEvent;
007: import ri.cache.transport.events.CacheRequest;
008: import ri.cache.transport.events.CacheResponse;
009:
010: /**
011: * LocalTransportFactory
012: *
013: * @author Brian Goetz
014: */
015: public class LocalTransportFactory implements TransportEndpointFactory {
016: public static final String ENDPOINT_NAME = "ENDPOINT_NAME";
017:
018: private Map endpointMap = new ConcurrentHashMap();
019: private static LocalTransportFactory instance = new LocalTransportFactory();
020:
021: private LocalTransportFactory() {
022: }
023:
024: public synchronized static LocalTransportFactory getFactory() {
025: return instance;
026: }
027:
028: public TransportEndpoint getEndpoint(Map parameters)
029: throws TransportException {
030: String name = (String) parameters.get(ENDPOINT_NAME);
031: if (name == null)
032: throw new TransportException(
033: "ENDPOINT_NAME parameter required for LocalTransportFactory.getEndpoint");
034: return new LocalTransportEndpoint(name);
035: }
036:
037: public void registerListener(Map parameters,
038: TransportListener listener) throws TransportException {
039: String name = (String) parameters.get(ENDPOINT_NAME);
040: if (name == null)
041: throw new TransportException(
042: "ENDPOINT_NAME parameter required for LocalTransportFactory.getEndpoint");
043: synchronized (this ) {
044: TransportListener existingListener = (TransportListener) endpointMap
045: .get(name);
046: if (existingListener != null)
047: throw new TransportException(
048: "Endpoint already registered; name=" + name);
049: endpointMap.put(name, listener);
050: }
051: }
052:
053: public void unregisterListener(Map parameters,
054: TransportListener listener) throws TransportException {
055: String name = (String) parameters.get(ENDPOINT_NAME);
056: if (name == null)
057: throw new TransportException(
058: "ENDPOINT_NAME parameter required for LocalTransportFactory.getEndpoint");
059: synchronized (this ) {
060: TransportListener existingListener = (TransportListener) endpointMap
061: .get(name);
062: if (existingListener == null)
063: throw new TransportException(
064: "Endpoint not registered; name=" + name);
065: if (existingListener != listener)
066: throw new TransportException(
067: "Endpoint registered with different listener; name="
068: + name);
069: endpointMap.remove(name);
070: }
071: }
072:
073: public class LocalTransportEndpoint implements TransportEndpoint {
074: private final String name;
075:
076: public LocalTransportEndpoint(String name) {
077: this .name = name;
078: // We don't look up the listener here, since it might not yet be registered (in the case of a peer cache,
079: // the won't both be registered simultaneously), so we look up the named listener when we want to send it
080: // an event or request
081: }
082:
083: private TransportListener getListener()
084: throws TransportException {
085: TransportListener listener = (TransportListener) endpointMap
086: .get(name);
087: if (listener == null)
088: throw new TransportException(
089: "Unable to find local transport listener; name="
090: + name);
091: return listener;
092: }
093:
094: public void sendCacheEvent(CacheEvent e)
095: throws TransportException {
096: TransportListener listener = getListener();
097: listener.onCacheEvent(e);
098: }
099:
100: public CacheResponse sendCacheRequest(CacheRequest e)
101: throws TransportException {
102: TransportListener listener = getListener();
103: return listener.onCacheRequest(e);
104: }
105: }
106: }
|