0001: // $Id: ENCRYPT.java,v 1.26.2.1 2007/04/27 08:03:51 belaban Exp $
0002:
0003: package org.jgroups.protocols;
0004:
0005: import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
0006: import org.jgroups.*;
0007: import org.jgroups.stack.Protocol;
0008: import org.jgroups.util.QueueClosedException;
0009: import org.jgroups.util.Streamable;
0010: import org.jgroups.util.Util;
0011:
0012: import javax.crypto.*;
0013: import javax.crypto.spec.SecretKeySpec;
0014: import java.io.DataInputStream;
0015: import java.io.DataOutputStream;
0016: import java.io.IOException;
0017: import java.io.InputStream;
0018: import java.security.*;
0019: import java.security.cert.CertificateException;
0020: import java.security.spec.X509EncodedKeySpec;
0021: import java.util.Map;
0022: import java.util.Properties;
0023: import java.util.WeakHashMap;
0024:
0025: /**
0026: * ENCRYPT layer. Encrypt and decrypt the group communication in JGroups
0027: *
0028: * The file can be used in two ways:
0029: * <ul>
0030: * <li> Option 1. Configured with a secretKey in a keystore so it can be used at any
0031: * layer in JGroups without the need for a coordinator, or if you want protection against passive
0032: * monitoring but do not want the key exchange overhead and complexity. In this mode all nodes must be distributed with
0033: * the same keystore file.
0034: * <li> Option 2. Configured with algorithms and key sizes. The Encrypt Layer in this mode sould be used between the
0035: * FRAG and PBCast layers in the stack. The coordinator then chooses the
0036: * secretkey which it distributes amongst all the peers. In this form no keystore exists as the keys are
0037: * distributed using a public/private key exchange. View changes that identify a new controller will result in a new session key
0038: * being generated and then distributed to all peers. This overhead can be substantial in a an application with
0039: * a reasonable peer churn.
0040: * </ul>
0041: * <p>
0042: * <p>
0043: * Each message is identified as encrypted with a specific encryption header which identifies
0044: * the type of encrypt header and an MD5 digest that identifies the version of the key being used
0045: * to encrypt/decrypt the messages.
0046: * <p>
0047: * <p>
0048: *<h2>Option 1</h2><br>
0049: * This is the simplest option and can be used by simply inserting the Encryption layer
0050: * at any point in the JGroup stack - it will encrypt all Events of a type MSG that
0051: * have a non-null message buffer. The format of the entry in this form is:<br>
0052: * <ENCRYPT key_store_name="defaultStore.keystore" store_password="changeit" alias="myKey"/><br>
0053: * An example bare-bones.xml file showing the keystore version can be found in the conf
0054: * ina file called EncryptKeyStore.xml - along with a defaultStore.keystore file.<br>
0055: * In order to use the Encrypt layer in this manner it is necessary to have the secretKey already generated
0056: * in a keystore file. The directory containing the keystore file must be on the application's classpath.
0057: * You cannot create a SecretKey keystore file using the keytool application shipped with the JDK.
0058: * A java file called KeyStoreGenerator is included in the demo
0059: * package that can be used from the command line (or IDE) to generate a suitable keystore.
0060: * <p>
0061: * <p>
0062: *<h2>Option 2</h2><br>
0063: * This option is suited to an application that does not ship with a known key but instead it is generated
0064: * and distributed by the controller. The secret key is first generated by the Controller (in JGroup terms). When a view change
0065: * occurs a peer will request the secret key by sending a key request with its own public key. The controller
0066: * encrypts the secret key with this key and sends it back to the peer who then decrypts it and installs the
0067: * key as its own secret key.
0068: * <br>All encryption and decryption of Messages is done using this key. When a peer receives
0069: * a view change that shows a different keyserver it will repeat this process - the view change event
0070: * also trigger the encrypt layer to queue up and down messages until the new key is installed.
0071: * The previous keys are retained so that messages sent before the view change that are queued can be decrypted
0072: * if the key is different.
0073: * <br>
0074: * An example EncryptNoKeyStore.xml is included in the conf file as a guide.
0075: * <p><p>
0076: * <br> Note: the current version does not support the concept of perfect forward encryption (PFE)
0077: * which means that if a peer leaves the group the keys are re-generated preventing the departed peer from
0078: * decrypting future messages if it chooses to listen in on the group. This is not included as it really requires
0079: * a suitable authentication scheme as well to make this feature useful as there is nothing to stop the peer rejoining and receiving the new
0080: * key. A future release will address this issue.
0081: *
0082: * @author Steve Woodcock
0083: * @author Bela Ban
0084: */
0085:
0086: public class ENCRYPT extends Protocol {
0087:
0088: static final String DEFAULT_SYM_ALGO = "Blowfish";
0089: // address info
0090: Address local_addr = null;
0091: // keyserver address
0092: Address keyServerAddr = null;
0093:
0094: //used to see whether we are the key server
0095: boolean keyServer = false;
0096:
0097: // encryption properties in no supplied key mode
0098: String asymProvider = null;
0099: final String symProvider = null;
0100: String asymAlgorithm = "RSA";
0101: String symAlgorithm = DEFAULT_SYM_ALGO;
0102: int asymInit = 512; // initial public/private key length
0103: int symInit = 56; // initial shared key length
0104:
0105: // properties for functioning in supplied key mode
0106: private boolean suppliedKey = false;
0107: private String keyStoreName;
0108: private String storePassword = "changeit"; //JDK default
0109: private String keyPassword = "changeit"; //JDK default
0110: private String alias = "mykey"; // JDK default
0111:
0112: // public/private Key
0113: KeyPair Kpair; // to store own's public/private Key
0114:
0115: // for client to store server's public Key
0116: PublicKey serverPubKey = null;
0117:
0118: // needed because we do simultaneous encode/decode with these ciphers - which
0119: // would be a threading issue
0120: Cipher symEncodingCipher;
0121: Cipher symDecodingCipher;
0122:
0123: // version filed for secret key
0124: private String symVersion = null;
0125: // dhared secret key to encrypt/decrypt messages
0126: SecretKey secretKey = null;
0127:
0128: // map to hold previous keys so we can decrypt some earlier messages if we need to
0129: final Map keyMap = new WeakHashMap();
0130:
0131: // queues to buffer data while we are swapping shared key
0132: // or obtsining key for first time
0133:
0134: private boolean queue_up = true;
0135:
0136: private boolean queue_down = false;
0137:
0138: // queue to hold upcoming messages while key negotiation is happening
0139: private LinkedQueue upMessageQueue = new LinkedQueue();
0140:
0141: // queue to hold downcoming messages while key negotiation is happening
0142: private LinkedQueue downMessageQueue = new LinkedQueue();
0143: // decrypting cypher for secret key requests
0144: private Cipher asymCipher;
0145:
0146: /** determines whether to encrypt the entire message, or just the buffer */
0147: private boolean encrypt_entire_message = false;
0148:
0149: public ENCRYPT() {
0150: }
0151:
0152: public String getName() {
0153: return "ENCRYPT";
0154: }
0155:
0156: /*
0157: * GetAlgorithm: Get the algorithm name from "algorithm/mode/padding"
0158: * taken from original ENCRYPT file
0159: */
0160: private String getAlgorithm(String s) {
0161: int index = s.indexOf("/");
0162: if (index == -1)
0163: return s;
0164:
0165: return s.substring(0, index);
0166: }
0167:
0168: public boolean setProperties(Properties props) {
0169: String str;
0170:
0171: super .setProperties(props);
0172: // asymmetric key length
0173: str = props.getProperty("asym_init");
0174: if (str != null) {
0175: asymInit = Integer.parseInt(str);
0176: props.remove("asym_init");
0177:
0178: if (log.isInfoEnabled())
0179: log.info("Asym algo bits used is " + asymInit);
0180: }
0181:
0182: // symmetric key length
0183: str = props.getProperty("sym_init");
0184: if (str != null) {
0185: symInit = Integer.parseInt(str);
0186: props.remove("sym_init");
0187:
0188: if (log.isInfoEnabled())
0189: log.info("Sym algo bits used is " + symInit);
0190: }
0191:
0192: // asymmetric algorithm name
0193: str = props.getProperty("asym_algorithm");
0194: if (str != null) {
0195: asymAlgorithm = str;
0196: props.remove("asym_algorithm");
0197:
0198: if (log.isInfoEnabled())
0199: log.info("Asym algo used is " + asymAlgorithm);
0200: }
0201:
0202: // symmetric algorithm name
0203: str = props.getProperty("sym_algorithm");
0204: if (str != null) {
0205: symAlgorithm = str;
0206: props.remove("sym_algorithm");
0207:
0208: if (log.isInfoEnabled())
0209: log.info("Sym algo used is " + symAlgorithm);
0210: }
0211:
0212: // symmetric algorithm name
0213: str = props.getProperty("asym_provider");
0214: if (str != null) {
0215: asymProvider = str;
0216: props.remove("asym_provider");
0217:
0218: if (log.isInfoEnabled())
0219: log.info("asymProvider used is " + asymProvider);
0220: }
0221:
0222: //symmetric algorithm name
0223: str = props.getProperty("key_store_name");
0224: if (str != null) {
0225: keyStoreName = str;
0226: props.remove("key_store_name");
0227:
0228: if (log.isInfoEnabled())
0229: log.info("key_store_name used is " + keyStoreName);
0230: }
0231:
0232: // symmetric algorithm name
0233: str = props.getProperty("store_password");
0234: if (str != null) {
0235: storePassword = str;
0236: props.remove("store_password");
0237:
0238: if (log.isInfoEnabled())
0239: log.info("store_password used is not null");
0240: }
0241:
0242: // symmetric algorithm name
0243: str = props.getProperty("key_password");
0244: if (str != null) {
0245: keyPassword = str;
0246: props.remove("key_password");
0247:
0248: if (log.isInfoEnabled())
0249: log.info("key_password used is not null");
0250: } else if (storePassword != null) {
0251: keyPassword = storePassword;
0252:
0253: if (log.isInfoEnabled())
0254: log.info("key_password used is same as store_password");
0255: }
0256:
0257: // symmetric algorithm name
0258: str = props.getProperty("alias");
0259: if (str != null) {
0260: alias = str;
0261: props.remove("alias");
0262:
0263: if (log.isInfoEnabled())
0264: log.info("alias used is " + alias);
0265: }
0266:
0267: str = props.getProperty("encrypt_entire_message");
0268: if (str != null) {
0269: this .encrypt_entire_message = new Boolean(str)
0270: .booleanValue();
0271: props.remove("encrypt_entire_message");
0272: }
0273:
0274: if (props.size() > 0) {
0275:
0276: if (log.isErrorEnabled())
0277: log.error("these properties are not recognized:"
0278: + props);
0279: return false;
0280: }
0281:
0282: return true;
0283: }
0284:
0285: public void init() throws Exception {
0286: if (keyStoreName == null) {
0287: initSymKey();
0288: initKeyPair();
0289: } else {
0290: initConfiguredKey();
0291: }
0292: initSymCiphers(symAlgorithm, getSecretKey());
0293: }
0294:
0295: /**
0296: * Initialisation if a supplied key is defined in the properties. This
0297: * supplied key must be in a keystore which can be generated using the
0298: * keystoreGenerator file in demos. The keystore must be on the classpath
0299: * to find it.
0300: *
0301: * @throws KeyStoreException
0302: * @throws Exception
0303: * @throws IOException
0304: * @throws NoSuchAlgorithmException
0305: * @throws CertificateException
0306: * @throws UnrecoverableKeyException
0307: */
0308: private void initConfiguredKey() throws KeyStoreException,
0309: Exception, IOException, NoSuchAlgorithmException,
0310: CertificateException, UnrecoverableKeyException {
0311: InputStream inputStream = null;
0312: // must not use default keystore type - as does not support secret keys
0313: KeyStore store = KeyStore.getInstance("JCEKS");
0314:
0315: SecretKey tempKey = null;
0316: try {
0317: // load in keystore using this thread's classloader
0318: inputStream = Thread.currentThread()
0319: .getContextClassLoader().getResourceAsStream(
0320: keyStoreName);
0321: // we can't find a keystore here -
0322: if (inputStream == null) {
0323: throw new Exception("Unable to load keystore "
0324: + keyStoreName + " ensure file is on classpath");
0325: }
0326: // we have located a file lets load the keystore
0327: try {
0328: store.load(inputStream, storePassword.toCharArray());
0329: // loaded keystore - get the key
0330: tempKey = (SecretKey) store.getKey(alias, keyPassword
0331: .toCharArray());
0332: } catch (IOException e) {
0333: throw new Exception("Unable to load keystore "
0334: + keyStoreName + ": " + e);
0335: } catch (NoSuchAlgorithmException e) {
0336: throw new Exception("No Such algorithm " + keyStoreName
0337: + ": " + e);
0338: } catch (CertificateException e) {
0339: throw new Exception("Certificate exception "
0340: + keyStoreName + ": " + e);
0341: }
0342:
0343: if (tempKey == null) {
0344: throw new Exception("Unable to retrieve key '" + alias
0345: + "' from keystore " + keyStoreName);
0346: }
0347: //set the key here
0348: setSecretKey(tempKey);
0349:
0350: if (symAlgorithm.equals(DEFAULT_SYM_ALGO)) {
0351: symAlgorithm = tempKey.getAlgorithm();
0352: }
0353:
0354: // set the fact we are using a supplied key
0355:
0356: suppliedKey = true;
0357: queue_down = false;
0358: queue_up = false;
0359: } finally {
0360: Util.close(inputStream);
0361: }
0362:
0363: }
0364:
0365: /**
0366: * Used to initialise the symmetric key if none is supplied in a keystore.
0367: * @throws Exception
0368: */
0369: public void initSymKey() throws Exception {
0370: KeyGenerator keyGen = null;
0371: // see if we have a provider specified
0372: if (symProvider != null && symProvider.trim().length() > 0) {
0373: keyGen = KeyGenerator.getInstance(
0374: getAlgorithm(symAlgorithm), symProvider);
0375: } else {
0376: keyGen = KeyGenerator
0377: .getInstance(getAlgorithm(symAlgorithm));
0378: }
0379: // generate the key using the defined init properties
0380: keyGen.init(symInit);
0381: secretKey = keyGen.generateKey();
0382:
0383: setSecretKey(secretKey);
0384:
0385: if (log.isInfoEnabled())
0386: log.info(" Symmetric key generated ");
0387: }
0388:
0389: /**
0390: * Initialises the Ciphers for both encryption and decryption using the
0391: * generated or supplied secret key.
0392: *
0393: * @param algorithm
0394: * @param secret
0395: * @throws Exception
0396: */
0397: private void initSymCiphers(String algorithm, SecretKey secret)
0398: throws Exception {
0399:
0400: if (log.isInfoEnabled())
0401: log.info(" Initializing symmetric ciphers");
0402:
0403: symEncodingCipher = Cipher.getInstance(algorithm);
0404: symDecodingCipher = Cipher.getInstance(algorithm);
0405: symEncodingCipher.init(Cipher.ENCRYPT_MODE, secret);
0406: symDecodingCipher.init(Cipher.DECRYPT_MODE, secret);
0407:
0408: //set the version
0409: MessageDigest digest = MessageDigest.getInstance("MD5");
0410: digest.reset();
0411: digest.update(secret.getEncoded());
0412:
0413: symVersion = new String(digest.digest(), "UTF-8");
0414: if (log.isInfoEnabled()) {
0415: // log.info(" Initialized symmetric ciphers with secret key (" + symVersion.length() + " bytes) " +symVersion);
0416: StringBuffer sb = new StringBuffer(
0417: " Initialized symmetric ciphers with secret key ("
0418: + symVersion.length() + " bytes) ");
0419: char[] arr = symVersion.toCharArray();
0420: for (int i = 0; i < arr.length; i++) {
0421: char c = arr[i];
0422: sb.append((int) c);
0423: }
0424: log.info(sb.toString());
0425: }
0426: }
0427:
0428: /**
0429: * Generates the public/private key pair from the init params
0430: * @throws Exception
0431: */
0432: public void initKeyPair() throws Exception {
0433: // generate keys according to the specified algorithms
0434: // generate publicKey and Private Key
0435: KeyPairGenerator KpairGen = null;
0436: if (asymProvider != null && asymProvider.trim().length() > 0) {
0437: KpairGen = KeyPairGenerator.getInstance(
0438: getAlgorithm(asymAlgorithm), asymProvider);
0439: } else {
0440: KpairGen = KeyPairGenerator
0441: .getInstance(getAlgorithm(asymAlgorithm));
0442:
0443: }
0444: KpairGen.initialize(asymInit, new SecureRandom());
0445: Kpair = KpairGen.generateKeyPair();
0446:
0447: // set up the Cipher to decrypt secret key responses encrypted with our key
0448:
0449: asymCipher = Cipher.getInstance(asymAlgorithm);
0450: asymCipher.init(Cipher.DECRYPT_MODE, Kpair.getPrivate());
0451:
0452: if (log.isInfoEnabled())
0453: log.info(" asym algo initialized");
0454: }
0455:
0456: /** Just remove if you don't need to reset any state */
0457: public void reset() {
0458: }
0459:
0460: /* (non-Javadoc)
0461: * @see org.jgroups.stack.Protocol#up(org.jgroups.Event)
0462: */
0463: public void up(Event evt) {
0464:
0465: switch (evt.getType()) {
0466:
0467: // we need to know what our address is
0468: case Event.SET_LOCAL_ADDRESS:
0469: local_addr = (Address) evt.getArg();
0470: if (log.isDebugEnabled())
0471: log.debug("set local address to " + local_addr);
0472: break;
0473: case Event.VIEW_CHANGE:
0474: View view = (View) evt.getArg();
0475: if (log.isInfoEnabled())
0476: log.info("handling view: " + view);
0477: if (!suppliedKey) {
0478: handleViewChange(view);
0479: }
0480: break;
0481: // we try and decrypt all messages
0482: case Event.MSG:
0483: try {
0484: handleUpMessage(evt);
0485: } catch (Exception e) {
0486: log.warn("exception occurred decrypting message", e);
0487: }
0488: return;
0489: default:
0490: break;
0491: }
0492:
0493: passUp(evt);
0494: }
0495:
0496: private synchronized void handleViewChange(View view) {
0497:
0498: // if view is a bit broken set me as keyserver
0499: if (view.getMembers() == null
0500: || view.getMembers().get(0) == null) {
0501: becomeKeyServer(local_addr);
0502: return;
0503: }
0504: // otherwise get keyserver from view controller
0505: Address tmpKeyServer = (Address) view.getMembers().get(0);
0506:
0507: //I am new keyserver - either first member of group or old key server is no more and
0508: // I have been voted new controller
0509: if (tmpKeyServer.equals(local_addr)
0510: && (keyServerAddr == null || (!tmpKeyServer
0511: .equals(keyServerAddr)))) {
0512: becomeKeyServer(tmpKeyServer);
0513: // a new keyserver has been set and it is not me
0514: } else if (keyServerAddr == null
0515: || (!tmpKeyServer.equals(keyServerAddr))) {
0516: handleNewKeyServer(tmpKeyServer);
0517: } else {
0518: if (log.isDebugEnabled())
0519: log.debug("Membership has changed but I do not care");
0520: }
0521: }
0522:
0523: /**
0524: * Handles becoming server - resetting queue settings
0525: * and setting keyserver address to be local address.
0526: *
0527: * @param tmpKeyServer
0528: */
0529: private void becomeKeyServer(Address tmpKeyServer) {
0530: keyServerAddr = tmpKeyServer;
0531: keyServer = true;
0532: if (log.isInfoEnabled())
0533: log.info("I have become key server " + keyServerAddr);
0534: queue_down = false;
0535: queue_up = false;
0536: }
0537:
0538: /**
0539: * Sets up the peer for a new keyserver - this is
0540: * setting queueing to buffer messages until we have a new
0541: * secret key from the key server and sending a key request
0542: * to the new keyserver.
0543: *
0544: * @param newKeyServer
0545: */
0546: private void handleNewKeyServer(Address newKeyServer) {
0547: // start queueing until we have new key
0548: // to make sure we are not sending with old key
0549: queue_up = true;
0550: queue_down = true;
0551: // set new keyserver address
0552: keyServerAddr = newKeyServer;
0553: keyServer = false;
0554: if (log.isInfoEnabled())
0555: log.info("Sending key request");
0556:
0557: // create a key request message
0558: sendKeyRequest();
0559: }
0560:
0561: /**
0562: * @param evt
0563: */
0564: private void handleUpMessage(Event evt) throws Exception {
0565: Message msg = (Message) evt.getArg();
0566:
0567: if (msg == null) {
0568: if (log.isTraceEnabled())
0569: log.trace("null message - passing straight up");
0570: passUp(evt);
0571: return;
0572: }
0573:
0574: if (msg.getLength() == 0) {
0575: passUp(evt);
0576: return;
0577: }
0578:
0579: EncryptHeader hdr = (EncryptHeader) msg
0580: .getHeader(EncryptHeader.KEY);
0581:
0582: // try and get the encryption header
0583: if (hdr == null) {
0584: if (log.isTraceEnabled())
0585: log
0586: .trace("dropping message as ENCRYPT header is null or has not been recognized, msg will not be passed up, "
0587: + "headers are " + msg.getHeaders());
0588: return;
0589: }
0590:
0591: if (log.isTraceEnabled())
0592: log.trace("header received " + hdr);
0593:
0594: // if a normal message try and decrypt it
0595: if (hdr.getType() == EncryptHeader.ENCRYPT) {
0596: // if msg buffer is empty, and we didn't encrypt the entire message, just pass up
0597: if (!hdr.encrypt_entire_msg
0598: && ((Message) evt.getArg()).getLength() == 0) {
0599: if (log.isTraceEnabled())
0600: log
0601: .trace("passing up message as it has an empty buffer ");
0602: passUp(evt);
0603: return;
0604: }
0605:
0606: // if queueing then pass into queue to be dealt with later
0607: if (queue_up) {
0608: if (log.isTraceEnabled())
0609: log
0610: .trace("queueing up message as no session key established: "
0611: + evt.getArg());
0612: upMessageQueue.put(evt);
0613: } else {
0614: // make sure we pass up any queued messages first
0615: // could be more optimised but this can wait
0616: // we only need this if not using supplied key
0617: if (!suppliedKey) {
0618: drainUpQueue();
0619: }
0620: // try and decrypt the message
0621: Message tmpMsg = decryptMessage(symDecodingCipher, msg);
0622: if (tmpMsg != null) {
0623: if (log.isTraceEnabled())
0624: log.trace("decrypted message " + tmpMsg);
0625: passUp(new Event(Event.MSG, tmpMsg));
0626: } else {
0627: log.warn("Unrecognised cipher discarding message");
0628: }
0629: }
0630: } else {
0631: // check if we had some sort of encrypt control
0632: // header if using supplied key we should not
0633: // process it
0634: if (suppliedKey) {
0635: if (log.isWarnEnabled()) {
0636: log.warn("We received an encrypt header of "
0637: + hdr.getType()
0638: + " while in configured mode");
0639: }
0640: } else {
0641: // see what sort of encrypt control message we
0642: // have received
0643: switch (hdr.getType()) {
0644: // if a key request
0645: case EncryptHeader.KEY_REQUEST:
0646: if (log.isInfoEnabled()) {
0647: log.info("received a key request from peer");
0648: }
0649:
0650: //if a key request send response key back
0651: try {
0652: // extract peer's public key
0653: PublicKey tmpKey = generatePubKey(msg
0654: .getBuffer());
0655: // send back the secret key we have
0656: sendSecretKey(getSecretKey(), tmpKey, msg
0657: .getSrc());
0658: } catch (Exception e) {
0659: log
0660: .warn("unable to reconstitute peer's public key");
0661: }
0662: break;
0663: case EncryptHeader.SECRETKEY:
0664: if (log.isInfoEnabled()) {
0665: log
0666: .info("received a secretkey response from keyserver");
0667: }
0668:
0669: try {
0670: SecretKey tmp = decodeKey(msg.getBuffer());
0671: if (tmp == null) {
0672: // unable to understand response
0673: // lets try again
0674: sendKeyRequest();
0675: } else {
0676: // otherwise lets set the reurned key
0677: // as the shared key
0678: setKeys(tmp, hdr.getVersion());
0679: if (log.isInfoEnabled()) {
0680: log.info("Decoded secretkey response");
0681: }
0682: }
0683: } catch (Exception e) {
0684: log
0685: .warn("unable to process received public key");
0686: }
0687: break;
0688: default:
0689: log.warn("Received ignored encrypt header of "
0690: + hdr.getType());
0691: break;
0692: }
0693: }
0694: }
0695: }
0696:
0697: /**
0698: * used to drain the up queue - synchronized so we
0699: * can call it safely despite access from potentially two threads at once
0700: * @throws QueueClosedException
0701: * @throws Exception
0702: */
0703: private void drainUpQueue() throws QueueClosedException, Exception {
0704: //we do not synchronize here as we only have one up thread so we should never get an issue
0705: //synchronized(upLock){
0706: Event tmp = null;
0707: while ((tmp = (Event) upMessageQueue.poll(0L)) != null) {
0708: if (tmp != null) {
0709: Message msg = decryptMessage(symDecodingCipher,
0710: (Message) tmp.getArg());
0711:
0712: if (msg != null) {
0713: if (log.isTraceEnabled()) {
0714: log.trace("passing up message from drain "
0715: + msg);
0716: }
0717: passUp(new Event(Event.MSG, msg));
0718: } else {
0719: log
0720: .warn("discarding message in queue up drain as cannot decode it");
0721: }
0722: }
0723: }
0724: }
0725:
0726: /**
0727: * Sets the keys for the app. and drains the queues - the drains could
0728: * be called att he same time as the up/down messages calling in to
0729: * the class so we may have an extra call to the drain methods but this slight expense
0730: * is better than the alternative of waiting until the next message to
0731: * trigger the drains which may never happen.
0732: *
0733: * @param key
0734: * @param version
0735: * @throws Exception
0736: */
0737: private void setKeys(SecretKey key, String version)
0738: throws Exception {
0739:
0740: // put the previous key into the map
0741: // if the keys are already there then they will overwrite
0742: keyMap.put(getSymVersion(), getSymDecodingCipher());
0743:
0744: setSecretKey(key);
0745: initSymCiphers(key.getAlgorithm(), key);
0746: setSymVersion(version);
0747:
0748: // drain the up queue
0749: log.info("setting queue up to false in setKeys");
0750: queue_up = false;
0751: drainUpQueue();
0752:
0753: queue_down = false;
0754: drainDownQueue();
0755: }
0756:
0757: /**
0758: * Does the actual work for decrypting - if version does not match current cipher
0759: * then tries to use previous cipher
0760: * @param cipher
0761: * @param msg
0762: * @return
0763: * @throws Exception
0764: */
0765: private Message decryptMessage(Cipher cipher, Message msg)
0766: throws Exception {
0767: EncryptHeader hdr = (EncryptHeader) msg
0768: .getHeader(EncryptHeader.KEY);
0769: if (!hdr.getVersion().equals(getSymVersion())) {
0770: log
0771: .warn("attempting to use stored cipher as message does not uses current encryption version ");
0772: cipher = (Cipher) keyMap.get(hdr.getVersion());
0773: if (cipher == null) {
0774: log
0775: .warn("Unable to find a matching cipher in previous key map");
0776: return null;
0777: } else {
0778: if (log.isTraceEnabled())
0779: log
0780: .trace("decrypting using previous cipher version "
0781: + hdr.getVersion());
0782: return _decrypt(cipher, msg, hdr.encrypt_entire_msg);
0783: }
0784: }
0785:
0786: else {
0787:
0788: // reset buffer with decrypted message
0789: return _decrypt(cipher, msg, hdr.encrypt_entire_msg);
0790: }
0791: }
0792:
0793: private Message _decrypt(Cipher cipher, Message msg,
0794: boolean decrypt_entire_msg) throws Exception {
0795: if (!decrypt_entire_msg) {
0796: msg.setBuffer(cipher.doFinal(msg.getBuffer()));
0797: return msg;
0798: }
0799:
0800: byte[] decrypted_msg = cipher.doFinal(msg.getBuffer());
0801: Message ret = (Message) Util.streamableFromByteBuffer(
0802: Message.class, decrypted_msg);
0803: if (ret.getDest() == null)
0804: ret.setDest(msg.getDest());
0805: if (ret.getSrc() == null)
0806: ret.setSrc(msg.getSrc());
0807: return ret;
0808: }
0809:
0810: /**
0811: * @param secret
0812: * @param pubKey
0813: * @throws InvalidKeyException
0814: * @throws IllegalStateException
0815: * @throws IllegalBlockSizeException
0816: * @throws BadPaddingException
0817: */
0818: private void sendSecretKey(SecretKey secret, PublicKey pubKey,
0819: Address source) throws InvalidKeyException,
0820: IllegalStateException, IllegalBlockSizeException,
0821: BadPaddingException, NoSuchPaddingException,
0822: NoSuchAlgorithmException {
0823: Message newMsg;
0824:
0825: if (log.isDebugEnabled())
0826: log.debug("encoding shared key ");
0827:
0828: // create a cipher with peer's public key
0829: Cipher tmp = Cipher.getInstance(asymAlgorithm);
0830: tmp.init(Cipher.ENCRYPT_MODE, pubKey);
0831:
0832: //encrypt current secret key
0833: byte[] encryptedKey = tmp.doFinal(secret.getEncoded());
0834:
0835: //SW logout encrypted bytes we are sending so we
0836: // can match the clients log to see if they match
0837: if (log.isDebugEnabled())
0838: log
0839: .debug(" Generated encoded key which only client can decode:"
0840: + formatArray(encryptedKey));
0841:
0842: newMsg = new Message(source, local_addr, encryptedKey);
0843:
0844: newMsg.putHeader(EncryptHeader.KEY, new EncryptHeader(
0845: EncryptHeader.SECRETKEY, getSymVersion()));
0846:
0847: if (log.isDebugEnabled())
0848: log.debug(" Sending version " + getSymVersion()
0849: + " encoded key to client");
0850: passDown(new Event(Event.MSG, newMsg));
0851: }
0852:
0853: /**
0854: * @param msg
0855: * @return
0856: */
0857: // private PublicKey handleKeyRequest(Message msg)
0858: // {
0859: // Message newMsg;
0860: // if (log.isDebugEnabled())
0861: // log.debug("Request for key recieved");
0862: //
0863: // //SW log the clients encoded public key so we can
0864: // // see if they match
0865: // if (log.isDebugEnabled())
0866: // log.debug("Got peer's encoded public key:"
0867: // + formatArray(msg.getBuffer()));
0868: //
0869: // PublicKey pubKey = generatePubKey(msg.getBuffer());
0870: //
0871: // //SW log the clients resulting public key so we can
0872: // // see if it is created correctly
0873: // if (log.isDebugEnabled())
0874: // log.debug("Generated requestors public key" + pubKey);
0875: //
0876: // /*
0877: // * SW why do we send this as the client does not use it ? - although we
0878: // * could make use to provide some authentication later on rahter than
0879: // * just encryption send server's publicKey
0880: // */
0881: // newMsg = new Message(msg.getSrc(), local_addr, Kpair.getPublic()
0882: // .getEncoded());
0883: //
0884: // //SW Log out our public key in encoded format so we
0885: // // can match with the client debugging to
0886: // // see if they match
0887: // if (log.isInfoEnabled())
0888: // log.debug("encoded key is "
0889: // + formatArray(Kpair.getPublic().getEncoded()));
0890: //
0891: //
0892: // newMsg.putHeader(EncryptHeader.KEY, new EncryptHeader(
0893: // EncryptHeader.SERVER_PUBKEY, getSymVersion()));
0894: //
0895: //
0896: // passDown(new Event(Event.MSG, newMsg));
0897: // return pubKey;
0898: // }
0899:
0900: /**
0901: * @return Message
0902: */
0903:
0904: private Message sendKeyRequest() {
0905:
0906: // send client's public key to server and request
0907: // server's public key
0908: Message newMsg = new Message(keyServerAddr, local_addr, Kpair
0909: .getPublic().getEncoded());
0910:
0911: newMsg.putHeader(EncryptHeader.KEY, new EncryptHeader(
0912: EncryptHeader.KEY_REQUEST, getSymVersion()));
0913: passDown(new Event(Event.MSG, newMsg));
0914: return newMsg;
0915: }
0916:
0917: /* (non-Javadoc)
0918: * @see org.jgroups.stack.Protocol#down(org.jgroups.Event)
0919: */
0920: public void down(Event evt) {
0921: switch (evt.getType()) {
0922:
0923: case Event.MSG:
0924: try {
0925: if (queue_down) {
0926: if (log.isTraceEnabled())
0927: log
0928: .trace("queueing down message as no session key established"
0929: + evt.getArg());
0930: downMessageQueue.put(evt); // queue messages if we are waiting for a new key
0931: } else {
0932: // make sure the down queue is drained first to keep ordering
0933: if (!suppliedKey) {
0934: drainDownQueue();
0935: }
0936: sendDown(evt);
0937: }
0938:
0939: } catch (Exception e) {
0940: log.warn("unable to send down event " + e);
0941: }
0942: return;
0943:
0944: case Event.VIEW_CHANGE:
0945: View view = (View) evt.getArg();
0946: if (log.isInfoEnabled())
0947: log.info("handling view: " + view);
0948: if (!suppliedKey) {
0949: handleViewChange(view);
0950: }
0951: break;
0952: default:
0953: break;
0954: }
0955: passDown(evt);
0956: }
0957:
0958: /**
0959: * @throws Exception
0960: * @throws QueueClosedException
0961: */
0962: private void drainDownQueue() throws Exception,
0963: QueueClosedException {
0964: // we do not synchronize here as we only have one down thread so we should never get an issue
0965: // first lets replay any oustanding events
0966: Event tmp = null;
0967: while ((tmp = (Event) downMessageQueue.poll(0L)) != null) {
0968: sendDown(tmp);
0969: }
0970: }
0971:
0972: /**
0973: * @param evt
0974: * @throws Exception
0975: */
0976: private void sendDown(Event evt) throws Exception {
0977: if (evt.getType() != Event.MSG) {
0978: return;
0979: }
0980:
0981: Message msg = (Message) evt.getArg();
0982: if (msg.getLength() == 0) {
0983: passDown(evt);
0984: return;
0985: }
0986:
0987: EncryptHeader hdr = new EncryptHeader(EncryptHeader.ENCRYPT,
0988: getSymVersion());
0989: hdr.encrypt_entire_msg = this .encrypt_entire_message;
0990:
0991: if (encrypt_entire_message) {
0992: byte[] serialized_msg = Util.streamableToByteBuffer(msg);
0993: byte[] encrypted_msg = encryptMessage(symEncodingCipher,
0994: serialized_msg);
0995: Message tmp = msg.copy(false); // we need to preserve headers which may already be present
0996: tmp.setBuffer(encrypted_msg);
0997: tmp.setSrc(local_addr);
0998: tmp.putHeader(EncryptHeader.KEY, hdr);
0999: passDown(new Event(Event.MSG, tmp));
1000: return;
1001: }
1002:
1003: // put our encrypt header on the message
1004: msg.putHeader(EncryptHeader.KEY, hdr);
1005:
1006: // copy neeeded because same message (object) may be retransmitted -> no double encryption
1007: Message msgEncrypted = msg.copy(false);
1008: msgEncrypted.setBuffer(encryptMessage(symEncodingCipher, msg
1009: .getBuffer()));
1010: passDown(new Event(Event.MSG, msgEncrypted));
1011: }
1012:
1013: /**
1014: *
1015: * @param cipher
1016: * @param plain
1017: * @return
1018: * @throws Exception
1019: */
1020: private byte[] encryptMessage(Cipher cipher, byte[] plain)
1021: throws Exception {
1022: return cipher.doFinal(plain);
1023: }
1024:
1025: private SecretKeySpec decodeKey(byte[] encodedKey) throws Exception {
1026: // try and decode secrey key sent from keyserver
1027: byte[] keyBytes = asymCipher.doFinal(encodedKey);
1028:
1029: SecretKeySpec keySpec = null;
1030: try {
1031: keySpec = new SecretKeySpec(keyBytes,
1032: getAlgorithm(symAlgorithm));
1033:
1034: // test reconstituted key to see if valid
1035: Cipher temp = Cipher.getInstance(symAlgorithm);
1036: temp.init(Cipher.SECRET_KEY, keySpec);
1037: } catch (Exception e) {
1038: log.fatal(e);
1039: keySpec = null;
1040: }
1041: return keySpec;
1042: }
1043:
1044: /**
1045: * used to reconstitute public key sent in byte form from peer
1046: * @param encodedKey
1047: * @return PublicKey
1048: */
1049: private PublicKey generatePubKey(byte[] encodedKey) {
1050: PublicKey pubKey = null;
1051: try {
1052: KeyFactory KeyFac = KeyFactory
1053: .getInstance(getAlgorithm(asymAlgorithm));
1054: X509EncodedKeySpec x509KeySpec = new X509EncodedKeySpec(
1055: encodedKey);
1056: pubKey = KeyFac.generatePublic(x509KeySpec);
1057: } catch (Exception e) {
1058: e.printStackTrace();
1059: }
1060: return pubKey;
1061: }
1062:
1063: /*
1064: * simple helper method so we can see the format of the byte arrays in a
1065: * readable form could be better to use Base64 but will do for now
1066: */
1067: private String formatArray(byte[] array) {
1068: StringBuffer buf = new StringBuffer();
1069: for (int i = 0; i < array.length; i++) {
1070: buf.append(Integer.toHexString(array[i]));
1071: }
1072: return buf.toString();
1073: }
1074:
1075: /**
1076: * @return Returns the asymInit.
1077: */
1078: protected int getAsymInit() {
1079: return asymInit;
1080: }
1081:
1082: /**
1083: * @return Returns the asymProvider.
1084: */
1085: protected String getAsymProvider() {
1086: return asymProvider;
1087: }
1088:
1089: /**
1090: * @return Returns the desKey.
1091: */
1092: protected SecretKey getDesKey() {
1093: return secretKey;
1094: }
1095:
1096: /**
1097: * @return Returns the kpair.
1098: */
1099: protected KeyPair getKpair() {
1100: return Kpair;
1101: }
1102:
1103: /**
1104: * @return Returns the asymCipher.
1105: */
1106: protected Cipher getAsymCipher() {
1107: return asymCipher;
1108: }
1109:
1110: /**
1111: * @return Returns the serverPubKey.
1112: */
1113: protected PublicKey getServerPubKey() {
1114: return serverPubKey;
1115: }
1116:
1117: /**
1118: * @return Returns the symAlgorithm.
1119: */
1120: protected String getSymAlgorithm() {
1121: return symAlgorithm;
1122: }
1123:
1124: /**
1125: * @return Returns the symInit.
1126: */
1127: protected int getSymInit() {
1128: return symInit;
1129: }
1130:
1131: /**
1132: * @return Returns the symProvider.
1133: */
1134: protected String getSymProvider() {
1135: return symProvider;
1136: }
1137:
1138: /**
1139: * @return Returns the asymAlgorithm.
1140: */
1141: protected String getAsymAlgorithm() {
1142: return asymAlgorithm;
1143: }
1144:
1145: /**
1146: * @return Returns the symVersion.
1147: */
1148: private String getSymVersion() {
1149: return symVersion;
1150: }
1151:
1152: /**
1153: * @param symVersion
1154: * The symVersion to set.
1155: */
1156: private void setSymVersion(String symVersion) {
1157: this .symVersion = symVersion;
1158: }
1159:
1160: /**
1161: * @return Returns the secretKey.
1162: */
1163: private SecretKey getSecretKey() {
1164: return secretKey;
1165: }
1166:
1167: /**
1168: * @param secretKey The secretKey to set.
1169: */
1170: private void setSecretKey(SecretKey secretKey) {
1171: this .secretKey = secretKey;
1172: }
1173:
1174: /**
1175: * @return Returns the keyStoreName.
1176: */
1177: protected String getKeyStoreName() {
1178: return keyStoreName;
1179: }
1180:
1181: /**
1182: * @return Returns the symDecodingCipher.
1183: */
1184: protected Cipher getSymDecodingCipher() {
1185: return symDecodingCipher;
1186: }
1187:
1188: /**
1189: * @return Returns the symEncodingCipher.
1190: */
1191: protected Cipher getSymEncodingCipher() {
1192: return symEncodingCipher;
1193: }
1194:
1195: /**
1196: * @return Returns the local_addr.
1197: */
1198: protected Address getLocal_addr() {
1199: return local_addr;
1200: }
1201:
1202: /**
1203: * @param local_addr The local_addr to set.
1204: */
1205: protected void setLocal_addr(Address local_addr) {
1206: this .local_addr = local_addr;
1207: }
1208:
1209: /**
1210: * @return Returns the keyServerAddr.
1211: */
1212: protected Address getKeyServerAddr() {
1213: return keyServerAddr;
1214: }
1215:
1216: /**
1217: * @param keyServerAddr The keyServerAddr to set.
1218: */
1219: protected void setKeyServerAddr(Address keyServerAddr) {
1220: this .keyServerAddr = keyServerAddr;
1221: }
1222:
1223: public static class EncryptHeader extends org.jgroups.Header
1224: implements Streamable {
1225: short type;
1226: public static final short ENCRYPT = 0;
1227: public static final short KEY_REQUEST = 1;
1228: public static final short SERVER_PUBKEY = 2;
1229: public static final short SECRETKEY = 3;
1230: public static final short SECRETKEY_READY = 4;
1231:
1232: // adding key for Message object purpose
1233: static final String KEY = "encrypt";
1234:
1235: String version;
1236:
1237: boolean encrypt_entire_msg = false;
1238:
1239: public EncryptHeader() {
1240: }
1241:
1242: public EncryptHeader(short type) {
1243: //this(type, 0l);
1244: this .type = type;
1245: this .version = "";
1246: }
1247:
1248: public EncryptHeader(short type, String version) {
1249: this .type = type;
1250: this .version = version;
1251: }
1252:
1253: public void writeExternal(java.io.ObjectOutput out)
1254: throws IOException {
1255: out.writeShort(type);
1256: out.writeObject(version);
1257: }
1258:
1259: public void readExternal(java.io.ObjectInput in)
1260: throws IOException, ClassNotFoundException {
1261: type = in.readShort();
1262: version = (String) in.readObject();
1263: }
1264:
1265: public void writeTo(DataOutputStream out) throws IOException {
1266: out.writeShort(type);
1267: Util.writeString(version, out);
1268: out.writeBoolean(encrypt_entire_msg);
1269: }
1270:
1271: public void readFrom(DataInputStream in) throws IOException,
1272: IllegalAccessException, InstantiationException {
1273: type = in.readShort();
1274: version = Util.readString(in);
1275: encrypt_entire_msg = in.readBoolean();
1276: }
1277:
1278: public String toString() {
1279: return "ENCRYPT [type=" + type + " version=\"" + version
1280: + "\"]";
1281: }
1282:
1283: public long size() {
1284: long retval = Global.SHORT_SIZE + Global.BYTE_SIZE
1285: + Global.BYTE_SIZE;
1286: if (version != null)
1287: retval += version.length() + 2;
1288: return retval;
1289: }
1290:
1291: /*
1292: * (non-Javadoc)
1293: *
1294: * @see java.lang.Object#equals(java.lang.Object)
1295: */
1296: public boolean equals(Object obj) {
1297: if (obj instanceof EncryptHeader) {
1298: return ((((EncryptHeader) obj).getType() == type) && ((((EncryptHeader) obj)
1299: .getVersion().equals(version))));
1300: }
1301: return false;
1302: }
1303:
1304: /**
1305: * @return Returns the type.
1306: */
1307: protected short getType() {
1308: return type;
1309: }
1310:
1311: /**
1312: * @return Returns the version.
1313: */
1314: protected String getVersion() {
1315: return version;
1316: }
1317: }
1318: }
|