001: /*
002: *
003: *
004: * Copyright 1990-2007 Sun Microsystems, Inc. All Rights Reserved.
005: * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER
006: *
007: * This program is free software; you can redistribute it and/or
008: * modify it under the terms of the GNU General Public License version
009: * 2 only, as published by the Free Software Foundation.
010: *
011: * This program is distributed in the hope that it will be useful, but
012: * WITHOUT ANY WARRANTY; without even the implied warranty of
013: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
014: * General Public License version 2 for more details (a copy is
015: * included at /legal/license.txt).
016: *
017: * You should have received a copy of the GNU General Public License
018: * version 2 along with this work; if not, write to the Free Software
019: * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA
021: *
022: * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa
023: * Clara, CA 95054 or visit www.sun.com if you need additional
024: * information or have any questions.
025: */
026: package com.sun.kvem.jsr082.obex;
027:
028: import javax.obex.Operation;
029: import javax.obex.HeaderSet;
030: import javax.obex.ResponseCodes;
031: import java.io.InputStream;
032: import java.io.OutputStream;
033: import java.io.IOException;
034: import java.io.DataInputStream;
035: import java.io.DataOutputStream;
036:
037: /**
038: * The class implements client side of put/get operation.
039: */
040: final class ClientOperation implements Operation {
041:
042: /** Debug information, should be false for RR. */
043: private static final boolean DEBUG = false;
044:
045: private ClientSessionImpl stream;
046: private HeaderSetImpl recvHeaders;
047: private HeaderSetImpl sentHeaders;
048: private byte[] head;
049: private Object lock = new Object();
050:
051: private boolean inputStreamOpened;
052: private boolean inputStreamClosed;
053: private boolean outputStreamOpened;
054: private boolean outputStreamClosed;
055:
056: private boolean inputStreamEof;
057:
058: private boolean firstDataBlock = true;
059: private int openObjects = 1;
060:
061: private OperationInputStream is;
062: private OperationOutputStream os;
063:
064: /**
065: * True if this operation is get operation.
066: * Otherwise it is put operation.
067: */
068: private boolean isGet;
069:
070: /**
071: * Output stream finished, receiving data from input stream.
072: */
073: private boolean requestEnd;
074: private boolean operationEnd;
075: private boolean operationClosed;
076:
077: private boolean restartable = true;
078: private boolean restarting;
079:
080: /**
081: * Determines whether aborting of operation is in progress.
082: * If <code>true</code> any read/write calls are denied.
083: */
084: private boolean abortingOperation;
085:
086: ClientOperation(ClientSessionImpl stream,
087: HeaderSetImpl sendHeaders, boolean isGet)
088: throws IOException {
089: if (DEBUG) {
090: System.out
091: .println("clientOperation.constructor(): isGet = "
092: + isGet);
093: }
094: this .isGet = isGet;
095: head = new byte[] {
096: isGet ? (byte) ObexPacketStream.OPCODE_GET
097: : (byte) ObexPacketStream.OPCODE_PUT, 0, 0 };
098:
099: stream.operation = this ;
100: stream.isEof = false;
101: this .stream = stream;
102: this .sentHeaders = sendHeaders == null ? new HeaderSetImpl(
103: HeaderSetImpl.OWNER_CLIENT) : sendHeaders;
104: recvHeaders = new HeaderSetImpl(HeaderSetImpl.OWNER_CLIENT);
105: is = new OperationInputStream();
106: os = new OperationOutputStream();
107:
108: // starting send process, fill in send buffer
109: stream.packetBegin(head);
110: if (sendHeaders == null
111: || sendHeaders.getHeader(HeaderSet.TARGET) == null) {
112:
113: // early TARGET vs CONNECTION ID conflict check
114: stream.packetAddConnectionID(stream.getConnectionID(),
115: sendHeaders);
116: }
117: stream.packetAddAuthResponses();
118: stream.packetAddHeaders(sendHeaders);
119:
120: // if buffer is overflowed - begining to send packets
121: while (stream.challengesToSend
122: || !stream.queuedHeaders.isEmpty()) {
123: if (!packetExchange()) {
124: // Some headers may be lost if server early finish the
125: // operation.
126: return;
127: }
128: }
129: }
130:
131: /**
132: * Finish and send packet, received response, start new packet.
133: * @return packetType == OPCODE_CONTINUE.
134: */
135: private boolean packetExchange() throws IOException {
136: if (DEBUG) {
137: System.out.println("client: packetExchange()");
138: }
139: if (operationEnd) {
140: if (requestEnd && stream.shouldSendAuthResponse()
141: && restartOperation()) {
142: return true;
143: }
144: return false;
145: }
146: if (!requestEnd) {
147: // finish packet end send it
148: stream.packetEndStripConnID();
149:
150: // receive packet
151: stream.recvPacket();
152: operationEnd = stream.packetType != ObexPacketStream.OPCODE_CONTINUE;
153: synchronized (recvHeaders) {
154: stream.parsePacketHeaders(recvHeaders, 3);
155: }
156:
157: // check code
158: if (operationEnd) {
159: if (stream.shouldSendAuthResponse()
160: && restartOperation()) {
161: return true;
162: }
163: operationEnd = requestEnd = true;
164: return false;
165: }
166:
167: // begin new packet
168: stream.packetBegin(head);
169: stream.packetAddAuthResponses();
170: stream.packetAddHeaders(null);
171: return true;
172: }
173:
174: // requestEnd = true
175:
176: stream.parseEnd();
177: stream.sendPacket(head, -1, null, false);
178: stream.recvPacket();
179: operationEnd = stream.packetType != ObexPacketStream.OPCODE_CONTINUE;
180:
181: // check of errorcode should be done before after data parsing
182: stream.parsePacketDataBegin(recvHeaders, 3);
183: return true;
184: }
185:
186: private void requestEnd() throws IOException {
187: if (DEBUG) {
188: System.out.println("client: requestEnd()");
189: }
190: synchronized (lock) {
191: if (requestEnd) {
192: return;
193: }
194: requestEnd = true;
195: }
196: head[0] |= ObexPacketStream.OPCODE_FINAL;
197:
198: if (operationEnd) {
199: return;
200: }
201:
202: if (outputStreamOpened) {
203: boolean res = stream.packetEOFBody();
204: if (!res) { // error adding EOFB previous packet too long
205: if (!packetExchange()) {
206: return;
207: }
208: stream.packetEOFBody();
209: }
210: }
211:
212: stream.packetMarkFinal();
213: stream.packetEndStripConnID();
214: stream.recvPacket();
215: operationEnd = stream.packetType != ObexPacketStream.OPCODE_CONTINUE;
216:
217: if (!isGet) {
218: stream.parsePacketHeaders(recvHeaders, 3);
219: return;
220: }
221:
222: stream.parsePacketDataBegin(recvHeaders, 3);
223:
224: while (true) {
225: // special request to check data availability
226: int hasData = stream.parsePacketData(recvHeaders, null, 0,
227: 0);
228: if (hasData == 1 || stream.isEof)
229: break;
230:
231: if (stream.shouldSendAuthResponse() && restartOperation()) {
232: return;
233: }
234: if (!packetExchange()) {
235: return;
236: }
237: }
238: }
239:
240: private void notRestartable() {
241: restartable = false;
242: sentHeaders = null;
243: }
244:
245: private boolean restartOperation() throws IOException {
246: if (DEBUG) {
247: System.out.println("client: restartOperation()");
248: }
249: if (!restartable) {
250: return false;
251: }
252: HeaderSetImpl headers = sentHeaders;
253: notRestartable();
254: operationEnd = false;
255: boolean prevRequestEnd = requestEnd;
256: requestEnd = false;
257: head[0] = isGet ? (byte) ObexPacketStream.OPCODE_GET
258: : (byte) ObexPacketStream.OPCODE_PUT;
259:
260: recvHeaders = new HeaderSetImpl(HeaderSetImpl.OWNER_CLIENT);
261: stream.queuedHeaders.removeAllElements();
262: stream.isEof = false;
263:
264: // starting send process, fill in send buffer
265: stream.packetBegin(head);
266: stream.packetAddConnectionID(stream.getConnectionID(), headers);
267: stream.packetAddAuthResponses();
268: stream.packetAddHeaders(headers);
269:
270: // if buffer is overflowed - begining to send packets
271: while (!stream.queuedHeaders.isEmpty()) {
272: if (!packetExchange()) {
273: return true;
274: }
275: }
276: if (prevRequestEnd) {
277: requestEnd();
278: }
279: restarting = true;
280: return true;
281: }
282:
283: private void sendAbortPacket() throws IOException {
284: if (operationEnd) {
285: return;
286: }
287:
288: inputStreamClosed = true;
289: outputStreamClosed = true;
290: operationEnd = true;
291: requestEnd = true;
292: stream.queuedHeaders.removeAllElements();
293: stream
294: .sendPacket(ObexPacketStream.PACKET_ABORT, -1, null,
295: true);
296: stream.recvPacket();
297: stream.parsePacketHeaders(recvHeaders, 3);
298:
299: if (stream.packetType != ResponseCodes.OBEX_HTTP_OK) {
300: stream.brokenLink();
301: }
302: }
303:
304: public void abort() throws IOException {
305: abortingOperation = true;
306: synchronized (lock) {
307: if (DEBUG) {
308: System.out.println("client: abort()");
309: }
310: if (operationClosed) {
311: throw new IOException("operation closed");
312: }
313: try {
314: if (operationEnd) {
315: throw new IOException("operation already finished");
316: }
317: sendAbortPacket();
318: } finally {
319: operationClosed = true;
320: openObjects = 0;
321: stream.operation = null;
322: }
323: }
324: }
325:
326: public HeaderSet getReceivedHeaders() throws IOException {
327: synchronized (lock) {
328: if (DEBUG) {
329: System.out.println("client: getReceivedHeaders()");
330: }
331: if (operationClosed) {
332: throw new IOException("operation closed");
333: }
334:
335: HeaderSetImpl res = new HeaderSetImpl(recvHeaders);
336: res.packetType = ObexPacketStream
337: .validateStatus(res.packetType);
338: return res;
339: }
340: }
341:
342: public int getResponseCode() throws IOException {
343: synchronized (lock) {
344: if (DEBUG) {
345: System.out.println("client: getResponseCodes()");
346: }
347: if (operationClosed) {
348: throw new IOException("operation closed");
349: }
350:
351: requestEnd();
352:
353: inputStreamOpened = false;
354: outputStreamOpened = false;
355:
356: inputStreamClosed = true;
357: outputStreamClosed = true;
358:
359: openObjects = 1;
360:
361: return ObexPacketStream
362: .validateStatus(recvHeaders.packetType);
363: }
364: }
365:
366: public void sendHeaders(HeaderSet headers) throws IOException {
367: synchronized (lock) {
368: if (DEBUG) {
369: System.out.println("client: sendHeaders()");
370: }
371: if (operationClosed) {
372: throw new IOException("operation closed");
373: }
374: if (headers == null) {
375: throw new NullPointerException("null headerset");
376: }
377: if (!(headers instanceof HeaderSetImpl)) {
378: throw new IllegalArgumentException(
379: "wrong headerset class");
380: }
381: HeaderSetImpl headersImpl = (HeaderSetImpl) headers;
382: if (!headersImpl.isSendable()) {
383: throw new IllegalArgumentException(
384: "not created with createHeaderSet");
385: }
386: if (operationEnd) {
387: throw new IOException("operation finished");
388: }
389:
390: if (restartable) {
391: // store the headers to accumulated headers
392: sentHeaders.merge(headersImpl);
393: }
394:
395: stream.packetAddHeaders(headersImpl);
396:
397: if (requestEnd) {
398: return;
399: }
400:
401: if (!stream.queuedHeaders.isEmpty()) {
402: if (!packetExchange()) {
403: throw new IOException(
404: "server finished operation, not all headers sent");
405: }
406: }
407: }
408: }
409:
410: public String getEncoding() {
411: return null; // acording to docs
412: }
413:
414: public long getLength() {
415: Long res = (Long) recvHeaders.getHeader(HeaderSetImpl.LENGTH);
416: if (res == null) {
417: return -1;
418: }
419: return res.longValue();
420: }
421:
422: public String getType() {
423: return (String) recvHeaders.getHeader(HeaderSetImpl.TYPE);
424: }
425:
426: public DataOutputStream openDataOutputStream() throws IOException {
427: return new DataOutputStream(openOutputStream());
428: }
429:
430: public OutputStream openOutputStream() throws IOException {
431: synchronized (lock) {
432: if (DEBUG) {
433: System.out.println("client: openOutputStream()");
434: }
435: if (operationClosed) {
436: throw new IOException("operation closed");
437: }
438: if (outputStreamOpened) {
439: throw new IOException(
440: "no more output streams available");
441: }
442: if (requestEnd) {
443: throw new IOException("too late to open output stream");
444: }
445: outputStreamOpened = true;
446: openObjects++;
447: return os;
448: }
449: }
450:
451: public DataInputStream openDataInputStream() throws IOException {
452: return new DataInputStream(openInputStream());
453: }
454:
455: public InputStream openInputStream() throws IOException {
456: synchronized (lock) {
457: if (DEBUG) {
458: System.out.println("client: openInputStream()");
459: }
460: if (operationClosed) {
461: throw new IOException("operation closed");
462: }
463: if (inputStreamOpened) {
464: throw new IOException("no more input streams available");
465: }
466: inputStreamOpened = true;
467: openObjects++;
468: if (!isGet) {
469: return new FakeInputStream();
470: }
471:
472: // flush rest of headers and data
473: requestEnd();
474:
475: return is;
476: }
477: }
478:
479: private void terminate() throws IOException {
480: if (DEBUG) {
481: System.out.println("client: terminate() = "
482: + (openObjects - 1));
483: }
484: openObjects--;
485: if (openObjects != 0) {
486: return;
487: }
488:
489: // all closed what was opened.
490: sendAbortPacket();
491: stream.operation = null;
492: }
493:
494: public void close() throws IOException {
495: synchronized (lock) {
496: if (DEBUG) {
497: System.out.println("client: op.close()");
498: }
499: if (!operationClosed) {
500: operationClosed = true;
501: terminate();
502: }
503: }
504: }
505:
506: private class OperationOutputStream extends OutputStream {
507: OperationOutputStream() {
508: }
509:
510: public void write(int b) throws IOException {
511: write(new byte[] { (byte) b }, 0, 1);
512: }
513:
514: public void write(byte[] b, int offset, int len)
515: throws IOException {
516: int initialOffset = offset;
517: int initialLen = len;
518: boolean firstDataPacket = true;
519: synchronized (lock) {
520: if (DEBUG) {
521: // System.out.println("client: write()");
522: }
523: if (outputStreamClosed || requestEnd) {
524: throw new IOException("stream closed");
525: }
526: if (len < 0 || offset < 0 || offset + len > b.length) {
527: throw new ArrayIndexOutOfBoundsException();
528: }
529: while (len > 0) {
530: if (abortingOperation) {
531: throw new IOException("operation aborted");
532: }
533: int wr = stream.packetAddData(b, offset, len);
534: if (wr != len || firstDataBlock && firstDataPacket) {
535: firstDataPacket = false;
536: restarting = false;
537: if (!packetExchange()) {
538: // fix CR: when sending and closin the socket
539: if (!stream.shouldSendAuthResponse()) {
540: if (wr == 0) {
541: break;
542: }
543: } else {
544: throw new IOException(
545: "server rejected the data");
546: }
547: }
548: if (restarting) {
549: len = initialLen;
550: offset = initialOffset;
551: }
552: }
553: len -= wr;
554: offset += wr;
555: }
556: }
557: firstDataBlock = false;
558: notRestartable();
559: }
560:
561: public void flush() throws IOException {
562: synchronized (lock) {
563: if (DEBUG) {
564: System.out.println("client: flush()");
565: }
566:
567: if (outputStreamClosed || requestEnd) {
568: throw new IOException("stream closed");
569: }
570: if (stream.packetLength != 3) {
571: packetExchange();
572: }
573: }
574: }
575:
576: public void close() throws IOException {
577: synchronized (lock) {
578: if (DEBUG) {
579: System.out.println("client: os.close()");
580: }
581: if (!outputStreamClosed) {
582: outputStreamClosed = true;
583: requestEnd();
584: terminate();
585: }
586: }
587: }
588: }
589:
590: private class OperationInputStream extends InputStream {
591: OperationInputStream() {
592: }
593:
594: public int read() throws IOException {
595: byte[] b = new byte[1];
596: int len = read(b, 0, 1);
597: if (len == -1) {
598: return -1;
599: }
600: return b[0] & 0xFF;
601: }
602:
603: public int read(byte[] b, int offset, int len)
604: throws IOException {
605: synchronized (lock) {
606: if (DEBUG) {
607: // System.out.println("client: read()");
608: }
609: if (inputStreamClosed) {
610: throw new IOException("stream closed");
611: }
612: // Nullpointer exception thrown here
613: if (len < 0 || offset < 0 || offset + len > b.length) {
614: throw new ArrayIndexOutOfBoundsException();
615: }
616: if (len == 0) {
617: return 0;
618: }
619:
620: if (inputStreamEof) {
621: notRestartable();
622: return -1;
623: }
624: int result = 0;
625: while (true) {
626: if (abortingOperation) {
627: throw new IOException("operation aborted");
628: }
629: int rd = stream.parsePacketData(recvHeaders, b,
630: offset, len);
631: if (rd != 0) {
632: offset += rd;
633: len -= rd;
634: result += rd;
635: if (len == 0) {
636: notRestartable();
637: return result;
638: }
639: }
640:
641: // need more data, packet is finished
642:
643: // check if stream is finished
644: if (stream.isEof) {
645: // received END_OF_BODY
646: while (!operationEnd) {
647: // strange, no response code - waiting
648: stream.parseEnd();
649: stream.sendPacket(head, -1, null, false);
650: stream.recvPacket();
651: operationEnd = stream.packetType != ObexPacketStream.OPCODE_CONTINUE;
652: stream.parsePacketHeaders(recvHeaders, 3);
653: }
654:
655: inputStreamEof = true;
656: notRestartable();
657: return (result == 0) ? -1 : result;
658: }
659:
660: if (stream.packetType != ObexPacketStream.OPCODE_CONTINUE) {
661: throw new IOException(
662: "server errorcode received");
663: }
664: packetExchange();
665: }
666: }
667: }
668:
669: public void close() throws IOException {
670: synchronized (lock) {
671: if (DEBUG) {
672: System.out.println("client: is.close()");
673: }
674: if (inputStreamClosed) {
675: return;
676: }
677: inputStreamEof = false;
678:
679: // sending abort packet if operation not ended
680: try {
681: sendAbortPacket();
682: } catch (IOException e) {
683: // nothing, link should be marked already as broken
684: }
685:
686: inputStreamClosed = true;
687: terminate();
688: }
689: }
690: }
691:
692: private class FakeInputStream extends InputStream {
693: FakeInputStream() {
694: }
695:
696: public int read() throws IOException {
697: throw new IOException("not supported");
698: }
699:
700: public void close() throws IOException {
701: }
702: }
703: }
|