001: /***************************************************************
002: * This file is part of the [fleXive](R) project.
003: *
004: * Copyright (c) 1999-2008
005: * UCS - unique computing solutions gmbh (http://www.ucs.at)
006: * All rights reserved
007: *
008: * The [fleXive](R) project is free software; you can redistribute
009: * it and/or modify it under the terms of the GNU General Public
010: * License as published by the Free Software Foundation;
011: * either version 2 of the License, or (at your option) any
012: * later version.
013: *
014: * The GNU General Public License can be found at
015: * http://www.gnu.org/copyleft/gpl.html.
016: * A copy is found in the textfile GPL.txt and important notices to the
017: * license from the author are found in LICENSE.txt distributed with
018: * these libraries.
019: *
020: * This library is distributed in the hope that it will be useful,
021: * but WITHOUT ANY WARRANTY; without even the implied warranty of
022: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
023: * GNU General Public License for more details.
024: *
025: * For further information about UCS - unique computing solutions gmbh,
026: * please see the company website: http://www.ucs.at
027: *
028: * For further information about [fleXive](R), please see the
029: * project website: http://www.flexive.org
030: *
031: *
032: * This copyright notice MUST APPEAR in all copies of the file!
033: ***************************************************************/package com.flexive.shared.stream;
034:
035: import com.flexive.shared.CacheAdmin;
036: import com.flexive.shared.exceptions.FxStreamException;
037: import com.flexive.shared.value.BinaryDescriptor;
038: import com.flexive.stream.*;
039: import org.apache.commons.logging.Log;
040: import org.apache.commons.logging.LogFactory;
041:
042: import java.io.InputStream;
043: import java.io.OutputStream;
044: import java.net.Inet4Address;
045: import java.net.InetAddress;
046: import java.net.NetworkInterface;
047: import java.net.UnknownHostException;
048: import java.util.Enumeration;
049: import java.util.List;
050:
051: /**
052: * Utitlity class to access the StreamServer
053: *
054: * @author Markus Plesser (markus.plesser@flexive.com), UCS - unique computing solutions gmbh (http://www.ucs.at)
055: */
056: public class FxStreamUtils {
057:
058: private static final transient Log LOG = LogFactory
059: .getLog(FxStreamUtils.class);
060:
061: /**
062: * 5 minutes default time to live
063: */
064: public final static long DEFAULT_TTL = 5 * 60 * 1000;
065:
066: /**
067: * Upload a binary (using an OutputStream) to the StreamServer with a default time to live
068: *
069: * @param length expected length of the stream/binary
070: * @param stream the Stream containing the binary
071: * @return payload containing server side handle of the binary, mimeType and meta data
072: * @throws FxStreamException on errors
073: * @see FxStreamUtils#DEFAULT_TTL
074: */
075: public static BinaryUploadPayload uploadBinary(long length,
076: InputStream stream) throws FxStreamException {
077: return uploadBinary(length, stream, DEFAULT_TTL);
078: }
079:
080: /**
081: * Retrieve a StreamClient, local servers are preferred, remote only as fallback
082: *
083: * @return StreamClient
084: * @throws FxStreamException on errors
085: */
086: public static StreamClient getClient() throws FxStreamException {
087: List<ServerLocation> servers = CacheAdmin.getStreamServers();
088: try {
089: StreamClient client;
090: // client = StreamClientFactory.getRemoteClient(servers.get(0).getAddress(), servers.get(0).getPort());
091: client = StreamClientFactory.getClient(servers);
092: // System.out.println("StreamClient is " + (client.isLocal() ? "LOCAL" : "REMOTE"));
093: return client;
094: } catch (StreamException e) {
095: throw new FxStreamException(e);
096: }
097: }
098:
099: /**
100: * Upload a binary (using an OutputStream) to the StreamServer with a given time to live.
101: * Warning: if using a remote connection, this method will return a few miliseconds before
102: * all binary data is stored in the DB! Local connected clients will return *after* all
103: * data is stored. This is currently a 'feature' that might be fixed sometime.
104: *
105: * @param length expected length of the stream/binary
106: * @param stream the Stream containing the binary
107: * @param timeToLive time in miliseconds the binary is guaranteed to exist server side (will be removed once expired)
108: * @return payload containing server side handle of the binary, mimeType and meta data
109: * @throws FxStreamException on errors
110: */
111: public static BinaryUploadPayload uploadBinary(long length,
112: InputStream stream, long timeToLive)
113: throws FxStreamException {
114: StreamClient client = null;
115: try {
116: client = getClient();
117: // client = StreamClientFactory.getRemoteClient(servers.get(0).getAddress(), servers.get(0).getPort());
118: DataPacket<BinaryUploadPayload> req = new DataPacket<BinaryUploadPayload>(
119: new BinaryUploadPayload(length, timeToLive), true);
120: DataPacket<BinaryUploadPayload> resp = client.connect(req);
121: if (resp.getPayload().isServerError())
122: throw new FxStreamException("ex.stream.serverError",
123: resp.getPayload().getErrorMessage());
124: if (resp.isExpectStream())
125: client.sendStream(stream, length);
126: client.close();
127: client = null;
128: return resp.getPayload();
129: } catch (StreamException e) {
130: throw new FxStreamException(e);
131: } finally {
132: try {
133: if (client != null)
134: client.close();
135: } catch (StreamException e) {
136: //ignore
137: }
138: }
139: }
140:
141: /**
142: * Probe all network interfaces and return the most suited to run a StreamServer on.
143: * Preferred are interfaces that are not site local.
144: *
145: * @return best suited host to run a StreamServer
146: * @throws UnknownHostException on errors
147: */
148: public static InetAddress probeNetworkInterfaces()
149: throws UnknownHostException {
150: try {
151: Enumeration<NetworkInterface> nifs = NetworkInterface
152: .getNetworkInterfaces();
153: NetworkInterface nif;
154: InetAddress preferred = null, fallback = null;
155: while (nifs.hasMoreElements()) {
156: nif = nifs.nextElement();
157: if (LOG.isDebugEnabled())
158: LOG.debug("Probing " + nif.getDisplayName()
159: + " ...");
160: if (nif.getDisplayName().startsWith("vmnet"))
161: continue;
162: Enumeration<InetAddress> inas = nif.getInetAddresses();
163: while (inas.hasMoreElements()) {
164: InetAddress na = inas.nextElement();
165: if (LOG.isDebugEnabled())
166: LOG.debug("Probing " + nif.getDisplayName()
167: + na);
168: if (!(na instanceof Inet4Address))
169: continue;
170: if (!na.isLoopbackAddress() && na.isReachable(1000)) {
171: if (preferred == null
172: || (preferred.isSiteLocalAddress() && !na
173: .isSiteLocalAddress()))
174: preferred = na;
175: }
176: if (fallback == null && na.isLoopbackAddress())
177: fallback = na;
178: }
179: }
180:
181: if (LOG.isDebugEnabled())
182: LOG.debug("preferred: " + preferred + " fallback: "
183: + fallback);
184: if (preferred != null)
185: return preferred;
186: if (fallback != null)
187: return fallback;
188: return InetAddress.getLocalHost();
189: } catch (Exception e) {
190: return InetAddress.getLocalHost();
191: }
192: }
193:
194: public static void downloadBinary(BinaryDownloadCallback callback,
195: List<ServerLocation> server, OutputStream stream,
196: long binaryId, int binarySize) throws FxStreamException {
197: if (server == null || server.size() == 0 || stream == null
198: || (binarySize < 0 || binarySize > 3))
199: throw new FxStreamException(
200: "ex.stream.download.param.missing");
201: StreamClient client = null;
202: try {
203: client = getClient();
204: // client = StreamClientFactory.getRemoteClient(servers.get(0).getAddress(), servers.get(0).getPort());
205: DataPacket<BinaryDownloadPayload> req = new DataPacket<BinaryDownloadPayload>(
206: new BinaryDownloadPayload(binaryId, 1, 1,
207: binarySize), true, true);
208: DataPacket<BinaryDownloadPayload> resp = client
209: .connect(req);
210: if (resp.getPayload().isServerError())
211: throw new FxStreamException("ex.stream.serverError",
212: resp.getPayload().getErrorMessage());
213: if (callback != null) {
214: callback.setMimeType(resp.getPayload().getMimeType());
215: callback.setBinarySize(resp.getPayload().getDatasize());
216: }
217: client.receiveStream(stream);
218: client.close();
219: client = null;
220: } catch (StreamException e) {
221: throw new FxStreamException(e);
222: } finally {
223: try {
224: if (client != null)
225: client.close();
226: } catch (StreamException e) {
227: //ignore
228: }
229: }
230: }
231:
232: public static void downloadBinary(List<ServerLocation> server,
233: OutputStream stream, BinaryDescriptor descriptor)
234: throws FxStreamException {
235: if (server == null || server.size() == 0
236: || descriptor.getSize() <= 0 || stream == null)
237: throw new FxStreamException(
238: "ex.stream.download.param.missing");
239: StreamClient client = null;
240: try {
241: client = getClient();
242: // client = StreamClientFactory.getRemoteClient(servers.get(0).getAddress(), servers.get(0).getPort());
243: DataPacket<BinaryDownloadPayload> req = new DataPacket<BinaryDownloadPayload>(
244: new BinaryDownloadPayload(descriptor.getId(),
245: descriptor.getVersion(), descriptor
246: .getQuality()), true, true);
247: DataPacket<BinaryDownloadPayload> resp = client
248: .connect(req);
249: if (resp.getPayload().isServerError())
250: throw new FxStreamException("ex.stream.serverError",
251: resp.getPayload().getErrorMessage());
252: client.receiveStream(stream, descriptor.getSize());
253: client.close();
254: client = null;
255: } catch (StreamException e) {
256: throw new FxStreamException(e);
257: } finally {
258: try {
259: if (client != null)
260: client.close();
261: } catch (StreamException e) {
262: //ignore
263: }
264: }
265: }
266: }
|