1 /*
   2  * Copyright (c) 2013, Oracle and/or its affiliates. All rights reserved.
   3  *
   4  * Redistribution and use in source and binary forms, with or without
   5  * modification, are permitted provided that the following conditions
   6  * are met:
   7  *
   8  *   - Redistributions of source code must retain the above copyright
   9  *     notice, this list of conditions and the following disclaimer.
  10  *
  11  *   - Redistributions in binary form must reproduce the above copyright
  12  *     notice, this list of conditions and the following disclaimer in the
  13  *     documentation and/or other materials provided with the distribution.
  14  *
  15  *   - Neither the name of Oracle nor the names of its
  16  *     contributors may be used to endorse or promote products derived
  17  *     from this software without specific prior written permission.
  18  *
  19  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
  20  * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
  21  * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
  22  * PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE COPYRIGHT OWNER OR
  23  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
  24  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
  25  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
  26  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
  27  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
  28  * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
  29  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  30  */
  31 
  32 /*
  33  * This source code is provided to illustrate the usage of a given feature
  34  * or technique and has been deliberately simplified. Additional steps
  35  * required for a production-quality application, such as security checks,
  36  * input validation and proper error handling, might not be present in
  37  * this sample code.
  38  */
  39 import java.io.BufferedReader;
  40 import java.io.IOException;
  41 import java.nio.charset.StandardCharsets;
  42 import java.nio.file.Files;
  43 import java.nio.file.Paths;
  44 import java.util.function.BinaryOperator;
  45 import java.util.function.Predicate;
  46 import java.util.function.Supplier;
  47 import java.util.regex.Pattern;
  48 import java.util.stream.Collector;
  49 import java.util.stream.Collectors;
  50 import static java.lang.Double.parseDouble;
  51 import java.util.Arrays;
  52 import java.util.Comparator;
  53 import java.util.DoubleSummaryStatistics;
  54 import java.util.EnumSet;
  55 import java.util.List;
  56 import java.util.Set;
  57 import java.util.TreeSet;
  58 import java.util.function.BiConsumer;
  59 import java.util.function.Function;
  60 
  61 /**
  62  * CSVProcessor is a tool for processing CSV file. There are several command
  63  * line options. Please consult printUsageAndExit(...) method for more info.
  64  * This sample shows examples of using next features:
  65  * <ol>
  66  * <li>Lambda and bulk operations. Working with streams: map(...), filter(...),
  67  * sorted(...) methods. collect(...) method with different collectors:
  68  * Collectors.maxBy(...), Collectors.minBy(...), Collectors.toList(),
  69  * Collectors.toCollection(...), Collectors.groupingBy(...),
  70  * Collectors.toDoubleSummaryStatistics(...), a custom Collector.</li>
  71  * <li>Static method reference for printing values.</li>
  72  * <li>Try-with-resources feature for closing files.</li>
  73  * <li>Switch by String feature.
  74  * <li>Other new API: Pattern.asPredicate(), BinaryOperator
  75  * BufferedReader.lines(), Collection.forEach(...), Comparator.comparing(...),
  76  * Comparator.reversed(), Arrays.stream(...).</li>
  77  * </ol>
  78  *
  79  * @author Andrey Nazarov
  80  */
  81 public class CSVProcessor {
  82 
  83     //Number of characters that may be read
  84     private static final int READ_AHEAD_LIMIT = 100_000_000;
  85 
  86     /**
  87      * The main method for the CSVProcessor program. Run program with empty
  88      * argument list to see possible arguments.
  89      *
  90      * @param args the argument list for CSVProcessor.
  91      */
  92     public static void main(String[] args) {
  93         if (args.length < 2) {
  94             printUsageAndExit();
  95         }
  96         try (BufferedReader br = new BufferedReader(
  97                 Files.newBufferedReader(Paths.get(args[args.length - 1]),
  98                         StandardCharsets.UTF_8))) {
  99             //assume first line contains column names
 100             List<String> header = Arrays.stream(br.readLine().split(","))
 101                     .map(String::trim).collect(Collectors.toList());
 102             //calculate an index of the column in question
 103             int column = getColumnNumber(header, args[1]);
 104             switch (args[0]) {
 105                 case "sort":
 106                     verifyArgumentNumber(args, 4);
 107                     //define sort order
 108                     boolean isAsc;
 109                     switch (args[2].toUpperCase()) {
 110                         case "ASC":
 111                             isAsc = true;
 112                             break;
 113                         case "DESC":
 114                             isAsc = false;
 115                             break;
 116                         default:
 117                             printUsageAndExit("Illegal argument" + args[2]);
 118                             return;//should not be reached
 119                     }
 120                     /*
 121                      * create comparator that compares lines by comparing values
 122                      * in the specified column.
 123                      */
 124                     Comparator<String> cmp
 125                             = Comparator.comparing(str -> getCell(str, column),
 126                                     String.CASE_INSENSITIVE_ORDER);
 127                     /*
 128                      * sorted(...) is used to sort records.
 129                      * forEach(...) is used to output sorted records.
 130                      */
 131                     br.lines().sorted(isAsc ? cmp : cmp.reversed())
 132                             .forEach(System.out::println);
 133                     break;
 134                 case "search":
 135                     verifyArgumentNumber(args, 4);
 136                     /*
 137                      * records are filtered by a regex.
 138                      * forEach(...) is used to output filtered records.
 139                      */
 140                     Predicate<String> pattern
 141                             = Pattern.compile(args[2]).asPredicate();
 142                     br.lines().filter(str -> pattern.test(getCell(str, column)))
 143                             .forEach(System.out::println);
 144                     break;
 145                 case "groupby":
 146                     verifyArgumentNumber(args, 3);
 147                     /*
 148                      * group lines by values in the column with collect(...),
 149                      * print with forEach(...) for every distinct value within
 150                      * the column.
 151                      */
 152                     br.lines().collect(
 153                             Collectors.groupingBy(
 154                                     str -> getCell(str, column),
 155                                     Collectors.toCollection(TreeSet::new)))
 156                             .forEach((str, set) -> {
 157                                 System.out.println(str + ":");
 158                                 set.forEach(System.out::println);
 159                             });
 160                     break;
 161                 case "stat":
 162                     verifyArgumentNumber(args, 3);
 163 
 164                     /*
 165                      * BufferedReader will be read several times.
 166                      * We mark this point to return here after each pass.
 167                      */
 168                     br.mark(READ_AHEAD_LIMIT);
 169 
 170                     //Statistics can be collected by a custom collector in one pass
 171                     System.out.println(
 172                             br.lines().collect(new Statistics(column)));
 173                     br.reset();
 174 
 175                     /*
 176                      * Alternatively, statistics can be collected
 177                      * by built-in API in several passes.
 178                      */
 179                     statInSeveralPasses(br, column);
 180                     break;
 181                 default:
 182                     printUsageAndExit("Illegal argument" + args[0]);
 183             }
 184         } catch (IOException e) {
 185             printUsageAndExit(e.toString());
 186         }
 187     }
 188 
 189     private static void statInSeveralPasses(BufferedReader br, int column)
 190             throws IOException {
 191         System.out.println("#-----Statistic in several passes-------#");
 192         //create comparator to compare records by the column.
 193         Comparator<String> comparator
 194                 = Comparator.comparing(
 195                         (String str) -> parseDouble(getCell(str, column)));
 196         //find max record by Collectors.maxBy(...)
 197         System.out.println(
 198                 "Max: " + br.lines().collect(Collectors.maxBy(comparator)));
 199         br.reset();
 200         //find min record by Collectors.minBy(...)
 201         System.out.println(
 202                 "Min: " + br.lines().collect(Collectors.minBy(comparator)));
 203         br.reset();
 204         //Compute average value and sum with Collectors.toDoubleSummaryStatistics(...)
 205         DoubleSummaryStatistics doubleSummaryStatistics
 206                 = br.lines().collect(
 207                         Collectors.summarizingDouble(
 208                                 str -> parseDouble(getCell(str, column))));
 209         System.out.println("Average: " + doubleSummaryStatistics.getAverage());
 210         System.out.println("Sum: " + doubleSummaryStatistics.getSum());
 211     }
 212 
 213     private static void verifyArgumentNumber(String[] args, int n) {
 214         if (args.length != n) {
 215             printUsageAndExit("Expected " + n + " arguments but was "
 216                     + args.length);
 217         }
 218     }
 219 
 220     private static int getColumnNumber(List<String> header, String name) {
 221         int column = header.indexOf(name);
 222         if (column == -1) {
 223             printUsageAndExit("There is no column with name " + name);
 224         }
 225         return column;
 226     }
 227 
 228     private static String getCell(String record, int column) {
 229         return record.split(",")[column].trim();
 230     }
 231 
 232     private static void printUsageAndExit(String... str) {
 233         System.out.println("Usages:");
 234 
 235         System.out.println("CSVProcessor sort COLUMN_NAME ASC|DESC FILE");
 236         System.out.println("Sort lines by column COLUMN_NAME in CSV FILE\n");
 237 
 238         System.out.println("CSVProcessor search COLUMN_NAME REGEX FILE");
 239         System.out.println("Search for REGEX in column COLUMN_NAME in CSV FILE\n");
 240 
 241         System.out.println("CSVProcessor groupby COLUMN_NAME FILE");
 242         System.out.println("Split lines into different groups according column "
 243                 + "COLUMN_NAME value\n");
 244 
 245         System.out.println("CSVProcessor stat COLUMN_NAME FILE");
 246         System.out.println("Compute max/min/average/sum  statistics by column "
 247                 + "COLUMN_NAME\n");
 248 
 249         Arrays.asList(str).forEach(System.out::println);
 250         System.exit(1);
 251     }
 252 
 253     /*
 254      * This is custom implementation of Collector interface.
 255      * Statitics objects gather max,min,sum,average statistics.
 256      */
 257     private static class Statistics
 258             implements Collector<String, Statistics, Statistics> {
 259 
 260 
 261         /*
 262          * @implNote This implementation is not thread safe.
 263          * However, it is safe to use Statistics on a parallel stream, because
 264          * the parallel implementation of
 265          * {@link java.util.stream.Stream#collect Stream.collect()}
 266          * provides the necessary partitioning, isolation, and merging of results for
 267          * safe and efficient parallel execution.
 268          */
 269         private String maxRecord;
 270         private String minRecord;
 271 
 272         private double sum;
 273         private int lineCount;
 274         private final BinaryOperator<String> maxOperator;
 275         private final BinaryOperator<String> minOperator;
 276         private final int column;
 277 
 278         public Statistics(int column) {
 279             this.column = column;
 280             Comparator<String> cmp = Comparator.comparing(
 281                     (String str) -> parseDouble(getCell(str, column)));
 282             maxOperator = BinaryOperator.maxBy(cmp);
 283             minOperator = BinaryOperator.minBy(cmp);
 284         }
 285 
 286         /*
 287          * Process line
 288          */
 289         public Statistics accept(String line) {
 290             maxRecord = maxRecord == null
 291                     ? line : maxOperator.apply(maxRecord, line);
 292             minRecord = minRecord == null
 293                     ? line : minOperator.apply(minRecord, line);
 294 
 295             sum += parseDouble(getCell(line, column));
 296             lineCount++;
 297             return this;
 298         }
 299 
 300 
 301         /*
 302          * Merge two Statistics
 303          */
 304         public Statistics combine(Statistics stat) {
 305             maxRecord = maxOperator.apply(maxRecord, stat.getMaxRecord());
 306             minRecord = minOperator.apply(minRecord, stat.getMinRecord());
 307             sum += stat.getSum();
 308             lineCount += stat.getLineCount();
 309             return this;
 310         }
 311 
 312         @Override
 313         public String toString() {
 314             StringBuilder sb = new StringBuilder();
 315             sb.append("#------Statistics------#\n");
 316             sb.append("Max: ").append(getMaxRecord()).append("\n");
 317             sb.append("Min: ").append(getMinRecord()).append("\n");
 318             sb.append("Sum = ").append(getSum()).append("\n");
 319             sb.append("Average = ").append(average()).append("\n");
 320             sb.append("#------Statistics------#\n");
 321             return sb.toString();
 322         }
 323 
 324         @Override
 325         public Supplier<Statistics> supplier() {
 326             return () -> new Statistics(column);
 327         }
 328 
 329         @Override
 330         public BiConsumer<Statistics, String> accumulator() {
 331             return Statistics::accept;
 332         }
 333 
 334         @Override
 335         public BinaryOperator<Statistics> combiner() {
 336             return Statistics::combine;
 337 
 338         }
 339 
 340         @Override
 341         public Function<Statistics, Statistics> finisher() {
 342             return stat -> stat;
 343         }
 344 
 345         @Override
 346         public Set<Characteristics> characteristics() {
 347             return EnumSet.of(Characteristics.IDENTITY_FINISH);
 348         }
 349 
 350         private String getMaxRecord() {
 351             return maxRecord;
 352         }
 353 
 354         private String getMinRecord() {
 355             return minRecord;
 356         }
 357 
 358         private double getSum() {
 359             return sum;
 360         }
 361 
 362         private double average() {
 363             return sum / lineCount;
 364         }
 365 
 366         private int getLineCount() {
 367             return lineCount;
 368         }
 369 
 370     }
 371 
 372 }