001: package org.apache.synapse.transport.amqp;
002:
003: import java.util.HashMap;
004: import java.util.Iterator;
005: import java.util.List;
006: import java.util.Map;
007:
008: import org.apache.axiom.om.OMElement;
009: import org.apache.axis2.AxisFault;
010: import org.apache.axis2.addressing.EndpointReference;
011: import org.apache.axis2.context.ConfigurationContext;
012: import org.apache.axis2.description.AxisService;
013: import org.apache.axis2.description.Parameter;
014: import org.apache.axis2.description.ParameterIncludeImpl;
015: import org.apache.axis2.description.TransportInDescription;
016: import org.apache.commons.logging.Log;
017: import org.apache.commons.logging.LogFactory;
018: import org.apache.qpidity.nclient.Client;
019: import org.apache.qpidity.nclient.Connection;
020: import org.apache.synapse.transport.base.AbstractTransportListener;
021: import org.apache.synapse.transport.base.BaseUtils;
022: import org.apache.synapse.transport.jms.JMSConnectionFactory;
023: import org.apache.synapse.transport.jms.JMSConstants;
024: import org.apache.synapse.transport.jms.JMSUtils;
025:
026: public class AMQPListener extends AbstractTransportListener {
027: public static final String TRANSPORT_NAME = "AMQP";
028: private static final Log log = LogFactory
029: .getLog(AMQPListener.class);
030:
031: /** A Map containing the AMQP connections managed by this, keyed by name */
032: private Map<String, AMQPConnection> connections = new HashMap<String, AMQPConnection>();
033: /** A Map of service name to the AMQP EPR addresses */
034: private Map serviceNameToEPRMap = new HashMap();
035:
036: @Override
037: public void init(ConfigurationContext cfgCtx,
038: TransportInDescription transportIn) throws AxisFault {
039: setTransportName(TRANSPORT_NAME);
040: super .init(cfgCtx, transportIn);
041: loadConnectionDefinitions(transportIn);
042:
043: if (connections.isEmpty()) {
044: log
045: .warn("No AMQP connections are defined. Cannot listen on AMQP");
046: return;
047: }
048:
049: log.info("AMQP Transport Receiver/Listener initialized...");
050: }
051:
052: @Override
053: public void start() throws AxisFault {
054: for (String conName : connections.keySet()) {
055: AMQPConnection conDef = connections.get(conName);
056: conDef.start();
057: }
058: super .start();
059: }
060:
061: // Need to clean up the sessions as well
062: @Override
063: public void stop() throws AxisFault {
064: for (String conName : connections.keySet()) {
065: AMQPConnection connection = connections.get(conName);
066: try {
067: connection.stop();
068: } catch (Exception e) {
069: throw new AMQPSynapseException(
070: "Error creating a connection to the broker", e);
071: }
072: }
073: super .stop();
074: }
075:
076: @Override
077: protected void startListeningForService(AxisService service) {
078: if (service.getName().startsWith("__")) {
079: return;
080: }
081:
082: AMQPConnection con = getConnectionFactory(service);
083: if (con == null) {
084: String msg = "Service "
085: + service.getName()
086: + " does not specify"
087: + "an AMQP connection or refers to an invalid connection. "
088: + "This service is being marked as faulty and will not be "
089: + "available over the AMQP transport";
090: log.warn(msg);
091: BaseUtils.markServiceAsFaulty(service.getName(), msg,
092: service.getAxisConfiguration());
093: return;
094: }
095:
096: // compute service EPR and keep for later use
097: List<AMQPBinding> bindings = AMQPUtils
098: .getBindingsForService(service);
099:
100: serviceNameToEPRMap.put(service.getName(), URIParser.getEPR(
101: bindings, con.getUrl()));
102:
103: log.info("Starting to listen for service " + service.getName());
104:
105: // create bindings for the service
106: }
107:
108: @Override
109: protected void stopListeningForService(AxisService service) {
110: // TODO Auto-generated method stub
111:
112: }
113:
114: public EndpointReference[] getEPRsForService(String serviceName,
115: String ip) throws AxisFault {
116: //Strip out the operation name
117: if (serviceName.indexOf('/') != -1) {
118: serviceName = serviceName.substring(0, serviceName
119: .indexOf('/'));
120: }
121: return new EndpointReference[] { new EndpointReference(
122: (String) serviceNameToEPRMap.get(serviceName)) };
123: }
124:
125: /**
126: * Create an AMQP Connection instances for the definitions in the transport listener,
127: * and add these map keyed by name
128: *
129: * @param transprtIn the transport-in description for AMQP
130: */
131: private void loadConnectionDefinitions(
132: TransportInDescription transprtIn) {
133:
134: // iterate through all defined connection definitions
135: Iterator conIter = transprtIn.getParameters().iterator();
136:
137: while (conIter.hasNext()) {
138: Parameter conParams = (Parameter) conIter.next();
139:
140: ParameterIncludeImpl pi = new ParameterIncludeImpl();
141: AMQPConnection conDef = new AMQPConnection();
142: try {
143: pi.deserializeParameters((OMElement) conParams
144: .getValue());
145: } catch (AxisFault axisFault) {
146: log.error(
147: "Error reading parameters for AMQP Connection definitions"
148: + conParams.getName(), axisFault);
149: }
150: conDef.setName((String) conParams.getValue());
151:
152: Iterator params = pi.getParameters().iterator();
153: while (params.hasNext()) {
154:
155: Parameter p = (Parameter) params.next();
156:
157: if (AMQPConstants.CONNECTION_URL_PARAM.equals(p
158: .getName())) {
159: conDef.setUrl((String) p.getValue());
160: } else if (AMQPConstants.EXCHANGE_NAME_PARAM.equals(p
161: .getName())) {
162: conDef.setExchangeName((String) p.getValue());
163: } else if (AMQPConstants.EXCHANGE_TYPE_PARAM.equals(p
164: .getName())) {
165: conDef.setExchangeType((String) p.getValue());
166: }
167: }
168:
169: connections.put(conDef.getName(), conDef);
170: }
171: }
172:
173: /**
174: * Return the connection for this service. If this service
175: * refers to an invalid connection or defaults to a non-existent default
176: * connection, this returns null
177: *
178: * @param service the AxisService
179: * @return the AMQPConnection to be used, or null if reference is invalid
180: */
181: private AMQPConnection getConnectionFactory(AxisService service) {
182: Parameter conNameParam = service
183: .getParameter(AMQPConstants.CONNECTION_NAME_PARAM);
184: Parameter conURLParam = service
185: .getParameter(AMQPConstants.CONNECTION_URL_PARAM);
186:
187: // validate connection factory name (specified or default)
188: if (conNameParam != null) {
189: String conFac = (String) conNameParam.getValue();
190: if (connections.containsKey(conFac)) {
191: return (AMQPConnection) connections.get(conFac);
192: } else {
193: return null;
194: }
195:
196: // Next see if service defines it's own connection
197: } else if (conURLParam != null) {
198: AMQPConnection con = new AMQPConnection();
199: con.setUrl((String) conURLParam.getValue());
200: con.start();
201: connections.put(service.getName(), con);
202: return con;
203:
204: // Next see if there is a default defined in axis2.xml
205: } else if (connections
206: .containsKey(AMQPConstants.DEFAULT_CONNECTION)) {
207: return (AMQPConnection) connections
208: .get(AMQPConstants.DEFAULT_CONNECTION);
209:
210: } else {
211: return null;
212: }
213: }
214: }
|