0001: /*
0002: * JacORB - a free Java ORB
0003: *
0004: * Copyright (C) 1997-2004 Gerald Brose.
0005: *
0006: * This library is free software; you can redistribute it and/or
0007: * modify it under the terms of the GNU Library General Public
0008: * License as published by the Free Software Foundation; either
0009: * version 2 of the License, or (at your option) any later version.
0010: *
0011: * This library is distributed in the hope that it will be useful,
0012: * but WITHOUT ANY WARRANTY; without even the implied warranty of
0013: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
0014: * Library General Public License for more details.
0015: *
0016: * You should have received a copy of the GNU Library General Public
0017: * License along with this library; if not, write to the Free
0018: * Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
0019: */
0020:
0021: package org.jacorb.imr;
0022:
0023: import org.apache.avalon.framework.logger.Logger;
0024: import org.apache.avalon.framework.configuration.*;
0025:
0026: import org.omg.GIOP.*;
0027:
0028: import org.jacorb.imr.RegistrationPackage.*;
0029: import org.jacorb.imr.AdminPackage.*;
0030:
0031: import org.jacorb.orb.*;
0032: import org.jacorb.orb.giop.*;
0033: import org.jacorb.orb.iiop.*;
0034: import org.jacorb.orb.listener.NullTCPConnectionListener;
0035:
0036: import org.jacorb.poa.util.POAUtil;
0037:
0038: import org.jacorb.util.ObjectUtil;
0039:
0040: import org.omg.PortableServer.*;
0041:
0042: import java.io.*;
0043: import java.net.ServerSocket;
0044: import java.net.Socket;
0045: import java.net.InetAddress;
0046: import java.lang.reflect.Method;
0047:
0048: /**
0049: * This is the main class of the JacORB implementation repository.
0050: * It keeps track of the registered POAs with lifespan policy
0051: * PERSISTENT and provides a way for migrating and restarting
0052: * the POAS servers.
0053: *
0054: * @author Nicolas Noffke
0055: *
0056: * $Id: ImplementationRepositoryImpl.java,v 1.66 2007/02/06 23:47:53 andre.spiegel Exp $
0057: */
0058:
0059: public class ImplementationRepositoryImpl extends
0060: ImplementationRepositoryPOA {
0061: /**
0062: * <code>orb</code> is the ORB instance for the IMR.
0063: */
0064: private org.omg.CORBA.ORB orb;
0065:
0066: private org.jacorb.config.Configuration configuration = null;
0067:
0068: /** the specific logger for this component */
0069: private Logger logger = null;
0070:
0071: private String iorFile = null;
0072:
0073: /**
0074: * the file containing the serialized server table. Also
0075: * used for writing the table to on shutdown.
0076: */
0077: private File table_file;
0078: private ServerTable server_table;
0079: private File table_file_backup;
0080: private SocketListener listener;
0081: private Thread listenerThread;
0082:
0083: private int object_activation_retries = 5;
0084: private int object_activation_sleep = 50;
0085:
0086: private boolean allow_auto_register = false;
0087: private boolean check_object_liveness = false;
0088:
0089: private long poaActivationTimeout = 120000; //2 min
0090:
0091: private WriteThread wt;
0092: private boolean updatePending;
0093: private Shutdown shutdownThread;
0094:
0095: /**
0096: * The constructor.
0097: * It builds up the server table and starts up the SocketListener thread.
0098: */
0099: public ImplementationRepositoryImpl(org.omg.CORBA.ORB orb) {
0100: this .orb = orb;
0101:
0102: shutdownThread = new Shutdown();
0103: shutdownThread.setDaemon(true);
0104: shutdownThread.setName("Shutdown Thread");
0105: addShutdownHook(shutdownThread);
0106: }
0107:
0108: public void configure(Configuration myConfiguration)
0109: throws ConfigurationException {
0110: configuration = (org.jacorb.config.Configuration) myConfiguration;
0111:
0112: logger = configuration.getNamedLogger("jacorb.imr");
0113:
0114: String defaultTableFile = "table.dat";
0115: String tableFileStr = configuration.getAttribute(
0116: "jacorb.imr.table_file", defaultTableFile);
0117:
0118: //NOTE: deliberate use of ref equivalence check here. I need to find
0119: //out if the default case has taken place, in which case, i assume
0120: //that the default string ref is just passed through.
0121: if (tableFileStr == defaultTableFile) {
0122: if (this .logger.isWarnEnabled()) {
0123: this .logger
0124: .warn("No file for the server table specified! Please configure the property jacorb.imr.table_file!");
0125: this .logger
0126: .warn("Will create \"table.dat\" in current directory, if necessary");
0127: }
0128: }
0129:
0130: table_file = new File(tableFileStr);
0131: boolean _new_table = false;
0132:
0133: // try to open table file
0134: if (!table_file.exists()) {
0135: _new_table = true;
0136: if (this .logger.isInfoEnabled()) {
0137: this .logger.info("Table file " + tableFileStr
0138: + " does not exist - autocreating it.");
0139: }
0140:
0141: try {
0142: table_file.createNewFile();
0143: } catch (IOException ex) {
0144: throw new ConfigurationException(
0145: "Failed to create table file", ex);
0146: }
0147: } else {
0148: if (table_file.isDirectory()) {
0149: throw new ConfigurationException(
0150: "The table file is a directory! Please check "
0151: + table_file.getAbsolutePath());
0152: }
0153:
0154: if (!table_file.canRead()) {
0155: throw new ConfigurationException(
0156: "The table file is not readable! Please check "
0157: + table_file.getAbsolutePath());
0158: }
0159:
0160: if (!table_file.canWrite()) {
0161: throw new ConfigurationException(
0162: "The table file is not writable! Please check "
0163: + table_file.getAbsolutePath());
0164: }
0165: }
0166:
0167: try {
0168: if (_new_table) {
0169: this .server_table = new ServerTable();
0170: save_server_table(table_file);
0171: } else {
0172: try {
0173: ObjectInputStream _in = new ObjectInputStream(
0174: new FileInputStream(table_file));
0175: server_table = (ServerTable) _in.readObject();
0176: _in.close();
0177: } catch (Exception ex) {
0178: this .logger
0179: .warn(
0180: "Failed to read ServerTable -- creating an empty one",
0181: ex);
0182:
0183: server_table = new ServerTable();
0184: save_server_table(table_file);
0185: }
0186: }
0187: } catch (FileOpFailed ex) {
0188: this .logger.error("Failed to read ServerTable", ex);
0189: }
0190:
0191: //should be set. if not, throw
0192: this .iorFile = configuration
0193: .getAttribute("jacorb.imr.ior_file");
0194:
0195: String _backup_file_str = configuration.getAttribute(
0196: "jacorb.imr.backup_file", "");
0197:
0198: //set up server table backup file
0199: if (_backup_file_str.length() == 0) {
0200: this .logger
0201: .warn("No backup file specified!. No backup file will be created");
0202: }
0203:
0204: if (_backup_file_str.length() > 0) {
0205: table_file_backup = new File(_backup_file_str);
0206:
0207: // try to open backup file
0208: if (!table_file_backup.exists()) {
0209: _new_table = true;
0210:
0211: if (this .logger.isInfoEnabled()) {
0212: this .logger.info("Backup file " + _backup_file_str
0213: + " does not exist - autocreating it.");
0214: }
0215:
0216: try {
0217: table_file_backup.createNewFile();
0218: } catch (IOException ex) {
0219: throw new ConfigurationException(
0220: "Failed to create backup file", ex);
0221: }
0222: } else {
0223: if (table_file_backup.isDirectory()) {
0224: throw new ConfigurationException(
0225: "The backup file is a directory! Please check "
0226: + table_file_backup
0227: .getAbsolutePath());
0228: }
0229:
0230: if (!table_file_backup.canRead()) {
0231: throw new ConfigurationException(
0232: "The backup file is not readable! Please check "
0233: + table_file_backup
0234: .getAbsolutePath());
0235: }
0236:
0237: if (!table_file_backup.canWrite()) {
0238: throw new ConfigurationException(
0239: "The backup file is not writable! Please check "
0240: + table_file_backup
0241: .getAbsolutePath());
0242: }
0243: }
0244: }
0245:
0246: this .object_activation_retries = configuration
0247: .getAttributeAsInteger(
0248: "jacorb.imr.object_activation_retries", 5);
0249:
0250: this .object_activation_sleep = configuration
0251: .getAttributeAsInteger(
0252: "jacorb.imr.object_activation_sleep", 50);
0253:
0254: this .allow_auto_register = configuration.getAttributeAsBoolean(
0255: "jacorb.imr.allow_auto_register", false);
0256: this .check_object_liveness = configuration
0257: .getAttributeAsBoolean(
0258: "jacorb.imr.check_object_liveness", false);
0259:
0260: this .poaActivationTimeout = configuration
0261: .getAttributeAsInteger("jacorb.imr.timeout", 120000);
0262:
0263: this .listener = new SocketListener();
0264: this .listener.configure(configuration);
0265:
0266: this .listenerThread = new Thread(listener);
0267: this .listenerThread.setPriority(Thread.MAX_PRIORITY);
0268: this .listenerThread.start();
0269:
0270: this .wt = new WriteThread();
0271: this .wt.setName("IMR Write Thread");
0272: this .wt.setDaemon(true);
0273: this .wt.start();
0274: }
0275:
0276: public String getIORFile() {
0277: return this .iorFile;
0278: }
0279:
0280: // implementation of org.jacorb.imr.RegistrationOperations interface
0281:
0282: /**
0283: * This method sets a server down, i.e. not.active. If a request for
0284: * that server is encountered, the server is tried to be restarted.
0285: *
0286: * @param server the servers name.
0287: * @exception org.jacorb.imr.UnknownServerName No server with name
0288: * <code>server</code> has been registered.
0289: */
0290: public void set_server_down(String server) throws UnknownServerName {
0291: if (this .logger.isDebugEnabled()) {
0292: this .logger.debug("ImR: server " + server
0293: + " is going down... ");
0294: }
0295:
0296: ImRServerInfo _server = server_table.getServer(server);
0297: _server.setDown();
0298: }
0299:
0300: /**
0301: * This method registers a POA. It has actually two functions:
0302: * <ul>
0303: * <li> Register a POA that has not yet been registered. It is the added to the
0304: * server table. </li>
0305: * <li> Reactivating a POA that is not active, but has already an entry
0306: * in the server table</li> </ul>
0307: * The reason for using only one method for those two tasks is that it is
0308: * much more difficult for the ORB, which does the registering, to distinguish
0309: * between an newly created POA and a restarted one.
0310: *
0311: * @param name the POAs name.
0312: * @param server the logical server name of the server the running in.
0313: * @param host the POAs host.
0314: * @param port the POas port.
0315: * @exception org.jacorb.imr.RegistrationPackage.IllegalPOAName the POAs name is not valid.
0316: * @exception org.jacorb.imr.RegistrationPackage.DuplicatePOAName an active POA with
0317: * <code>name</code> is currently registered.
0318: * @exception org.jacorb.imr.UnknownServerName The server has not been registered.
0319: */
0320: public void register_poa(String name, String server, String host,
0321: int port) throws IllegalPOAName, DuplicatePOAName,
0322: UnknownServerName {
0323: ImRServerInfo _server = null;
0324: ImRPOAInfo _poa = null;
0325: boolean remap = false;
0326:
0327: updatePending = true;
0328:
0329: if (this .logger.isDebugEnabled()) {
0330: this .logger.debug("ImR: registering poa " + name
0331: + " for server: " + server + " on " + host);
0332: }
0333:
0334: if (allow_auto_register && !server_table.hasServer(server)) {
0335: try {
0336: register_server(server, "", "");
0337: } catch (IllegalServerName isn) {
0338: //ignore
0339: } catch (DuplicateServerName dsn) {
0340: //ignore
0341: }
0342: }
0343:
0344: _server = server_table.getServer(server);
0345: _poa = server_table.getPOA(name);
0346:
0347: if (_poa == null) {
0348: //New POAInfo is to be created
0349: _poa = new ImRPOAInfo(name, host, port, _server,
0350: poaActivationTimeout);
0351: _server.addPOA(_poa);
0352: server_table.putPOA(name, _poa);
0353:
0354: this .logger.debug("ImR: new poa registered");
0355: } else {
0356: // Existing POA is reactivated
0357:
0358: // Need to check whether the old server is alive. if it is then
0359: // throw an exception otherwise we can remap the currently
0360: // registered name.
0361: if ((_poa.active) || (!server.equals(_poa.server.name))) {
0362: byte[] first = _poa.name.getBytes();
0363: byte[] id = new byte[first.length + 1];
0364: System.arraycopy(first, 0, id, 0, first.length);
0365: id[first.length] = org.jacorb.poa.POAConstants.OBJECT_KEY_SEP_BYTE;
0366:
0367: // If host and port are the same then it must be a replacement as
0368: // we could not have got to here if the original was running - we
0369: // would have got a socket exception.
0370: if (_poa.host.equals(host) && _poa.port == port) {
0371: remap = true;
0372: } else {
0373: // Otherwise try a ping
0374: remap = !(checkServerActive(_poa.host, _poa.port,
0375: id));
0376: }
0377:
0378: if (remap == false) {
0379: throw new DuplicatePOAName("POA " + name
0380: + " has already been registered "
0381: + "for server " + _poa.server.name);
0382: }
0383: this .logger.debug("ImR: Remapping server/port");
0384: }
0385:
0386: _poa.reactivate(host, port);
0387: this .logger.debug("ImR: register_poa, reactivated");
0388: }
0389:
0390: synchronized (wt) {
0391: wt.notify();
0392: }
0393: }
0394:
0395: /**
0396: * Register a new host with a server startup daemon.
0397: * @param host a HostInfo object containing the hosts name and a reference to its
0398: * ServerStartupDaemon object.
0399: *
0400: * @exception org.jacorb.imr.RegistrationPackage.IllegalHostName <code>name</code> is not valid.
0401: * @exception org.jacorb.imr.RegistrationPackage.InvalidSSDRef It was impossible to connect
0402: * to the daemon.
0403: */
0404: public void register_host(HostInfo host) throws IllegalHostName,
0405: InvalidSSDRef {
0406:
0407: if (host.name == null || host.name.length() == 0)
0408: throw new IllegalHostName(host.name);
0409:
0410: try {
0411: host.ssd_ref.get_system_load();
0412: } catch (Exception e) {
0413: this .logger.error("Exception while getting system load", e);
0414: throw new InvalidSSDRef();
0415: }
0416: updatePending = true;
0417:
0418: server_table.putHost(host.name, new ImRHostInfo(host));
0419:
0420: synchronized (wt) {
0421: wt.notify();
0422: }
0423: }
0424:
0425: /**
0426: * Get host and port (wrapped inside an ImRInfo object) of this repository.
0427: * @return the ImRInfo object of this repository.
0428: */
0429:
0430: public ImRInfo get_imr_info() {
0431: return new ImRInfo(listener.getAddress(), listener.getPort());
0432: }
0433:
0434: // implementation of org.jacorb.imr.AdminOperations interface
0435:
0436: /**
0437: * List all hosts currently registered with this repository.
0438: * It is not guaranteed that the references inside the HostInfo
0439: * objects are still valid.
0440: *
0441: * @return an array containing all known hosts.
0442: */
0443: public HostInfo[] list_hosts() {
0444: return server_table.getHosts();
0445: }
0446:
0447: /**
0448: * List all registered server. The ServerInfo objects contain also
0449: * a list of the associated POAs.
0450: *
0451: * @return an array containing all registered servers.
0452: */
0453: public ServerInfo[] list_servers() {
0454: ServerInfo[] servers;
0455:
0456: if (check_object_liveness) {
0457: this .logger.debug("ImR: Checking servers");
0458:
0459: servers = server_table.getServers();
0460:
0461: for (int k = 0; k < servers.length; k++) {
0462: if (servers[k].active && servers[k].poas.length > 0) {
0463: byte[] first = servers[k].poas[0].name.getBytes();
0464: byte[] id = new byte[first.length + 1];
0465: System.arraycopy(first, 0, id, 0, first.length);
0466: id[first.length] = org.jacorb.poa.POAConstants.OBJECT_KEY_SEP_BYTE;
0467:
0468: if (!checkServerActive(servers[k].poas[0].host,
0469: servers[k].poas[0].port, id)) {
0470: try {
0471: if (this .logger.isDebugEnabled()) {
0472: this .logger
0473: .debug("ImR: Setting server "
0474: + servers[k].name
0475: + " down");
0476: }
0477:
0478: // Server is not active so set it down
0479: server_table.getServer(servers[k].name)
0480: .setDown();
0481:
0482: // Save retrieving the list again.
0483: servers[k].active = false;
0484: } catch (UnknownServerName e) {
0485: if (this .logger.isErrorEnabled()) {
0486: this .logger.error(
0487: "ImR: Internal error - unknown server "
0488: + servers[k].name, e);
0489: }
0490: }
0491: }
0492: }
0493: }
0494: } else {
0495: servers = server_table.getServers();
0496: }
0497:
0498: return servers;
0499: }
0500:
0501: /**
0502: * Get the ServerInfo object of a specific server.
0503: *
0504: * @param server the servers name.
0505: * @return the ServerInfo object of the server with name <code>server</code>
0506: * @exception UnknownServerName the server <code>server</code> has not been registered.
0507: */
0508: public ServerInfo get_server_info(String server)
0509: throws UnknownServerName {
0510: return server_table.getServer(server).toServerInfo();
0511: }
0512:
0513: /**
0514: * Register a logical server. The logical server corresponds to a process
0515: * which has a number of POAs.
0516: *
0517: * @param name the servers name.
0518: * @param command the startup command for this server if it should be restarted
0519: * on demand. Has to be empty (NOT null) if the server should not be restarted.
0520: * @param host the host on which the server should be restarted. Should not
0521: * be null, but is ignored if no startup command is specified.
0522: *
0523: * @exception org.jacorb.imr.AdminPackage.IllegalServerName the servers name is not valid.
0524: * @exception org.jacorb.imr.AdminPackage.DuplicateServerName a server with <code>name</code>
0525: * has already been registered.
0526: */
0527: public void register_server(String name, String command, String host)
0528: throws IllegalServerName, DuplicateServerName {
0529: updatePending = true;
0530:
0531: ImRServerInfo _server = new ImRServerInfo(name, host, command);
0532: server_table.putServer(name, _server);
0533:
0534: if (this .logger.isDebugEnabled()) {
0535: this .logger.debug("ImR: server " + name + " on " + host
0536: + " registered");
0537: }
0538:
0539: synchronized (wt) {
0540: wt.notify();
0541: }
0542: }
0543:
0544: /**
0545: * Remove a logical server from the server table. If a server is removed, all of its POAs
0546: * are removed as well.
0547: *
0548: * @param name the servers name.
0549: * @exception org.jacorb.imr.UnknownServerName a server with <code>name</code> has not been registered.
0550: */
0551: public void unregister_server(String name) throws UnknownServerName {
0552: updatePending = true;
0553:
0554: ImRServerInfo _server = server_table.getServer(name);
0555: String[] _poas = _server.getPOANames();
0556:
0557: // remove POAs
0558: for (int _i = 0; _i < _poas.length; _i++)
0559: server_table.removePOA(_poas[_i]);
0560:
0561: server_table.removeServer(name);
0562:
0563: if (this .logger.isDebugEnabled()) {
0564: this .logger.debug("ImR: server " + name + " unregistered");
0565: }
0566:
0567: synchronized (wt) {
0568: wt.notify();
0569: }
0570: }
0571:
0572: /**
0573: * Updates the server with a new command and host. For migrating purposes.
0574: *
0575: * @param name the servers name.
0576: * @param command the new startup command for this server.
0577: * @param host the new host.
0578: * @exception UnknownServerName a server with <code>name</code>
0579: * has not been registered.
0580: */
0581: public void edit_server(String name, String command, String host)
0582: throws UnknownServerName {
0583: updatePending = true;
0584:
0585: ImRServerInfo _server = server_table.getServer(name);
0586:
0587: _server.command = command;
0588: _server.host = host;
0589:
0590: if (this .logger.isDebugEnabled()) {
0591: this .logger.debug("ImR: server " + name + " edited");
0592: }
0593:
0594: synchronized (wt) {
0595: wt.notify();
0596: }
0597: }
0598:
0599: /**
0600: * Hold a server. This causes all requests for this server to be delayed
0601: * until it is released. Holding a server is useful for migrating or
0602: * maintaining it. There is not timeout set, so requests might be delayed
0603: * indefinetly (or, at least, until the communication layer protests;-).
0604: *
0605: * @param name the servers name.
0606: * @exception org.jacorb.imr.UnknownServerName a server with <code>name</code> has not been registered.
0607: */
0608: public void hold_server(String name) throws UnknownServerName {
0609: ImRServerInfo _server = server_table.getServer(name);
0610: _server.holding = true;
0611: }
0612:
0613: /**
0614: * Release a server from state "holding".
0615: *
0616: * @param name the servers name.
0617: * @exception org.jacorb.imr.UnknownServerName a server with <code>name</code> has not been registered.
0618: */
0619: public void release_server(String name) throws UnknownServerName {
0620: ImRServerInfo _server = server_table.getServer(name);
0621: _server.release();
0622: }
0623:
0624: /**
0625: * Start a server.
0626: *
0627: * @param name the servers name.
0628: * @exception org.jacorb.imr.UnknownServerName a server with <code>name</code>
0629: * has not been registered.
0630: */
0631: public void start_server(String name) throws UnknownServerName,
0632: ServerStartupFailed {
0633: restartServer(server_table.getServer(name));
0634: }
0635:
0636: /**
0637: * Save the server table to a backup file.
0638: * @exception org.jacorb.imr.AdminPackage.FileOpFailed something went wrong.
0639: */
0640: public void save_server_table() throws FileOpFailed {
0641: if (table_file_backup != null) {
0642: save_server_table(table_file_backup);
0643: }
0644: }
0645:
0646: /**
0647: * Shut the repository down orderly, i.e. with saving of the server table.
0648: * The actual shutdown is done in the SocketListener thread because, if
0649: * done from here, the orb wont shut don correctly because this connection
0650: * is still active. (See end of SocketListener.run())
0651: *
0652: * @param wait wait_for_completion (from ORB.shutdown()). If false, then the ORB
0653: * is forced down, ignoring any open connection.
0654: */
0655: public void shutdown(boolean wait) {
0656: synchronized (wt) {
0657: wt.shutdown();
0658: wt.notify();
0659: }
0660: if (listener != null) {
0661: listener.stopListening(wait);
0662: try {
0663: synchronized (listener) {
0664: // Wait at most 5 seconds for the listener to shutdown.
0665: listenerThread.join(5000);
0666: }
0667: } catch (InterruptedException e) {
0668: this .logger.warn("shutdown wait was interrupted", e);
0669: }
0670: }
0671: try {
0672: save_server_table();
0673: } catch (FileOpFailed f) {
0674: this .logger.error("ImR: Failed to save backup table.", f);
0675: }
0676: this .logger.debug("ImR: Finished shutting down");
0677: }
0678:
0679: /**
0680: * Remove a host from the servertable. Hosts are removed
0681: * automatically on server startup, if they can't be accessed.
0682: *
0683: * @param name the hosts name.
0684: * @exception UnknownHostName no host with that name known.
0685: */
0686: public void unregister_host(String name) throws UnknownHostName {
0687: if (server_table.removeHost(name) == null)
0688: throw new UnknownHostName(name);
0689: }
0690:
0691: /**
0692: * Convenience method which does the actual serialization.
0693: *
0694: * @param save_to the file where to write to.
0695: */
0696: private void save_server_table(File save_to) throws FileOpFailed {
0697: try {
0698: ObjectOutputStream _out = new ObjectOutputStream(
0699: new FileOutputStream(save_to));
0700:
0701: server_table.table_lock.gainExclusiveLock();
0702: _out.writeObject(server_table);
0703: server_table.table_lock.releaseExclusiveLock();
0704:
0705: _out.flush();
0706: _out.close();
0707: } catch (Exception e) {
0708: this .logger.error("Exception while saving server table", e);
0709: throw new FileOpFailed();
0710: }
0711: updatePending = false;
0712: }
0713:
0714: /**
0715: * Prints the usage screen and exits.
0716: */
0717: public static void usage() {
0718: System.out
0719: .println("Usage: The following properties are useful in conjunction with the \nImplementationRepository:");
0720: System.out
0721: .println("\t \"jacorb.imr.endpoint_host\" Address to listen on for requests");
0722: System.out
0723: .println("\t \"jacorb.imr.endpoint_port\" Port to listen on for requests");
0724: System.out
0725: .println("\t \"jacorb.imr.table_file\" The file to store the server table into");
0726: System.out
0727: .println("\t \"jacorb.imr.backup_file\" The file to store the server table backup into");
0728: System.out
0729: .println("\t \"jacorb.imr.ior_file\" The file to store the ImRs IOR into");
0730: System.out
0731: .println("\t \"jacorb.imr.allow_auto_register\" if set to \"on\", servers that don't \n\talready have an entry on their first call to the imr, will get \n\tautomatically registered. Otherwise, an UnknownServer exception \n\tis thrown.");
0732: System.exit(0);
0733: }
0734:
0735: /**
0736: * The main method. "Parses" the arguments and sets the corresponding
0737: * attributes up, creates a new ImplementationRepositoryImpl instance and
0738: * runs the ORB.
0739: */
0740: public static void main(String[] args) {
0741: // translate any properties set on the commandline but after the
0742: // class name to a properties
0743: java.util.Properties argProps = ObjectUtil.argsToProps(args);
0744: argProps.setProperty("jacorb.implname", "the_ImR");
0745: argProps.setProperty("jacorb.use_imr", "off");
0746:
0747: boolean printIOR = false;
0748:
0749: for (int i = 0; i < args.length; i++) {
0750: if ("-printIOR".equals(args[i])) {
0751: printIOR = true;
0752: }
0753: }
0754:
0755: //Write IOR to file
0756: try {
0757: org.omg.CORBA.ORB orb = org.omg.CORBA.ORB.init(args,
0758: argProps);
0759:
0760: ImplementationRepositoryImpl _imr = new ImplementationRepositoryImpl(
0761: orb);
0762: _imr.configure(((org.jacorb.orb.ORB) orb)
0763: .getConfiguration());
0764:
0765: POA root_poa = POAHelper.narrow(orb
0766: .resolve_initial_references("RootPOA"));
0767: root_poa.the_POAManager().activate();
0768:
0769: org.omg.CORBA.Policy[] policies = new org.omg.CORBA.Policy[2];
0770:
0771: policies[0] = root_poa
0772: .create_lifespan_policy(LifespanPolicyValue.PERSISTENT);
0773: policies[1] = root_poa
0774: .create_id_assignment_policy(IdAssignmentPolicyValue.USER_ID);
0775:
0776: POA imr_poa = root_poa.create_POA("ImRPOA", root_poa
0777: .the_POAManager(), policies);
0778:
0779: for (int i = 0; i < policies.length; i++) {
0780: policies[i].destroy();
0781: }
0782:
0783: byte[] id = "ImR".getBytes();
0784:
0785: imr_poa.activate_object_with_id(id, _imr);
0786:
0787: PrintWriter _out = new PrintWriter(new FileOutputStream(
0788: new File(_imr.getIORFile())));
0789:
0790: final org.omg.CORBA.Object imrReference = imr_poa
0791: .servant_to_reference(_imr);
0792: _out.println(orb.object_to_string(imrReference));
0793: _out.flush();
0794: _out.close();
0795:
0796: if (printIOR) {
0797: System.out.println("SERVER IOR: "
0798: + orb.object_to_string(imrReference));
0799: System.out.flush();
0800: }
0801:
0802: orb.run();
0803: } catch (Exception _e) {
0804: _e.printStackTrace();
0805: usage();
0806: System.exit(1);
0807: }
0808: }
0809:
0810: private void restartServer(ImRServerInfo server)
0811: throws ServerStartupFailed {
0812: // server might be holding
0813: server.awaitRelease();
0814:
0815: if (!server.active) {
0816: if (this .logger.isDebugEnabled()) {
0817: this .logger.debug("ImR: server " + server.name
0818: + " is down");
0819: }
0820:
0821: if (server.command.length() == 0) {
0822: //server can't be restarted, send exception
0823: throw new ServerStartupFailed("Server " + server.name
0824: + " can't be restarted because"
0825: + " of missing startup command");
0826: }
0827:
0828: // we have to synchronize here to avoid a server to be
0829: // restarted multiple times by requests that are
0830: // received in the gap between the first try to
0831: // restart and the reactivation of the POAs.
0832: // restarting is set back to false when the first POA
0833: // is reactivated and the server goes back to active
0834: // (see ImRPOAInfo.reactivate()).
0835: if (server.shouldBeRestarted()) {
0836: try {
0837: // If there is no SSD for the host, we get an
0838: // NullPointerException. In a further
0839: // version, we might choose another random
0840: // SSD.
0841: ImRHostInfo _host = server_table
0842: .getHost(server.host);
0843:
0844: if (_host == null) {
0845: throw new ServerStartupFailed(
0846: "Unknown host: >>" + server.host + "<<");
0847: }
0848:
0849: if (this .logger.isDebugEnabled()) {
0850: this .logger.debug("ImR: will restart "
0851: + server.name);
0852: }
0853:
0854: _host.startServer(server.command, orb);
0855: } catch (ServerStartupFailed ssf) {
0856: server.setNotRestarting();
0857:
0858: throw ssf;
0859: } catch (Exception e) {
0860: server.setNotRestarting();
0861:
0862: this .logger.error(
0863: "Exception while restarting server", e);
0864:
0865: // sth wrong with daemon, remove from table
0866: server_table.removeHost(server.host);
0867:
0868: throw new ServerStartupFailed(
0869: "Failed to connect to host!");
0870: }
0871: } else {
0872: if (this .logger.isDebugEnabled()) {
0873: this .logger
0874: .debug("ImR: somebody else is restarting "
0875: + server.name);
0876: }
0877: }
0878: } else {
0879: if (this .logger.isDebugEnabled()) {
0880: this .logger.debug("ImR: server " + server.name
0881: + " is active");
0882: }
0883: }
0884: }
0885:
0886: // Shutdown hook methods are done via reflection as these were
0887: // not supported prior to the JDK 1.3.
0888:
0889: private void addShutdownHook(Thread thread) {
0890: Method method = getHookMethod("addShutdownHook");
0891:
0892: if (method != null) {
0893: invokeHookMethod(method, thread);
0894: }
0895: }
0896:
0897: private Method getHookMethod(String name) {
0898: Method method = null;
0899: Class[] params = new Class[1];
0900:
0901: params[0] = Thread.class;
0902: try {
0903: method = Runtime.class.getMethod(name, params);
0904: } catch (Throwable ex) {
0905: }
0906:
0907: return method;
0908: }
0909:
0910: private void invokeHookMethod(Method method, Thread thread) {
0911: Object[] args = new Object[1];
0912:
0913: args[0] = thread;
0914: try {
0915: method.invoke(Runtime.getRuntime(), args);
0916: } catch (Throwable ex) {
0917: if (this .logger.isErrorEnabled()) {
0918: this .logger.error("Failed to invoke Runtime."
0919: + method.getName(), ex);
0920: }
0921: }
0922: }
0923:
0924: /**
0925: * Inner class SocketListener, responsible for accepting
0926: * connection requests. *Very* close to inner class Listener in
0927: * orb/BasicAdapter.java.
0928: * <br> When a connection is accepted a
0929: * new RequestReceptor thread is started.
0930: */
0931: private class SocketListener implements Runnable {
0932: private ServerSocket server_socket;
0933: private int port = 0;
0934: private String address;
0935: private int timeout = 0;
0936: private boolean run = true;
0937: private boolean wait = false;
0938:
0939: private MessageReceptorPool receptor_pool = null;
0940: private RequestListener request_listener = null;
0941: private ReplyListener reply_listener = null;
0942:
0943: private TransportManager transport_manager = null;
0944:
0945: /**
0946: * The constructor. It sets up the ServerSocket and starts the thread.
0947: */
0948: public SocketListener() {
0949: request_listener = new ImRRequestListener();
0950: reply_listener = new NoBiDirServerReplyListener();
0951: }
0952:
0953: public void configure(Configuration myConfiguration)
0954: throws ConfigurationException {
0955: // Moved from the constructor to facilitate logging.
0956: receptor_pool = new MessageReceptorPool("server",
0957: "ImplementationRepository", myConfiguration);
0958:
0959: try {
0960: int endpoint_port = configuration
0961: .getAttributeAsInteger(
0962: "jacorb.imr.endpoint_port_number", 0);
0963:
0964: String endpoint_host = configuration.getAttribute(
0965: "jacorb.imr.endpoint_host", "");
0966:
0967: if (endpoint_host.length() > 0) {
0968: server_socket = new ServerSocket(endpoint_port, 50, //default backlog, see jdk doc
0969: InetAddress.getByName(endpoint_host));
0970: } else {
0971: //no explicit address given, listen anywhere
0972: server_socket = new ServerSocket(endpoint_port);
0973: }
0974:
0975: org.jacorb.orb.dns.DNSLookup lookup = new org.jacorb.orb.dns.DNSLookup();
0976: lookup.configure(configuration);
0977:
0978: // First deal with DNS; if we are not using DNS do fallback.
0979: if (endpoint_host.length() > 0) {
0980: address = lookup.inverseLookup(InetAddress
0981: .getByName(endpoint_host));
0982: } else {
0983: address = lookup.inverseLookup(InetAddress
0984: .getLocalHost());
0985: }
0986:
0987: if (address == null) {
0988: if (endpoint_host.length() > 0) {
0989: address = endpoint_host;
0990: } else {
0991: address = InetAddress.getLocalHost().toString();
0992: }
0993: }
0994:
0995: if (address.indexOf("/") >= 0)
0996: address = address
0997: .substring(address.indexOf("/") + 1);
0998:
0999: port = server_socket.getLocalPort();
1000:
1001: if (logger.isDebugEnabled()) {
1002: logger.debug("ImR Listener at " + port + ", "
1003: + address);
1004: }
1005: } catch (Exception e) {
1006: throw new ConfigurationException(
1007: "Listener: Couldn't init", e);
1008: }
1009:
1010: this .transport_manager = new TransportManager(
1011: (org.jacorb.orb.ORB) orb);
1012: this .transport_manager.configure(configuration);
1013: }
1014:
1015: /**
1016: * Get the port this SocketListener is listening on.
1017: *
1018: * @return the port
1019: */
1020: public int getPort() {
1021: return port;
1022: }
1023:
1024: /**
1025: * The internet address of the Socket this thread is listening on.
1026: *
1027: * @return the address of the socket.
1028: */
1029: public String getAddress() {
1030: return address;
1031: }
1032:
1033: /**
1034: * Set the connection timeout.
1035: *
1036: * @param timeout the timeout.
1037: */
1038: public void setTimeout(int timeout) {
1039: this .timeout = timeout;
1040: }
1041:
1042: /**
1043: * The threads main event loop. Listenes on the socket
1044: * and starts new RequestReceptor threads on accepting.
1045: * <br> On termination does the actual shutdown of the
1046: * repository.
1047: */
1048: public void run() {
1049: while (run) {
1050: try {
1051: Socket socket = server_socket.accept();
1052: socket.setSoTimeout(timeout);
1053:
1054: org.jacorb.orb.iiop.ServerIIOPConnection transport = new ServerIIOPConnection(
1055: socket, false,
1056: new NullTCPConnectionListener()); // TODO // no SSL
1057: transport.configure(configuration);
1058:
1059: GIOPConnection connection = new ServerGIOPConnection(
1060: transport.get_server_profile(), transport,
1061: request_listener, reply_listener, null,
1062: null);
1063: connection.configure(configuration);
1064: receptor_pool.connectionCreated(connection);
1065: } catch (Exception _e) {
1066: // when finishing, we do a close() on
1067: // server_socket from "outside" and that causes an
1068: // exception here. But since we wanted it this
1069: // way, we don't display the Exception to avoid
1070: // confusing users.
1071: if (run) {
1072: logger.debug(
1073: "Internal Exception, can be ignored",
1074: _e);
1075: }
1076: }
1077: }
1078:
1079: // doing the actual shutdown of the implementation
1080: // repository here
1081: orb.shutdown(wait);
1082: }
1083:
1084: /**
1085: * Causes the event loop to terminate by closing the ServerSocket.
1086: *
1087: * @param wait for ORB.shutdown().
1088: */
1089: public void stopListening(boolean wait) {
1090: run = false;
1091: this .wait = wait;
1092:
1093: try {
1094: server_socket.close();
1095: } catch (Exception _e) {
1096: logger.error("Exception while closing server socket",
1097: _e);
1098: }
1099: }
1100: }
1101:
1102: private boolean checkServerActive(String host, int port,
1103: byte[] object_key) {
1104: ClientConnectionManager cm = null;
1105: IIOPAddress address = null;
1106: ClientConnection connection = null;
1107: LocateRequestOutputStream lros = null;
1108: LocateReplyReceiver receiver = null;
1109: LocateReplyInputStream lris = null;
1110: boolean result = false;
1111:
1112: cm = ((org.jacorb.orb.ORB) orb).getClientConnectionManager();
1113: try {
1114: address = new IIOPAddress(host, port);
1115: address.configure(configuration);
1116:
1117: IIOPProfile iiopProfile = new IIOPProfile(address,
1118: object_key);
1119: iiopProfile.configure(configuration);
1120:
1121: connection = cm.getConnection(iiopProfile);
1122: } catch (ConfigurationException e) {
1123: logger.error("Failed to configure", e);
1124: }
1125:
1126: if (this .logger.isDebugEnabled()) {
1127: this .logger.debug("Pinging " + host + " / " + port);
1128: }
1129:
1130: try {
1131: lros = new LocateRequestOutputStream(object_key, connection
1132: .getId(), 2);
1133: receiver = new LocateReplyReceiver((org.jacorb.orb.ORB) orb);
1134:
1135: connection.sendRequest(lros, receiver, lros.getRequestId(),
1136: true); // response expected
1137:
1138: lris = receiver.getReply();
1139:
1140: switch (lris.rep_hdr.locate_status.value()) {
1141: case LocateStatusType_1_2._UNKNOWN_OBJECT:
1142: case LocateStatusType_1_2._OBJECT_HERE:
1143: case LocateStatusType_1_2._OBJECT_FORWARD:
1144: case LocateStatusType_1_2._OBJECT_FORWARD_PERM:
1145: case LocateStatusType_1_2._LOC_SYSTEM_EXCEPTION:
1146: case LocateStatusType_1_2._LOC_NEEDS_ADDRESSING_MODE:
1147: default: {
1148: result = true;
1149: break;
1150: }
1151: }
1152: } catch (Throwable ex) {
1153: this .logger.debug("Exception while checking server active",
1154: ex);
1155:
1156: result = false;
1157: } finally {
1158: cm.releaseConnection(connection);
1159: }
1160: return result;
1161: }
1162:
1163: /**
1164: * Inner class ImRRequestListener. Receives messages.
1165: */
1166: private class ImRRequestListener implements RequestListener {
1167: public ImRRequestListener() {
1168: }
1169:
1170: /**
1171: * receive and dispatch requests
1172: *
1173: * @param request a <code>byte[]</code> value
1174: * @param connection a <code>GIOPConnection</code> value
1175: */
1176: public void requestReceived(byte[] request,
1177: GIOPConnection connection) {
1178: logger.debug("requestReceived");
1179: connection.incPendingMessages();
1180:
1181: final RequestInputStream in = new RequestInputStream(orb,
1182: request);
1183:
1184: try {
1185: replyNewLocation(((org.jacorb.orb.ORB) orb)
1186: .mapObjectKey(ParsedIOR.extractObjectKey(
1187: in.req_hdr.target,
1188: (org.jacorb.orb.ORB) orb)),
1189: in.req_hdr.request_id, in.getGIOPMinor(),
1190: connection, false);
1191: } finally {
1192: in.close();
1193: }
1194: }
1195:
1196: public void locateRequestReceived(byte[] request,
1197: GIOPConnection connection) {
1198: connection.incPendingMessages();
1199:
1200: final LocateRequestInputStream in = new LocateRequestInputStream(
1201: orb, request);
1202:
1203: try {
1204: replyNewLocation(ParsedIOR.extractObjectKey(
1205: in.req_hdr.target, (org.jacorb.orb.ORB) orb),
1206: in.req_hdr.request_id, in.getGIOPMinor(),
1207: connection, true);
1208: } finally {
1209: in.close();
1210: }
1211: }
1212:
1213: public void cancelRequestReceived(byte[] request,
1214: GIOPConnection connection) {
1215: //ignore
1216: }
1217:
1218: public void fragmentReceived(byte[] fragment,
1219: GIOPConnection connection) {
1220: //ignore
1221: }
1222:
1223: public void connectionClosed() {
1224: }
1225:
1226: /**
1227: * The actual core method of the implementation repository.
1228: * Causes servers to start, looks up new POA locations in
1229: * the server table.
1230: */
1231: private void replyNewLocation(byte[] object_key,
1232: int request_id, int giop_minor,
1233: GIOPConnection connection, boolean isLocateRequest) {
1234: String _poa_name = POAUtil.extractImplName(object_key)
1235: + '/' + POAUtil.extractPOAName(object_key);
1236:
1237: // look up POA in table
1238: ImRPOAInfo _poa = server_table.getPOA(_poa_name);
1239: if (_poa == null) {
1240: sendSysException(new org.omg.CORBA.TRANSIENT("POA "
1241: + _poa_name + " unknown"), connection,
1242: request_id, giop_minor);
1243: return;
1244: }
1245:
1246: // get server of POA
1247: ImRServerInfo _server = _poa.server;
1248:
1249: if (logger.isDebugEnabled()) {
1250: logger.debug("ImR: Looking up: " + _server.name);
1251: }
1252:
1253: // There is only point pinging the remote object if server
1254: // is active and either the QoS to ping returned objects
1255: // is true or the ServerStartUpDaemon is active and there
1256: // is a command to run - if not, even if the server isn't
1257: // actually active, we can't restart it so just allow this
1258: // to fall through and throw the TRANSIENT below.
1259: boolean ssd_valid = ((_server.command.length() != 0) && (server_table
1260: .getHost(_server.host) != null));
1261:
1262: if (_server.active && (check_object_liveness || ssd_valid)) {
1263: // At this point the server *might* be running - we
1264: // just want to verify it.
1265: if (!checkServerActive(_poa.host, _poa.port, object_key)) {
1266: // Server is not active so set it down
1267: _server.setDown();
1268: }
1269: }
1270:
1271: try {
1272: restartServer(_server);
1273: } catch (ServerStartupFailed ssf) {
1274: if (logger.isInfoEnabled()) {
1275: logger.info("Object (" + _server.name + ") on "
1276: + _poa.host + '/' + _poa.port
1277: + " not reachable");
1278: }
1279:
1280: sendSysException(
1281: new org.omg.CORBA.TRANSIENT(ssf.reason),
1282: connection, request_id, giop_minor);
1283: return;
1284: }
1285:
1286: // POA might not be active
1287: boolean _old_poa_state = _poa.active;
1288:
1289: // wait for POA to be reregistered.
1290: if (!_poa.awaitActivation()) {
1291: // timeout reached
1292: sendSysException(new org.omg.CORBA.TRANSIENT(
1293: "Timeout exceeded"), connection, request_id,
1294: giop_minor);
1295: return;
1296: }
1297:
1298: ReplyOutputStream out = new ReplyOutputStream(request_id,
1299: org.omg.GIOP.ReplyStatusType_1_2.LOCATION_FORWARD,
1300: giop_minor, isLocateRequest, logger);
1301:
1302: // The typecode is for org.omg.CORBA.Object, but avoiding
1303: // creation of new ObjectHolder Instance.
1304: IIOPAddress addr = new IIOPAddress(_poa.host,
1305: (short) _poa.port);
1306: org.omg.IOP.IOR _ior = null;
1307: try {
1308: addr.configure(configuration);
1309: IIOPProfile p = new IIOPProfile(addr, object_key,
1310: giop_minor);
1311: p.configure(configuration);
1312: _ior = ParsedIOR.createObjectIOR(p);
1313: } catch (ConfigurationException e) {
1314: logger.error("Error while configuring address/profile",
1315: e);
1316: }
1317:
1318: if (!_old_poa_state) {
1319: // if POA has been reactivated, we have to wait for
1320: // the requested object to become ready again. This is
1321: // for avoiding clients to get confused by
1322: // OBJECT_NOT_EXIST exceptions which they might get
1323: // when trying to contact the server too early.
1324:
1325: org.omg.CORBA.Object _object = orb
1326: .string_to_object((new ParsedIOR(
1327: (org.jacorb.orb.ORB) orb, _ior))
1328: .getIORString());
1329:
1330: // Sort of busy waiting here, no other way possible
1331: for (int _i = 0; _i < object_activation_retries; _i++) {
1332: try {
1333: Thread.sleep(object_activation_sleep);
1334:
1335: // This will usually throw an OBJECT_NOT_EXIST
1336: if (!_object._non_existent()) // "CORBA ping"
1337: {
1338: break;
1339: }
1340: } catch (Exception _e) {
1341: logger.info(
1342: "Exception while waiting for object",
1343: _e);
1344: }
1345: }
1346: }
1347:
1348: try {
1349: // write new location to stream
1350: out.write_IOR(_ior);
1351:
1352: if (logger.isDebugEnabled()) {
1353: logger.debug("ImR: Sending location forward for "
1354: + _server.name);
1355: }
1356:
1357: connection.sendReply(out);
1358: } catch (IOException _e) {
1359: logger
1360: .error("Exception while writing new location",
1361: _e);
1362:
1363: sendSysException(new org.omg.CORBA.UNKNOWN(_e
1364: .toString()), connection, request_id,
1365: giop_minor);
1366: }
1367: }
1368:
1369: /**
1370: * Convenience method for sending a CORBA System Exception back to
1371: * the client.
1372: *
1373: * @param sys_ex the exception to send back.
1374: */
1375: private void sendSysException(
1376: org.omg.CORBA.SystemException sys_ex,
1377: GIOPConnection connection, int request_id,
1378: int giop_minor) {
1379: ReplyOutputStream out = new ReplyOutputStream(request_id,
1380: org.omg.GIOP.ReplyStatusType_1_2.SYSTEM_EXCEPTION,
1381: giop_minor, false, logger);
1382:
1383: SystemExceptionHelper.write(out, sys_ex);
1384:
1385: try {
1386: connection.sendReply(out);
1387: } catch (IOException _e) {
1388: logger
1389: .error(
1390: "Exception while sending SystemException to client",
1391: _e);
1392: }
1393: }
1394: }
1395:
1396: /**
1397: * <code>WriteThread</code> runs as a background thread which will write the
1398: * server table out whenever any modifications are made.
1399: */
1400: private class WriteThread extends Thread {
1401: boolean done;
1402:
1403: public WriteThread() {
1404: }
1405:
1406: /**
1407: * <code>run</code> continiously loops until the shutdown is called.
1408: */
1409: public void run() {
1410: while (true) {
1411: try {
1412: save_server_table(table_file);
1413: } catch (FileOpFailed ex) {
1414: logger.error("Exception while saving server table",
1415: ex);
1416: }
1417:
1418: if (done) {
1419: break;
1420: }
1421:
1422: // If by the time we have written the server table another request has arrived
1423: // which requires an update don't bother entering the wait state.
1424: if (!updatePending) {
1425: try {
1426: synchronized (this ) {
1427: this .wait();
1428: }
1429: } catch (InterruptedException ex) {
1430: }
1431:
1432: logger
1433: .debug("ImR: IMR write thread waking up to save server table... ");
1434: }
1435: }
1436: }
1437:
1438: /**
1439: * <code>shutdown</code> toggles the thread to shut itself down.
1440: */
1441: public void shutdown() {
1442: done = true;
1443: }
1444: }
1445:
1446: /**
1447: * <code>Shutdown</code> is a thread that is run the Java 1.3 (and greater)
1448: * virtual machine upon receiving a Ctrl-C or kill -INT.
1449: */
1450: private class Shutdown extends Thread {
1451: public synchronized void run() {
1452: logger.debug("ImR: Shutting down");
1453: shutdown(true);
1454: }
1455: }
1456:
1457: } // ImplementationRepositoryImpl
|