001: /*
002: * Copyright 1999-2004 The Apache Software Foundation
003: *
004: * Licensed under the Apache License, Version 2.0 (the "License");
005: * you may not use this file except in compliance with the License.
006: * You may obtain a copy of the License at
007: *
008: * http://www.apache.org/licenses/LICENSE-2.0
009: *
010: * Unless required by applicable law or agreed to in writing, software
011: * distributed under the License is distributed on an "AS IS" BASIS,
012: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013: * See the License for the specific language governing permissions and
014: * limitations under the License.
015: */
016:
017: package org.apache.jk.common;
018:
019: import java.net.URLEncoder;
020: import java.io.File;
021: import java.io.FileOutputStream;
022: import java.io.IOException;
023: import javax.management.ObjectName;
024:
025: import org.apache.commons.modeler.Registry;
026: import org.apache.jk.core.JkHandler;
027: import org.apache.jk.core.Msg;
028: import org.apache.jk.core.MsgContext;
029: import org.apache.jk.core.JkChannel;
030: import org.apache.jk.core.WorkerEnv;
031: import org.apache.coyote.Request;
032: import org.apache.coyote.RequestGroupInfo;
033: import org.apache.coyote.RequestInfo;
034: import org.apache.tomcat.util.threads.ThreadPool;
035: import org.apache.tomcat.util.threads.ThreadPoolRunnable;
036:
037: /** Pass messages using unix domain sockets.
038: *
039: * @author Costin Manolache
040: */
041: public class ChannelUn extends JniHandler implements JkChannel {
042: static final int CH_OPEN = 4;
043: static final int CH_CLOSE = 5;
044: static final int CH_READ = 6;
045: static final int CH_WRITE = 7;
046:
047: String file;
048: ThreadPool tp = ThreadPool.createThreadPool(true);
049:
050: /* ==================== Tcp socket options ==================== */
051:
052: public ThreadPool getThreadPool() {
053: return tp;
054: }
055:
056: public void setFile(String f) {
057: file = f;
058: }
059:
060: public String getFile() {
061: return file;
062: }
063:
064: /* ==================== ==================== */
065: int socketNote = 1;
066: int isNote = 2;
067: int osNote = 3;
068:
069: int localId = 0;
070:
071: public void init() throws IOException {
072: if (file == null) {
073: log.debug("No file, disabling unix channel");
074: return;
075: //throw new IOException( "No file for the unix socket channel");
076: }
077: if (wEnv != null && wEnv.getLocalId() != 0) {
078: localId = wEnv.getLocalId();
079: }
080:
081: if (localId != 0) {
082: file = file + localId;
083: }
084: File socketFile = new File(file);
085: if (!socketFile.isAbsolute()) {
086: String home = wEnv.getJkHome();
087: if (home == null) {
088: log.debug("No jkhome");
089: } else {
090: File homef = new File(home);
091: socketFile = new File(homef, file);
092: log.debug("Making the file absolute " + socketFile);
093: }
094: }
095:
096: if (!socketFile.exists()) {
097: try {
098: FileOutputStream fos = new FileOutputStream(socketFile);
099: fos.write(1);
100: fos.close();
101: } catch (Throwable t) {
102: log
103: .error("Attempting to create the file failed, disabling channel"
104: + socketFile);
105: return;
106: }
107: }
108: // The socket file cannot be removed ...
109: if (!socketFile.delete()) {
110: log.error("Can't remove socket file " + socketFile);
111: return;
112: }
113:
114: super .initNative("channel.un:" + file);
115:
116: if (apr == null || !apr.isLoaded()) {
117: log.debug("Apr is not available, disabling unix channel ");
118: apr = null;
119: return;
120: }
121:
122: // Set properties and call init.
123: setNativeAttribute("file", file);
124: // unixListenSocket=apr.unSocketListen( file, 10 );
125:
126: setNativeAttribute("listen", "10");
127: // setNativeAttribute( "debug", "10" );
128:
129: // Initialize the thread pool and execution chain
130: if (next == null && wEnv != null) {
131: if (nextName != null)
132: setNext(wEnv.getHandler(nextName));
133: if (next == null)
134: next = wEnv.getHandler("dispatch");
135: if (next == null)
136: next = wEnv.getHandler("request");
137: }
138:
139: super .initJkComponent();
140: JMXRequestNote = wEnv.getNoteId(WorkerEnv.ENDPOINT_NOTE,
141: "requestNote");
142: // Run a thread that will accept connections.
143: if (this .domain != null) {
144: try {
145: tpOName = new ObjectName(domain
146: + ":type=ThreadPool,name=" + getChannelName());
147:
148: Registry.getRegistry(null, null).registerComponent(tp,
149: tpOName, null);
150:
151: rgOName = new ObjectName(domain
152: + ":type=GlobalRequestProcessor,name="
153: + getChannelName());
154: Registry.getRegistry(null, null).registerComponent(
155: global, rgOName, null);
156: } catch (Exception e) {
157: log.error("Can't register threadpool");
158: }
159: }
160: tp.start();
161: AprAcceptor acceptAjp = new AprAcceptor(this );
162: tp.runIt(acceptAjp);
163: log.info("JK: listening on unix socket: " + file);
164:
165: }
166:
167: ObjectName tpOName;
168: ObjectName rgOName;
169: RequestGroupInfo global = new RequestGroupInfo();
170: int count = 0;
171: int JMXRequestNote;
172:
173: public void start() throws IOException {
174: }
175:
176: public void destroy() throws IOException {
177: if (apr == null)
178: return;
179: try {
180: if (tp != null)
181: tp.shutdown();
182:
183: //apr.unSocketClose( unixListenSocket,3);
184: super .destroyJkComponent();
185:
186: if (tpOName != null) {
187: Registry.getRegistry().unregisterComponent(tpOName);
188: }
189: if (rgOName != null) {
190: Registry.getRegistry().unregisterComponent(rgOName);
191: }
192: } catch (Exception e) {
193: log.error("Error in destroy", e);
194: }
195: }
196:
197: public void registerRequest(Request req, MsgContext ep, int count) {
198: if (this .domain != null) {
199: try {
200:
201: RequestInfo rp = req.getRequestProcessor();
202: rp.setGlobalProcessor(global);
203: ObjectName roname = new ObjectName(getDomain()
204: + ":type=RequestProcessor,worker="
205: + getChannelName() + ",name=JkRequest" + count);
206: ep.setNote(JMXRequestNote, roname);
207:
208: Registry.getRegistry().registerComponent(rp, roname,
209: null);
210: } catch (Exception ex) {
211: log.warn("Error registering request");
212: }
213: }
214: }
215:
216: /** Open a connection - since we're listening that will block in
217: accept
218: */
219: public int open(MsgContext ep) throws IOException {
220: // Will associate a jk_endpoint with ep and call open() on it.
221: // jk_channel_un will accept a connection and set the socket info
222: // in the endpoint. MsgContext will represent an active connection.
223: return super .nativeDispatch(ep.getMsg(0), ep, CH_OPEN, 1);
224: }
225:
226: public void close(MsgContext ep) throws IOException {
227: super .nativeDispatch(ep.getMsg(0), ep, CH_CLOSE, 1);
228: }
229:
230: public int send(Msg msg, MsgContext ep) throws IOException {
231: return super .nativeDispatch(msg, ep, CH_WRITE, 0);
232: }
233:
234: public int receive(Msg msg, MsgContext ep) throws IOException {
235: int rc = super .nativeDispatch(msg, ep, CH_READ, 1);
236:
237: if (rc != 0) {
238: log.error("receive error: " + rc, new Throwable());
239: return -1;
240: }
241:
242: msg.processHeader();
243:
244: if (log.isDebugEnabled())
245: log.debug("receive: total read = " + msg.getLen());
246:
247: return msg.getLen();
248: }
249:
250: public int flush(Msg msg, MsgContext ep) throws IOException {
251: return OK;
252: }
253:
254: public boolean isSameAddress(MsgContext ep) {
255: return false; // Not supporting shutdown on this channel.
256: }
257:
258: boolean running = true;
259:
260: /** Accept incoming connections, dispatch to the thread pool
261: */
262: void acceptConnections() {
263: if (apr == null)
264: return;
265:
266: if (log.isDebugEnabled())
267: log.debug("Accepting ajp connections on " + file);
268:
269: while (running) {
270: try {
271: MsgContext ep = this .createMsgContext();
272:
273: // blocking - opening a server connection.
274: int status = this .open(ep);
275: if (status != 0 && status != 2) {
276: log.error("Error acceptin connection on " + file);
277: break;
278: }
279:
280: // if( log.isDebugEnabled() )
281: // log.debug("Accepted ajp connections ");
282:
283: AprConnection ajpConn = new AprConnection(this , ep);
284: tp.runIt(ajpConn);
285: } catch (Exception ex) {
286: ex.printStackTrace();
287: }
288: }
289: }
290:
291: /** Process a single ajp connection.
292: */
293: void processConnection(MsgContext ep) {
294: if (log.isDebugEnabled())
295: log.debug("New ajp connection ");
296: try {
297: MsgAjp recv = new MsgAjp();
298: while (running) {
299: int res = this .receive(recv, ep);
300: if (res < 0) {
301: // EOS
302: break;
303: }
304: ep.setType(0);
305: log.debug("Process msg ");
306: int status = next.invoke(recv, ep);
307: }
308: if (log.isDebugEnabled())
309: log.debug("Closing un channel");
310: try {
311: Request req = (Request) ep.getRequest();
312: if (req != null) {
313: ObjectName roname = (ObjectName) ep
314: .getNote(JMXRequestNote);
315: Registry.getRegistry().unregisterComponent(roname);
316: req.getRequestProcessor().setGlobalProcessor(null);
317: }
318: } catch (Exception ee) {
319: log.error("Error, releasing connection", ee);
320: }
321: this .close(ep);
322: } catch (Exception ex) {
323: ex.printStackTrace();
324: }
325: }
326:
327: public int invoke(Msg msg, MsgContext ep) throws IOException {
328: int type = ep.getType();
329:
330: switch (type) {
331: case JkHandler.HANDLE_RECEIVE_PACKET:
332: return receive(msg, ep);
333: case JkHandler.HANDLE_SEND_PACKET:
334: return send(msg, ep);
335: case JkHandler.HANDLE_FLUSH:
336: return flush(msg, ep);
337: }
338:
339: // return next.invoke( msg, ep );
340: return OK;
341: }
342:
343: public String getChannelName() {
344: String encodedAddr = "";
345: String address = file;
346: if (address != null) {
347: encodedAddr = "" + address;
348: if (encodedAddr.startsWith("/"))
349: encodedAddr = encodedAddr.substring(1);
350: encodedAddr = URLEncoder.encode(encodedAddr);
351: }
352: return ("jk-" + encodedAddr);
353: }
354:
355: private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory
356: .getLog(ChannelUn.class);
357: }
358:
359: class AprAcceptor implements ThreadPoolRunnable {
360: ChannelUn wajp;
361:
362: AprAcceptor(ChannelUn wajp) {
363: this .wajp = wajp;
364: }
365:
366: public Object[] getInitData() {
367: return null;
368: }
369:
370: public void runIt(Object thD[]) {
371: wajp.acceptConnections();
372: }
373: }
374:
375: class AprConnection implements ThreadPoolRunnable {
376: ChannelUn wajp;
377: MsgContext ep;
378:
379: AprConnection(ChannelUn wajp, MsgContext ep) {
380: this .wajp = wajp;
381: this .ep = ep;
382: }
383:
384: public Object[] getInitData() {
385: return null;
386: }
387:
388: public void runIt(Object perTh[]) {
389: wajp.processConnection(ep);
390: }
391: }
|