01: package dalma.endpoints.jbi;
02:
03: import dalma.spi.port.MultiplexedEndPoint;
04:
05: import javax.jbi.messaging.MessageExchange;
06: import javax.jbi.messaging.DeliveryChannel;
07: import javax.jbi.messaging.MessagingException;
08: import java.util.logging.Level;
09:
10: /**
11: * @author Kohsuke Kawaguchi
12: */
13: public class JBIEndPoint extends
14: MultiplexedEndPoint<String, MessageExchange> {
15:
16: private final DeliveryChannel channel;
17:
18: private MessageExchangeHandler meHandler;
19:
20: private Thread receiverThread;
21:
22: public JBIEndPoint(String name, DeliveryChannel channel) {
23: super (name);
24: this .channel = channel;
25: this .receiverThread = new ReceiverThread(this , channel);
26: }
27:
28: protected String getKey(MessageExchange msg) {
29: return msg.getExchangeId();
30: }
31:
32: protected void onNewMessage(MessageExchange msg) {
33: MessageExchangeHandler h = meHandler;
34: if (h != null) {
35: try {
36: h.onNewMessage(msg);
37: } catch (Exception e) {
38: logger.log(Level.WARNING, e.getMessage(), e);
39: }
40: }
41: }
42:
43: protected String send(MessageExchange msg) {
44: try {
45: channel.send(msg);
46: return msg.getExchangeId();
47: } catch (MessagingException e) {
48: throw new JBIException(e);
49: }
50: }
51:
52: protected void start() {
53: receiverThread.start();
54: }
55:
56: protected void stop() {
57: receiverThread.interrupt();
58: try {
59: receiverThread.join();
60: } catch (InterruptedException e) {
61: // process the interruption later
62: Thread.currentThread().interrupt();
63: }
64: try {
65: channel.close();
66: } catch (MessagingException e) {
67: throw new JBIException(e);
68: }
69: }
70:
71: public MessageExchangeHandler getMessageExchangeHandler() {
72: return meHandler;
73: }
74:
75: public void setMessageExchangeHandler(
76: MessageExchangeHandler meHandler) {
77: this.meHandler = meHandler;
78: }
79: }
|