001: ///////////////////////////////////////////////////////////////////////////////
002: //
003: // Copyright (C) 2003-@year@ by Thomas M. Hazel, MyOODB (www.myoodb.org)
004: //
005: // All Rights Reserved
006: //
007: // This program is free software; you can redistribute it and/or modify
008: // it under the terms of the GNU General Public License and GNU Library
009: // General Public License as published by the Free Software Foundation;
010: // either version 2, or (at your option) any later version.
011: //
012: // This program is distributed in the hope that it will be useful,
013: // but WITHOUT ANY WARRANTY; without even the implied warranty of
014: // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
015: // GNU General Public License and GNU Library General Public License
016: // for more details.
017: //
018: // You should have received a copy of the GNU General Public License
019: // and GNU Library General Public License along with this program; if
020: // not, write to the Free Software Foundation, 675 Mass Ave, Cambridge,
021: // MA 02139, USA.
022: //
023: ///////////////////////////////////////////////////////////////////////////////
024: package org.myoodb.core;
025:
026: import java.io.*;
027: import java.net.*;
028: import java.util.*;
029:
030: import org.myoodb.util.*;
031: import org.myoodb.core.command.*;
032:
033: public class AbstractDatagramSocket extends AbstractConnection
034: implements java.lang.Runnable {
035: private static final org.myoodb.util.Logger LOGGER = org.myoodb.util.Logger
036: .getLogger(AbstractDatagramSocket.class);
037:
038: private static volatile int s_requestRetryTime = 100;
039: private static volatile int s_requestRetryStepTime = 0;
040: private static volatile int s_responseRetryTime = 1500;
041: private static volatile int s_responseRetryStepTime = 0;
042: private static volatile int s_tunnelKeepAliveTime = 250;
043:
044: private static final java.util.HashMap SERVER_SOCKETS = new java.util.HashMap();
045:
046: protected static final java.util.HashMap CONNECTIONS = new java.util.HashMap();
047: protected static final java.util.HashMap RECEIVE_BUFFERS = new java.util.HashMap();
048:
049: private Thread m_keepAliveThread;
050:
051: protected int m_requestVal;
052: protected long m_sessionIdentifier;
053: protected volatile boolean m_abandon;
054: protected volatile boolean m_realtime;
055: protected volatile long m_lastSentTime;
056: protected volatile long m_requestIdentifier;
057: protected volatile long m_lastRequestIdentifier;
058:
059: protected int m_connectTimeout;
060: protected DatagramSocket m_socket;
061: protected InetSocketAddress m_server;
062:
063: protected LinkedList m_inList;
064: protected byte[] m_outBuffer;
065:
066: private static void startPacketReceiver(
067: final DatagramSocket socket, final AbstractDatabase db) {
068: Runnable packetReceive = new Runnable() {
069: public void run() {
070: byte[] inBuffer = new byte[getBufferSize()];
071:
072: try {
073: while (socket.isConnected() == true) {
074: DatagramPacket packet = new DatagramPacket(
075: inBuffer, 0, inBuffer.length);
076: socket.receive(packet);
077:
078: FastByteArrayInputStream byteStream = new FastByteArrayInputStream(
079: inBuffer, 0, packet.getLength());
080: ObjectInputStream in = getObjectInputStream(
081: byteStream, db);
082:
083: Object[] data = new Object[5];
084: data[0] = in.readShort();
085: data[1] = in.readLong();
086: data[2] = in.readLong();
087:
088: if ((Short) data[0] >= TunnelManager.Protocol.USER_DATA) {
089: data[3] = in.readLong();
090:
091: try {
092: data[4] = in.readObject();
093: } catch (ClassNotFoundException e) {
094: data[4] = e;
095: }
096: }
097:
098: notifyReceiver(data);
099: }
100: } catch (IOException e) {
101: LOGGER.error(null, e);
102: }
103: }
104: };
105:
106: Thread thread = new Thread(packetReceive);
107: thread.setPriority(MyOodbManager.ACCEPT_THREAD_PRIORITY + 1);
108: thread.setDaemon(true);
109: thread.setName("MyOodb Udp Receive Thread: "
110: + socket.getLocalAddress());
111: thread.start();
112: }
113:
114: private static void stopPacketReceiver(final DatagramSocket socket) {
115: socket.disconnect();
116: }
117:
118: protected static void notifyReceiver(Object[] data) {
119: LinkedList inList = null;
120: synchronized (RECEIVE_BUFFERS) {
121: inList = (LinkedList) RECEIVE_BUFFERS.get(data[1]);
122: if (inList == null) {
123: return;
124: }
125: }
126:
127: synchronized (inList) {
128: inList.add(data);
129: inList.notify();
130: }
131: }
132:
133: protected static void setInputObjectStreamContext(
134: ObjectInputStream in, Object[] data) {
135: synchronized (CONNECTIONS) {
136: AbstractDatagramSocket socket = (AbstractDatagramSocket) CONNECTIONS
137: .get(data[1]);
138: if (socket != null) {
139: ((org.myoodb.util.FastObjectInputStream) in)
140: .setContext(socket.getDatabase());
141: }
142: }
143: }
144:
145: public static void addDatagramSocket(String host,
146: DatagramSocket socket) throws UnknownHostException {
147: synchronized (SERVER_SOCKETS) {
148: SERVER_SOCKETS.put(InetAddress.getByName(host)
149: .getHostAddress(),
150: new Object[] { socket, (long) 0 });
151: }
152: }
153:
154: public static DatagramSocket removeDatagramSocket(String host)
155: throws UnknownHostException {
156: DatagramSocket socket = null;
157:
158: synchronized (SERVER_SOCKETS) {
159: Object[] tuple = (Object[]) SERVER_SOCKETS
160: .remove(InetAddress.getByName(host)
161: .getHostAddress());
162: if (tuple != null) {
163: socket = (DatagramSocket) tuple[0];
164: stopPacketReceiver(socket);
165: }
166: }
167:
168: return socket;
169: }
170:
171: public static DatagramSocket getDatagramSocket(String host)
172: throws UnknownHostException {
173: DatagramSocket socket = null;
174:
175: synchronized (SERVER_SOCKETS) {
176: Object[] tuple = (Object[]) SERVER_SOCKETS.get(InetAddress
177: .getByName(host).getHostAddress());
178: if (tuple != null) {
179: socket = (DatagramSocket) tuple[0];
180: }
181: }
182:
183: return socket;
184: }
185:
186: public static void setRequestRetryTime(int time) {
187: s_requestRetryTime = time;
188: }
189:
190: public static int getRequestRetryTime() {
191: return s_requestRetryTime;
192: }
193:
194: public static void setRequestStepTime(int time) {
195: s_requestRetryStepTime = time;
196: }
197:
198: public static int getRequestStepTime() {
199: return s_requestRetryStepTime;
200: }
201:
202: public static void setResponseRetryTime(int time) {
203: s_responseRetryTime = time;
204: }
205:
206: public static int getResponseRetryTime() {
207: return s_responseRetryTime;
208: }
209:
210: public static void setResponseStepTime(int time) {
211: s_responseRetryStepTime = time;
212: }
213:
214: public static int getResponseStepTime() {
215: return s_responseRetryStepTime;
216: }
217:
218: public static void setTunnelKeepAliveTime(int time) {
219: s_tunnelKeepAliveTime = time;
220: }
221:
222: public static int getTunnelKeepAliveTime() {
223: return s_tunnelKeepAliveTime;
224: }
225:
226: public static void setConnectionTimeout(int time) {
227: org.myoodb.core.TunnelManager.setTunnelTimeout(time);
228: }
229:
230: public static int getConnectionTimeout() {
231: return org.myoodb.core.TunnelManager.getTunnelTimeout();
232: }
233:
234: public AbstractDatagramSocket(AbstractDatabase db, Long id,
235: String host, int port, int timeout, boolean secure,
236: boolean passThrough) throws UnknownHostException,
237: SocketException {
238: super (db, id);
239:
240: m_connectTimeout = timeout;
241:
242: if (passThrough == false) {
243: m_requestVal = TunnelManager.Protocol.USER_DATA;
244: } else {
245: m_requestVal = TunnelManager.Protocol.USER_DATA_TUNNEL;
246: }
247:
248: init();
249:
250: m_socket = assignDatagramSocket(host, port);
251:
252: if (org.myoodb.MyOodbDatabase.getGameConfigurationFlag() == true) {
253: synchronized (CONNECTIONS) {
254: CONNECTIONS.put(m_identifier, this );
255: }
256: }
257: }
258:
259: private void start() {
260: if (m_keepAliveThread == null) {
261: m_keepAliveThread = new Thread(this );
262: m_keepAliveThread.setDaemon(true);
263: m_keepAliveThread.setName("MyOodb Udp CleanUp Thread: "
264: + m_socket);
265: m_keepAliveThread.start();
266: }
267: }
268:
269: private boolean internalIsConnected() throws IOException {
270: if (m_socket == null) {
271: throw new IOException("connection has been closed");
272: }
273:
274: return (m_socket != null);
275: }
276:
277: private Object[] getData(int timeout) throws IOException {
278: if (m_inList.size() == 0) {
279: do {
280: try {
281: if (timeout != -1) {
282: synchronized (m_inList) {
283: m_inList.wait(timeout);
284: }
285:
286: if ((m_inList != null)
287: && (m_inList.size() == 0)) {
288: throw new SocketTimeoutException(
289: "Read timed out");
290: }
291: } else {
292: synchronized (m_inList) {
293: m_inList.wait();
294: }
295: }
296:
297: break;
298: } catch (InterruptedException e) {
299: // nothing to do
300: }
301: } while ((m_inList != null) && (m_inList.size() == 0));
302:
303: if (m_socket == null) {
304: throw new IOException("Socket Closed");
305: }
306: }
307:
308: synchronized (m_inList) {
309: return (Object[]) m_inList.removeFirst();
310: }
311: }
312:
313: private DatagramSocket assignDatagramSocket(String host, int port)
314: throws SocketException {
315: Object[] tuple = null;
316:
317: synchronized (SERVER_SOCKETS) {
318: m_server = new InetSocketAddress(host, port);
319:
320: tuple = (Object[]) SERVER_SOCKETS.get(m_server.getAddress()
321: .getHostAddress());
322:
323: if (org.myoodb.MyOodbDatabase.getGameConfigurationFlag() == false) {
324: if (tuple == null) {
325: tuple = new Object[] { new DatagramSocket(),
326: (int) 0 };
327: SERVER_SOCKETS.put(m_server.getAddress()
328: .getHostAddress(), tuple);
329: }
330:
331: if ((Integer) tuple[1] == 0) {
332: int refcount = (Integer) tuple[1];
333: tuple[1] = (int) refcount + 1;
334:
335: DatagramSocket socket = (DatagramSocket) tuple[0];
336: socket.disconnect();
337: socket.connect(m_server);
338:
339: socket.setSoTimeout(0);
340: int IPTOS_RELIABILITY = 0x04;
341: socket.setTrafficClass(IPTOS_RELIABILITY);
342: socket.setSendBufferSize(getBufferSize());
343: socket.setReceiveBufferSize(getBufferSize());
344:
345: startPacketReceiver(socket, m_db);
346: }
347: } else if (tuple == null) {
348: throw new org.myoodb.exception.PermissionException(
349: "No peer registered: " + m_server);
350: }
351: }
352:
353: return (DatagramSocket) tuple[0];
354: }
355:
356: private void unassignDatagramSocket() {
357: synchronized (SERVER_SOCKETS) {
358: InetSocketAddress server = m_server;
359: if (server != null) {
360: m_server = null;
361:
362: Object[] tuple = (Object[]) SERVER_SOCKETS.get(server
363: .getAddress().getHostAddress());
364:
365: if (org.myoodb.MyOodbDatabase
366: .getGameConfigurationFlag() == false) {
367: if (tuple != null) {
368: int refcount = (Integer) tuple[1];
369: tuple[1] = (int) refcount - 1;
370:
371: if ((Integer) tuple[1] == 0) {
372: DatagramSocket socket = (DatagramSocket) tuple[0];
373: stopPacketReceiver(socket);
374: }
375: }
376: }
377: }
378: }
379: }
380:
381: protected void init() throws SocketException {
382: m_keepAliveThread = null;
383:
384: m_abandon = false;
385: m_realtime = false;
386: m_requestIdentifier = -1;
387: m_sessionIdentifier = -1;
388: m_lastRequestIdentifier = -1;
389:
390: m_inList = new LinkedList();
391: m_outBuffer = new byte[getBufferSize()];
392:
393: synchronized (RECEIVE_BUFFERS) {
394: RECEIVE_BUFFERS.put(m_identifier, m_inList);
395: }
396:
397: start();
398: }
399:
400: protected SocketAddress getRemoteSocketAddress() {
401: return m_server;
402: }
403:
404: protected void sendAcknowledge(SocketAddress address,
405: byte[] outBuffer) throws java.io.IOException {
406: m_requestIdentifier++;
407:
408: if (m_abandon == false) {
409: FastByteArrayOutputStream response = new FastByteArrayOutputStream(
410: outBuffer);
411: ObjectOutputStream out = getObjectOutputStream(response);
412: if (org.myoodb.MyOodbDatabase.getGameConfigurationFlag() == true) {
413: out.writeBoolean(true /* server msg */);
414: }
415: out.writeShort(TunnelManager.Protocol.VERIFY_DATA);
416: out.writeLong(m_identifier);
417: out.writeLong(m_requestIdentifier);
418: out.writeLong(m_sessionIdentifier);
419: out.flush();
420:
421: m_socket.send(new DatagramPacket(outBuffer, 0, response
422: .size(), address));
423: m_lastSentTime = System.currentTimeMillis();
424: }
425:
426: m_lastRequestIdentifier = m_requestIdentifier;
427: }
428:
429: public void run() {
430: byte[] outBuffer = new byte[1024];
431:
432: while (m_keepAliveThread != null) {
433: try {
434: Thread.sleep(getTunnelKeepAliveTime());
435:
436: if (m_sessionIdentifier == -1) {
437: continue;
438: } else if ((System.currentTimeMillis() - m_lastSentTime) < getTunnelKeepAliveTime()) {
439: continue;
440: }
441:
442: FastByteArrayOutputStream byteStream = new FastByteArrayOutputStream(
443: outBuffer);
444: ObjectOutputStream out = getObjectOutputStream(byteStream);
445:
446: if (org.myoodb.MyOodbDatabase
447: .getGameConfigurationFlag() == true) {
448: out.writeBoolean(true /* server msg */);
449: }
450: out.writeShort(TunnelManager.Protocol.KEEP_ALIVE);
451: out.writeLong(m_identifier);
452: out.writeLong(m_lastRequestIdentifier);
453: out.writeLong(m_sessionIdentifier);
454: out.flush();
455:
456: m_socket.send(new DatagramPacket(outBuffer, 0,
457: byteStream.size(), m_server));
458: } catch (Exception e1) {
459: try {
460: Thread.sleep(1000);
461: } catch (InterruptedException e2) {
462: // nothing to do
463: }
464: }
465: }
466: }
467:
468: public void send(Object obj) throws IOException {
469: send(obj, -1);
470: }
471:
472: public void send(Object obj, int timeout) throws IOException {
473: if (obj instanceof LoginCommand) {
474: timeout = m_connectTimeout;
475: } else if (obj instanceof InvokeMethodCommand) {
476: m_abandon = (((InvokeMethodCommand) obj).getAccessLevel() == org.myoodb.MyOodbAccess.ABANDON);
477: m_realtime = (((InvokeMethodCommand) obj).getAccessLevel() == org.myoodb.MyOodbAccess.REALTIME);
478: } else {
479: m_abandon = false;
480: m_realtime = false;
481: }
482:
483: FastByteArrayOutputStream byteStream = new FastByteArrayOutputStream(
484: m_outBuffer);
485: ObjectOutputStream out = getObjectOutputStream(byteStream);
486:
487: try {
488: m_requestIdentifier++;
489: if (org.myoodb.MyOodbDatabase.getGameConfigurationFlag() == true) {
490: out.writeBoolean(true /* server msg */);
491: }
492: out.writeShort(m_requestVal);
493: out.writeLong(m_identifier);
494: out.writeLong(m_requestIdentifier);
495: out.writeLong(m_sessionIdentifier);
496: out.writeObject(obj);
497: out.flush();
498:
499: boolean sendRequest = true;
500:
501: int total = 0;
502: int waitTime = getRequestRetryTime();
503: while (internalIsConnected() == true) {
504: if (sendRequest == true) {
505: m_socket.send(new DatagramPacket(m_outBuffer, 0,
506: byteStream.size(), m_server));
507: m_lastSentTime = System.currentTimeMillis();
508:
509: if (m_abandon == true) {
510: break;
511: }
512: }
513:
514: try {
515: Object[] data = getData(waitTime);
516: if ((Long) data[2] == m_requestIdentifier) {
517: if ((Short) data[0] >= TunnelManager.Protocol.USER_DATA) {
518: synchronized (m_inList) {
519: m_inList.add(0, data);
520: }
521: }
522:
523: break;
524: } else if (LOGGER.isDebugEnabled() == true) {
525: LOGGER.debug("udp request invalid: "
526: + m_identifier + ", "
527: + m_requestIdentifier + ", " + data[2]);
528: }
529:
530: sendRequest = false;
531: } catch (SocketTimeoutException e) {
532: total += getRequestRetryTime();
533:
534: if ((m_realtime == false)
535: && (waitTime < getConnectionTimeout())) {
536: waitTime += getRequestStepTime();
537: }
538:
539: if (obj instanceof LogoutCommand) {
540: close();
541: break;
542: } else if ((timeout != -1) && (total >= timeout)) {
543: if (obj instanceof LoginCommand) {
544: throw new ConnectException(
545: "Connect timed out");
546: } else {
547: sendAcknowledge(m_server, m_outBuffer);
548:
549: throw new org.myoodb.exception.TimeoutException(
550: "Receive timed out");
551: }
552: } else if (LOGGER.isDebugEnabled() == true) {
553: LOGGER.debug("udp request retry: "
554: + m_identifier + ", "
555: + m_requestIdentifier);
556: }
557:
558: sendRequest = true;
559: }
560: }
561: } catch (IllegalArgumentException e) {
562: LOGGER.error(e.getMessage() + ": " + m_outBuffer.length
563: + ", " + byteStream.size(), e);
564:
565: throw e;
566: }
567: }
568:
569: public Object receive(int timeout) throws IOException,
570: ClassNotFoundException,
571: org.myoodb.exception.TimeoutException {
572: Object result = null;
573:
574: if ((m_abandon == true) && (timeout == -1)) {
575: timeout = s_responseRetryTime;
576: }
577:
578: try {
579: while (internalIsConnected() == true) {
580: Object[] data = getData(timeout);
581: if ((Long) data[2] == m_requestIdentifier) {
582: if ((Short) data[0] >= TunnelManager.Protocol.USER_DATA) {
583: m_sessionIdentifier = (Long) data[3];
584: m_requestIdentifier++;
585: m_lastRequestIdentifier = m_requestIdentifier;
586: result = data[4];
587: break;
588: }
589: }
590: }
591: } catch (IOException e) {
592: if ((timeout != -1)
593: && (e instanceof SocketTimeoutException)) {
594: sendAcknowledge(m_server, m_outBuffer);
595:
596: if (m_abandon == false) {
597: throw new org.myoodb.exception.TimeoutException(
598: "Receive timed out");
599: }
600: } else {
601: throw e;
602: }
603: }
604:
605: return result;
606: }
607:
608: public void close() throws IOException {
609: m_socket = null;
610: m_keepAliveThread = null;
611: m_sessionIdentifier = -1;
612:
613: unassignDatagramSocket();
614:
615: long identifier = m_identifier;
616: m_identifier = (long) -1;
617:
618: LinkedList inList = m_inList;
619: m_inList = null;
620:
621: if (identifier != -1) {
622: synchronized (RECEIVE_BUFFERS) {
623: RECEIVE_BUFFERS.remove(identifier);
624: }
625:
626: if (org.myoodb.MyOodbDatabase.getGameConfigurationFlag() == true) {
627: synchronized (CONNECTIONS) {
628: CONNECTIONS.remove(identifier);
629: }
630: }
631: }
632:
633: if (inList != null) {
634: synchronized (inList) {
635: inList.notify();
636: }
637: }
638: }
639:
640: public boolean isConnected() {
641: return (m_socket != null);
642: }
643:
644: public DatagramSocket getDatagramSocket() {
645: return m_socket;
646: }
647: }
|