001: /* ----- BEGIN LICENSE BLOCK -----
002: * Version: MPL 1.1
003: *
004: * The contents of this file are subject to the Mozilla Public License Version
005: * 1.1 (the "License"); you may not use this file except in compliance with
006: * the License. You may obtain a copy of the License at
007: * http://www.mozilla.org/MPL/
008: *
009: * Software distributed under the License is distributed on an "AS IS" basis,
010: * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
011: * for the specific language governing rights and limitations under the
012: * License.
013: *
014: * The Original Code was the Rendezvous client.
015: * The Code is the DataShare server.
016: *
017: * The Initial Developer of the Original Code is
018: * Ball Aerospace & Technologies Corp, Fairborn, Ohio
019: * Portions created by the Initial Developer are Copyright (C) 2001
020: * the Initial Developer. All Rights Reserved.
021: *
022: * Contributor(s): Charles Wood <cwood@ball.com>
023: *
024: * ----- END LICENSE BLOCK ----- */
025: /* RCS $Id: DataShareConnection.java,v 1.3 2002/01/29 20:55:59 lizellaman Exp $
026: * $Log: DataShareConnection.java,v $
027: * Revision 1.3 2002/01/29 20:55:59 lizellaman
028: * Added LoggingInterface, modified the PropertiesInterface implementation
029: *
030: * Revision 1.2 2002/01/20 23:31:21 lizellaman
031: * add code to handle 'keepalive' packets for if/when DataShare server decides that a connection has been idle too long
032: *
033: * Revision 1.1 2002/01/03 03:21:36 lizellaman
034: * existing file, moved to client package
035: *
036: * Revision 1.1.1.1 2001/10/23 13:43:48 lizellaman
037: * initial sourceforge release
038: *
039: */
040:
041: package org.datashare.client;
042:
043: import java.net.Socket;
044: import java.net.DatagramSocket;
045: import java.net.MulticastSocket;
046: import java.net.InetAddress;
047: import java.io.ObjectOutputStream;
048: import java.io.ObjectInputStream;
049: import java.io.ByteArrayInputStream;
050: import java.io.ByteArrayOutputStream;
051: import java.io.BufferedInputStream;
052: import java.io.BufferedOutputStream;
053: import java.io.InputStream;
054: import java.io.OutputStream;
055: import java.io.IOException;
056: import java.io.StreamCorruptedException;
057: import java.io.UTFDataFormatException;
058: import java.net.DatagramPacket;
059:
060: import org.datashare.objects.DataShareObject;
061: import org.datashare.objects.DataShareConnectionDescriptor;
062: import org.datashare.objects.ChannelDescription;
063: import org.datashare.objects.ActivateConnectionObject;
064: import org.datashare.SessionUtilities;
065:
066: /**
067: * This is the client side implementation of our network connection (formerly called channels)
068: * We use this to send data, and it is provided the object that will be given any received data
069: * and the notification if something bad happens to this socket...
070: */
071: public class DataShareConnection implements Runnable {
072: Socket tcpSocket = null;
073: DatagramSocket udpSocket = null;
074: MulticastSocket multicastSocket = null;
075: ObjectOutputStream oos = null;
076: ObjectInputStream ois = null;
077: private boolean running = true; // set to false to stop thread
078: private boolean closeAllCalled = false;
079: public DataShareConnectionDescriptor dscd = null;
080: ClientDataReceiverInterface cdri = null;
081: DatagramPacket sndPacket = null; // used when sending UDP stuff
082: DatagramPacket rcvPacket = null; // used when receiving UDP stuff
083: protected int type;
084: private boolean completed = false;
085: public String keyValue;
086: private int sndBuffSize = -1;
087: private int rcvBuffSize = -1;
088: private byte[] byteArrayInstance = null;
089:
090: public DataShareConnection(DataShareConnectionDescriptor dscd1,
091: ClientDataReceiverInterface cdri) throws Exception {
092: type = dscd1.type;
093: dscd = dscd1;
094: this .cdri = cdri;
095:
096: switch (type) {
097: case ChannelDescription.TCP: {
098: System.out
099: .println("Creating a TCP DataShareConnection instance for "
100: + dscd.serverIP + ":" + dscd.serverPort);
101: try {
102: if (dscd.completelySpecified) // should never get here if we are really on client side, but just in case...
103: tcpSocket = new Socket(dscd.serverIP,
104: dscd.serverPort, dscd.clientIP,
105: dscd.clientPort);
106: else {
107: tcpSocket = new Socket(dscd.serverIP,
108: dscd.serverPort);
109: dscd.clientIP = tcpSocket.getLocalAddress();
110: dscd.clientPort = tcpSocket.getLocalPort();
111: dscd.completelySpecified = true;
112: }
113: oos = new ObjectOutputStream(tcpSocket
114: .getOutputStream());
115: ois = new ObjectInputStream(tcpSocket.getInputStream());
116: Thread me = new Thread(this , "DataShareConnectionFor"
117: + dscd.channelDescription.channelName
118: + "OnServerPort" + dscd.serverPort);
119: me.start();
120: keyValue = "TCP-" + dscd.serverIP.getHostAddress()
121: + ":" + dscd.serverPort + "-"
122: + dscd.clientIP.getHostAddress() + ":"
123: + dscd.clientPort;
124: while (!completed) {
125: //System.out.println("waiting for connection thread to start...");
126: SessionUtilities.delay(100);
127: }
128: } catch (Exception e) {
129: System.out
130: .println("Problems creating our tcpSocket...");
131: //e.printStackTrace();
132: System.out.println(e);
133: //throw new Exception(e.toString());
134: throw e;
135: }
136: break;
137: }
138: case ChannelDescription.UDP: {
139: System.out
140: .println("Creating a UDP DataShareConnection instance for "
141: + dscd.serverIP + ":" + dscd.serverPort);
142: try {
143: udpSocket = new DatagramSocket();
144: udpSocket.setReceiveBufferSize(15000);
145: udpSocket.setSendBufferSize(10000);
146: rcvBuffSize = udpSocket.getReceiveBufferSize();
147: if (SessionUtilities.getVerbose())
148: System.out
149: .println("UDP socket max receive size is "
150: + rcvBuffSize);
151: sndBuffSize = udpSocket.getSendBufferSize();
152: if (SessionUtilities.getVerbose())
153: System.out.println("UDP socket max send size is "
154: + sndBuffSize);
155: dscd.clientIP = InetAddress.getLocalHost();
156: //dscd.clientIP = udpSocket.getLocalAddress(); doesn't seem to work
157: dscd.clientPort = udpSocket.getLocalPort();
158: dscd.completelySpecified = true;
159: Thread me = new Thread(this , "DataShareConnectionFor"
160: + dscd.channelDescription.channelName);
161: me.start();
162: keyValue = "UDP-" + dscd.serverIP.getHostAddress()
163: + ":" + dscd.serverPort + "-"
164: + dscd.clientIP.getHostAddress() + ":"
165: + dscd.clientPort;
166: while (!completed) {
167: //System.out.println("waiting for connection thread to start...");
168: SessionUtilities.delay(100);
169: }
170: } catch (Exception e2) {
171: System.out
172: .println("Problems creating our udpSocket...");
173: e2.printStackTrace();
174: throw new Exception(e2.toString());
175: }
176: break;
177: }
178: case ChannelDescription.MULTICAST: {
179: System.out
180: .println("Creating a Multicast DataShareConnection instance for "
181: + dscd.serverIP + ":" + dscd.serverPort);
182: try {
183: multicastSocket = new MulticastSocket(dscd.serverPort); // send and receive over same port
184: multicastSocket.setReceiveBufferSize(15000);
185: multicastSocket.setSendBufferSize(10000);
186: multicastSocket.joinGroup(dscd.serverIP); // server IP for Multicast is really multicast
187: if (SessionUtilities.getVerbose()) {
188: rcvBuffSize = multicastSocket
189: .getReceiveBufferSize();
190: sndBuffSize = multicastSocket.getSendBufferSize();
191: if (SessionUtilities.getVerbose()) {
192: System.out
193: .println("Multicast socket max receive size is "
194: + rcvBuffSize);
195: System.out
196: .println("Multicast socket max send size is "
197: + sndBuffSize);
198: }
199: }
200: dscd.clientIP = dscd.serverIP;
201: //dscd.clientIP = multicastSocket.getLocalAddress(); doesn't seem to work
202: dscd.clientPort = multicastSocket.getLocalPort();
203: dscd.completelySpecified = true;
204: Thread me = new Thread(this , "DataShareConnectionFor"
205: + dscd.channelDescription.channelName);
206: me.start();
207: keyValue = "Multicast-"
208: + dscd.serverIP.getHostAddress() + ":"
209: + dscd.serverPort + "-"
210: + dscd.clientIP.getHostAddress() + ":"
211: + dscd.clientPort;
212: while (!completed) {
213: //System.out.println("waiting for connection thread to start...");
214: SessionUtilities.delay(100);
215: }
216: } catch (Exception e2) {
217: System.out
218: .println("Problems creating our multcastSocket...");
219: e2.printStackTrace();
220: throw new Exception(e2.toString());
221: }
222: break;
223: }
224: default: {
225: System.out
226: .println("DataShareConnection...unknown channel type!!!");
227: }
228: }
229: if (SessionUtilities.getVerbose())
230: System.out.println("DataShareConnection ready for "
231: + dscd.channelDescription.channelName);
232: }
233:
234: public int getType() {
235: return type;
236: }
237:
238: public void sendToOthers(Object object) {
239: sendToOthers(object, false);
240: }
241:
242: public void sendToAll(Object object) {
243: sendToAll(object, false);
244: }
245:
246: public void sendToClient(Object object, String clientUniqueName) {
247: sendToClient(object, clientUniqueName, false);
248: }
249:
250: public void sendToOthers(Object object, boolean isControl) {
251: try {
252: DataShareObject dsObject = new DataShareObject(
253: SessionUtilities.convertObjectToByteArray(object),
254: DataShareObject.SENDTOOTHERS, dscd.clientKeyValue);
255: dsObject.isControlObject = isControl;
256: sendDSObject(dsObject);
257: } catch (Exception e) {
258: e.printStackTrace();
259: }
260: }
261:
262: public void sendToAll(Object object, boolean isControl) {
263: try {
264: DataShareObject dsObject = new DataShareObject(
265: SessionUtilities.convertObjectToByteArray(object),
266: DataShareObject.SENDTOALL, dscd.clientKeyValue);
267: dsObject.isControlObject = isControl;
268: sendDSObject(dsObject);
269: } catch (Exception e) {
270: e.printStackTrace();
271: }
272: }
273:
274: public void sendToClient(Object object, String clientUniqueName,
275: boolean isControl) {
276: try {
277: DataShareObject dsObject = new DataShareObject(
278: SessionUtilities.convertObjectToByteArray(object),
279: dscd.clientKeyValue, clientUniqueName);
280: dsObject.isControlObject = isControl;
281: sendDSObject(dsObject);
282: } catch (Exception e) {
283: e.printStackTrace();
284: }
285: }
286:
287: private synchronized void sendDSObject(DataShareObject dsObject) {
288: switch (type) {
289: case ChannelDescription.TCP: {
290: try {
291: if (running) {
292: if (SessionUtilities.getVerbose())
293: System.out.println("Sending "
294: + dsObject.objectBytes.length
295: + " bytes over TCP socket "
296: + dscd.clientPort + "->ServerPort"
297: + dscd.serverPort);
298: oos.writeObject(dsObject);
299: oos.flush();
300: }
301: } catch (IOException ioe) {
302: if (!closeAllCalled) {
303: ioe.printStackTrace();
304: closeAll();
305: }
306: }
307: break;
308: }
309: case ChannelDescription.UDP: {
310: try {
311: if (running) {
312: byte[] sndBytes = SessionUtilities
313: .convertObjectToByteArray(dsObject);
314: sndPacket = new DatagramPacket(sndBytes,
315: sndBytes.length, dscd.serverIP,
316: dscd.serverPort);
317: if (SessionUtilities.getVerbose())
318: System.out.println("Sending " + sndBytes.length
319: + " bytes over UDP socket "
320: + dscd.clientPort + "->ServerPort"
321: + dscd.serverPort);
322: udpSocket.send(sndPacket);
323: }
324: } catch (IOException ioe2) {
325: if (!closeAllCalled) {
326: ioe2.printStackTrace();
327: closeAll();
328: }
329: }
330: break;
331: }
332: case ChannelDescription.MULTICAST: {
333: try {
334: if (running) {
335: byte[] sndBytes = SessionUtilities
336: .convertObjectToByteArray(dsObject);
337: sndPacket = new DatagramPacket(sndBytes,
338: sndBytes.length, dscd.serverIP,
339: dscd.serverPort);
340: if (SessionUtilities.getVerbose())
341: System.out.println("Sending " + sndBytes.length
342: + " bytes over Multicast socket "
343: + dscd.clientPort + "->ServerPort"
344: + dscd.serverPort);
345: multicastSocket.send(sndPacket);
346: }
347: } catch (IOException ioe2) {
348: if (!closeAllCalled) {
349: ioe2.printStackTrace();
350: closeAll();
351: }
352: }
353: break;
354: }
355: default:
356: System.out
357: .println("*** Trying to send to an unknown type of Connection ***");
358: break;
359: }
360: }
361:
362: /**
363: * returns -1 if invalid, otherwise returns the max size of the UDP socket receive buffer
364: */
365: public int getRcvBuffSize() {
366: return rcvBuffSize;
367: }
368:
369: /**
370: * returns -1 if invalid, otherwise returns the max size of the UDP socket send buffer
371: */
372: public int getSndBuffSize() {
373: return sndBuffSize;
374: }
375:
376: // loop for any received data...
377: public void run() {
378: if (SessionUtilities.getVerbose())
379: System.out.println("waiting for data on "
380: + dscd.channelDescription.channelName);
381: System.out.flush();
382: Object object;
383: int objectCount = 0;
384: //Thread.currentThread().setPriority(Thread.currentThread().getPriority() + 1);
385: try {
386: completed = true; // would like to sync this with the readObject line
387: while (running) {
388: switch (type) {
389: case ChannelDescription.TCP: {
390: object = ois.readObject(); // wait for an object
391: if (SessionUtilities.getVerbose())
392: System.out.println("received "
393: + dscd.channelDescription.channelName
394: + " object #" + ++objectCount);
395: try {
396: DataShareObject dso = (DataShareObject) object;
397: // if it is a keepalive packet, don't send to application
398: if (dso.type != DataShareObject.KEEPALIVE)
399: cdri.dataReceived(dso);
400: } catch (ClassCastException cce) {
401: System.out
402: .println("Not DataShareObject for object #"
403: + objectCount
404: + ", data ignored");
405: }
406: break;
407: }
408: case ChannelDescription.UDP: {
409: try {
410: rcvPacket = new DatagramPacket(new byte[9998],
411: 9998);
412: udpSocket.receive(rcvPacket);
413: // System.out.println("received " + dscd.channelDescription.channelName + " object #" + ++objectCount + " (" + rcvPacket.getLength() + ") bytes");
414: object = (Object) SessionUtilities
415: .retrieveObject(rcvPacket.getData());
416: try {
417: DataShareObject dso = (DataShareObject) object;
418: /////////// next two lines are for printing out only
419: Object o = SessionUtilities
420: .retrieveObject(dso.objectBytes);
421: if (SessionUtilities.getVerbose())
422: System.out.println("DSO info: "
423: + dso.objectBytes.length
424: + " bytes in "
425: + o.getClass().toString());
426: ///////////
427: cdri.dataReceived(dso);
428: } catch (ClassCastException cce) {
429: System.out
430: .println("Not DataShareObject for object #"
431: + objectCount
432: + ", data ignored");
433: cce.printStackTrace();
434: }
435: } catch (StreamCorruptedException sce) {
436: System.out.println("Discarding UDP packet-> "
437: + sce.getMessage());
438: } catch (UTFDataFormatException udfe) {
439: System.out.println("Discarding UDP packet-> "
440: + udfe.getMessage());
441: }
442: break;
443: }
444: case ChannelDescription.MULTICAST: {
445: try {
446: rcvPacket = new DatagramPacket(new byte[9998],
447: 9998);
448: multicastSocket.receive(rcvPacket);
449: // System.out.println("received " + dscd.channelDescription.channelName + " object #" + ++objectCount + " (" + rcvPacket.getLength() + ") bytes");
450: object = (Object) SessionUtilities
451: .retrieveObject(rcvPacket.getData());
452: try {
453: DataShareObject dso = (DataShareObject) object;
454: /////////// next two lines are for printing out only
455: Object o = SessionUtilities
456: .retrieveObject(dso.objectBytes);
457: if (SessionUtilities.getVerbose())
458: System.out.println("DSO info: "
459: + dso.objectBytes.length
460: + " bytes in "
461: + o.getClass().toString());
462: ///////////
463: // for Multicast, we get any objects we send, and we don't want to receive this one...
464: // if(!o.getClass().isInstance(new ActivateConnectionObject()))
465: if (!dso.isControlObject)
466: cdri.dataReceived(dso);
467: } catch (ClassCastException cce) {
468: System.out
469: .println("Not DataShareObject for object #"
470: + objectCount
471: + ", data ignored");
472: cce.printStackTrace();
473: }
474: } catch (StreamCorruptedException sce) {
475: System.out
476: .println("Discarding Multicast packet-> "
477: + sce.getMessage());
478: } catch (UTFDataFormatException udfe) {
479: System.out
480: .println("Discarding Multicast packet-> "
481: + udfe.getMessage());
482: }
483: break;
484: }
485: }
486: }
487: } catch (Exception e) {
488: if (!closeAllCalled) {
489: System.out.println("DataShareConnection trouble for "
490: + this .keyValue);
491: e.printStackTrace();
492: closeAll();
493: }
494: }
495: }
496:
497: /**
498: * this will close all streams and connections, ignoring any errors
499: */
500: public void closeAll() {
501: if (!closeAllCalled) {
502: closeAllCalled = true;
503: running = false; // set to false to stop thread
504: if (tcpSocket != null) {
505: try {
506: tcpSocket.close();
507: tcpSocket = null;
508: } catch (Exception e) {
509: }
510: } else if (udpSocket != null) {
511: try {
512: udpSocket.close();
513: udpSocket = null;
514: } catch (Exception e) {
515: }
516: } else if (multicastSocket != null) {
517: try {
518: multicastSocket.leaveGroup(dscd.serverIP);
519: multicastSocket.close();
520: multicastSocket = null;
521: } catch (Exception e) {
522: }
523: }
524: cdri.connectionLost(this);
525: }
526: }
527:
528: }
|