001: /**
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */package org.apache.solr.handler;
017:
018: import org.apache.solr.request.SolrQueryRequest;
019: import org.apache.solr.request.SolrParams;
020: import org.apache.solr.request.SolrQueryResponse;
021: import org.apache.solr.util.ContentStream;
022: import org.apache.solr.core.SolrException;
023: import org.apache.solr.schema.IndexSchema;
024: import org.apache.solr.schema.SchemaField;
025: import org.apache.solr.util.StrUtils;
026: import org.apache.solr.update.*;
027: import org.apache.commons.csv.CSVStrategy;
028: import org.apache.commons.csv.CSVParser;
029: import org.apache.commons.io.IOUtils;
030:
031: import java.util.regex.Pattern;
032: import java.util.List;
033: import java.io.*;
034:
035: /**
036: * @author yonik
037: * @version $Id$
038: */
039:
040: public class CSVRequestHandler extends RequestHandlerBase {
041:
042: public void handleRequestBody(SolrQueryRequest req,
043: SolrQueryResponse rsp) throws Exception {
044: CSVLoader loader = new SingleThreadedCSVLoader(req);
045:
046: Iterable<ContentStream> streams = req.getContentStreams();
047: if (streams == null) {
048: if (!RequestHandlerUtils.handleCommit(req, rsp, false)) {
049: throw new SolrException(
050: SolrException.ErrorCode.BAD_REQUEST,
051: "missing content stream");
052: }
053: return;
054: }
055:
056: for (ContentStream stream : streams) {
057: Reader reader = stream.getReader();
058: try {
059: loader.errHeader = "CSVLoader: input="
060: + stream.getSourceInfo();
061: loader.load(reader);
062: } finally {
063: IOUtils.closeQuietly(reader);
064: }
065: }
066:
067: // perhaps commit when we are done
068: RequestHandlerUtils.handleCommit(req, rsp, false);
069: }
070:
071: //////////////////////// SolrInfoMBeans methods //////////////////////
072: @Override
073: public String getDescription() {
074: return "Add/Update multiple documents with CSV formatted rows";
075: }
076:
077: @Override
078: public String getVersion() {
079: return "$Revision:$";
080: }
081:
082: @Override
083: public String getSourceId() {
084: return "$Id:$";
085: }
086:
087: @Override
088: public String getSource() {
089: return "$URL:$";
090: }
091: }
092:
093: abstract class CSVLoader {
094: static String SEPARATOR = "separator";
095: static String FIELDNAMES = "fieldnames";
096: static String HEADER = "header";
097: static String SKIP = "skip";
098: static String SKIPLINES = "skipLines";
099: static String MAP = "map";
100: static String TRIM = "trim";
101: static String EMPTY = "keepEmpty";
102: static String SPLIT = "split";
103: static String ENCAPSULATOR = "encapsulator";
104: static String OVERWRITE = "overwrite";
105:
106: private static Pattern colonSplit = Pattern.compile(":");
107: private static Pattern commaSplit = Pattern.compile(",");
108:
109: final IndexSchema schema;
110: final SolrParams params;
111: final UpdateHandler handler;
112: final CSVStrategy strategy;
113:
114: String[] fieldnames;
115: SchemaField[] fields;
116: CSVLoader.FieldAdder[] adders;
117:
118: int skipLines; // number of lines to skip at start of file
119:
120: final AddUpdateCommand templateAdd;
121:
122: /** Add a field to a document unless it's zero length.
123: * The FieldAdder hierarchy handles all the complexity of
124: * further transforming or splitting field values to keep the
125: * main logic loop clean. All implementations of add() must be
126: * MT-safe!
127: */
128: private class FieldAdder {
129: void add(DocumentBuilder builder, int line, int column,
130: String val) {
131: if (val.length() > 0) {
132: builder.addField(fields[column].getName(), val, 1.0f);
133: }
134: }
135: }
136:
137: /** add zero length fields */
138: private class FieldAdderEmpty extends CSVLoader.FieldAdder {
139: void add(DocumentBuilder builder, int line, int column,
140: String val) {
141: builder.addField(fields[column].getName(), val, 1.0f);
142: }
143: }
144:
145: /** trim fields */
146: private class FieldTrimmer extends CSVLoader.FieldAdder {
147: private final CSVLoader.FieldAdder base;
148:
149: FieldTrimmer(CSVLoader.FieldAdder base) {
150: this .base = base;
151: }
152:
153: void add(DocumentBuilder builder, int line, int column,
154: String val) {
155: base.add(builder, line, column, val.trim());
156: }
157: }
158:
159: /** map a single value.
160: * for just a couple of mappings, this is probably faster than
161: * using a HashMap.
162: */
163: private class FieldMapperSingle extends CSVLoader.FieldAdder {
164: private final String from;
165: private final String to;
166: private final CSVLoader.FieldAdder base;
167:
168: FieldMapperSingle(String from, String to,
169: CSVLoader.FieldAdder base) {
170: this .from = from;
171: this .to = to;
172: this .base = base;
173: }
174:
175: void add(DocumentBuilder builder, int line, int column,
176: String val) {
177: if (from.equals(val))
178: val = to;
179: base.add(builder, line, column, val);
180: }
181: }
182:
183: /** Split a single value into multiple values based on
184: * a CSVStrategy.
185: */
186: private class FieldSplitter extends CSVLoader.FieldAdder {
187: private final CSVStrategy strategy;
188: private final CSVLoader.FieldAdder base;
189:
190: FieldSplitter(CSVStrategy strategy, CSVLoader.FieldAdder base) {
191: this .strategy = strategy;
192: this .base = base;
193: }
194:
195: void add(DocumentBuilder builder, int line, int column,
196: String val) {
197: CSVParser parser = new CSVParser(new StringReader(val),
198: strategy);
199: try {
200: String[] vals = parser.getLine();
201: if (vals != null) {
202: for (String v : vals)
203: base.add(builder, line, column, v);
204: } else {
205: base.add(builder, line, column, val);
206: }
207: } catch (IOException e) {
208: throw new SolrException(
209: SolrException.ErrorCode.BAD_REQUEST, "");
210: }
211: }
212: }
213:
214: String errHeader = "CSVLoader:";
215:
216: CSVLoader(SolrQueryRequest req) {
217: this .params = req.getParams();
218: handler = req.getCore().getUpdateHandler();
219: schema = req.getSchema();
220:
221: templateAdd = new AddUpdateCommand();
222: templateAdd.allowDups = false;
223: templateAdd.overwriteCommitted = true;
224: templateAdd.overwritePending = true;
225:
226: if (params.getBool(OVERWRITE, true)) {
227: templateAdd.allowDups = false;
228: templateAdd.overwriteCommitted = true;
229: templateAdd.overwritePending = true;
230: } else {
231: templateAdd.allowDups = true;
232: templateAdd.overwriteCommitted = false;
233: templateAdd.overwritePending = false;
234: }
235:
236: strategy = new CSVStrategy(',', '"',
237: CSVStrategy.COMMENTS_DISABLED, true, false, true);
238: String sep = params.get(SEPARATOR);
239: if (sep != null) {
240: if (sep.length() != 1)
241: throw new SolrException(
242: SolrException.ErrorCode.BAD_REQUEST,
243: "Invalid separator:'" + sep + "'");
244: strategy.setDelimiter(sep.charAt(0));
245: }
246:
247: String encapsulator = params.get(ENCAPSULATOR);
248: if (encapsulator != null) {
249: if (encapsulator.length() != 1)
250: throw new SolrException(
251: SolrException.ErrorCode.BAD_REQUEST,
252: "Invalid encapsulator:'" + sep + "'");
253: strategy.setEncapsulator(encapsulator.charAt(0));
254: }
255:
256: String fn = params.get(FIELDNAMES);
257: fieldnames = fn != null ? commaSplit.split(fn, -1) : null;
258:
259: Boolean hasHeader = params.getBool(HEADER);
260:
261: skipLines = params.getInt(SKIPLINES, 0);
262:
263: if (fieldnames == null) {
264: if (null == hasHeader) {
265: // assume the file has the headers if they aren't supplied in the args
266: hasHeader = true;
267: } else if (hasHeader) {
268: throw new SolrException(
269: SolrException.ErrorCode.BAD_REQUEST,
270: "CSVLoader: must specify fieldnames=<fields>* or header=true");
271: }
272: } else {
273: // if the fieldnames were supplied and the file has a header, we need to
274: // skip over that header.
275: if (hasHeader != null && hasHeader)
276: skipLines++;
277:
278: prepareFields();
279: }
280: }
281:
282: /** create the FieldAdders that control how each field is indexed */
283: void prepareFields() {
284: // Possible future optimization: for really rapid incremental indexing
285: // from a POST, one could cache all of this setup info based on the params.
286: // The link from FieldAdder to this would need to be severed for that to happen.
287:
288: fields = new SchemaField[fieldnames.length];
289: adders = new CSVLoader.FieldAdder[fieldnames.length];
290: String skipStr = params.get(SKIP);
291: List<String> skipFields = skipStr == null ? null : StrUtils
292: .splitSmart(skipStr, ',');
293:
294: CSVLoader.FieldAdder adder = new CSVLoader.FieldAdder();
295: CSVLoader.FieldAdder adderKeepEmpty = new CSVLoader.FieldAdderEmpty();
296:
297: for (int i = 0; i < fields.length; i++) {
298: String fname = fieldnames[i];
299: // to skip a field, leave the entries in fields and addrs null
300: if (fname.length() == 0
301: || (skipFields != null && skipFields
302: .contains(fname)))
303: continue;
304:
305: fields[i] = schema.getField(fname);
306: boolean keepEmpty = params
307: .getFieldBool(fname, EMPTY, false);
308: adders[i] = keepEmpty ? adderKeepEmpty : adder;
309:
310: // Order that operations are applied: split -> trim -> map -> add
311: // so create in reverse order.
312: // Creation of FieldAdders could be optimized and shared among fields
313:
314: String[] fmap = params.getFieldParams(fname, MAP);
315: if (fmap != null) {
316: for (String mapRule : fmap) {
317: String[] mapArgs = colonSplit.split(mapRule, -1);
318: if (mapArgs.length != 2)
319: throw new SolrException(
320: SolrException.ErrorCode.BAD_REQUEST,
321: "Map rules must be of the form 'from:to' ,got '"
322: + mapRule + "'");
323: adders[i] = new CSVLoader.FieldMapperSingle(
324: mapArgs[0], mapArgs[1], adders[i]);
325: }
326: }
327:
328: if (params.getFieldBool(fname, TRIM, false)) {
329: adders[i] = new CSVLoader.FieldTrimmer(adders[i]);
330: }
331:
332: if (params.getFieldBool(fname, SPLIT, false)) {
333: String sepStr = params.getFieldParam(fname, SEPARATOR);
334: char fsep = sepStr == null || sepStr.length() == 0 ? ','
335: : sepStr.charAt(0);
336: String encStr = params.getFieldParam(fname,
337: ENCAPSULATOR);
338: char fenc = encStr == null || encStr.length() == 0 ? '\''
339: : encStr.charAt(0);
340:
341: CSVStrategy fstrat = new CSVStrategy(fsep, fenc,
342: CSVStrategy.COMMENTS_DISABLED);
343: adders[i] = new CSVLoader.FieldSplitter(fstrat,
344: adders[i]);
345: }
346: }
347: }
348:
349: private void input_err(String msg, String[] line, int lineno) {
350: StringBuilder sb = new StringBuilder();
351: sb.append(errHeader + ", line=" + lineno + "," + msg
352: + "\n\tvalues={");
353: for (String val : line) {
354: sb.append("'" + val + "',");
355: }
356: sb.append('}');
357: throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, sb
358: .toString());
359: }
360:
361: /** load the CSV input */
362: void load(Reader input) throws IOException {
363: Reader reader = input;
364: if (skipLines > 0) {
365: if (!(reader instanceof BufferedReader)) {
366: reader = new BufferedReader(reader);
367: }
368: BufferedReader r = (BufferedReader) reader;
369: for (int i = 0; i < skipLines; i++) {
370: r.readLine();
371: }
372: }
373:
374: CSVParser parser = new CSVParser(reader, strategy);
375:
376: // parse the fieldnames from the header of the file
377: if (fieldnames == null) {
378: fieldnames = parser.getLine();
379: if (fieldnames == null) {
380: throw new SolrException(
381: SolrException.ErrorCode.BAD_REQUEST,
382: "Expected fieldnames in CSV input");
383: }
384: prepareFields();
385: }
386:
387: // read the rest of the CSV file
388: for (;;) {
389: int line = parser.getLineNumber(); // for error reporting in MT mode
390: String[] vals = parser.getLine();
391: if (vals == null)
392: break;
393:
394: if (vals.length != fields.length) {
395: input_err("expected " + fields.length
396: + " values but got " + vals.length, vals, line);
397: }
398:
399: addDoc(line, vals);
400: }
401: }
402:
403: /** called for each line of values (document) */
404: abstract void addDoc(int line, String[] vals) throws IOException;
405:
406: /** this must be MT safe... may be called concurrently from multiple threads. */
407: void doAdd(int line, String[] vals, DocumentBuilder builder,
408: AddUpdateCommand template) throws IOException {
409: // the line number is passed simply for error reporting in MT mode.
410: // first, create the lucene document
411: builder.startDoc();
412: for (int i = 0; i < vals.length; i++) {
413: if (fields[i] == null)
414: continue; // ignore this field
415: String val = vals[i];
416: adders[i].add(builder, line, i, val);
417: }
418: builder.endDoc();
419:
420: template.doc = builder.getDoc();
421: handler.addDoc(template);
422: }
423:
424: }
425:
426: class SingleThreadedCSVLoader extends CSVLoader {
427: protected DocumentBuilder builder;
428:
429: SingleThreadedCSVLoader(SolrQueryRequest req) {
430: super (req);
431: builder = new DocumentBuilder(schema);
432: }
433:
434: void addDoc(int line, String[] vals) throws IOException {
435: templateAdd.indexedId = null;
436: doAdd(line, vals, builder, templateAdd);
437: }
438: }
|