001: // ICPReceiver.java
002: // $Id: ICPReceiver.java,v 1.7 2000/08/16 21:38:04 ylafon Exp $
003: // (c) COPYRIGHT MIT and INRIA, 1996.
004: // please first read the full copyright statement in file COPYRIGHT.HTML
005:
006: package org.w3c.www.protocol.http.icp;
007:
008: import java.io.IOException;
009: import java.io.PrintStream;
010:
011: import java.net.DatagramPacket;
012: import java.net.DatagramSocket;
013: import java.net.SocketException;
014: import java.net.URL;
015:
016: import org.w3c.www.protocol.http.HttpManager;
017: import org.w3c.www.protocol.http.PropRequestFilterException;
018:
019: import org.w3c.www.protocol.http.cache.CacheFilter;
020:
021: class ICPReceiver extends Thread implements ICP {
022: private final static boolean debug = false;
023: /**
024: * The default waiter queue size.
025: */
026: public static final int DEFAULT_QUEUE_SIZE = 4;
027: /**
028: * The default received datagram packet size.
029: */
030: public static final int DEFAULT_PACKET_SIZE = 512;
031: /**
032: * The port number this receiver listens on.
033: */
034: protected int port = -1;
035: /**
036: * Our socket to receive packets.
037: */
038: DatagramSocket socket = null;
039: /**
040: * Our current request identifier.
041: */
042: protected int nextid = 1;
043: /**
044: * The CacheFilter we use for answering queries.
045: */
046: CacheFilter cache = null;
047: /**
048: * The queue of objects waiting for an ICP reply.
049: */
050: ICPWaiter queue[] = null;
051: /**
052: * The ICP filter we are working with.
053: */
054: protected ICPFilter filter = null;
055:
056: protected DatagramSocket getSocket() {
057: return socket;
058: }
059:
060: /**
061: * Create a new ICP query instance.
062: * @param url The URL to be queried.
063: */
064:
065: protected ICPQuery createQuery(URL url) {
066: int rid = -1;
067: synchronized (this ) {
068: rid = nextid++;
069: }
070: return new ICPQuery(rid, url);
071: }
072:
073: /**
074: * Add a waiter for on the given request identifier.
075: * @param waiter The ICPWaiter instance for that request.
076: */
077:
078: protected synchronized void addReplyWaiter(ICPWaiter waiter) {
079: if (queue == null) {
080: queue = new ICPWaiter[DEFAULT_QUEUE_SIZE];
081: queue[0] = waiter;
082: } else {
083: // Look for a free slot:
084: for (int i = 0; i < queue.length; i++) {
085: if (queue[i] == null) {
086: queue[i] = waiter;
087: return;
088: }
089: }
090: // Resize queue:
091: ICPWaiter nqueue[] = new ICPWaiter[queue.length << 1];
092: System.arraycopy(queue, 0, nqueue, 0, queue.length);
093: nqueue[queue.length] = waiter;
094: queue = nqueue;
095: return;
096: }
097: }
098:
099: /**
100: * Remove the given waiter from the waiters queue.
101: * This waiter has completed his job, he doesn't care about what happens
102: * next at the ICP level.
103: * @param waiter The wauter to remove from our queue.
104: */
105:
106: protected synchronized void removeReplyWaiter(ICPWaiter waiter) {
107: if (queue != null) {
108: for (int i = 0; i < queue.length; i++) {
109: if (queue[i] == waiter) {
110: queue[i] = null;
111: return;
112: }
113: }
114: }
115: }
116:
117: /**
118: * Handle the given ICP reply.
119: */
120:
121: protected synchronized void handleReply(ICPReply reply)
122: throws ICPProtocolException {
123: int id = reply.getIdentifier();
124: // Do we have someone waiting for this reply ?
125: for (int i = 0; i < queue.length; i++) {
126: if (queue[i] == null)
127: continue;
128: if (queue[i].getIdentifier() == id) {
129: queue[i].notifyReply(reply);
130: return;
131: }
132: }
133: // No one was waiting for this packet throw it away:
134: if (debug)
135: System.out.println("icp: discarding reply " + id);
136: return;
137: }
138:
139: /**
140: * Handle the given ICP query.
141: * @param p The DatagramPacket that wraps up the query.
142: */
143:
144: protected synchronized void handleQuery(ICPQuery query)
145: throws ICPProtocolException {
146: // Process the query:
147: if (debug)
148: System.out.println("icp[" + port + "]: query for "
149: + query.getURL() + " from "
150: + query.getSenderAddress() + "/"
151: + query.getSenderPort());
152: // FIXME boolean hit = cache.hasResource(query.getURL().toExternalForm());
153: boolean hit = false;
154: ICPReply reply = new ICPReply(query.getIdentifier(),
155: hit ? ICP_OP_HIT : ICP_OP_MISS);
156: // Emit the reply:
157: ICPSender sender = filter.getSender(query.getSenderAddress(),
158: query.getSenderPort());
159: if (sender != null) {
160: if (debug)
161: System.out.println("icp[" + port + "]: reports "
162: + reply.getOpcode() + " for "
163: + reply.getIdentifier() + " to " + sender);
164: sender.send(reply);
165: } else {
166: if (debug)
167: System.out.println("icp[" + port
168: + "]: couldn't locate peer at "
169: + query.getSenderAddress() + "/"
170: + query.getSenderPort());
171: }
172: }
173:
174: /**
175: * Run the ICP manager.
176: */
177:
178: public void run() {
179: byte pbuf[] = new byte[DEFAULT_PACKET_SIZE];
180: readloop: while (true) {
181: // Receive next ICP packet:
182: DatagramPacket p = new DatagramPacket(pbuf, pbuf.length);
183: try {
184: socket.receive(p);
185: } catch (IOException ex) {
186: ex.printStackTrace();
187: continue readloop;
188: }
189: // Is this a query or a reply ?
190: try {
191: ICPMessage m = ICPMessage.parse(p);
192: if (m instanceof ICPQuery)
193: handleQuery((ICPQuery) m);
194: else
195: handleReply((ICPReply) m);
196: } catch (ICPProtocolException ex) {
197: ex.printStackTrace();
198: }
199: }
200: }
201:
202: ICPReceiver(HttpManager manager, ICPFilter filter, int port)
203: throws SocketException, PropRequestFilterException {
204: // Thread setting:
205: setName("ICP-Receiver");
206: setDaemon(true);
207: // Initialize instance variables:
208: this .port = port;
209: this .filter = filter;
210: this .socket = new DatagramSocket(port);
211: // Get a pointer to the cache filter os that environment:
212: try {
213: Class c = Class
214: .forName("org.w3c.www.protocol.http.cache.CacheFilter");
215: cache = (CacheFilter) manager.getGlobalFilter(c);
216: } catch (Exception ex) {
217: }
218: if (cache == null)
219: throw new PropRequestFilterException("no cache filter.");
220: start();
221: }
222: }
|