001: package com.coldcore.coloradoftp.plugin.intellipack.connection;
002:
003: import com.coldcore.coloradoftp.connection.impl.GenericDataConnection;
004: import com.coldcore.coloradoftp.factory.ObjectFactory;
005: import com.coldcore.coloradoftp.plugin.intellipack.util.Util;
006: import org.apache.log4j.Logger;
007:
008: import java.nio.channels.SocketChannel;
009:
010: /**
011: * Data connection with configurable speed (bytes per second) and timeout (seconds).
012: *
013: * This class devides data connection into 2 groups: LOCAL and REMOTE. Because of this
014: * this class can work with machines that have 1 or 2 network cards. One card is
015: * considered for local connections and other is for remote connections. Throughtput
016: * of each card is configured individualy.
017: */
018: public class IntelDataConnection extends GenericDataConnection {
019:
020: private static Logger log = Logger
021: .getLogger(IntelDataConnection.class);
022: protected long totalSpeedLocal;
023: protected long totalSpeedRemote;
024: protected String localIpRegexp;
025: protected long lastSecondTime;
026: protected long bytesLastSec;
027: protected boolean local;
028: protected int timeout;
029: protected long lastActiveTime;
030: protected Speedometer speedometer;
031:
032: public IntelDataConnection(int bufferSize) {
033: super (bufferSize);
034:
035: localIpRegexp = "192.168.*";
036: lastActiveTime = System.currentTimeMillis();
037: lastSecondTime = System.currentTimeMillis();
038: }
039:
040: /** Get channel max speed for connections from local hosts
041: * @return Bytes per second
042: */
043: public long getTotalSpeedLocal() {
044: return totalSpeedLocal;
045: }
046:
047: /** Set channel max speed for connections from local hosts
048: * @param totalSpeedLocal Bytes per second
049: */
050: public void setTotalSpeedLocal(long totalSpeedLocal) {
051: if (totalSpeedLocal < 0)
052: throw new IllegalArgumentException("Negative argument");
053: this .totalSpeedLocal = totalSpeedLocal;
054: }
055:
056: /** Get channel max speed for connections not from local hosts
057: * @return Bytes per second
058: */
059: public long getTotalSpeedRemote() {
060: return totalSpeedRemote;
061: }
062:
063: /** Set channel max speed for connections not from local hosts
064: * @param totalSpeedRemote Bytes per second
065: */
066: public void setTotalSpeedRemote(long totalSpeedRemote) {
067: if (totalSpeedRemote < 0)
068: throw new IllegalArgumentException("Negative argument");
069: this .totalSpeedRemote = totalSpeedRemote;
070: }
071:
072: /** Get regular expression of IP to determine connections from local hosts
073: * @return Regular expression
074: */
075: public String getLocalIpRegexp() {
076: return localIpRegexp;
077: }
078:
079: /** Set regular expression of IP to determine connections from local hosts
080: * @param localIpRegexp Regular expression
081: */
082: public void setLocalIpRegexp(String localIpRegexp) {
083: if (localIpRegexp == null)
084: throw new IllegalArgumentException("Invalid argument");
085: this .localIpRegexp = localIpRegexp;
086: }
087:
088: /** Get connection inactive interval
089: * @return Seconds
090: */
091: public int getTimeout() {
092: return timeout;
093: }
094:
095: /** Set connection inactive interval
096: * @param timeout Seconds
097: */
098: public void setTimeout(int timeout) {
099: if (timeout < 0)
100: throw new IllegalArgumentException("Negative argument");
101: this .timeout = timeout;
102: }
103:
104: public void initialize(SocketChannel channel) {
105: super .initialize(channel);
106:
107: String ip = channel.socket().getInetAddress().getHostAddress();
108: local = Util.checkRegExp(ip, localIpRegexp);
109: IntelDataConnection.log.debug("Local IP? " + local);
110:
111: speedometer = (Speedometer) ObjectFactory
112: .getObject(local ? "dataSpeedometer.local"
113: : "dataSpeedometer.remote");
114: }
115:
116: protected void read() throws Exception {
117: if (isOverSpeed()) {
118: Thread.sleep(sleep);
119: return;
120: }
121:
122: long bytes = bytesRead;
123: super .read();
124: long diff = bytesRead - bytes;
125:
126: if (diff > 0) {
127: speedometer.add(diff); //Update speed
128: lastActiveTime = System.currentTimeMillis(); //Update last active time
129: }
130: }
131:
132: protected void write() throws Exception {
133: if (isOverSpeed()) {
134: Thread.sleep(sleep);
135: return;
136: }
137:
138: long bytes = bytesWrote;
139: super .write();
140: long diff = bytesWrote - bytes;
141:
142: if (diff > 0) {
143: speedometer.add(diff); //Update speed
144: lastActiveTime = System.currentTimeMillis(); //Update last active time
145: }
146:
147: }
148:
149: /** Test if total channel speed barrier is broken
150: * @return TRUE if it is broken or FALSE otherwise
151: */
152: protected boolean isOverSpeed() {
153: long currentTime = System.currentTimeMillis();
154: long transferred = bytesRead + bytesWrote;
155:
156: if (currentTime > lastSecondTime + 1000L) {
157: bytesLastSec = transferred;
158: lastSecondTime = currentTime;
159: }
160:
161: long speedLimit = local ? totalSpeedLocal : totalSpeedRemote;
162:
163: return speedometer.getBytesThisSecond() > speedLimit
164: && speedLimit > 0;
165: }
166:
167: public void service() throws Exception {
168: //Timeout test
169: long currentTime = System.currentTimeMillis();
170: if (currentTime > lastActiveTime + timeout * 1000L
171: && timeout > 0 && userAborted) {
172: IntelDataConnection.log
173: .warn("Connection timeout, aborting data transfer");
174: abort();
175: }
176:
177: //Service...
178: super.service();
179: }
180: }
|