001: /* ----- BEGIN LICENSE BLOCK -----
002: * Version: MPL 1.1
003: *
004: * The contents of this file are subject to the Mozilla Public License Version
005: * 1.1 (the "License"); you may not use this file except in compliance with
006: * the License. You may obtain a copy of the License at
007: * http://www.mozilla.org/MPL/
008: *
009: * Software distributed under the License is distributed on an "AS IS" basis,
010: * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
011: * for the specific language governing rights and limitations under the
012: * License.
013: *
014: * The Original Code is the DataShare server.
015: *
016: * The Initial Developer of the Original Code is
017: * Ball Aerospace & Technologies Corp, Fairborn, Ohio
018: * Portions created by the Initial Developer are Copyright (C) 2001
019: * the Initial Developer. All Rights Reserved.
020: *
021: * Contributor(s): Charles Wood <cwood@ball.com>
022: *
023: * ----- END LICENSE BLOCK ----- */
024: /* RCS $Id: DataReceiverAdapter.java,v 1.4 2002/02/20 14:12:57 lizellaman Exp $
025: * $Log: DataReceiverAdapter.java,v $
026: * Revision 1.4 2002/02/20 14:12:57 lizellaman
027: * changes to improve history retrieval
028: *
029: * Revision 1.3 2002/02/04 13:51:39 lizellaman
030: * Remove all references to past product names (or)
031: * Add PublicAPI for creating Rendezvous Sessions
032: *
033: * Revision 1.2 2002/01/29 20:50:17 lizellaman
034: * Added LoggingInterface, modified the PropertiesInterface implementation
035: *
036: * Revision 1.1.1.1 2001/10/23 13:37:15 lizellaman
037: * initial sourceforge release
038: *
039: */
040:
041: package org.datashare;
042:
043: import java.util.Hashtable;
044: import java.util.Enumeration;
045: import java.util.Vector;
046:
047: import org.datashare.FifoQueue;
048: import org.datashare.FifoConsumer;
049: import org.datashare.objects.DataShareObject;
050: import org.datashare.objects.ActivateConnectionObject;
051: import org.datashare.objects.ChannelDescription;
052: import org.datashare.objects.RequestHistory;
053: import org.datashare.objects.HistoryFinishedObject;
054:
055: /**
056: * Implements the standard behavior we expect for DataShare connections on the
057: * server side, including taking care of activating the connection (initiated by the client
058: * sending the ActivateConnectionObject) and automatically sending any received data to
059: * the specified recipients. This Adapter is only suitable for the DataShare function
060: * Channels, not the commandStatus connection. There is one instance of this class
061: * for every function/channel, not one for every consumer.
062: *
063: * @date March 01, 2001
064: * @author Charles Wood
065: * @version 1.0
066: */
067: public class DataReceiverAdapter implements DataReceiverInterface {
068: /**
069: * table of SocketServers, indexed by clientKey (cannot have two connections from same client on
070: * same connection). This is populated by displaying the consumer.
071: */
072: public Hashtable allMySocketHandlers = new Hashtable();
073:
074: /**
075: * the databaseKey for this instance's Channel, used only if data is persisted by the Server
076: */
077: private String channelDatabaseKey = null;
078:
079: /**
080: * provides the sequence for the Data objects, incremented as they are created, first data
081: * object has sequence number of zero
082: */
083: protected int sequence = 0;
084:
085: private SocketAdapter sockets[] = new SocketAdapter[0]; // lots of remote clients, comes from allMySocketHandlers
086: private Vector allSockets = new Vector(); // populated by call to .newConnection()
087:
088: private SocketServerInterface threadedServer = null; // our socket server, set externally
089: private TreeViewServerInterface server;
090: private SessionInfo sessionInfo;
091: private ChannelInfo channelInfo;
092: private boolean neverCreateBean;
093: private Hashtable historyThreads = new Hashtable();
094:
095: public DataReceiverAdapter(TreeViewServerInterface server) {
096: this .server = server;
097: }
098:
099: /**
100: * must be set after constructor called
101: */
102: public void setSessionInfo(SessionInfo sessionInfo) {
103: this .sessionInfo = sessionInfo;
104: }
105:
106: /**
107: * must be set after constructor called
108: */
109: public void setChannelInfo(ChannelInfo channelInfo) {
110: this .channelInfo = channelInfo;
111: neverCreateBean = !server.getPersistData()
112: || !channelInfo.getSaveDataForThisChannel();
113: }
114:
115: /**
116: * must be set after constructor called
117: */
118: public void setSocketServer(SocketServerInterface threadedServer) {
119: this .threadedServer = threadedServer;
120: }
121:
122: public SocketServerInterface getSocketServer() {
123: return threadedServer;
124: }
125:
126: // if any data is received, check to see if it is an activation object or a RequestHistory object,
127: // else send it to specified others if the channel is active
128: public synchronized void clientDataReceived(DataShareObject dso,
129: SocketAdapter ts) {
130: boolean forwardableData = false;
131: boolean createBean = false; // used when decided by individual circumstance
132:
133: SessionUtilities.getLoggingInterface().debugMsg(
134: SessionUtilities.getLoggingInterface().DEBUG,
135: SessionUtilities.getLoggingInterface().NETWORK,
136: "1-Rcvd " + (dso.isControlObject ? "control" : "data")
137: + " object from " + ts.getClientKey() + " on "
138: + ts.getKeyValue());
139:
140: if (dso.isControlObject) // server only needs to look inside control objects
141: {
142: try {
143: Object object = SessionUtilities
144: .retrieveObject(dso.objectBytes);
145: if (object instanceof ActivateConnectionObject) {
146: ActivateConnectionObject aco = (ActivateConnectionObject) object;
147: if (ts.getActive()) {
148: SessionUtilities
149: .getLoggingInterface()
150: .debugMsg(
151: SessionUtilities
152: .getLoggingInterface().DEBUG,
153: SessionUtilities
154: .getLoggingInterface().NETWORK,
155: "Consumer requested activation on channel that is already active");
156: }
157: connectionActivated(ts, aco);
158: } else if (object instanceof RequestHistory) {
159: RequestHistory rh = (RequestHistory) object;
160: handleHistory(rh, ts); // asking for history or cancelling history
161: }
162: } catch (Exception e) {
163: e.printStackTrace();
164: }
165: } else // not control object
166: {
167: try {
168: if (ts.getActive()) {
169: if (!neverCreateBean)
170: createBean = true;
171: if (ts.getType() != ChannelDescription.MULTICAST)
172: forwardableData = true;
173: } else {
174: SessionUtilities
175: .getLoggingInterface()
176: .debugMsg(
177: SessionUtilities
178: .getLoggingInterface().WARNING,
179: SessionUtilities
180: .getLoggingInterface().NETWORK,
181: "DataReceiverAdapter received non-control object over not-activated Channel");
182: }
183: } catch (Exception e) {
184: e.printStackTrace();
185: }
186: }
187:
188: if (createBean) {
189: try {
190: Hashtable props = new Hashtable();
191: // firsttime we need to get the channel key
192: if (channelDatabaseKey == null)
193: setChannelDatabaseKey(channelInfo.getDatabaseID());
194:
195: props.put("channelKey", channelDatabaseKey); // need to make ADSKey for ADS
196: props.put("userObject", SessionUtilities
197: .convertObjectToByteArray(dso));
198: props.put("sequence", new Integer(sequence));
199: // no callback for Data (use null)
200: server.saveDataToDatabase("DSData3Home", props,
201: (PersistDataCallbackInterface) null);
202: sequence++;
203: // for Data objects, we don't try to get a reference to them now
204: } catch (Exception e) {
205: e.printStackTrace();
206: }
207: }
208:
209: if (forwardableData) {
210: try {
211: switch (dso.type) {
212: case DataShareObject.SENDTOALL:
213: // send data to all sockets...
214: for (int x = 0; x < sockets.length; x++) {
215: if (sockets[x].getActive()) {
216: SessionUtilities
217: .getLoggingInterface()
218: .debugMsg(
219: SessionUtilities
220: .getLoggingInterface().DEBUG,
221: SessionUtilities
222: .getLoggingInterface().NETWORK,
223: "2-Sending object to "
224: + sockets[x]
225: .getClientKey());
226: sockets[x].sendData(dso);
227: }
228: }
229: break;
230: case DataShareObject.SENDTOCLIENT:
231: // send data to one socket...
232: for (int x = 0; x < sockets.length; x++) {
233: if (sockets[x].getActive()
234: && sockets[x].getClientKey().equals(
235: dso.destinationClientKey)) {
236: SessionUtilities
237: .getLoggingInterface()
238: .debugMsg(
239: SessionUtilities
240: .getLoggingInterface().DEBUG,
241: SessionUtilities
242: .getLoggingInterface().NETWORK,
243: "2-Sending object to "
244: + sockets[x]
245: .getClientKey());
246: sockets[x].sendData(dso);
247: break;
248: }
249: }
250: break;
251: case DataShareObject.SENDTOOTHERS:
252: // send data to all but one socket...
253: for (int x = 0; x < sockets.length; x++) {
254: if (sockets[x].getActive()
255: && !sockets[x].getClientKey().equals(
256: dso.sendingClientKey)) {
257: SessionUtilities
258: .getLoggingInterface()
259: .debugMsg(
260: SessionUtilities
261: .getLoggingInterface().DEBUG,
262: SessionUtilities
263: .getLoggingInterface().NETWORK,
264: "2-Sending object to "
265: + sockets[x]
266: .getClientKey());
267: sockets[x].sendData(dso);
268: }
269: }
270: break;
271: default:
272: SessionUtilities
273: .getLoggingInterface()
274: .debugMsg(
275: SessionUtilities
276: .getLoggingInterface().WARNING,
277: SessionUtilities
278: .getLoggingInterface().NETWORK,
279: "DataReceiverAdapter received unknown type field value-> "
280: + dso.type);
281: }
282: } catch (Exception e) {
283: e.printStackTrace();
284: }
285: }
286: }
287:
288: /**
289: * tries to find the socket for a particular consumer, returns null if not successful
290: */
291: public SocketAdapter findConsumerSocket(ConsumerInfo ci) {
292: SocketAdapter sa = null;
293: for (int x = 0; x < sockets.length; x++) {
294: if (sockets[x].getClientKey().equals(ci.getKeyValue())) {
295: sa = sockets[x];
296: break;
297: }
298: }
299: return sa;
300: }
301:
302: /**
303: * This is called through the DataReceiverInterface when the connection has been lost/closed
304: * so that the connection may be removed from our tables and other clients notified
305: */
306: public void connectionLost(SocketAdapter ts) {
307: SessionUtilities.getLoggingInterface().debugMsg(
308: SessionUtilities.getLoggingInterface().DEBUG,
309: SessionUtilities.getLoggingInterface().NETWORK,
310: "Lost the connection to " + ts.getKeyValue());
311: // find this ts in our hashtable and remove it and tell others...
312: synchronized (allMySocketHandlers) {
313: SessionUtilities.getLoggingInterface().debugMsg(
314: SessionUtilities.getLoggingInterface().DEBUG,
315: SessionUtilities.getLoggingInterface().NETWORK,
316: "connectionLost(), removing consumer "
317: + ts.getClientKey()
318: + " from DataReceiverAdapter "
319: + this .threadedServer.getKeyValue());
320:
321: HistoryThread ht = (HistoryThread) historyThreads.get(ts
322: .getKeyValue());
323: if (ht != null)
324: ht.cancel();
325:
326: allMySocketHandlers.remove(ts.getClientKey());
327: allSockets.remove(ts);
328: convertAllMySocketHandlersToSockets();
329: server.removeConsumer(ts.getClientKey(), sessionInfo
330: .getKeyValue(), channelInfo.getKeyValue());
331: }
332: }
333:
334: /**
335: * This is called locally when the Multicast connection has been lost/closed
336: * so that the connection may be removed from our tables and other clients notified.
337: * This is different from other types of connections in that we have only one connection
338: * that is shared among all Multicast clients.
339: */
340: private void connectionLost(String clientKey) {
341: SessionUtilities.getLoggingInterface().debugMsg(
342: SessionUtilities.getLoggingInterface().DEBUG,
343: SessionUtilities.getLoggingInterface().NETWORK,
344: "Lost the Multicast connection to " + clientKey);
345: // find this ts in our hashtable and remove it and tell others...
346: synchronized (allMySocketHandlers) {
347: SessionUtilities.getLoggingInterface().debugMsg(
348: SessionUtilities.getLoggingInterface().DEBUG,
349: SessionUtilities.getLoggingInterface().NETWORK,
350: "connectionLost (multicast), removing consumer "
351: + clientKey + " from DataReceiverAdapter "
352: + this .threadedServer.getKeyValue());
353: SocketAdapter ts = (SocketAdapter) allMySocketHandlers
354: .remove(clientKey);
355: if (ts != null)
356: allSockets.remove(ts);
357: convertAllMySocketHandlersToSockets();
358: server.removeConsumer(clientKey, sessionInfo.getKeyValue(),
359: channelInfo.getKeyValue());
360: SessionUtilities
361: .getLoggingInterface()
362: .debugMsg(
363: SessionUtilities.getLoggingInterface().DEBUG,
364: SessionUtilities.getLoggingInterface().NETWORK,
365: "***** client should have been told to shutdown the function for this socket!!");
366: }
367: }
368:
369: public void newConnection(SocketAdapter ts) {
370: SessionUtilities.getLoggingInterface().debugMsg(
371: SessionUtilities.getLoggingInterface().DEBUG,
372: SessionUtilities.getLoggingInterface().NETWORK,
373: "New connection - " + ts.getKeyValue());
374: allSockets.add(ts);
375: }
376:
377: /**
378: * attempts to close all connections and stop all Threads associated with our Channel.
379: * This should only be called prior to exiting as it is not guaranteed to be the most
380: * graceful way to shutdown.
381: */
382: public void
383: stopAllConnectionsAndThreads()
384: {
385: getSocketServer().close(); // close our socket server so no more connections are made
386: // shutdown all clients getting History
387: for( Enumeration enum = historyThreads.elements(); enum.hasMoreElements(); )
388: {
389: HistoryThread historyThread = (HistoryThread)enum.nextElement();
390: historyThread.cancel();
391: historyThread.queue.reset();
392: }
393: this .removeAllConsumerConnections(); // get rid of all activated consumers
394: // now just in case we forgot any clients...
395: for( Enumeration enum = allSockets.elements(); enum.hasMoreElements(); )
396: {
397: SocketAdapter sa = (SocketAdapter)enum.nextElement();
398: sa.close();
399: }
400: }
401:
402: /**
403: * called when a client 'activates' this ThreadedSocket
404: * for Multicast, we will have multiple copies of same socket
405: */
406: public synchronized void connectionActivated(SocketAdapter ts,
407: ActivateConnectionObject aco) {
408: SessionUtilities.getLoggingInterface().debugMsg(
409: SessionUtilities.getLoggingInterface().DEBUG,
410: SessionUtilities.getLoggingInterface().CLIENT,
411: "Consumer " + aco.clientKeyValue
412: + " requested us to activate Connection "
413: + ts.getKeyValue());
414:
415: // test to see if we know about this client...
416: if (server.isClientRegistered(aco.clientKeyValue)) {
417: // yes, we know about this client
418: // need to set clientClass here so we know class of client for this data connection
419: String clientClass = server.getClientInfo(
420: aco.clientKeyValue).getClientClass();
421: ts.setClientClass(clientClass);
422:
423: // activate connection
424: ts.setActive(true);
425:
426: // display the consumer, if channel has history, they may already be displayed
427: displayConsumer(ts, aco.clientKeyValue);
428: } else {
429: // no, we do not know about this client
430: SessionUtilities.getLoggingInterface().debugMsg(
431: SessionUtilities.getLoggingInterface().DEBUG,
432: SessionUtilities.getLoggingInterface().CLIENT,
433: "Ignoring ActivateCommand, non-current consumer-> "
434: + aco.clientKeyValue);
435: }
436: }
437:
438: /**
439: * puts the user into the tree for this channel/function, and sets the SocketAdapter's owner's name
440: */
441: private void displayConsumer(SocketAdapter ts, String clientKeyValue) {
442: // save the client key in the Session
443: ts.setClientKey(clientKeyValue);
444:
445: // let server send another update and make the clients overwrite this consumer entry (activate may change for consumer)
446: if (allMySocketHandlers.containsKey(clientKeyValue)) {
447: SessionUtilities.getLoggingInterface().debugMsg(
448: SessionUtilities.getLoggingInterface().DEBUG,
449: SessionUtilities.getLoggingInterface().CLIENT,
450: "DRA.displayClient() already has client "
451: + clientKeyValue + " in channel "
452: + ts.getKeyValue());
453: } else {
454: SessionUtilities.getLoggingInterface().debugMsg(
455: SessionUtilities.getLoggingInterface().DEBUG,
456: SessionUtilities.getLoggingInterface().CLIENT,
457: "DRA.displayClient() adding consumer "
458: + clientKeyValue + " to channel "
459: + ts.getKeyValue());
460: allMySocketHandlers.put(clientKeyValue, ts);
461: convertAllMySocketHandlersToSockets();
462: }
463: server.addConsumer(clientKeyValue, sessionInfo.getKeyValue(),
464: channelInfo.getKeyValue(), ts.getActive());
465: }
466:
467: /**
468: * called when we loose our ServerSocket, need to figure out what to do about it...
469: */
470: public void lostServerSocket(String keyValue) {
471: SessionUtilities.getLoggingInterface().debugMsg(
472: SessionUtilities.getLoggingInterface().DEBUG,
473: SessionUtilities.getLoggingInterface().NETWORK,
474: "DataRecieverAdapter:Lost our socketServer - "
475: + keyValue);
476: }
477:
478: /**
479: * Take our hashtable of connections and convert it to an array
480: */
481: private void convertAllMySocketHandlersToSockets() {
482: synchronized (sockets) {
483: Object[] objects = allMySocketHandlers.values().toArray();
484: sockets = new SocketAdapter[objects.length];
485: for (int x = 0; x < sockets.length; x++)
486: sockets[x] = (SocketAdapter) objects[x];
487: }
488: }
489:
490: // /**
491: // * returns the key for this instance's Session
492: // */
493: // public String
494: // getSessionKey()
495: // {
496: // return sessionInfo.getDatabaseID();
497: // }
498:
499: /**
500: * retrieves the channelDatabaseKey for this channel
501: */
502: public String getChannelDatabaseKey() {
503: return this .channelDatabaseKey;
504: }
505:
506: /**
507: * sets the channelDatabaseKey for this channel
508: */
509: public void setChannelDatabaseKey(String channelDatabaseKey) {
510: if (this .channelDatabaseKey == null) // can only set it once
511: {
512: this .channelDatabaseKey = channelDatabaseKey;
513: }
514: }
515:
516: /**
517: * returns the next port to use for a connection
518: */
519: public int getNextPort() {
520: return server.getNextPort();
521: }
522:
523: /**
524: * returns the key for this instance's Channel
525: */
526: public String getChannelKey() {
527: return channelInfo.getDatabaseID();
528: }
529:
530: /**
531: * call this when a consumer needs to be removed...takes care to close the connection
532: */
533: public void removeConsumerConnection(String clientKeyValue) {
534: SocketAdapter this SA = null;
535: if (allMySocketHandlers != null) {
536: try {
537: if (allMySocketHandlers.containsKey(clientKeyValue)) {
538: SessionUtilities
539: .getLoggingInterface()
540: .debugMsg(
541: SessionUtilities
542: .getLoggingInterface().DEBUG,
543: SessionUtilities
544: .getLoggingInterface().CLIENT,
545: "Removing consumer "
546: + clientKeyValue
547: + " from DataReceiverAdapter "
548: + this .threadedServer
549: .getKeyValue());
550: this SA = (SocketAdapter) allMySocketHandlers
551: .remove(clientKeyValue);
552: allSockets.remove(this SA);
553: if (this SA.getType() == ChannelDescription.MULTICAST) {
554: // for multicast, only one socket server and one socket, don't close it
555: connectionLost(clientKeyValue);
556: } else {
557: // close connection and it will notify us
558: this SA.close();
559: }
560: }
561: } catch (Exception e) {
562: SessionUtilities
563: .getLoggingInterface()
564: .debugMsg(
565: SessionUtilities.getLoggingInterface().ERROR,
566: SessionUtilities.getLoggingInterface().CLIENT,
567: "Problem removing client or closing connection...");
568: e.printStackTrace();
569: }
570: }
571: }
572:
573: /**
574: * call this when all consumers need to be removed...takes care to close the connection,
575: */
576: public void removeAllConsumerConnections()
577: {
578: SocketAdapter this SA = null;
579: if(allMySocketHandlers != null)
580: {
581: for(Enumeration enum = allMySocketHandlers.elements(); enum.hasMoreElements();)
582: {
583: this SA = (SocketAdapter)enum.nextElement();
584: SessionUtilities.getLoggingInterface().debugMsg(SessionUtilities.getLoggingInterface().DEBUG,
585: SessionUtilities.getLoggingInterface().CLIENT,
586: "Removing consumer " + this SA.getClientKey());
587: SocketAdapter ts = (SocketAdapter)allMySocketHandlers.remove(this SA.getClientKey());
588: if(ts != null)
589: allSockets.remove(this SA);
590: if(this SA.getType() == ChannelDescription.MULTICAST)
591: {
592: // for multicast, only one socket server and one socket, don't close it
593: connectionLost(this SA.getClientKey());
594: }
595: else
596: {
597: // close connection and it will notify us
598: this SA.close();
599: }
600: }
601: }
602: }
603:
604: /**
605: * this method handles RequestHistory objects, and either sends the history objects or
606: * cancels the history that is being sent
607: */
608: private void handleHistory(RequestHistory rh, SocketAdapter ts) {
609: if (rh.typeOfRequest == RequestHistory.DATA) {
610: HistoryThread historyThread = new HistoryThread(
611: this ,
612: ts,
613: server.getPersistenceInterface(),
614: channelInfo.getConnectionDescriptor().channelDescription);
615: historyThreads.put(ts.getKeyValue(), historyThread);
616: ts.setClientKey(rh.userName); // this will be written over later with the activation, but lets us see who is getting history for now
617: historyThread.start();
618: } else if (rh.typeOfRequest == RequestHistory.CANCEL) {
619: HistoryThread historyThread = (HistoryThread) historyThreads
620: .remove(ts.getKeyValue());
621: historyThread.cancel();
622: }
623: }
624: }
625:
626: /**
627: * Sends the EJBs for this channel to the client
628: */
629: class HistoryThread extends Thread implements FifoConsumer {
630: boolean finished = false;
631: boolean canceled = false;
632: DataReceiverAdapter dra;
633: SocketAdapter ts;
634: String threadName;
635: int sequenceNumber = 0;
636: int incrementSize = 100; // how many to retrieve at one time, will be set to value passed in by ChannelDescription
637: boolean retrieveMultipleEJBs = false;
638: int delayCount = 0;
639: ChannelDescription channelDescription;
640: FifoQueue queue = new FifoQueue();
641: RetreiveDataBeans getDataBeans = new RetreiveDataBeans();
642: PersistenceInterface persistenceInterface = null;
643: // these attributes used when 'data pacing' is in effect (use original data times to help pace data)
644: long lastDataSentSystemTime = 0l; // when we sent the last chunk of data
645: long previousDataTime = 0l; // original time of last chuck of data sent
646: int preSendThisMany = 20; // send this many packets to pre-charge our buffers before actual pacing begins
647: int sentWithOutPacing = 0; // how many we have sent without pacing, compared to above value to determine if we should pace yet
648:
649: public HistoryThread(DataReceiverAdapter parent, SocketAdapter sa,
650: PersistenceInterface pi, ChannelDescription cd) {
651: dra = parent;
652: ts = sa;
653: persistenceInterface = pi;
654: channelDescription = cd;
655: threadName = "DataShare.HistoryThead-" + sa.getKeyValue();
656: this .setName(threadName);
657: }
658:
659: /**
660: * only called if user cancels
661: */
662: public void cancel() {
663: canceled = true;
664: this .setName("Stopped--" + threadName);
665: finished = true;
666: getDataBeans.cancelHistory();
667: queue.reset();
668: this .sendMoreData(); // just to release the Thread so it can exit
669: }
670:
671: public void run() {
672: this .setPriority(Thread.currentThread().getPriority() + 1);
673: DataShareObject dso = null;
674: DataShareObject[] dsoArray = null;
675: incrementSize = channelDescription.historyCountInc; // how many to retrieve at one time
676: if (incrementSize > 0)
677: retrieveMultipleEJBs = true;
678: delayCount = channelDescription.historyDelay;
679:
680: if (SessionUtilities.getVerbose()) {
681: String delayType;
682: if (delayCount > 0)
683: delayType = ", with a delay between packets of "
684: + delayCount + "msec";
685: else if (delayCount < 0)
686: delayType = ", with the data timing similar to original data ('paced')";
687: else
688: delayType = " with no delay between data packets";
689:
690: SessionUtilities.getLoggingInterface().debugMsg(
691: SessionUtilities.getLoggingInterface().DEBUG,
692: SessionUtilities.getLoggingInterface().DATABASE,
693: threadName + " is retrieving/sending "
694: + incrementSize + " EJBs per call"
695: + delayType);
696: }
697:
698: // true only if retrieving multiple EJBs per call
699: if (retrieveMultipleEJBs) {
700: queue.setConsumer(this ); // the queue will now call our newFifoDataAvailable from it's Thread whenever data is available
701: getDataBeans.retrieveAllData(persistenceInterface, dra
702: .getChannelDatabaseKey());
703: }
704:
705: while (!finished) {
706: try {
707: if (retrieveMultipleEJBs) // if getting multiple EJBs at a time, use the threading of FifoQueue
708: {
709: dsoArray = getDataBeans.getNextData(incrementSize);
710:
711: // dsoArray of null means no more data available, zero length means none right now
712: if (dsoArray == null) {
713: SessionUtilities
714: .getLoggingInterface()
715: .debugMsg(
716: SessionUtilities
717: .getLoggingInterface().DEBUG,
718: SessionUtilities
719: .getLoggingInterface().DATABASE,
720: "No more history available for Channel "
721: + channelDescription.channelName);
722: finished = true;
723: break;
724: }
725:
726: if (dsoArray.length > 0) {
727: //sequenceNumber += incrementSize;
728: //sequenceNumber += dsoArray.length; // increment sequence number by what was returned
729: for (int x = 0; x < dsoArray.length; x++) {
730: try {
731: queue.write(dsoArray[x]);
732: } catch (Exception ee) {
733: ee.printStackTrace();
734: }
735: }
736: this .waitToGetMoreData(); // wait for thread to ask for more data
737: }
738: } else // retreive EJBs one at a time and send them one at a time
739: {
740: dso = RetreiveDataBeans.getData(
741: persistenceInterface, dra
742: .getChannelDatabaseKey(),
743: sequenceNumber++);
744: if (dso != null) {
745: ts.sendData(dso);
746: yield();
747: } else
748: finished = true;
749: }
750: } catch (Exception e) {
751: e.printStackTrace();
752: finished = true;
753: }
754: } // end of while(!finished)
755:
756: try {
757: // tell client that no more history will be sent
758: HistoryFinishedObject hfo = new HistoryFinishedObject(dra
759: .getChannelDatabaseKey());
760: dso = new DataShareObject(SessionUtilities
761: .convertObjectToByteArray(hfo),
762: DataShareObject.SENDTOALL, ts.getClientKey());
763: SessionUtilities.getLoggingInterface().debugMsg(
764: SessionUtilities.getLoggingInterface().DEBUG,
765: SessionUtilities.getLoggingInterface().DATABASE,
766: "Sending HistoryFinishedObject to client "
767: + ts.getClientKey() + " for Channel "
768: + channelDescription.channelName);
769:
770: // send history finished; if this is an unreliable channel, send a few more end of history objects...
771: int sendThisManyHistoryFinishedObjects = 1;
772: if (ts.getType() == ChannelDescription.UDP
773: || ts.getType() == ChannelDescription.MULTICAST)
774: sendThisManyHistoryFinishedObjects = 3;
775: for (int x = 0; x < sendThisManyHistoryFinishedObjects; x++) {
776: if (retrieveMultipleEJBs)
777: queue.write(dso); // put HistoryFinishedObject at end of queue
778: else
779: ts.sendData(dso); // send HistoryFinishedObject normally
780: }
781: } catch (Exception e) {
782: SessionUtilities.getLoggingInterface().debugMsg(
783: SessionUtilities.getLoggingInterface().ERROR,
784: SessionUtilities.getLoggingInterface().DATABASE,
785: "Problem sending HistoryFinishedObject to client "
786: + ts.getClientKey() + " for Channel "
787: + channelDescription.channelName);
788: e.printStackTrace();
789: }
790:
791: // if xmitting from queue thread, must wait until queue is empty or we cancel before shutting down
792: // this thread, or the HistoryFinishedObject will not get sent to client...
793: while (retrieveMultipleEJBs && queue.size() > 0 && !canceled) {
794: SessionUtilities.getLoggingInterface().debugMsg(
795: SessionUtilities.getLoggingInterface().DEBUG,
796: SessionUtilities.getLoggingInterface().DATABASE,
797: "Thread named " + this .getName()
798: + " will exit when " + queue.size()
799: + " more beans are sent");
800: SessionUtilities.delay(1000);
801: }
802:
803: SessionUtilities.getLoggingInterface().debugMsg(
804: SessionUtilities.getLoggingInterface().DEBUG,
805: SessionUtilities.getLoggingInterface().DATABASE,
806: "Thread named " + this .getName() + " has stopped");
807: }
808:
809: // releases the blocked thread so it can exit or look for more data
810: public synchronized void sendMoreData() {
811: SessionUtilities
812: .getLoggingInterface()
813: .debugMsg(
814: SessionUtilities.getLoggingInterface().DEBUG,
815: SessionUtilities.getLoggingInterface().DATABASE,
816: "More History data has been requested by history xmitter thread...");
817: notifyAll();
818: }
819:
820: public synchronized void waitToGetMoreData() {
821: try {
822: wait();
823: } catch (InterruptedException e) {
824: }
825: }
826:
827: /**
828: * called when data is available from the FIFO, used only when getting multiple EJBs at a time.
829: * This method is called from the FifoQueue thread so any delay here will affect it only.
830: */
831: public void newFifoDataAvailable(Object object) {
832:
833: if (delayCount >= 0) // send data after a fixed delay time
834: {
835: if (object != null)
836: ts.sendData((DataShareObject) object);
837: if (delayCount > 0) {
838: SessionUtilities
839: .getLoggingInterface()
840: .debugMsg(
841: SessionUtilities.getLoggingInterface().DEBUG,
842: SessionUtilities.getLoggingInterface().DATABASE,
843: "delaying " + delayCount
844: + " in newFifoDataAvailable...");
845: SessionUtilities.delay(delayCount);
846: }
847: } else if (delayCount < 0) // send data paced by original data (sort of...)
848: {
849: paceTheData((DataShareObject) object); // will calculate a delay, if needed
850: }
851:
852: if (queue.size() == incrementSize / 2 || queue.size() == 0) // wait until half our queue has been sent (test for zero if last chunk less than half)
853: sendMoreData(); // cause the next chunk of EJBs to be put into our queue
854: }
855:
856: /**
857: * calculates how long to delay before returning so that the next data can be sent, assumes
858: * that data was just sent and that data will be sent again soon after the return. Note that if the
859: * original data had packets seperated by long delays, this algorithm will remove all the delays.
860: * The goal is to send the data with small delays with about the same pacing as it had originally,
861: * but to remove the long delays altogether. Long delay is currently defined below.
862: */
863: private void paceTheData(DataShareObject dso) {
864: long originalDataDelay = 0;
865: long originalDataTime = 0;
866: long systemDelta = 0;
867: long dataSentSystemTime = 0;
868: long longDelay = 2000l; // 2 seconds is a long delay
869:
870: if (dso != null) {
871: originalDataTime = dso.getCreationDate().getTime();
872: originalDataDelay = originalDataTime - previousDataTime; // delta time between original datum
873: systemDelta = System.currentTimeMillis()
874: - lastDataSentSystemTime; // milliseconds since we sent last data
875:
876: if (originalDataDelay > longDelay || // if there was a long delay in original data,
877: systemDelta >= originalDataDelay) // or we have already waited long enough
878: {
879: // then send data immediately
880: ts.sendData(dso);
881: sentWithOutPacing++;
882: dataSentSystemTime = System.currentTimeMillis();
883: } else // we may be delaying before sending data...
884: {
885: int delayTime = new Long(originalDataDelay
886: - systemDelta).intValue();
887: if (sentWithOutPacing++ > preSendThisMany)
888: SessionUtilities.delay(delayTime);
889: ts.sendData(dso);
890: dataSentSystemTime = System.currentTimeMillis();
891: }
892:
893: SessionUtilities
894: .getLoggingInterface()
895: .debugMsg(
896: SessionUtilities.getLoggingInterface().DEBUG,
897: SessionUtilities.getLoggingInterface().DATABASE,
898: "paceTheData: originalDataTime = "
899: + originalDataTime
900: + ", data sent at "
901: + lastDataSentSystemTime
902: + ", dataDelta = "
903: + originalDataDelay
904: + ", our delay = "
905: + (dataSentSystemTime - lastDataSentSystemTime));
906:
907: previousDataTime = originalDataTime; // save it for the next time
908: lastDataSentSystemTime = dataSentSystemTime; // save it for next time too
909: } else
910: SessionUtilities.getLoggingInterface().debugMsg(
911: SessionUtilities.getLoggingInterface().DEBUG,
912: SessionUtilities.getLoggingInterface().DATABASE,
913: "paceTheData received a null DataShareObject");
914: }
915:
916: }
|