001: ///////////////////////////////////////////////////////////////////////////////
002: //
003: // Copyright (C) 2003-@year@ by Thomas M. Hazel, MyOODB (www.myoodb.org)
004: //
005: // All Rights Reserved
006: //
007: // This program is free software; you can redistribute it and/or modify
008: // it under the terms of the GNU General Public License and GNU Library
009: // General Public License as published by the Free Software Foundation;
010: // either version 2, or (at your option) any later version.
011: //
012: // This program is distributed in the hope that it will be useful,
013: // but WITHOUT ANY WARRANTY; without even the implied warranty of
014: // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
015: // GNU General Public License and GNU Library General Public License
016: // for more details.
017: //
018: // You should have received a copy of the GNU General Public License
019: // and GNU Library General Public License along with this program; if
020: // not, write to the Free Software Foundation, 675 Mass Ave, Cambridge,
021: // MA 02139, USA.
022: //
023: ///////////////////////////////////////////////////////////////////////////////
024: package org.myoodb.core;
025:
026: import java.io.*;
027: import java.net.*;
028:
029: import org.myoodb.util.*;
030: import org.myoodb.core.command.*;
031:
032: public class MyOodbTunnelUdp {
033: private static final org.myoodb.util.Logger LOGGER = org.myoodb.util.Logger
034: .getLogger(MyOodbTunnelUdp.class);
035:
036: public static void start(final DatagramSocket socket) {
037: try {
038: start(null, socket);
039: } catch (java.net.UnknownHostException e) {
040: // ignore compile error
041: }
042: }
043:
044: public static void start(final String host,
045: final DatagramSocket socket)
046: throws java.net.UnknownHostException {
047: Runnable udpService = new Runnable() {
048: public void run() {
049: if (host != null) {
050: try {
051: AbstractDatagramSocket.addDatagramSocket(host,
052: socket);
053: } catch (UnknownHostException e) {
054: LOGGER.error(null, e);
055: }
056: }
057:
058: try {
059: MyOodbTunnelUdp tunnelUdp = new MyOodbTunnelUdp(
060: socket);
061: tunnelUdp.mainloop();
062: } catch (Exception e) {
063: if (socket.isClosed() == false) {
064: LOGGER.error(null, e);
065: }
066: }
067:
068: if (host != null) {
069: try {
070: AbstractDatagramSocket
071: .removeDatagramSocket(host);
072: } catch (UnknownHostException e) {
073: LOGGER.error(null, e);
074: }
075: }
076: }
077: };
078:
079: Thread thread = new Thread(udpService);
080: thread.setPriority(MyOodbManager.ACCEPT_THREAD_PRIORITY + 1);
081: thread.setDaemon(true);
082: thread.setName("MyOodb Udp Mainloop Thread: "
083: + socket.getLocalAddress());
084: thread.start();
085:
086: if (host != null) {
087: while (AbstractDatagramSocket.getDatagramSocket(host) == null) {
088: try {
089: Thread.sleep(100);
090: } catch (java.lang.InterruptedException e) {
091: // nothing to do
092: }
093: }
094: }
095: }
096:
097: private DatagramSocket m_socket;
098: private org.myoodb.MyOodbDatabase m_db;
099: private java.util.LinkedList m_bufferPool;
100: private java.util.concurrent.ExecutorService m_threadManager;
101:
102: public MyOodbTunnelUdp(DatagramSocket socket) throws Exception {
103: m_db = org.myoodb.core.MyOodbManager.getTheManager()
104: .getTunnelManager().getDatabase();
105:
106: m_bufferPool = new java.util.LinkedList();
107: m_threadManager = java.util.concurrent.Executors
108: .newCachedThreadPool(new TransactionThreadFactory());
109:
110: m_socket = socket;
111: m_socket.setSoTimeout(0);
112: int IPTOS_RELIABILITY = 0x04;
113: m_socket.setTrafficClass(IPTOS_RELIABILITY);
114: m_socket.setSendBufferSize(AbstractConnection.getBufferSize());
115: m_socket.setReceiveBufferSize(AbstractConnection
116: .getBufferSize());
117: }
118:
119: protected ObjectInputStream getObjectInputStream(
120: InputStream inputStream) throws java.io.IOException {
121: return new org.myoodb.util.FastObjectInputStream(
122: new BufferedInputStream(inputStream, AbstractConnection
123: .getBufferSize()), m_db);
124: }
125:
126: protected ObjectOutputStream getObjectOutputStream(
127: OutputStream outputStream) throws java.io.IOException {
128: return new org.myoodb.util.FastObjectOutputStream(
129: new BufferedOutputStream(outputStream,
130: AbstractConnection.getBufferSize()), m_db);
131: }
132:
133: private void sendAcknowledge(SocketAddress address, byte[] buffer,
134: long connectIdentifier, long requestIdentifier,
135: long sessionIdentifier) throws java.io.IOException {
136: FastByteArrayOutputStream response = new FastByteArrayOutputStream(
137: buffer);
138:
139: ObjectOutputStream out = getObjectOutputStream(response);
140: if (org.myoodb.MyOodbDatabase.getGameConfigurationFlag() == true) {
141: out.writeBoolean(false /* client msg */);
142: }
143: out.writeShort(TunnelManager.Protocol.VERIFY_DATA);
144: out.writeLong(connectIdentifier);
145: out.writeLong(requestIdentifier);
146: out.writeLong(sessionIdentifier);
147: out.flush();
148:
149: m_socket.send(new DatagramPacket(buffer, 0, response.size(),
150: address));
151: }
152:
153: private void prepareResponse(ObjectOutputStream out,
154: int requestType, long connectIdentifier,
155: long requestIdentifier, long sessionIdentifier,
156: Object result) throws java.io.IOException {
157: //
158: // XXX: since this is outside the context of the database we need to check for concurrency
159: //
160: java.util.ConcurrentModificationException globalError = null;
161:
162: for (int i = 0; i < org.myoodb.core.storage.ClusterStore.CONCURRENCY_RETRY; i++) {
163: try {
164: if (org.myoodb.MyOodbDatabase
165: .getGameConfigurationFlag() == true) {
166: out.writeBoolean(false /* client msg */);
167: }
168: out.writeShort(requestType);
169: out.writeLong(connectIdentifier);
170: out.writeLong(requestIdentifier);
171: out.writeLong(sessionIdentifier);
172: out.writeObject(result);
173: out.flush();
174: globalError = null;
175: break;
176: } catch (java.util.ConcurrentModificationException e) {
177: try {
178: Thread.sleep(100);
179: } catch (java.lang.InterruptedException ee) {
180: // nothing to do
181: }
182:
183: globalError = e;
184: }
185: }
186:
187: if (globalError != null) {
188: if (org.myoodb.MyOodbDatabase.getGameConfigurationFlag() == true) {
189: out.writeBoolean(false /* client msg */);
190: }
191: out.writeShort(requestType);
192: out.writeLong(connectIdentifier);
193: out.writeLong(requestIdentifier);
194: out.writeLong(sessionIdentifier);
195: out.writeObject(new org.myoodb.exception.InternalException(
196: "Concurrency error:", globalError));
197: out.flush();
198: }
199: }
200:
201: public Object[] processRequest(Object[] data, byte[] outBuffer,
202: SocketAddress address) throws java.io.IOException {
203: int gRequestVal = -1;
204: boolean gAbandon = false;
205:
206: long gConnectIdentifier = -1;
207: long gRequestIdentifier = -1;
208: Object[] gResponseTuple = null;
209: Object[] gConnectionTuple = null;
210:
211: try {
212: gRequestVal = (Short) data[0];
213: gConnectIdentifier = (Long) data[1];
214: gRequestIdentifier = (Long) data[2];
215: long sessionIdentifier = (Long) data[3];
216:
217: synchronized (TunnelManager.CONNECTIONS) {
218: gConnectionTuple = (Object[]) TunnelManager.CONNECTIONS
219: .get(sessionIdentifier);
220: }
221:
222: if (gRequestVal >= TunnelManager.Protocol.USER_DATA) {
223: AbstractCommand command = (AbstractCommand) data[4];
224:
225: if (command instanceof InvokeMethodCommand) {
226: gAbandon = (((InvokeMethodCommand) command)
227: .getAccessLevel() == org.myoodb.MyOodbAccess.ABANDON);
228: }
229:
230: org.myoodb.core.DatabaseConnection connection = null;
231: if (gConnectionTuple == null) {
232: if (sessionIdentifier != -1) {
233: throw new org.myoodb.exception.TimeoutException(
234: "Connection timed out");
235: }
236:
237: if ((command instanceof LoginCommand) == false) {
238: throw new org.myoodb.exception.PermissionException(
239: "Permission denied");
240: }
241:
242: connection = new org.myoodb.core.DatabaseConnection(
243: m_db, null, m_db.getHost(), m_db.getPort(),
244: -1, false, null);
245:
246: sessionIdentifier = MyOodbManager.getTheManager()
247: .getIdentifierManager()
248: .getNextConnectionId();
249:
250: gConnectionTuple = new Object[6];
251: gConnectionTuple[0] = connection;
252: gConnectionTuple[1] = address;
253: gConnectionTuple[2] = System.currentTimeMillis();
254: gConnectionTuple[3] = gConnectIdentifier;
255: gConnectionTuple[4] = (long) -1;
256: gConnectionTuple[5] = false;
257:
258: synchronized (TunnelManager.CONNECTIONS) {
259: TunnelManager.CONNECTIONS.put(
260: sessionIdentifier, gConnectionTuple);
261: }
262: } else {
263: if (gConnectionTuple[1].equals(address) == false) {
264: if (LOGGER.isDebugEnabled() == true) {
265: LOGGER.debug("udp unknown request: "
266: + gConnectIdentifier + ", "
267: + gRequestIdentifier + ", "
268: + address);
269: }
270:
271: return null;
272: }
273:
274: connection = (org.myoodb.core.DatabaseConnection) gConnectionTuple[0];
275: gConnectionTuple[2] = System.currentTimeMillis();
276: }
277:
278: synchronized (gConnectionTuple) {
279: if ((gRequestIdentifier != 0)
280: && (gRequestIdentifier < (Long) gConnectionTuple[4])) {
281: return null;
282: } else if ((gRequestIdentifier != 0)
283: && (gRequestIdentifier == (Long) gConnectionTuple[4])) {
284: if (gConnectionTuple[5] == (Boolean) false) {
285: if (LOGGER.isDebugEnabled() == true) {
286: LOGGER.debug("udp response ack: "
287: + gConnectIdentifier + ", "
288: + gRequestIdentifier);
289: }
290:
291: sendAcknowledge(address, outBuffer,
292: gConnectIdentifier,
293: gRequestIdentifier,
294: sessionIdentifier);
295: } else {
296: gConnectionTuple.notify();
297: }
298:
299: return null;
300: }
301:
302: gConnectionTuple[4] = gRequestIdentifier;
303: gConnectionTuple.notify();
304: }
305:
306: Object result = null;
307: synchronized (connection) {
308: boolean tunnel = (gRequestVal == TunnelManager.Protocol.USER_DATA_TUNNEL);
309: if ((tunnel == false)
310: && (command instanceof InvokeMethodCommand)) {
311: tunnel = (((InvokeMethodCommand) command)
312: .getAccessLevel() == org.myoodb.MyOodbAccess.TUNNEL);
313: }
314:
315: if (tunnel == true) {
316: ((AbstractCommand) command)
317: .setTunnelIdentifier(sessionIdentifier);
318:
319: synchronized (TunnelManager.INVOKE_COMMANDS) {
320: TunnelManager.INVOKE_COMMANDS.put(
321: sessionIdentifier, command);
322: }
323: }
324:
325: try {
326: connection.send(command);
327: result = connection.receive(-1);
328:
329: synchronized (gConnectionTuple) {
330: gConnectionTuple[5] = true;
331: }
332: } finally {
333: if (tunnel == true) {
334: synchronized (TunnelManager.INVOKE_COMMANDS) {
335: result = TunnelManager.INVOKE_COMMANDS
336: .remove(sessionIdentifier);
337: }
338: }
339: }
340: }
341:
342: FastByteArrayOutputStream response = new FastByteArrayOutputStream(
343: outBuffer);
344: prepareResponse(getObjectOutputStream(response),
345: gRequestVal, gConnectIdentifier,
346: gRequestIdentifier, sessionIdentifier, result);
347:
348: if (response.size() > outBuffer.length) {
349: String errMsg = command.getClass()
350: + " response greater than max buffer: "
351: + response.size();
352: throw new org.myoodb.exception.PermissionException(
353: errMsg);
354: }
355:
356: gResponseTuple = new Object[] { gAbandon,
357: gRequestIdentifier, gConnectionTuple, response };
358: } else if (gConnectionTuple != null) {
359: gConnectionTuple[2] = System.currentTimeMillis();
360:
361: synchronized (gConnectionTuple) {
362: if (gRequestIdentifier > (Long) gConnectionTuple[4]) {
363: gConnectionTuple[4] = gRequestIdentifier;
364: gConnectionTuple.notify();
365: }
366: }
367: }
368: } catch (Exception e) {
369: if (LOGGER.isDebugEnabled() == true) {
370: LOGGER.debug(null, e);
371: }
372:
373: if (gRequestVal >= TunnelManager.Protocol.USER_DATA) {
374: if ((e instanceof org.myoodb.exception.RemoteException) == false) {
375: e = new org.myoodb.exception.InternalException(e);
376: }
377:
378: FastByteArrayOutputStream response = new FastByteArrayOutputStream(
379: outBuffer);
380: prepareResponse(getObjectOutputStream(response),
381: gRequestVal, gConnectIdentifier,
382: gRequestIdentifier, -1, e);
383: gResponseTuple = new Object[] { gAbandon,
384: gRequestIdentifier, gConnectionTuple, response };
385: }
386: }
387:
388: return gResponseTuple;
389: }
390:
391: public void mainloop() throws IOException {
392: byte[] inBuffer = new byte[AbstractConnection.getBufferSize()];
393:
394: while (MyOodbManager.getTheManagerShuttingdownFlag() == false) {
395: Object[] buffers = null;
396:
397: synchronized (m_bufferPool) {
398: if (m_bufferPool.size() == 0) {
399: buffers = new Object[] {
400: new Object[5],
401: new byte[AbstractConnection.getBufferSize()] };
402: } else {
403: buffers = (Object[]) m_bufferPool.removeFirst();
404: }
405: }
406:
407: DatagramPacket packet = new DatagramPacket(inBuffer, 0,
408: inBuffer.length);
409: m_socket.receive(packet);
410:
411: try {
412: FastByteArrayInputStream byteStream = new FastByteArrayInputStream(
413: inBuffer, 0, packet.getLength());
414: ObjectInputStream in = getObjectInputStream(byteStream);
415:
416: boolean serverMsg = (org.myoodb.MyOodbDatabase
417: .getGameConfigurationFlag() == false) ? true
418: : in.readBoolean();
419:
420: Object[] data = (Object[]) buffers[0];
421: data[0] = in.readShort();
422: data[1] = in.readLong();
423: data[2] = in.readLong();
424: data[3] = in.readLong();
425:
426: if ((Short) data[0] >= TunnelManager.Protocol.USER_DATA) {
427: if (serverMsg == false) {
428: AbstractDatagramSocket
429: .setInputObjectStreamContext(in, data);
430: }
431:
432: data[4] = in.readObject();
433: }
434:
435: if (serverMsg == true) {
436: Runnable runnable = new RequestRunnable(buffers,
437: packet.getSocketAddress());
438: m_threadManager.execute(runnable);
439: } else {
440: synchronized (m_bufferPool) {
441: buffers[0] = new Object[5];
442: m_bufferPool.add(buffers);
443: }
444:
445: AbstractDatagramSocket.notifyReceiver(data);
446: }
447: } catch (Exception e) {
448: LOGGER.error(null, e);
449:
450: synchronized (m_bufferPool) {
451: m_bufferPool.add(buffers);
452: }
453: }
454: }
455: }
456:
457: class RequestRunnable implements Runnable {
458: private Object[] m_buffers;
459: private SocketAddress m_address;
460:
461: public RequestRunnable(Object[] buffers, SocketAddress address) {
462: m_buffers = buffers;
463: m_address = address;
464: }
465:
466: public void run() {
467: Thread.currentThread().setName(
468: "MyOodb Udp Tunnel Request Thread: " + m_address);
469:
470: try {
471: Object[] data = (Object[]) m_buffers[0];
472: byte[] outBuffer = (byte[]) m_buffers[1];
473:
474: Object[] responseTuple = processRequest(data,
475: outBuffer, m_address);
476: if (responseTuple != null) {
477: boolean abandon = (Boolean) responseTuple[0];
478: long requestIdentifier = (Long) responseTuple[1];
479: Object[] connectionTuple = (Object[]) responseTuple[2];
480: FastByteArrayOutputStream response = (FastByteArrayOutputStream) responseTuple[3];
481:
482: if ((abandon == false) && (connectionTuple != null)) {
483: long globalTime = 0;
484: int waitTime = AbstractDatagramSocket
485: .getResponseRetryTime();
486: int holdOffTime = AbstractDatagramSocket
487: .getRequestRetryTime();
488:
489: long connectIdentifier = (Long) connectionTuple[3];
490: DatabaseConnection connection = (DatabaseConnection) connectionTuple[0];
491:
492: do {
493: long currentTime = System
494: .currentTimeMillis();
495: if ((currentTime - globalTime) > holdOffTime) {
496: m_socket.send(new DatagramPacket(
497: outBuffer, 0, response.size(),
498: m_address));
499: }
500: /*
501: else if (LOGGER.isDebugEnabled() == true)
502: {
503: LOGGER.debug("no udp response retry: " + connectIdentifier + ", " + requestIdentifier + ", " + connectionTuple[4]);
504: }
505: */
506:
507: globalTime = currentTime;
508:
509: synchronized (connectionTuple) {
510: try {
511: connectionTuple.wait(waitTime);
512: } catch (java.lang.InterruptedException e) {
513: // nothing to do
514: }
515: }
516:
517: if (requestIdentifier < (Long) connectionTuple[4]) {
518: break;
519: }
520:
521: if (LOGGER.isDebugEnabled() == true) {
522: LOGGER.debug("udp response loop: "
523: + connectIdentifier + ", "
524: + requestIdentifier + ", "
525: + connectionTuple[4]);
526: }
527:
528: if (waitTime < AbstractDatagramSocket
529: .getConnectionTimeout()) {
530: waitTime += AbstractDatagramSocket
531: .getResponseStepTime();
532: }
533: } while (connection.isConnected() == true);
534: } else {
535: m_socket.send(new DatagramPacket(outBuffer, 0,
536: response.size(), m_address));
537: }
538: }
539: } catch (IOException e) {
540: LOGGER.error(null, e);
541: } finally {
542: int bufferReuseSize = TunnelManager.CONNECTIONS.size() * 2;
543:
544: if (org.myoodb.MyOodbDatabase
545: .getGameConfigurationFlag() == true) {
546: bufferReuseSize += AbstractDatagramSocket.RECEIVE_BUFFERS
547: .size();
548: }
549:
550: if (m_bufferPool.size() <= bufferReuseSize) {
551: synchronized (m_bufferPool) {
552: m_bufferPool.add(m_buffers);
553: }
554: }
555: }
556: }
557: }
558:
559: class TransactionThread extends Thread {
560: public TransactionThread(Runnable runnable) {
561: super (runnable);
562: }
563: }
564:
565: class TransactionThreadFactory implements
566: java.util.concurrent.ThreadFactory {
567: public Thread newThread(Runnable runnable) {
568: Thread thread = new TransactionThread(runnable);
569: thread
570: .setPriority(MyOodbManager.TRANSACTION_THREAD_PRIORITY + 1);
571: return thread;
572: }
573: }
574: }
|