001: /***************************************************************
002: * This file is part of the [fleXive](R) project.
003: *
004: * Copyright (c) 1999-2008
005: * UCS - unique computing solutions gmbh (http://www.ucs.at)
006: * All rights reserved
007: *
008: * The [fleXive](R) project is free software; you can redistribute
009: * it and/or modify it under the terms of the GNU General Public
010: * License as published by the Free Software Foundation;
011: * either version 2 of the License, or (at your option) any
012: * later version.
013: *
014: * The GNU General Public License can be found at
015: * http://www.gnu.org/copyleft/gpl.html.
016: * A copy is found in the textfile GPL.txt and important notices to the
017: * license from the author are found in LICENSE.txt distributed with
018: * these libraries.
019: *
020: * This library is distributed in the hope that it will be useful,
021: * but WITHOUT ANY WARRANTY; without even the implied warranty of
022: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
023: * GNU General Public License for more details.
024: *
025: * For further information about UCS - unique computing solutions gmbh,
026: * please see the company website: http://www.ucs.at
027: *
028: * For further information about [fleXive](R), please see the
029: * project website: http://www.flexive.org
030: *
031: *
032: * This copyright notice MUST APPEAR in all copies of the file!
033: ***************************************************************/package com.flexive.core.stream;
034:
035: import com.flexive.core.Database;
036: import com.flexive.core.DatabaseConst;
037: import com.flexive.shared.stream.BinaryUploadPayload;
038: import com.flexive.stream.DataPacket;
039: import com.flexive.stream.StreamException;
040: import com.flexive.stream.StreamProtocol;
041: import org.apache.commons.lang.RandomStringUtils;
042: import org.apache.commons.logging.Log;
043: import org.apache.commons.logging.LogFactory;
044:
045: import java.io.IOException;
046: import java.io.PipedInputStream;
047: import java.io.PipedOutputStream;
048: import java.nio.ByteBuffer;
049: import java.sql.Connection;
050: import java.sql.PreparedStatement;
051: import java.sql.SQLException;
052: import java.sql.Timestamp;
053: import java.util.Date;
054:
055: /**
056: * StreamServer protocol for receiving a binary stream
057: *
058: * @author Markus Plesser (markus.plesser@flexive.com), UCS - unique computing solutions gmbh (http://www.ucs.at)
059: */
060: public class BinaryUploadProtocol extends
061: StreamProtocol<BinaryUploadPayload> implements Runnable {
062:
063: protected static final transient Log LOG = LogFactory
064: .getLog(BinaryUploadProtocol.class);
065:
066: private long timeToLive = 0;
067: private String handle = null;
068: private long count = 0;
069: private int division = -1;
070: private long rcvTime = 0;
071: private long expectedLength = 0;
072: private boolean protoInit = false;
073: private boolean rcvStarted = false;
074: private Thread rcvThread = null;
075:
076: PipedInputStream pin = null;
077: PipedOutputStream pout = null;
078:
079: public BinaryUploadProtocol() {
080: super (BinaryUploadPayload.class);
081: }
082:
083: @Override
084: public StreamProtocol<BinaryUploadPayload> getInstance() {
085: return new BinaryUploadProtocol();
086: }
087:
088: @Override
089: public boolean canHandle(DataPacket<BinaryUploadPayload> dataPacket)
090: throws StreamException {
091: return true;
092: }
093:
094: @Override
095: public DataPacket<BinaryUploadPayload> processPacket(
096: DataPacket<BinaryUploadPayload> dataPacket)
097: throws StreamException {
098: if (dataPacket.isExpectResponse() && !this .protoInit) {
099: this .protoInit = true;
100: this .timeToLive = dataPacket.getPayload().getTimeToLive();
101: this .expectedLength = dataPacket.getPayload()
102: .getExpectedLength();
103: if (LOG.isDebugEnabled())
104: LOG.debug("Receive started at "
105: + new Date(System.currentTimeMillis()));
106: this .handle = RandomStringUtils.randomAlphanumeric(32);
107: this .division = dataPacket.getPayload().getDivision();
108: return new DataPacket<BinaryUploadPayload>(
109: new BinaryUploadPayload(handle), false, true);
110: } else {
111: if (LOG.isDebugEnabled())
112: LOG.debug("so finished ...");
113: cleanup();
114: }
115: return null;
116: }
117:
118: /**
119: * When an object implementing interface <code>Runnable</code> is used
120: * to create a thread, starting the thread causes the object's
121: * <code>run</code> method to be called in that separately executing
122: * thread.
123: * <p/>
124: * The general contract of the method <code>run</code> is that it may
125: * take any action whatsoever.
126: *
127: * @see Thread#run()
128: */
129: public void run() {
130: if (LOG.isDebugEnabled())
131: LOG.debug("thread started");
132: try {
133: createBlob();
134: } catch (Exception e) {
135: e.printStackTrace();
136: }
137: if (LOG.isDebugEnabled())
138: LOG.debug("thread finished");
139: }
140:
141: /**
142: * Create an entry in the transit table
143: *
144: * @throws SQLException on errors
145: * @throws IOException on errors
146: */
147: private void createBlob() throws SQLException, IOException {
148: //see: http://bugs.mysql.com/bug.php?id=7745
149: //default max allowed packet size is 1M
150: //info about longblob: http://dev.mysql.com/doc/refman/5.0/en/blob.html
151:
152: long time;
153: Connection con = null;
154: PreparedStatement ps = null;
155: try {
156: con = Database.getDbConnection(division);
157: ps = con
158: .prepareStatement("INSERT INTO "
159: + DatabaseConst.TBL_BINARY_TRANSIT
160: + " (BKEY,FBLOB,TFER_DONE,EXPIRE) VALUES(?,?,FALSE,?)");
161: ps.setString(1, handle);
162: ps.setBinaryStream(2, pin, (int) expectedLength);
163: ps.setLong(3, System.currentTimeMillis() + timeToLive);
164: time = System.currentTimeMillis();
165: ps.executeUpdate();
166: if (LOG.isDebugEnabled())
167: LOG.debug("Stored " + count + " bytes in "
168: + (System.currentTimeMillis() - time)
169: + "[ms] in DB");
170: try {
171: pin.close();
172: } catch (IOException e) {
173: e.printStackTrace();
174: }
175: } finally {
176: Database.closeObjects(BinaryUploadProtocol.class, con, ps);
177: }
178: }
179:
180: /**
181: * Mark the blob as received, set the size and generate meta data if applicable
182: *
183: * @param size size transferred
184: * @throws SQLException on errors
185: */
186: private void markReceived(long size) throws SQLException {
187: Connection con = null;
188: PreparedStatement ps = null;
189: try {
190: con = Database.getDbConnection(division);
191:
192: ps = con.prepareStatement("UPDATE "
193: + DatabaseConst.TBL_BINARY_TRANSIT
194: + " SET TFER_DONE=?, BLOBSIZE=? WHERE BKEY=?");
195: ps.setBoolean(1, true);
196: ps.setLong(2, size);
197: ps.setString(3, handle);
198: ps.executeUpdate();
199: } finally {
200: Database.closeObjects(BinaryUploadProtocol.class, con, ps);
201: }
202: }
203:
204: /**
205: * {@inheritDoc}
206: */
207: @Override
208: public synchronized boolean receiveStream(ByteBuffer buffer)
209: throws IOException {
210: if (!buffer.hasRemaining()) {
211: //this can only happen on remote clients
212: if (LOG.isDebugEnabled())
213: LOG.debug("aborting (empty)");
214: return false;
215: }
216: if (!rcvStarted) {
217: rcvStarted = true;
218: if (LOG.isDebugEnabled())
219: LOG.debug("(internal serverside) receive start");
220: pin = new PipedInputStream();
221: pout = new PipedOutputStream(pin);
222: rcvThread = new Thread(this );
223: rcvThread.setDaemon(true);
224: rcvThread.start();
225: rcvTime = System.currentTimeMillis();
226: }
227: if (LOG.isDebugEnabled()
228: && count + buffer.remaining() > expectedLength) {
229: LOG.debug("poss. overflow: pos=" + buffer.position()
230: + " lim=" + buffer.limit() + " cap="
231: + buffer.capacity());
232: LOG
233: .debug("Curr count: "
234: + count
235: + " count+rem="
236: + (count + buffer.remaining() + " delta:" + ((count + buffer
237: .remaining()) - expectedLength)));
238: }
239: count += buffer.remaining();
240: pout.write(buffer.array(), buffer.position(), buffer
241: .remaining());
242: // System.out.println("Received: "+count+"/"+expectedLength+". expecting more");
243: buffer.clear();
244: if (expectedLength > 0 && count >= expectedLength) {
245: if (LOG.isDebugEnabled())
246: LOG.debug("aborting");
247: return false;
248: }
249: return true;
250: }
251:
252: @Override
253: public synchronized void cleanup() {
254: if (pout == null) {
255: if (LOG.isDebugEnabled())
256: LOG.debug("skipping cleanup");
257: return;
258: }
259: try {
260: pout.close();
261: pout = null;
262: } catch (IOException e) {
263: e.printStackTrace();
264: }
265: try {
266: rcvThread.join();
267: } catch (InterruptedException e) {
268: e.printStackTrace();
269: }
270: try {
271: markReceived(count);
272: } catch (SQLException e) {
273: e.printStackTrace();
274: }
275: if (LOG.isDebugEnabled()) {
276: LOG.debug("Received " + count + " bytes in "
277: + (System.currentTimeMillis() - rcvTime) + "[ms]");
278: LOG
279: .debug("===================================================");
280: LOG.debug("=== Finished binary receive. Total length: "
281: + count);
282: LOG.debug("=== Handle was: " + handle);
283: LOG
284: .debug("===================================================");
285: }
286: }
287: }
|