001: /*
002: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright notice. All rights reserved.
003: */
004: package com.tc.net.protocol.tcm;
005:
006: import EDU.oswego.cs.dl.util.concurrent.ConcurrentReaderHashMap;
007: import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArraySet;
008: import EDU.oswego.cs.dl.util.concurrent.SynchronizedRef;
009:
010: import com.tc.async.api.Sink;
011: import com.tc.bytes.TCByteBuffer;
012: import com.tc.logging.TCLogger;
013: import com.tc.net.MaxConnectionsExceededException;
014: import com.tc.net.TCSocketAddress;
015: import com.tc.net.protocol.NetworkLayer;
016: import com.tc.net.protocol.NetworkStackID;
017: import com.tc.net.protocol.TCNetworkMessage;
018: import com.tc.net.protocol.transport.MessageTransport;
019: import com.tc.util.Assert;
020: import com.tc.util.TCTimeoutException;
021:
022: import java.io.IOException;
023: import java.net.UnknownHostException;
024: import java.util.Iterator;
025: import java.util.Map;
026: import java.util.Set;
027:
028: /**
029: * @author teck
030: */
031: abstract class AbstractMessageChannel implements MessageChannel,
032: MessageChannelInternal {
033:
034: private final Map attachments = new ConcurrentReaderHashMap();
035: private final Object attachmentLock = new Object();
036: private final Set listeners = new CopyOnWriteArraySet();
037: private final ChannelStatus status = new ChannelStatus();
038: private final SynchronizedRef remoteAddr = new SynchronizedRef(null);
039: private final SynchronizedRef localAddr = new SynchronizedRef(null);
040: private final TCMessageFactory msgFactory;
041: private final TCMessageRouter router;
042: private final TCMessageParser parser;
043: private final TCLogger logger;
044:
045: protected NetworkLayer sendLayer;
046:
047: AbstractMessageChannel(TCMessageRouter router, TCLogger logger,
048: TCMessageFactory msgFactory) {
049: this .router = router;
050: this .logger = logger;
051: this .msgFactory = msgFactory;
052: this .parser = new TCMessageParser(this .msgFactory);
053: }
054:
055: public void addAttachment(String key, Object value, boolean replace) {
056: synchronized (attachmentLock) {
057: boolean exists = attachments.containsKey(key);
058: if (replace || !exists) {
059: attachments.put(key, value);
060: }
061: }
062: }
063:
064: public Object removeAttachment(String key) {
065: return this .attachments.remove(key);
066: }
067:
068: public Object getAttachment(String key) {
069: return this .attachments.get(key);
070: }
071:
072: public boolean isOpen() {
073: return this .status.isOpen();
074: }
075:
076: public boolean isClosed() {
077: return this .status.isClosed();
078: }
079:
080: public void addListener(ChannelEventListener listener) {
081: if (listener == null) {
082: return;
083: }
084:
085: listeners.add(listener);
086: }
087:
088: public TCMessage createMessage(TCMessageType type) {
089: TCMessage rv = this .msgFactory.createMessage(this , type);
090: // TODO: set default channel specific information in the TC message header
091:
092: return rv;
093: }
094:
095: public void routeMessageType(TCMessageType messageType,
096: TCMessageSink dest) {
097: router.routeMessageType(messageType, dest);
098: }
099:
100: public void unrouteMessageType(TCMessageType messageType) {
101: router.unrouteMessageType(messageType);
102: }
103:
104: public abstract NetworkStackID open()
105: throws MaxConnectionsExceededException, TCTimeoutException,
106: UnknownHostException, IOException;
107:
108: /**
109: * Routes a TCMessage to a sink. The hydrate sink will do the hydrate() work
110: */
111: public void routeMessageType(TCMessageType messageType,
112: Sink destSink, Sink hydrateSink) {
113: routeMessageType(messageType, new TCMessageSinkToSedaSink(
114: destSink, hydrateSink));
115: }
116:
117: public void close() {
118: synchronized (status) {
119: if (!status.isClosed()) {
120: Assert.assertNotNull(this .sendLayer);
121: this .sendLayer.close();
122: }
123: status.close();
124: }
125: }
126:
127: public final boolean isConnected() {
128: return this .sendLayer != null && this .sendLayer.isConnected();
129: }
130:
131: public final void setSendLayer(NetworkLayer layer) {
132: this .sendLayer = layer;
133: }
134:
135: public final void setReceiveLayer(NetworkLayer layer) {
136: throw new UnsupportedOperationException();
137: }
138:
139: public void send(final TCNetworkMessage message) {
140: if (logger.isDebugEnabled()) {
141: final Runnable logMsg = new Runnable() {
142: public void run() {
143: logger.debug("Message Sent: " + message.toString());
144: }
145: };
146:
147: final Runnable existingCallback = message.getSentCallback();
148: final Runnable newCallback;
149:
150: if (existingCallback != null) {
151: newCallback = new Runnable() {
152: public void run() {
153: try {
154: existingCallback.run();
155: } catch (Exception e) {
156: logger.error(e);
157: } finally {
158: logMsg.run();
159: }
160: }
161: };
162: } else {
163: newCallback = logMsg;
164: }
165:
166: message.setSentCallback(newCallback);
167: }
168:
169: this .sendLayer.send(message);
170: }
171:
172: public final void receive(TCByteBuffer[] msgData) {
173: this .router.putMessage(parser.parseMessage(this , msgData));
174: }
175:
176: protected final ChannelStatus getStatus() {
177: return status;
178: }
179:
180: public void notifyTransportDisconnected(MessageTransport transport) {
181: this .remoteAddr.set(null);
182: this .localAddr.set(null);
183: fireTransportDisconnectedEvent();
184: }
185:
186: protected void fireTransportDisconnectedEvent() {
187: fireEvent(new ChannelEventImpl(
188: ChannelEventType.TRANSPORT_DISCONNECTED_EVENT,
189: AbstractMessageChannel.this ));
190: }
191:
192: public void notifyTransportConnected(MessageTransport transport) {
193: this .remoteAddr.set(transport.getRemoteAddress());
194: this .localAddr.set(transport.getLocalAddress());
195: fireEvent(new ChannelEventImpl(
196: ChannelEventType.TRANSPORT_CONNECTED_EVENT,
197: AbstractMessageChannel.this ));
198: }
199:
200: public void notifyTransportConnectAttempt(MessageTransport transport) {
201: return;
202: }
203:
204: public void notifyTransportClosed(MessageTransport transport) {
205: // yeah, we know. We closed it.
206: return;
207: }
208:
209: public TCSocketAddress getLocalAddress() {
210: return (TCSocketAddress) this .localAddr.get();
211: }
212:
213: public TCSocketAddress getRemoteAddress() {
214: return (TCSocketAddress) this .remoteAddr.get();
215: }
216:
217: private void fireEvent(ChannelEventImpl event) {
218: for (Iterator i = listeners.iterator(); i.hasNext();) {
219: ((ChannelEventListener) i.next()).notifyChannelEvent(event);
220: }
221: }
222:
223: class ChannelStatus {
224: private ChannelState state;
225:
226: public ChannelStatus() {
227: this .state = ChannelState.CLOSED;
228: }
229:
230: // this method non-public on purpose. Only the channel should change it's own status
231: synchronized void close() {
232: changeState(ChannelState.CLOSED);
233: fireEvent(new ChannelEventImpl(
234: ChannelEventType.CHANNEL_CLOSED_EVENT,
235: AbstractMessageChannel.this ));
236: }
237:
238: // this method non-public on purpose. Only the channel should change it's own status
239: synchronized void open() {
240: changeState(ChannelState.OPEN);
241: fireEvent(new ChannelEventImpl(
242: ChannelEventType.CHANNEL_OPENED_EVENT,
243: AbstractMessageChannel.this ));
244: }
245:
246: synchronized boolean isOpen() {
247: return ChannelState.OPEN.equals(state);
248: }
249:
250: synchronized boolean isClosed() {
251: return ChannelState.CLOSED.equals(state);
252: }
253:
254: private synchronized void changeState(ChannelState newState) {
255: state = newState;
256: }
257: }
258:
259: private static class ChannelState {
260: private static final int STATE_OPEN = 1;
261: private static final int STATE_CLOSED = 2;
262:
263: static final ChannelState OPEN = new ChannelState(STATE_OPEN);
264: static final ChannelState CLOSED = new ChannelState(
265: STATE_CLOSED);
266:
267: private final int state;
268:
269: private ChannelState(int state) {
270: this .state = state;
271: }
272:
273: public String toString() {
274: switch (state) {
275: case STATE_OPEN:
276: return "OPEN";
277: case STATE_CLOSED:
278: return "CLOSED";
279: default:
280: return "UNKNOWN";
281: }
282: }
283: }
284: }
|