001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017:
018: package org.apache.jk.common;
019:
020: import java.net.URLEncoder;
021: import java.io.File;
022: import java.io.FileOutputStream;
023: import java.io.IOException;
024: import javax.management.ObjectName;
025:
026: import org.apache.jk.core.JkHandler;
027: import org.apache.jk.core.Msg;
028: import org.apache.jk.core.MsgContext;
029: import org.apache.jk.core.JkChannel;
030: import org.apache.jk.core.WorkerEnv;
031: import org.apache.coyote.Request;
032: import org.apache.coyote.RequestGroupInfo;
033: import org.apache.coyote.RequestInfo;
034: import org.apache.tomcat.util.modeler.Registry;
035: import org.apache.tomcat.util.threads.ThreadPool;
036: import org.apache.tomcat.util.threads.ThreadPoolRunnable;
037:
038: /** Pass messages using unix domain sockets.
039: *
040: * @author Costin Manolache
041: */
042: public class ChannelUn extends JniHandler implements JkChannel {
043: static final int CH_OPEN = 4;
044: static final int CH_CLOSE = 5;
045: static final int CH_READ = 6;
046: static final int CH_WRITE = 7;
047:
048: String file;
049: ThreadPool tp = ThreadPool.createThreadPool(true);
050:
051: /* ==================== Tcp socket options ==================== */
052:
053: public ThreadPool getThreadPool() {
054: return tp;
055: }
056:
057: public void setFile(String f) {
058: file = f;
059: }
060:
061: public String getFile() {
062: return file;
063: }
064:
065: /* ==================== ==================== */
066: int socketNote = 1;
067: int isNote = 2;
068: int osNote = 3;
069:
070: int localId = 0;
071:
072: public void init() throws IOException {
073: if (file == null) {
074: log.debug("No file, disabling unix channel");
075: return;
076: //throw new IOException( "No file for the unix socket channel");
077: }
078: if (wEnv != null && wEnv.getLocalId() != 0) {
079: localId = wEnv.getLocalId();
080: }
081:
082: if (localId != 0) {
083: file = file + localId;
084: }
085: File socketFile = new File(file);
086: if (!socketFile.isAbsolute()) {
087: String home = wEnv.getJkHome();
088: if (home == null) {
089: log.debug("No jkhome");
090: } else {
091: File homef = new File(home);
092: socketFile = new File(homef, file);
093: log.debug("Making the file absolute " + socketFile);
094: }
095: }
096:
097: if (!socketFile.exists()) {
098: try {
099: FileOutputStream fos = new FileOutputStream(socketFile);
100: fos.write(1);
101: fos.close();
102: } catch (Throwable t) {
103: log
104: .error("Attempting to create the file failed, disabling channel"
105: + socketFile);
106: return;
107: }
108: }
109: // The socket file cannot be removed ...
110: if (!socketFile.delete()) {
111: log.error("Can't remove socket file " + socketFile);
112: return;
113: }
114:
115: super .initNative("channel.un:" + file);
116:
117: if (apr == null || !apr.isLoaded()) {
118: log.debug("Apr is not available, disabling unix channel ");
119: apr = null;
120: return;
121: }
122:
123: // Set properties and call init.
124: setNativeAttribute("file", file);
125: // unixListenSocket=apr.unSocketListen( file, 10 );
126:
127: setNativeAttribute("listen", "10");
128: // setNativeAttribute( "debug", "10" );
129:
130: // Initialize the thread pool and execution chain
131: if (next == null && wEnv != null) {
132: if (nextName != null)
133: setNext(wEnv.getHandler(nextName));
134: if (next == null)
135: next = wEnv.getHandler("dispatch");
136: if (next == null)
137: next = wEnv.getHandler("request");
138: }
139:
140: super .initJkComponent();
141: JMXRequestNote = wEnv.getNoteId(WorkerEnv.ENDPOINT_NOTE,
142: "requestNote");
143: // Run a thread that will accept connections.
144: if (this .domain != null) {
145: try {
146: tpOName = new ObjectName(domain
147: + ":type=ThreadPool,name=" + getChannelName());
148:
149: Registry.getRegistry(null, null).registerComponent(tp,
150: tpOName, null);
151:
152: rgOName = new ObjectName(domain
153: + ":type=GlobalRequestProcessor,name="
154: + getChannelName());
155: Registry.getRegistry(null, null).registerComponent(
156: global, rgOName, null);
157: } catch (Exception e) {
158: log.error("Can't register threadpool");
159: }
160: }
161: tp.start();
162: AprAcceptor acceptAjp = new AprAcceptor(this );
163: tp.runIt(acceptAjp);
164: log.info("JK: listening on unix socket: " + file);
165:
166: }
167:
168: ObjectName tpOName;
169: ObjectName rgOName;
170: RequestGroupInfo global = new RequestGroupInfo();
171: int count = 0;
172: int JMXRequestNote;
173:
174: public void start() throws IOException {
175: }
176:
177: public void destroy() throws IOException {
178: if (apr == null)
179: return;
180: try {
181: if (tp != null)
182: tp.shutdown();
183:
184: //apr.unSocketClose( unixListenSocket,3);
185: super .destroyJkComponent();
186:
187: if (tpOName != null) {
188: Registry.getRegistry(null, null).unregisterComponent(
189: tpOName);
190: }
191: if (rgOName != null) {
192: Registry.getRegistry(null, null).unregisterComponent(
193: rgOName);
194: }
195: } catch (Exception e) {
196: log.error("Error in destroy", e);
197: }
198: }
199:
200: public void registerRequest(Request req, MsgContext ep, int count) {
201: if (this .domain != null) {
202: try {
203:
204: RequestInfo rp = req.getRequestProcessor();
205: rp.setGlobalProcessor(global);
206: ObjectName roname = new ObjectName(getDomain()
207: + ":type=RequestProcessor,worker="
208: + getChannelName() + ",name=JkRequest" + count);
209: ep.setNote(JMXRequestNote, roname);
210:
211: Registry.getRegistry(null, null).registerComponent(rp,
212: roname, null);
213: } catch (Exception ex) {
214: log.warn("Error registering request");
215: }
216: }
217: }
218:
219: /** Open a connection - since we're listening that will block in
220: accept
221: */
222: public int open(MsgContext ep) throws IOException {
223: // Will associate a jk_endpoint with ep and call open() on it.
224: // jk_channel_un will accept a connection and set the socket info
225: // in the endpoint. MsgContext will represent an active connection.
226: return super .nativeDispatch(ep.getMsg(0), ep, CH_OPEN, 1);
227: }
228:
229: public void close(MsgContext ep) throws IOException {
230: super .nativeDispatch(ep.getMsg(0), ep, CH_CLOSE, 1);
231: }
232:
233: public int send(Msg msg, MsgContext ep) throws IOException {
234: return super .nativeDispatch(msg, ep, CH_WRITE, 0);
235: }
236:
237: public int receive(Msg msg, MsgContext ep) throws IOException {
238: int rc = super .nativeDispatch(msg, ep, CH_READ, 1);
239:
240: if (rc != 0) {
241: log.error("receive error: " + rc, new Throwable());
242: return -1;
243: }
244:
245: msg.processHeader();
246:
247: if (log.isDebugEnabled())
248: log.debug("receive: total read = " + msg.getLen());
249:
250: return msg.getLen();
251: }
252:
253: public int flush(Msg msg, MsgContext ep) throws IOException {
254: return OK;
255: }
256:
257: public boolean isSameAddress(MsgContext ep) {
258: return false; // Not supporting shutdown on this channel.
259: }
260:
261: boolean running = true;
262:
263: /** Accept incoming connections, dispatch to the thread pool
264: */
265: void acceptConnections() {
266: if (apr == null)
267: return;
268:
269: if (log.isDebugEnabled())
270: log.debug("Accepting ajp connections on " + file);
271:
272: while (running) {
273: try {
274: MsgContext ep = this .createMsgContext();
275:
276: // blocking - opening a server connection.
277: int status = this .open(ep);
278: if (status != 0 && status != 2) {
279: log.error("Error acceptin connection on " + file);
280: break;
281: }
282:
283: // if( log.isDebugEnabled() )
284: // log.debug("Accepted ajp connections ");
285:
286: AprConnection ajpConn = new AprConnection(this , ep);
287: tp.runIt(ajpConn);
288: } catch (Exception ex) {
289: ex.printStackTrace();
290: }
291: }
292: }
293:
294: /** Process a single ajp connection.
295: */
296: void processConnection(MsgContext ep) {
297: if (log.isDebugEnabled())
298: log.debug("New ajp connection ");
299: try {
300: MsgAjp recv = new MsgAjp();
301: while (running) {
302: int res = this .receive(recv, ep);
303: if (res < 0) {
304: // EOS
305: break;
306: }
307: ep.setType(0);
308: log.debug("Process msg ");
309: int status = next.invoke(recv, ep);
310: }
311: if (log.isDebugEnabled())
312: log.debug("Closing un channel");
313: try {
314: Request req = (Request) ep.getRequest();
315: if (req != null) {
316: ObjectName roname = (ObjectName) ep
317: .getNote(JMXRequestNote);
318: if (roname != null) {
319: Registry.getRegistry(null, null)
320: .unregisterComponent(roname);
321: }
322: req.getRequestProcessor().setGlobalProcessor(null);
323: }
324: } catch (Exception ee) {
325: log.error("Error, releasing connection", ee);
326: }
327: this .close(ep);
328: } catch (Exception ex) {
329: ex.printStackTrace();
330: }
331: }
332:
333: public int invoke(Msg msg, MsgContext ep) throws IOException {
334: int type = ep.getType();
335:
336: switch (type) {
337: case JkHandler.HANDLE_RECEIVE_PACKET:
338: return receive(msg, ep);
339: case JkHandler.HANDLE_SEND_PACKET:
340: return send(msg, ep);
341: case JkHandler.HANDLE_FLUSH:
342: return flush(msg, ep);
343: }
344:
345: // return next.invoke( msg, ep );
346: return OK;
347: }
348:
349: public String getChannelName() {
350: String encodedAddr = "";
351: String address = file;
352: if (address != null) {
353: encodedAddr = "" + address;
354: if (encodedAddr.startsWith("/"))
355: encodedAddr = encodedAddr.substring(1);
356: encodedAddr = URLEncoder.encode(encodedAddr);
357: }
358: return ("jk-" + encodedAddr);
359: }
360:
361: private static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory
362: .getLog(ChannelUn.class);
363: }
364:
365: class AprAcceptor implements ThreadPoolRunnable {
366: ChannelUn wajp;
367:
368: AprAcceptor(ChannelUn wajp) {
369: this .wajp = wajp;
370: }
371:
372: public Object[] getInitData() {
373: return null;
374: }
375:
376: public void runIt(Object thD[]) {
377: wajp.acceptConnections();
378: }
379: }
380:
381: class AprConnection implements ThreadPoolRunnable {
382: ChannelUn wajp;
383: MsgContext ep;
384:
385: AprConnection(ChannelUn wajp, MsgContext ep) {
386: this .wajp = wajp;
387: this .ep = ep;
388: }
389:
390: public Object[] getInitData() {
391: return null;
392: }
393:
394: public void runIt(Object perTh[]) {
395: wajp.processConnection(ep);
396: }
397: }
|