001: package ri.cache.transport;
002:
003: import java.net.MalformedURLException;
004: import java.rmi.*;
005: import java.rmi.server.UnicastRemoteObject;
006: import java.util.*;
007:
008: import ri.cache.transport.events.CacheEvent;
009: import ri.cache.transport.events.CacheRequest;
010: import ri.cache.transport.events.CacheResponse;
011:
012: /**
013: * RmiTransportFactory
014: *
015: * @author Brian Goetz
016: */
017: public class RmiTransportFactory implements TransportEndpointFactory {
018:
019: public static final String HOST_NAME = "HOST_NAME";
020: public static final String HOST_PORT = "HOST_PORT";
021: public static final String ENTITY_NAME = "ENTITY_NAME";
022:
023: private static RmiTransportFactory instance = new RmiTransportFactory();
024:
025: private RmiTransportFactory() {
026: }
027:
028: public synchronized static RmiTransportFactory getFactory() {
029: return instance;
030: }
031:
032: private String getNameString(Map parameters)
033: throws TransportException {
034: String hostName = (String) parameters.get(HOST_NAME);
035: String hostPort = (String) parameters.get(HOST_PORT);
036: String entityName = (String) parameters.get(ENTITY_NAME);
037: if (entityName == null)
038: throw new TransportException(
039: "ENTITY_NAME parameter required for RmiTransportFactory.getEndpoint");
040: StringBuffer sb = new StringBuffer("//");
041: sb.append(hostName != null ? hostName : "localhost");
042: if (hostPort != null) {
043: sb.append(":");
044: sb.append(hostPort);
045: }
046: sb.append("/");
047: sb.append(entityName);
048:
049: return sb.toString();
050: }
051:
052: public TransportEndpoint getEndpoint(Map parameters)
053: throws TransportException {
054: return new RmiTransportEndpoint(getNameString(parameters));
055: }
056:
057: public void registerListener(Map parameters,
058: TransportListener listener) throws TransportException {
059: String name = getNameString(parameters);
060: try {
061: RemoteListenerAdapter remoteListener = new RemoteListenerAdapter(
062: listener);
063: Naming.bind(name, remoteListener);
064: } catch (AlreadyBoundException e) {
065: throw new TransportException(
066: "RMI listener already registered: " + name, e);
067: } catch (MalformedURLException e) {
068: throw new TransportException("Invalid RMI name: " + name, e);
069: } catch (RemoteException e) {
070: throw new TransportException(
071: "RemoteException binding RMI listener: " + name, e);
072: }
073: }
074:
075: public void unregisterListener(Map parameters,
076: TransportListener listener) throws TransportException {
077:
078: }
079:
080: public interface RemoteTransportListener extends Remote {
081: public void onCacheEvent(CacheEvent e) throws RemoteException;
082:
083: public CacheResponse onCacheRequest(CacheRequest req)
084: throws RemoteException;
085: }
086:
087: public static class RemoteListenerAdapter extends
088: UnicastRemoteObject implements RemoteTransportListener {
089:
090: private final TransportListener listener;
091:
092: public RemoteListenerAdapter(TransportListener listener)
093: throws RemoteException {
094: this .listener = listener;
095: }
096:
097: public void onCacheEvent(CacheEvent e) throws RemoteException {
098: listener.onCacheEvent(e);
099: }
100:
101: public CacheResponse onCacheRequest(CacheRequest req)
102: throws RemoteException {
103: return listener.onCacheRequest(req);
104: }
105: }
106:
107: private static class RmiTransportEndpoint implements
108: TransportEndpoint {
109: private final String nameString;
110: private volatile RemoteTransportListener stub;
111:
112: public RmiTransportEndpoint(String nameString) {
113: this .nameString = nameString;
114: }
115:
116: private void findStub() throws TransportException {
117: try {
118: if (stub == null)
119: stub = (RemoteTransportListener) Naming
120: .lookup(nameString);
121: } catch (NotBoundException e) {
122: throw new TransportException(
123: "RMI listener not registered: " + nameString, e);
124: } catch (MalformedURLException e) {
125: throw new TransportException("Invalid RMI name: "
126: + nameString, e);
127: } catch (RemoteException e) {
128: throw new TransportException(
129: "RemoteException looking up RMI listener: "
130: + nameString, e);
131: }
132: }
133:
134: public void sendCacheEvent(CacheEvent event)
135: throws TransportException {
136: findStub();
137: try {
138: stub.onCacheEvent(event);
139: } catch (RemoteException e) {
140: throw new TransportException(
141: "RemoteException invoking RMI listener: "
142: + nameString, e);
143: }
144: }
145:
146: public CacheResponse sendCacheRequest(CacheRequest req)
147: throws TransportException {
148: findStub();
149: try {
150: return stub.onCacheRequest(req);
151: } catch (RemoteException e) {
152: throw new TransportException(
153: "RemoteException invoking RMI listener: "
154: + nameString, e);
155: }
156: }
157: }
158: }
|