001: /*
002: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright notice. All rights reserved.
003: */
004: package com.tc.net.protocol.tcm;
005:
006: import com.tc.logging.TCLogger;
007: import com.tc.logging.TCLogging;
008: import com.tc.net.MaxConnectionsExceededException;
009: import com.tc.net.protocol.NetworkStackID;
010: import com.tc.net.protocol.TCNetworkMessage;
011: import com.tc.net.protocol.transport.MessageTransport;
012: import com.tc.object.msg.DSOMessageBase;
013: import com.tc.object.session.SessionID;
014: import com.tc.object.session.SessionProvider;
015: import com.tc.util.TCTimeoutException;
016:
017: import java.io.IOException;
018: import java.net.UnknownHostException;
019:
020: /**
021: * @author orion
022: */
023:
024: public class ClientMessageChannelImpl extends AbstractMessageChannel
025: implements ClientMessageChannel {
026: private static final TCLogger logger = TCLogging
027: .getLogger(ClientMessageChannel.class);
028: private final TCMessageFactory msgFactory;
029: private int connectAttemptCount;
030: private int connectCount;
031: private ChannelID channelID;
032: private final ChannelIDProviderImpl cidProvider;
033: private final SessionProvider sessionProvider;
034: private SessionID channelSessionID = SessionID.NULL_ID;
035:
036: protected ClientMessageChannelImpl(TCMessageFactory msgFactory,
037: TCMessageRouter router, SessionProvider sessionProvider) {
038: super (router, logger, msgFactory);
039: this .msgFactory = msgFactory;
040: this .cidProvider = new ChannelIDProviderImpl();
041: this .sessionProvider = sessionProvider;
042: }
043:
044: public NetworkStackID open() throws TCTimeoutException,
045: UnknownHostException, IOException,
046: MaxConnectionsExceededException {
047: final ChannelStatus status = getStatus();
048:
049: synchronized (status) {
050: if (status.isOpen()) {
051: throw new IllegalStateException("Channel already open");
052: }
053: NetworkStackID id = this .sendLayer.open();
054: getStatus().open();
055: this .channelID = new ChannelID(id.toLong());
056: this .cidProvider.setChannelID(this .channelID);
057: this .channelSessionID = sessionProvider.getSessionID();
058: return id;
059: }
060: }
061:
062: public void addClassMapping(TCMessageType type, Class msgClass) {
063: msgFactory.addClassMapping(type, msgClass);
064: }
065:
066: public ChannelID getChannelID() {
067: final ChannelStatus status = getStatus();
068: synchronized (status) {
069: if (!status.isOpen()) {
070: throw new IllegalStateException(
071: "Attempt to get the channel ID of an unopened channel.");
072: } else {
073: return this .channelID;
074: }
075: }
076: }
077:
078: public int getConnectCount() {
079: return connectCount;
080: }
081:
082: public int getConnectAttemptCount() {
083: return this .connectAttemptCount;
084: }
085:
086: /*
087: * Session message filter.
088: * To drop old session msgs when session changed.
089: */
090: public void send(final TCNetworkMessage message) {
091: if (channelSessionID == ((DSOMessageBase) message)
092: .getLocalSessionID()) {
093: super .send(message);
094: } else {
095: logger.info("Drop old message: "
096: + ((DSOMessageBase) message).getMessageType()
097: + " Expected " + channelSessionID + " but got "
098: + ((DSOMessageBase) message).getLocalSessionID());
099: }
100: }
101:
102: public void notifyTransportConnected(MessageTransport transport) {
103: super .notifyTransportConnected(transport);
104: connectCount++;
105: }
106:
107: public void notifyTransportDisconnected(MessageTransport transport) {
108: // Move channel to new session
109: channelSessionID = sessionProvider.nextSessionID();
110: logger
111: .info("ClientMessageChannel moves to "
112: + channelSessionID);
113: this .fireTransportDisconnectedEvent();
114: }
115:
116: public void notifyTransportConnectAttempt(MessageTransport transport) {
117: super .notifyTransportConnectAttempt(transport);
118: connectAttemptCount++;
119: }
120:
121: public void notifyTransportClosed(MessageTransport transport) {
122: //
123: }
124:
125: public ChannelIDProvider getChannelIDProvider() {
126: return cidProvider;
127: }
128:
129: private static class ChannelIDProviderImpl implements
130: ChannelIDProvider {
131:
132: private ChannelID channelID = ChannelID.NULL_ID;
133:
134: private synchronized void setChannelID(ChannelID channelID) {
135: this .channelID = channelID;
136: }
137:
138: public synchronized ChannelID getChannelID() {
139: return this.channelID;
140: }
141:
142: }
143:
144: }
|