001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017: package org.apache.catalina.tribes.test;
018:
019: import java.io.IOException;
020: import java.nio.channels.SelectionKey;
021: import java.util.Iterator;
022: import java.nio.channels.Selector;
023: import org.apache.catalina.tribes.transport.nio.NioSender;
024: import org.apache.catalina.tribes.membership.MemberImpl;
025: import org.apache.catalina.tribes.io.ChannelData;
026: import org.apache.catalina.tribes.io.XByteBuffer;
027: import org.apache.catalina.tribes.Member;
028: import org.apache.catalina.tribes.Channel;
029:
030: /**
031: * <p>Title: </p>
032: *
033: * <p>Description: </p>
034: *
035: * <p>Company: </p>
036: *
037: * @author not attributable
038: * @version 1.0
039: */
040: public class TestNioSender {
041: private Selector selector = null;
042: private int counter = 0;
043: MemberImpl mbr;
044: private static int testOptions = Channel.SEND_OPTIONS_DEFAULT;
045:
046: public TestNioSender() {
047:
048: }
049:
050: public synchronized int inc() {
051: return ++counter;
052: }
053:
054: public synchronized ChannelData getMessage(Member mbr) {
055: String msg = new String("Thread-"
056: + Thread.currentThread().getName() + " Message:"
057: + inc());
058: ChannelData data = new ChannelData(true);
059: data.setMessage(new XByteBuffer(msg.getBytes(), false));
060: data.setAddress(mbr);
061:
062: return data;
063: }
064:
065: public void init() throws Exception {
066: selector = Selector.open();
067: mbr = new MemberImpl("localhost", 4444, 0);
068: NioSender sender = new NioSender();
069: sender.setDestination(mbr);
070: sender.setDirectBuffer(true);
071: sender.setSelector(selector);
072: sender.setMessage(XByteBuffer
073: .createDataPackage(getMessage(mbr)));
074: sender.connect();
075: }
076:
077: public void run() {
078: while (true) {
079:
080: int selectedKeys = 0;
081: try {
082: selectedKeys = selector.select(100);
083: // if ( selectedKeys == 0 ) {
084: // System.out.println("No registered interests. Sleeping for a second.");
085: // Thread.sleep(100);
086: } catch (Exception e) {
087: e.printStackTrace();
088: continue;
089: }
090:
091: if (selectedKeys == 0) {
092: continue;
093: }
094:
095: Iterator it = selector.selectedKeys().iterator();
096: while (it.hasNext()) {
097: SelectionKey sk = (SelectionKey) it.next();
098: it.remove();
099: try {
100: int readyOps = sk.readyOps();
101: sk.interestOps(sk.interestOps() & ~readyOps);
102: NioSender sender = (NioSender) sk.attachment();
103: if (sender
104: .process(
105: sk,
106: (testOptions & Channel.SEND_OPTIONS_USE_ACK) == Channel.SEND_OPTIONS_USE_ACK)) {
107: System.out
108: .println("Message completed for handler:"
109: + sender);
110: Thread.currentThread().sleep(2000);
111: sender.reset();
112: sender.setMessage(XByteBuffer
113: .createDataPackage(getMessage(mbr)));
114: }
115:
116: } catch (Throwable t) {
117: t.printStackTrace();
118: return;
119: }
120: }
121: }
122: }
123:
124: public static void main(String[] args) throws Exception {
125: TestNioSender sender = new TestNioSender();
126: sender.init();
127: sender.run();
128: }
129: }
|