001: package org.sakaiproject.search.index.impl;
002:
003: import java.io.ByteArrayInputStream;
004: import java.io.File;
005: import java.io.FileInputStream;
006: import java.io.FileOutputStream;
007: import java.io.IOException;
008: import java.io.InputStream;
009: import java.util.HashMap;
010: import java.util.Iterator;
011: import java.util.List;
012: import java.util.Map;
013: import java.util.zip.ZipEntry;
014: import java.util.zip.ZipInputStream;
015: import java.util.zip.ZipOutputStream;
016:
017: import org.apache.commons.logging.Log;
018: import org.apache.commons.logging.LogFactory;
019: import org.sakaiproject.search.api.SearchService;
020: import org.sakaiproject.search.index.SegmentInfo;
021:
022: public class ClusterSegmentsStorage {
023:
024: private static final String PACKFILE = "packet";
025:
026: private static final Log log = LogFactory
027: .getLog(ClusterSegmentsStorage.class);
028:
029: private String searchIndexDirectory;
030:
031: private boolean debug = false;
032:
033: private boolean localStructuredStorage = false;
034:
035: private JDBCClusterIndexStore clusterIndexStore;
036:
037: private SearchService searchService;
038:
039: public ClusterSegmentsStorage(SearchService searchService,
040: String searchIndexDirectory,
041: JDBCClusterIndexStore clusterIndexStore,
042: boolean localStructuredStorage, boolean debug) {
043: this .localStructuredStorage = localStructuredStorage;
044: this .clusterIndexStore = clusterIndexStore;
045: this .searchIndexDirectory = searchIndexDirectory;
046: this .debug = debug;
047: this .searchService = searchService;
048: }
049:
050: /**
051: * unpack a segment from a zip
052: *
053: * @param addsi
054: * @param packetStream
055: * @param version
056: */
057: protected void unpackSegment(SegmentInfo addsi,
058: InputStream packetStream, long version) throws IOException {
059: log
060: .debug("================================Starting Unpack Segment==============================");
061: ZipInputStream zin = new ZipInputStream(packetStream);
062: ZipEntry zipEntry = null;
063: FileOutputStream fout = null;
064: try {
065: File loc = addsi.getSegmentLocation();
066: boolean locationExists = false;
067: File unpackBase = new File(searchIndexDirectory);
068: if (loc.exists()) {
069: locationExists = true;
070: unpackBase = new File(searchIndexDirectory, "unpack");
071: }
072: byte[] buffer = new byte[4096];
073: while ((zipEntry = zin.getNextEntry()) != null) {
074:
075: long ts = zipEntry.getTime();
076: // the zip entry needs to be a full path from the
077: // searchIndexDirectory... hence this is correct
078:
079: File f = new File(unpackBase, zipEntry.getName());
080: if (log.isDebugEnabled())
081: log.debug(" Unpack " + f.getAbsolutePath());
082: f.getParentFile().mkdirs();
083:
084: fout = new FileOutputStream(f);
085: int len;
086: while ((len = zin.read(buffer)) > 0) {
087: fout.write(buffer, 0, len);
088: }
089: zin.closeEntry();
090: fout.close();
091: f.setLastModified(ts);
092: }
093:
094: if (locationExists) {
095: Map<String, File> moved = new HashMap<String, File>();
096: moveAll(new File(unpackBase, loc.getName()), loc, moved);
097: deleteAll(unpackBase);
098: // unfortunately we have to remove the files befor the reload
099: // otherwise the checksums will fail.
100: deleteSome(loc, moved);
101: // force a reload before we delete the files,
102: // since this is in the locked thread, this node will reload
103: searchService.reload();
104:
105: }
106:
107: try {
108: addsi.checkSegmentValidity(searchService
109: .hasDiagnostics(), "Unpack Segment");
110: } catch (Exception ex) {
111: try {
112: addsi.checkSegmentValidity(true,
113: "Unpack Segment Failed");
114: } catch (Exception e) {
115:
116: }
117: throw new RuntimeException("Segment " + addsi.getName()
118: + " is corrupted ");
119: }
120:
121: addsi.setVersion(version);
122: addsi.setCreated();
123:
124: } finally {
125: try {
126: fout.close();
127: } catch (Exception ex) {
128: }
129: }
130: log
131: .debug("================================Done Unpack Segment==============================");
132:
133: }
134:
135: /**
136: * @param log2
137: * @param moved
138: */
139: private void deleteSome(File f, Map<String, File> moved) {
140: if (f.isDirectory()) {
141: File[] fs = f.listFiles();
142: for (int i = 0; i < fs.length; i++) {
143: deleteSome(fs[i], moved);
144: }
145: if (moved.get(f.getPath()) == null) {
146: f.delete();
147: log.debug(" deleted " + f.getPath());
148: }
149: } else {
150: if (moved.get(f.getPath()) == null) {
151: f.delete();
152: log.debug(" deleted " + f.getPath());
153: }
154: }
155:
156: }
157:
158: /**
159: * @param file
160: */
161: private void moveAll(File src, File dest, Map<String, File> moved) {
162:
163: if (src.isDirectory()) {
164: File[] fs = src.listFiles();
165: for (int i = 0; i < fs.length; i++) {
166: moveAll(fs[i], new File(dest, fs[i].getName()), moved);
167: }
168: } else {
169: if (dest.exists()) {
170: dest.delete();
171: } else {
172: File p = dest.getParentFile();
173: if (!p.exists()) {
174: p.mkdirs();
175: }
176: }
177: src.renameTo(dest);
178: log.debug(" renamed " + src.getPath() + " to "
179: + dest.getPath());
180: }
181: moved.put(dest.getPath(), dest);
182: }
183:
184: /**
185: * @param loc
186: */
187: private void deleteAll(File f) {
188: if (f.isDirectory()) {
189: File[] fs = f.listFiles();
190: for (int i = 0; i < fs.length; i++) {
191: deleteAll(fs[i]);
192: }
193: f.delete();
194: log.debug(" deleted " + f.getPath());
195: } else {
196: f.delete();
197: log.debug(" deleted " + f.getPath());
198: }
199: }
200:
201: /**
202: * unpack a segment from a zip
203: *
204: * @param addsi
205: * @param packetStream
206: * @param version
207: */
208: protected void unpackPatch(InputStream packetStream)
209: throws IOException {
210: log
211: .debug("================================Start Unpack Patch==============================");
212: ZipInputStream zin = new ZipInputStream(packetStream);
213: ZipEntry zipEntry = null;
214: FileOutputStream fout = null;
215: try {
216: byte[] buffer = new byte[4096];
217: while ((zipEntry = zin.getNextEntry()) != null) {
218:
219: long ts = zipEntry.getTime();
220: // the zip index name is the full path from the
221: // searchIndexDirectory
222: File f = new File(searchIndexDirectory, zipEntry
223: .getName());
224: if (log.isDebugEnabled())
225: log.debug(" Unpack "
226: + f.getAbsolutePath());
227: f.getParentFile().mkdirs();
228: fout = new FileOutputStream(f);
229:
230: int len;
231: while ((len = zin.read(buffer)) > 0) {
232: fout.write(buffer, 0, len);
233: }
234: zin.closeEntry();
235: fout.close();
236: f.setLastModified(ts);
237: }
238:
239: } finally {
240: try {
241: fout.close();
242: } catch (Exception ex) {
243: }
244: }
245: log
246: .debug("================================Done Unpack Patch==============================");
247:
248: }
249:
250: /**
251: * pack a segment into the zip
252: *
253: * @param addsi
254: * @return
255: * @throws IOException
256: */
257: protected File packSegment(SegmentInfo addsi, long newVersion)
258: throws IOException {
259:
260: log
261: .debug("================================Start Pack Segment==============================");
262: // just prior to packing a segment we can say its created
263: addsi.setCreated();
264:
265: File tmpFile = new File(searchIndexDirectory, PACKFILE
266: + String.valueOf(System.currentTimeMillis()) + ".zip");
267: ZipOutputStream zout = new ZipOutputStream(
268: new FileOutputStream(tmpFile));
269: addsi.setTimeStamp(newVersion);
270:
271: byte[] buffer = new byte[4096];
272: addFile(addsi.getSegmentLocation(), zout, buffer, 0);
273: zout.close();
274: // touch the version
275:
276: try {
277: if (log.isDebugEnabled())
278: log.debug(" Packed Name[" + tmpFile.getName()
279: + "]length[" + tmpFile.length() + "][" + addsi
280: + "]");
281: } catch (Exception e) {
282: e.printStackTrace();
283: }
284: log
285: .debug("================================Done Pack Segment==============================");
286: return tmpFile;
287: }
288:
289: /**
290: * pack a segment into the zip
291: *
292: * @param addsi
293: * @return
294: * @throws IOException
295: */
296: protected File packPatch() throws IOException {
297: log
298: .debug("================================Start Pack Patch==============================");
299:
300: File tmpFile = new File(searchIndexDirectory, PACKFILE
301: + String.valueOf(System.currentTimeMillis()) + ".zip");
302: ZipOutputStream zout = new ZipOutputStream(
303: new FileOutputStream(tmpFile));
304: byte[] buffer = new byte[4096];
305:
306: ZipEntry ze = new ZipEntry("lastpatchmarker");
307: ze.setTime(System.currentTimeMillis());
308: zout.putNextEntry(ze);
309: try {
310: ByteArrayInputStream fin = new ByteArrayInputStream(
311: "--PATCH MARKER--".getBytes());
312: try {
313: int len = 0;
314: while ((len = fin.read(buffer)) > 0) {
315: zout.write(buffer, 0, len);
316: }
317: } finally {
318: fin.close();
319: }
320: } finally {
321: zout.closeEntry();
322: }
323: // itertate over all segments present locally
324:
325: List l = clusterIndexStore.getLocalSegments();
326: for (Iterator li = l.iterator(); li.hasNext();) {
327: SegmentInfoImpl sgi = (SegmentInfoImpl) li.next();
328: if (sgi.isCreated()) { // Only add segment locations that are created
329: File f = sgi.getSegmentLocation();
330: addFile(f, zout, buffer, sgi.getVersion());
331: }
332: }
333: zout.close();
334: log
335: .debug("================================Done Pack Patch==============================");
336:
337: return tmpFile;
338: }
339:
340: /**
341: * add a file to the zout stream
342: *
343: * @param f
344: * @param zout
345: * @param buffer
346: * @throws IOException
347: */
348: private void addFile(File f, ZipOutputStream zout, byte[] buffer,
349: long modtime) throws IOException {
350: FileInputStream fin = null;
351: try {
352: if (f.isDirectory()) {
353: File[] files = f.listFiles();
354: if (files != null) {
355: for (int i = 0; i < files.length; i++) {
356: if (files[i].isDirectory()) {
357: addFile(files[i], zout, buffer, modtime);
358: } else {
359: if (files[i].lastModified() > modtime) {
360: log.debug(" Add "
361: + files[i].getPath());
362: addSingleFile(files[i], zout, buffer);
363: } else {
364: log.debug(" Ignore "
365: + files[i].getPath());
366: }
367: }
368: }
369: }
370: } else {
371: if (f.lastModified() > modtime) {
372: addSingleFile(f, zout, buffer);
373: }
374: }
375: } finally {
376: try {
377: fin.close();
378: } catch (Exception e) {
379: }
380: }
381: }
382:
383: private void addSingleFile(File file, ZipOutputStream zout,
384: byte[] buffer) throws IOException {
385: String path = file.getPath();
386: if (path.startsWith(searchIndexDirectory)) {
387: path = path.substring(searchIndexDirectory.length());
388: }
389: ZipEntry ze = new ZipEntry(path);
390: ze.setTime(file.lastModified());
391: zout.putNextEntry(ze);
392: try {
393: InputStream fin = new FileInputStream(file);
394: try {
395: int len = 0;
396: while ((len = fin.read(buffer)) > 0) {
397: zout.write(buffer, 0, len);
398: }
399: } finally {
400: fin.close();
401: }
402: } finally {
403: zout.closeEntry();
404: }
405:
406: }
407:
408: }
|