001: /*
002: * Copyright (c) xsocket.org, 2006 - 2008. All rights reserved.
003: *
004: * This library is free software; you can redistribute it and/or
005: * modify it under the terms of the GNU Lesser General Public
006: * License as published by the Free Software Foundation; either
007: * version 2.1 of the License, or (at your option) any later version.
008: *
009: * This library is distributed in the hope that it will be useful,
010: * but WITHOUT ANY WARRANTY; without even the implied warranty of
011: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
012: * Lesser General Public License for more details.
013: *
014: * You should have received a copy of the GNU Lesser General Public
015: * License along with this library; if not, write to the Free Software
016: * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
017: *
018: * Please refer to the LGPL license at: http://www.gnu.org/copyleft/lesser.txt
019: * The latest copy of this software may be found on http://www.xsocket.org/
020: */
021: package org.xsocket.connection;
022:
023: import java.io.IOException;
024: import java.io.InputStreamReader;
025: import java.io.LineNumberReader;
026: import java.lang.management.ManagementFactory;
027: import java.net.SocketTimeoutException;
028: import java.nio.BufferUnderflowException;
029: import java.util.LinkedHashMap;
030: import java.util.Map;
031: import java.util.Map.Entry;
032: import java.util.concurrent.CountDownLatch;
033: import java.util.concurrent.TimeUnit;
034: import java.util.logging.Level;
035: import java.util.logging.Logger;
036:
037: import javax.management.JMException;
038: import javax.management.MBeanServer;
039: import javax.management.ObjectName;
040:
041: import org.xsocket.DataConverter;
042:
043: /**
044: * utility class for jmx support
045: *
046: * @author grro@xsocket.org
047: */
048: @SuppressWarnings("unchecked")
049: public final class ConnectionUtils {
050:
051: private static final Logger LOG = Logger
052: .getLogger(ConnectionUtils.class.getName());
053:
054: public static final String DEFAULT_DOMAIN = "org.xsocket.connection";
055: public static final String SERVER_TRHREAD_PREFIX = "xServer";
056:
057: private static String versionInfo = null;
058:
059: private ConnectionUtils() {
060: }
061:
062: /**
063: * validate, based on a leading int length field. The length field will be removed
064: *
065: * @param connection the connection
066: * @return the length
067: * @throws IOException if an exception occurs
068: * @throws BufferUnderflowException if not enough data is available
069: */
070: public static int validateSufficientDatasizeByIntLengthField(
071: INonBlockingConnection connection) throws IOException,
072: BufferUnderflowException {
073:
074: connection.resetToReadMark();
075: connection.markReadPosition();
076:
077: // check if enough data is available
078: int length = connection.readInt();
079: if (connection.available() < length) {
080: if (LOG.isLoggable(Level.FINE)) {
081: LOG.fine("[" + connection.getId()
082: + "]insufficient data. require " + length
083: + " got " + connection.available());
084: }
085: throw new BufferUnderflowException();
086:
087: } else {
088: // ...yes, remove mark
089: connection.removeReadMark();
090: return length;
091: }
092: }
093:
094: /**
095: * validate, based on a leading int length field, that enough data (getNumberOfAvailableBytes() >= length) is available. If not,
096: * an BufferUnderflowException will been thrown. Example:
097: * <pre>
098: * //client
099: * connection.setAutoflush(false); // avoid immediate write
100: * ...
101: * connection.markWritePosition(); // mark current position
102: * connection.write((int) 0); // write "emtpy" length field
103: *
104: * // write and count written size
105: * int written = connection.write(CMD_PUT);
106: * written += ...
107: *
108: * connection.resetToWriteMark(); // return to length field position
109: * connection.write(written); // and update it
110: * connection.flush(); // flush (marker will be removed implicit)
111: * ...
112: *
113: *
114: * // server
115: * class MyHandler implements IDataHandler {
116: * ...
117: * public boolean onData(INonBlockingConnection connection) throws IOException, BufferUnderflowException {
118: * int length = ConnectionUtils.validateSufficientDatasizeByIntLengthField(connection);
119: *
120: * // enough data (BufferUnderflowException hasn`t been thrown)
121: * byte cmd = connection.readByte();
122: * ...
123: * }
124: * }
125: * </pre>
126: *
127: * @param connection the connection
128: * @param removeLengthField true, if length field should be removed
129: * @return the length
130: * @throws IOException if an exception occurs
131: * @throws BufferUnderflowException if not enough data is available
132: */
133: public static int validateSufficientDatasizeByIntLengthField(
134: INonBlockingConnection connection, boolean removeLengthField)
135: throws IOException, BufferUnderflowException {
136:
137: connection.resetToReadMark();
138: connection.markReadPosition();
139:
140: // check if enough data is available
141: int length = connection.readInt();
142: if (connection.available() < length) {
143: if (LOG.isLoggable(Level.FINE)) {
144: LOG.fine("[" + connection.getId()
145: + "]insufficient data. require " + length
146: + " got " + connection.available());
147: }
148: throw new BufferUnderflowException();
149:
150: } else {
151: // ...yes, remove mark
152: if (!removeLengthField) {
153: connection.resetToReadMark();
154: }
155:
156: connection.removeReadMark();
157: return length;
158: }
159: }
160:
161: /**
162: * starts the given server within a dedicated thread. This method blocks
163: * until the server is open. If the server hasn't been started within
164: * 60 sec a timeout exception will been thrown
165: *
166: * @param server the server to start
167: * @throws SocketTimeoutException is the timeout has been reached
168: */
169: public static void start(IServer server)
170: throws SocketTimeoutException {
171: start(server, 60);
172: }
173:
174: /**
175: * starts the given server within a dedicated thread. This method blocks
176: * until the server is open.
177: *
178: * @param server the server to start
179: * @param timeoutSec the maximum time to wait
180: *
181: * @throws SocketTimeoutException is the timeout has been reached
182: */
183: public static void start(IServer server, int timeoutSec)
184: throws SocketTimeoutException {
185:
186: final CountDownLatch startedSignal = new CountDownLatch(1);
187:
188: // create and add startup listener
189: IServerListener startupListener = new IServerListener() {
190:
191: public void onInit() {
192: startedSignal.countDown();
193: };
194:
195: public void onDestroy() {
196: };
197: };
198: server.addListener(startupListener);
199:
200: // start server within a dedicated thread
201: Thread t = new Thread(server);
202: t.setName("xServer");
203: t.start();
204:
205: // wait until server has been started (onInit has been called)
206: boolean isStarted = false;
207: try {
208: isStarted = startedSignal.await(timeoutSec,
209: TimeUnit.SECONDS);
210: } catch (InterruptedException e) {
211: throw new RuntimeException("start signal doesn't occured. "
212: + e.toString());
213: }
214:
215: // timeout occurred?
216: if (!isStarted) {
217: throw new SocketTimeoutException(
218: "start timeout ("
219: + DataConverter
220: .toFormatedDuration((long) timeoutSec * 1000)
221: + ")");
222: }
223:
224: // update thread name
225: t.setName(SERVER_TRHREAD_PREFIX + ":" + server.getLocalPort());
226:
227: // remove the startup listener
228: server.removeListener(startupListener);
229: }
230:
231: /**
232: * creates and registers a mbean for the given server on the platform MBeanServer
233: *
234: * <br/><br/><b>This is a xSocket preview functionality and subject to change. This
235: * method could be removed by the final version</b>
236: *
237: * @param server the server to register
238: * @return the objectName
239: * @throws JMException if an jmx exception occurs
240: */
241: public static ObjectName registerMBean(IServer server)
242: throws JMException {
243: return registerMBean(server, DEFAULT_DOMAIN);
244: }
245:
246: /**
247: * creates and registers a mbean for the given server on the platform MBeanServer
248: * under the given domain name
249: *
250: * <br/><br/><b>This is a xSocket preview functionality and subject to change. This
251: * method could be removed by the final version</b>
252: *
253: * @param server the server to register
254: * @param domain the domain name to use
255: * @return the objectName
256: * @throws JMException if an jmx exception occurs
257: */
258: public static ObjectName registerMBean(IServer server, String domain)
259: throws JMException {
260: return registerMBean(server, domain, ManagementFactory
261: .getPlatformMBeanServer());
262: }
263:
264: /**
265: * creates and registers a mbean for the given server on the given MBeanServer
266: * under the given domain name
267: *
268: * <br/><br/><b>This is a xSocket preview functionality and subject to change. This
269: * method could be removed by the final version</b>
270: *
271: * @param mbeanServer the mbean server to use
272: * @param server the server to register
273: * @param domain the domain name to use
274: * @return the objectName
275: * @throws JMException if an jmx exception occurs
276: */
277: public static ObjectName registerMBean(IServer server,
278: String domain, MBeanServer mbeanServer) throws JMException {
279: return ServerMBeanProxyFactory.createAndRegister(server,
280: domain, mbeanServer);
281: }
282:
283: /**
284: * creates and registers a mbean for the given connection pool on the platform MBeanServer
285: *
286: * <br/><br/><b>This is a xSocket preview functionality and subject to change. This
287: * method could be removed by the final version</b>
288: *
289: * @param pool the pool to register
290: * @return the objectName
291: * @throws JMException if an jmx exception occurs
292: */
293: @SuppressWarnings("unchecked")
294: public static ObjectName registerMBean(IConnectionPool pool)
295: throws JMException {
296: return registerMBean(pool, DEFAULT_DOMAIN);
297: }
298:
299: /**
300: * creates and registers a mbean for the given connection pool on the platform MBeanServer
301: * under the given domain name
302: *
303: * <br/><br/><b>This is a xSocket preview functionality and subject to change. This
304: * method could be removed by the final version</b>
305: *
306: * @param pool the pool to register
307: * @param domain the domain name to use
308: * @return the objectName
309: * @throws JMException if an jmx exception occurs
310: */
311: @SuppressWarnings("unchecked")
312: public static ObjectName registerMBean(IConnectionPool pool,
313: String domain) throws JMException {
314: return registerMBean(pool, domain, ManagementFactory
315: .getPlatformMBeanServer());
316: }
317:
318: /**
319: * creates and registers a mbean for the given pool on the given MBeanServer
320: * under the given domain name
321: *
322: * <br/><br/><b>This is a xSocket preview functionality and subject to change. This
323: * method could be removed by the final version</b>
324: *
325: * @param mbeanServer the mbean server to use
326: * @param pool the pool to register
327: * @param domain the domain name to use
328: * @return the objectName
329: * @throws JMException if an jmx exception occurs
330: */
331: @SuppressWarnings("unchecked")
332: public static ObjectName registerMBean(IConnectionPool pool,
333: String domain, MBeanServer mbeanServer) throws JMException {
334: return ConnectionPoolMBeanProxyFactory.createAndRegister(pool,
335: domain, mbeanServer);
336: }
337:
338: /**
339: * get the version string of xSocket (core)
340: *
341: * @return the version string
342: */
343: public static String getVersionInfo() {
344: if (versionInfo == null) {
345:
346: versionInfo = "<unknown>";
347:
348: try {
349: InputStreamReader isr = new InputStreamReader(
350: ConnectionUtils.class
351: .getResourceAsStream("/org/xsocket/version.txt"));
352: if (isr != null) {
353: LineNumberReader lnr = new LineNumberReader(isr);
354: String line = null;
355: do {
356: line = lnr.readLine();
357: if (line != null) {
358: if (line
359: .startsWith("Implementation-Version=")) {
360: versionInfo = line.substring(
361: "Implementation-Version="
362: .length(),
363: line.length()).trim();
364: }
365: }
366: } while (line != null);
367:
368: lnr.close();
369: }
370: } catch (Exception ignore) {
371: }
372: }
373:
374: return versionInfo;
375: }
376:
377: /**
378: * creates a new bound cache
379: *
380: * @param <T> the map value type
381: * @param maxSize the max size of the cache
382: * @return the new map cache
383: */
384:
385: public static <T> Map<Class, T> newMapCache(int maxSize) {
386: return new MapCache<T>(maxSize);
387: }
388:
389: @SuppressWarnings("unchecked")
390: private static final class MapCache<T> extends
391: LinkedHashMap<Class, T> {
392:
393: private static final long serialVersionUID = 4513864504007457500L;
394:
395: private int maxSize = 0;
396:
397: MapCache(int maxSize) {
398: this .maxSize = maxSize;
399: }
400:
401: @Override
402: protected boolean removeEldestEntry(Entry<Class, T> eldest) {
403: return size() > maxSize;
404: }
405: }
406: }
|