001: /**
002: *
003: * Licensed to the Apache Software Foundation (ASF) under one or more
004: * contributor license agreements. See the NOTICE file distributed with
005: * this work for additional information regarding copyright ownership.
006: * The ASF licenses this file to You under the Apache License, Version 2.0
007: * (the "License"); you may not use this file except in compliance with
008: * the License. You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing, software
013: * distributed under the License is distributed on an "AS IS" BASIS,
014: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015: * See the License for the specific language governing permissions and
016: * limitations under the License.
017: */package loanbroker;
018:
019: import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
020:
021: import org.apache.commons.logging.Log;
022: import org.apache.commons.logging.LogFactory;
023: import org.apache.servicemix.MessageExchangeListener;
024: import org.apache.servicemix.components.util.ComponentSupport;
025:
026: import javax.jbi.messaging.ExchangeStatus;
027: import javax.jbi.messaging.InOut;
028: import javax.jbi.messaging.MessageExchange;
029: import javax.jbi.messaging.MessagingException;
030: import javax.jbi.messaging.NormalizedMessage;
031: import javax.jbi.messaging.MessageExchange.Role;
032: import javax.jbi.servicedesc.ServiceEndpoint;
033: import javax.xml.namespace.QName;
034:
035: import java.util.ArrayList;
036: import java.util.Iterator;
037: import java.util.List;
038: import java.util.Map;
039:
040: public class LoanBroker extends ComponentSupport implements
041: MessageExchangeListener {
042:
043: private static final Log log = LogFactory.getLog(LoanBroker.class);
044:
045: public LoanBroker() {
046: super (new QName(Constants.LOANBROKER_NS,
047: Constants.LOANBROKER_SERVICE), "input");
048: }
049:
050: public void onMessageExchange(MessageExchange exchange)
051: throws MessagingException {
052: // Provider role
053: if (exchange.getRole() == Role.PROVIDER) {
054: processInputRequest(exchange);
055: // Consumer role
056: } else {
057: ServiceEndpoint ep = exchange.getEndpoint();
058: // Credit agency response
059: if (ep.getServiceName().getLocalPart().equals(
060: Constants.CREDITAGENCY_SERVICE)) {
061: processCreditAgencyResponse(exchange);
062: } else if (ep.getServiceName().getLocalPart().equals(
063: Constants.LENDERGATEWAY_SERVICE)) {
064: processLenderGatewayResponse(exchange);
065: } else {
066: processLoanQuote(exchange);
067: }
068: }
069: }
070:
071: private void processLoanQuote(MessageExchange exchange)
072: throws MessagingException {
073: log.info("Receiving loan quote");
074: // Get aggregation
075: String id = (String) getProperty(exchange,
076: Constants.PROPERTY_CORRELATIONID);
077: Aggregation ag = (Aggregation) aggregations.get(id);
078: // Get info from quote
079: LoanQuote q = new LoanQuote();
080: q.bank = exchange.getEndpoint().getServiceName().getLocalPart();
081: q.rate = (Double) getOutProperty(exchange,
082: Constants.PROPERTY_RATE);
083: done(exchange);
084: // Check if all quotes are received
085: synchronized (ag) {
086: ag.quotes.add(q);
087: if (ag.quotes.size() == ag.numbers) {
088: LoanQuote best = null;
089: for (Iterator iter = ag.quotes.iterator(); iter
090: .hasNext();) {
091: q = (LoanQuote) iter.next();
092: if (best == null
093: || q.rate.doubleValue() < best.rate
094: .doubleValue()) {
095: best = q;
096: }
097: }
098: NormalizedMessage response = ag.request.createMessage();
099: response
100: .setProperty(Constants.PROPERTY_RATE, best.rate);
101: response
102: .setProperty(Constants.PROPERTY_BANK, best.bank);
103: ag.request.setMessage(response, "out");
104: send(ag.request);
105: aggregations.remove(id);
106: }
107: }
108: }
109:
110: private void processLenderGatewayResponse(MessageExchange exchange)
111: throws MessagingException {
112: log.info("Receiving lender gateway response");
113: // Get aggregation
114: String id = (String) getProperty(exchange,
115: Constants.PROPERTY_CORRELATIONID);
116: Aggregation ag = (Aggregation) aggregations.get(id);
117: QName[] recipients = (QName[]) getOutProperty(exchange,
118: Constants.PROPERTY_RECIPIENTS);
119: ag.numbers = recipients.length;
120: for (int i = 0; i < recipients.length; i++) {
121: InOut inout = createInOutExchange(recipients[i], null, null);
122: inout.setProperty(Constants.PROPERTY_CORRELATIONID, id);
123: NormalizedMessage msg = inout.createMessage();
124: msg.setProperty(Constants.PROPERTY_SSN, ag.ssn);
125: msg.setProperty(Constants.PROPERTY_AMOUNT, ag.amount);
126: msg.setProperty(Constants.PROPERTY_DURATION, ag.duration);
127: msg.setProperty(Constants.PROPERTY_SCORE, ag.score);
128: msg.setProperty(Constants.PROPERTY_HISTORYLENGTH,
129: ag.hlength);
130: inout.setInMessage(msg);
131: send(inout);
132: }
133: done(exchange);
134: }
135:
136: private void processCreditAgencyResponse(MessageExchange exchange)
137: throws MessagingException {
138: log.info("Receiving credit agency response");
139: // Get aggregation
140: String id = (String) getProperty(exchange,
141: Constants.PROPERTY_CORRELATIONID);
142: Aggregation ag = (Aggregation) aggregations.get(id);
143: // Fill with infos
144: ag.score = (Integer) getOutProperty(exchange,
145: Constants.PROPERTY_SCORE);
146: ag.hlength = (Integer) getOutProperty(exchange,
147: Constants.PROPERTY_HISTORYLENGTH);
148: // Send to lender gateway
149: InOut inout = createInOutExchange(new QName(
150: Constants.LOANBROKER_NS,
151: Constants.LENDERGATEWAY_SERVICE), null, null);
152: inout.setProperty(Constants.PROPERTY_CORRELATIONID, id);
153: NormalizedMessage msg = inout.createMessage();
154: msg.setProperty(Constants.PROPERTY_SCORE, ag.score);
155: msg.setProperty(Constants.PROPERTY_HISTORYLENGTH, ag.hlength);
156: msg.setProperty(Constants.PROPERTY_AMOUNT, ag.amount);
157: inout.setInMessage(msg);
158: send(inout);
159: done(exchange);
160: }
161:
162: private void processInputRequest(MessageExchange exchange)
163: throws MessagingException {
164: if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
165: log.info("Receiving loan request");
166: // Create aggregation
167: String id = exchange.getExchangeId();
168: Aggregation ag = new Aggregation();
169: ag.request = exchange;
170: ag.ssn = (String) getInProperty(exchange,
171: Constants.PROPERTY_SSN);
172: ag.amount = (Double) getInProperty(exchange,
173: Constants.PROPERTY_AMOUNT);
174: ag.duration = (Integer) getInProperty(exchange,
175: Constants.PROPERTY_DURATION);
176: aggregations.put(id, ag);
177:
178: InOut inout = createInOutExchange(new QName(
179: Constants.LOANBROKER_NS,
180: Constants.CREDITAGENCY_SERVICE), null, null);
181: inout.setProperty(Constants.PROPERTY_CORRELATIONID, id);
182: NormalizedMessage msg = inout.createMessage();
183: msg.setProperty(Constants.PROPERTY_SSN, ag.ssn);
184: inout.setInMessage(msg);
185: send(inout);
186: }
187: }
188:
189: protected Object getProperty(MessageExchange me, String name) {
190: return me.getProperty(name);
191: }
192:
193: protected Object getInProperty(MessageExchange me, String name) {
194: return me.getMessage("in").getProperty(name);
195: }
196:
197: protected Object getOutProperty(MessageExchange me, String name) {
198: return me.getMessage("out").getProperty(name);
199: }
200:
201: private Map aggregations = new ConcurrentHashMap();
202:
203: public static class Aggregation {
204: public MessageExchange request;
205: public int numbers;
206: public String ssn;
207: public Double amount;
208: public Integer duration;
209: public Integer score;
210: public Integer hlength;
211: public List quotes = new ArrayList();
212: }
213:
214: public static class LoanQuote {
215: public String bank;
216: public Double rate;
217: }
218:
219: }
|