--- old/src/java.base/share/classes/java/util/DualPivotQuicksort.java 2018-01-18 06:09:10.556930431 -0800 +++ new/src/java.base/share/classes/java/util/DualPivotQuicksort.java 2018-01-18 06:09:10.188896279 -0800 @@ -1,16 +1,16 @@ /* - * Copyright (c) 2009, 2016, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2009, 2018, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License version 2 only, as - * published by the Free Software Foundation. Oracle designates this + * published by the Free Software Foundation. Oracle designates this * particular file as subject to the "Classpath" exception as provided * by Oracle in the LICENSE file that accompanied this code. * * This code is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License * version 2 for more details (a copy is included in the LICENSE file that * accompanied this code). * @@ -25,23 +25,21 @@ package java.util; +import java.util.concurrent.ForkJoinTask; +import java.util.concurrent.RecursiveAction; + /** - * This class implements the Dual-Pivot Quicksort algorithm by - * Vladimir Yaroslavskiy, Jon Bentley, and Josh Bloch. The algorithm - * offers O(n log(n)) performance on many data sets that cause other - * quicksorts to degrade to quadratic performance, and is typically + * This class implements powerful and fully optimized versions, both + * sequential and parallel, of the Dual-Pivot Quicksort algorithm by + * Vladimir Yaroslavskiy, Jon Bentley and Josh Bloch. This algorithm + * offers O(n log(n)) performance on all data sets, and is typically * faster than traditional (one-pivot) Quicksort implementations. * - * All exposed methods are package-private, designed to be invoked - * from public methods (in class Arrays) after performing any - * necessary array bounds checks and expanding parameters into the - * required forms. - * * @author Vladimir Yaroslavskiy * @author Jon Bentley * @author Josh Bloch * - * @version 2011.02.11 m765.827.12i:5\7pm + * @version 2018.02.18 * @since 1.7 */ final class DualPivotQuicksort { @@ -51,3131 +49,1858 @@ */ private DualPivotQuicksort() {} - /* - * Tuning parameters. + /** + * If the length of the leftmost part to be sorted is less than + * this constant, heap sort is used in preference to Quicksort. */ + private static final int HEAP_SORT_THRESHOLD = 69; /** - * The maximum number of runs in merge sort. + * If the length of non-leftmost part to be sorted is less than this + * constant, nano insertion sort is used in preference to Quicksort. */ - private static final int MAX_RUN_COUNT = 67; + private static final int NANO_INSERTION_SORT_THRESHOLD = 36; /** - * If the length of an array to be sorted is less than this - * constant, Quicksort is used in preference to merge sort. + * If the length of non-leftmost part to be sorted is less than this + * constant, pair insertion sort is used in preference to Quicksort. */ - private static final int QUICKSORT_THRESHOLD = 286; + private static final int PAIR_INSERTION_SORT_THRESHOLD = 88; /** - * If the length of an array to be sorted is less than this - * constant, insertion sort is used in preference to Quicksort. + * The minimal length of an array, required by parallel Quicksort. */ - private static final int INSERTION_SORT_THRESHOLD = 47; + private static final int PARALLEL_QUICKSORT_THRESHOLD = 1024; /** * If the length of a byte array to be sorted is greater than this * constant, counting sort is used in preference to insertion sort. */ - private static final int COUNTING_SORT_THRESHOLD_FOR_BYTE = 29; + private static final int COUNTING_SORT_THRESHOLD_FOR_BYTE = 41; /** * If the length of a short or char array to be sorted is greater * than this constant, counting sort is used in preference to Quicksort. */ - private static final int COUNTING_SORT_THRESHOLD_FOR_SHORT_OR_CHAR = 3200; + private static final int COUNTING_SORT_THRESHOLD_FOR_SHORT_OR_CHAR = 2180; - /* - * Sorting methods for seven primitive types. + /** + * if the depth of the Quicksort recursion exceeds this + * constant, heap sort is used in preference to Quicksort. */ + private static final int MAX_RECURSION_DEPTH = 100; /** - * Sorts the specified range of the array using the given - * workspace array slice if possible for merging - * - * @param a the array to be sorted - * @param left the index of the first element, inclusive, to be sorted - * @param right the index of the last element, inclusive, to be sorted - * @param work a workspace array (slice) - * @param workBase origin of usable space in work array - * @param workLen usable size of work array - */ - static void sort(int[] a, int left, int right, - int[] work, int workBase, int workLen) { - // Use Quicksort on small arrays - if (right - left < QUICKSORT_THRESHOLD) { - sort(a, left, right, true); - return; - } - - /* - * Index run[i] is the start of i-th run - * (ascending or descending sequence). - */ - int[] run = new int[MAX_RUN_COUNT + 1]; - int count = 0; run[0] = left; - - // Check if the array is nearly sorted - for (int k = left; k < right; run[count] = k) { - // Equal items in the beginning of the sequence - while (k < right && a[k] == a[k + 1]) - k++; - if (k == right) break; // Sequence finishes with equal items - if (a[k] < a[k + 1]) { // ascending - while (++k <= right && a[k - 1] <= a[k]); - } else if (a[k] > a[k + 1]) { // descending - while (++k <= right && a[k - 1] >= a[k]); - // Transform into an ascending sequence - for (int lo = run[count] - 1, hi = k; ++lo < --hi; ) { - int t = a[lo]; a[lo] = a[hi]; a[hi] = t; - } - } + * This constant is combination of maximum Quicksort recursion depth + * and binary flag, where the right bit "0" indicates that the whole + * array is considered as the leftmost part. + */ + private static final int LEFTMOST_BITS = MAX_RECURSION_DEPTH << 1; - // Merge a transformed descending sequence followed by an - // ascending sequence - if (run[count] > left && a[run[count]] >= a[run[count] - 1]) { - count--; - } + /** + * This class implements the parallel Dual-Pivot Quicksort. + */ + @SuppressWarnings("serial") + private static class Sorter extends RecursiveAction { - /* - * The array is not highly structured, - * use Quicksort instead of merge sort. - */ - if (++count == MAX_RUN_COUNT) { - sort(a, left, right, true); - return; - } + Sorter(T a, int bits, int low, int high) { + this.a = a; + this.bits = bits; + this.low = low; + this.high = high; } - // These invariants should hold true: - // run[0] = 0 - // run[] = right + 1; (terminator) + @Override + protected void compute() { + Class clazz = a.getClass(); - if (count == 0) { - // A single equal run - return; - } else if (count == 1 && run[count] > right) { - // Either a single ascending or a transformed descending run. - // Always check that a final run is a proper terminator, otherwise - // we have an unterminated trailing run, to handle downstream. - return; - } - right++; - if (run[count] < right) { - // Corner case: the final run is not a terminator. This may happen - // if a final run is an equals run, or there is a single-element run - // at the end. Fix up by adding a proper terminator at the end. - // Note that we terminate with (right + 1), incremented earlier. - run[++count] = right; - } - - // Determine alternation base for merge - byte odd = 0; - for (int n = 1; (n <<= 1) < count; odd ^= 1); - - // Use or create temporary array b for merging - int[] b; // temp array; alternates with a - int ao, bo; // array offsets from 'left' - int blen = right - left; // space needed for b - if (work == null || workLen < blen || workBase + blen > work.length) { - work = new int[blen]; - workBase = 0; - } - if (odd == 0) { - System.arraycopy(a, left, work, workBase, blen); - b = a; - bo = 0; - a = work; - ao = workBase - left; - } else { - b = work; - ao = 0; - bo = workBase - left; - } - - // Merging - for (int last; count > 1; count = last) { - for (int k = (last = 0) + 2; k <= count; k += 2) { - int hi = run[k], mi = run[k - 1]; - for (int i = run[k - 2], p = i, q = mi; i < hi; ++i) { - if (q >= hi || p < mi && a[p + ao] <= a[q + ao]) { - b[i + bo] = a[p++ + ao]; - } else { - b[i + bo] = a[q++ + ao]; - } - } - run[++last] = hi; - } - if ((count & 1) != 0) { - for (int i = right, lo = run[count - 1]; --i >= lo; - b[i + bo] = a[i + ao] - ); - run[++last] = right; + if (clazz == int[].class) { + sort((int[]) a, bits, true, low, high); + } else if (clazz == long[].class) { + sort((long[]) a, bits, true, low, high); + } else if (clazz == char[].class) { + sort((char[]) a, bits, true, low, high); + } else if (clazz == short[].class) { + sort((short[]) a, bits, true, low, high); + } else if (clazz == float[].class) { + sort((float[]) a, bits, true, low, high); + } else if (clazz == double[].class) { + sort((double[]) a, bits, true, low, high); + } else { + throw new IllegalArgumentException( + "Unknown type of array: " + clazz.getName()); } - int[] t = a; a = b; b = t; - int o = ao; ao = bo; bo = o; } + + private T a; + private int bits; + private int low; + private int high; } /** - * Sorts the specified range of the array by Dual-Pivot Quicksort. + * Sorts the specified range of the array. * * @param a the array to be sorted - * @param left the index of the first element, inclusive, to be sorted - * @param right the index of the last element, inclusive, to be sorted - * @param leftmost indicates if this part is the leftmost in the range - */ - private static void sort(int[] a, int left, int right, boolean leftmost) { - int length = right - left + 1; - - // Use insertion sort on tiny arrays - if (length < INSERTION_SORT_THRESHOLD) { - if (leftmost) { - /* - * Traditional (without sentinel) insertion sort, - * optimized for server VM, is used in case of - * the leftmost part. - */ - for (int i = left, j = i; i < right; j = ++i) { - int ai = a[i + 1]; - while (ai < a[j]) { - a[j + 1] = a[j]; - if (j-- == left) { - break; - } - } - a[j + 1] = ai; - } - } else { - /* - * Skip the longest ascending sequence. - */ - do { - if (left >= right) { - return; - } - } while (a[++left] >= a[left - 1]); + * @param parallel indicates whether sorting is performed in parallel + * @param low the index of the first element, inclusive, to be sorted + * @param high the index of the last element, exclusive, to be sorted + */ + static void sort(int[] a, boolean parallel, int low, int high) { + sort(a, LEFTMOST_BITS, parallel, low, high); + } - /* - * Every element from adjoining part plays the role - * of sentinel, therefore this allows us to avoid the - * left range check on each iteration. Moreover, we use - * the more optimized algorithm, so called pair insertion - * sort, which is faster (in the context of Quicksort) - * than traditional implementation of insertion sort. - */ - for (int k = left; ++left <= right; k = ++left) { - int a1 = a[k], a2 = a[left]; + /** + * Sorts the specified range of the array by the Dual-Pivot Quicksort. + * + * @param a the array to be sorted + * @param bits the recursion depth of Quicksort and the leftmost flag + * @param parallel indicates whether sorting is performed in parallel + * @param low the index of the first element, inclusive, to be sorted + * @param high the index of the last element, exclusive, to be sorted + */ + private static void sort(int[] a, int bits, boolean parallel, int low, int high) { + int end = high - 1, length = high - low; - if (a1 < a2) { - a2 = a1; a1 = a[left]; - } - while (a1 < a[--k]) { - a[k + 2] = a[k]; - } - a[++k + 1] = a1; + /* + * Run insertion sorts on non-leftmost part. + */ + if ((bits & 1) > 0) { - while (a2 < a[--k]) { - a[k + 1] = a[k]; - } - a[k + 1] = a2; - } - int last = a[right]; + /* + * Use nano insertion sort on tiny part. + */ + if (length < NANO_INSERTION_SORT_THRESHOLD) { + SortingAlgorithms.nanoInsertionSort(a, low, high); + return; + } - while (last < a[--right]) { - a[right + 1] = a[right]; - } - a[right + 1] = last; + /* + * Use pair insertion sort on small part. + */ + if (length < PAIR_INSERTION_SORT_THRESHOLD) { + SortingAlgorithms.pairInsertionSort(a, low, end); + return; } + } + + /* + * Switch to heap sort on the leftmost part or + * if the execution time is becoming quadratic. + */ + if (length < HEAP_SORT_THRESHOLD || (bits -= 2) < 0) { + SortingAlgorithms.heapSort(a, low, end); return; } - // Inexpensive approximation of length / 7 - int seventh = (length >> 3) + (length >> 6) + 1; + /* + * Check if the array is nearly sorted + * and then try to sort it by Merging sort. + */ + if (SortingAlgorithms.mergingSort(a, parallel, low, high)) { + return; + } /* - * Sort five evenly spaced elements around (and including) the - * center element in the range. These elements will be used for - * pivot selection as described below. The choice for spacing - * these elements was empirically determined to work well on - * a wide variety of inputs. + * The splitting of the array, defined by the following + * step, is related to the inexpensive approximation of + * the golden ratio. */ - int e3 = (left + right) >>> 1; // The midpoint - int e2 = e3 - seventh; - int e1 = e2 - seventh; - int e4 = e3 + seventh; - int e5 = e4 + seventh; + int step = (length >> 3) * 3 + 3; - // Sort these elements using insertion sort - if (a[e2] < a[e1]) { int t = a[e2]; a[e2] = a[e1]; a[e1] = t; } + /* + * Five elements around (and including) the central element in + * the array will be used for pivot selection as described below. + * The unequal choice of spacing these elements was empirically + * determined to work well on a wide variety of inputs. + */ + int e1 = low + step; + int e5 = end - step; + int e3 = (e1 + e5) >>> 1; + int e2 = (e1 + e3) >>> 1; + int e4 = (e3 + e5) >>> 1; - if (a[e3] < a[e2]) { int t = a[e3]; a[e3] = a[e2]; a[e2] = t; - if (t < a[e1]) { a[e2] = a[e1]; a[e1] = t; } - } - if (a[e4] < a[e3]) { int t = a[e4]; a[e4] = a[e3]; a[e3] = t; - if (t < a[e2]) { a[e3] = a[e2]; a[e2] = t; - if (t < a[e1]) { a[e2] = a[e1]; a[e1] = t; } - } - } - if (a[e5] < a[e4]) { int t = a[e5]; a[e5] = a[e4]; a[e4] = t; - if (t < a[e3]) { a[e4] = a[e3]; a[e3] = t; - if (t < a[e2]) { a[e3] = a[e2]; a[e2] = t; - if (t < a[e1]) { a[e2] = a[e1]; a[e1] = t; } + /* + * Sort these elements in place by the combination + * of 5-element sorting network and insertion sort. + */ + if (a[e5] < a[e3]) { int t = a[e5]; a[e5] = a[e3]; a[e3] = t; } + if (a[e4] < a[e2]) { int t = a[e4]; a[e4] = a[e2]; a[e2] = t; } + if (a[e5] < a[e4]) { int t = a[e5]; a[e5] = a[e4]; a[e4] = t; } + if (a[e3] < a[e2]) { int t = a[e3]; a[e3] = a[e2]; a[e2] = t; } + if (a[e4] < a[e3]) { int t = a[e4]; a[e4] = a[e3]; a[e3] = t; } + + if (a[e1] > a[e2]) { int t = a[e1]; a[e1] = a[e2]; a[e2] = t; + if (t > a[e3]) { a[e2] = a[e3]; a[e3] = t; + if (t > a[e4]) { a[e3] = a[e4]; a[e4] = t; + if (t > a[e5]) { a[e4] = a[e5]; a[e5] = t; } } } } // Pointers - int less = left; // The index of the first element of center part - int great = right; // The index before the first element of right part + int lower = low; // The index of the last element of the left part + int upper = end; // The index of the first element of the right part + + if (a[e1] < a[e2] && a[e2] < a[e3] && a[e3] < a[e4] && a[e4] < a[e5]) { - if (a[e1] != a[e2] && a[e2] != a[e3] && a[e3] != a[e4] && a[e4] != a[e5]) { /* - * Use the second and fourth of the five sorted elements as pivots. - * These values are inexpensive approximations of the first and - * second terciles of the array. Note that pivot1 <= pivot2. + * Use the first and the fifth elements as the pivots. + * These values are inexpensive approximation of tertiles. + * Note, that pivot1 < pivot2. */ - int pivot1 = a[e2]; - int pivot2 = a[e4]; + int pivot1 = a[e1]; + int pivot2 = a[e5]; /* * The first and the last elements to be sorted are moved to the * locations formerly occupied by the pivots. When partitioning - * is complete, the pivots are swapped back into their final + * is completed, the pivots are swapped back into their final * positions, and excluded from subsequent sorting. */ - a[e2] = a[left]; - a[e4] = a[right]; + a[e1] = a[lower]; + a[e5] = a[upper]; /* - * Skip elements, which are less or greater than pivot values. + * Skip elements, which are less or greater than the pivots. */ - while (a[++less] < pivot1); - while (a[--great] > pivot2); + while (a[++lower] < pivot1); + while (a[--upper] > pivot2); /* - * Partitioning: + * Backwards 3-interval partitioning * - * left part center part right part - * +--------------------------------------------------------------+ - * | < pivot1 | pivot1 <= && <= pivot2 | ? | > pivot2 | - * +--------------------------------------------------------------+ - * ^ ^ ^ - * | | | - * less k great + * left part central part right part + * +------------------------------------------------------------+ + * | < pivot1 | ? | pivot1 <= && <= pivot2 | > pivot2 | + * +------------------------------------------------------------+ + * ^ ^ ^ + * | | | + * lower k upper * * Invariants: * - * all in (left, less) < pivot1 - * pivot1 <= all in [less, k) <= pivot2 - * all in (great, right) > pivot2 + * all in (low, lower] < pivot1 + * pivot1 <= all in (k, upper) <= pivot2 + * all in [upper, end) > pivot2 * - * Pointer k is the first index of ?-part. + * Pointer k is the last index of ?-part */ - outer: - for (int k = less - 1; ++k <= great; ) { + for (int $ = --lower, k = ++upper; --k > lower; ) { int ak = a[k]; - if (ak < pivot1) { // Move a[k] to left part - a[k] = a[less]; - /* - * Here and below we use "a[i] = b; i++;" instead - * of "a[i++] = b;" due to performance issue. - */ - a[less] = ak; - ++less; - } else if (ak > pivot2) { // Move a[k] to right part - while (a[great] > pivot2) { - if (great-- == k) { - break outer; - } - } - if (a[great] < pivot1) { // a[great] <= pivot2 - a[k] = a[less]; - a[less] = a[great]; - ++less; - } else { // pivot1 <= a[great] <= pivot2 - a[k] = a[great]; - } - /* - * Here and below we use "a[i] = b; i--;" instead - * of "a[i--] = b;" due to performance issue. - */ - a[great] = ak; - --great; - } - } - - // Swap pivots into their final positions - a[left] = a[less - 1]; a[less - 1] = pivot1; - a[right] = a[great + 1]; a[great + 1] = pivot2; - - // Sort left and right parts recursively, excluding known pivots - sort(a, left, less - 2, leftmost); - sort(a, great + 2, right, false); - - /* - * If center part is too large (comprises > 4/7 of the array), - * swap internal pivot values to ends. - */ - if (less < e1 && e5 < great) { - /* - * Skip elements, which are equal to pivot values. - */ - while (a[less] == pivot1) { - ++less; - } - - while (a[great] == pivot2) { - --great; - } - - /* - * Partitioning: - * - * left part center part right part - * +----------------------------------------------------------+ - * | == pivot1 | pivot1 < && < pivot2 | ? | == pivot2 | - * +----------------------------------------------------------+ - * ^ ^ ^ - * | | | - * less k great - * - * Invariants: - * - * all in (*, less) == pivot1 - * pivot1 < all in [less, k) < pivot2 - * all in (great, *) == pivot2 - * - * Pointer k is the first index of ?-part. - */ - outer: - for (int k = less - 1; ++k <= great; ) { - int ak = a[k]; - if (ak == pivot1) { // Move a[k] to left part - a[k] = a[less]; - a[less] = ak; - ++less; - } else if (ak == pivot2) { // Move a[k] to right part - while (a[great] == pivot2) { - if (great-- == k) { - break outer; - } - } - if (a[great] == pivot1) { // a[great] < pivot2 - a[k] = a[less]; - /* - * Even though a[great] equals to pivot1, the - * assignment a[less] = pivot1 may be incorrect, - * if a[great] and pivot1 are floating-point zeros - * of different signs. Therefore in float and - * double sorting methods we have to use more - * accurate assignment a[less] = a[great]. - */ - a[less] = pivot1; - ++less; - } else { // pivot1 < a[great] < pivot2 - a[k] = a[great]; - } - a[great] = ak; - --great; + + if (ak < pivot1) { // Move a[k] to the left side + while (a[++lower] < pivot1); + + if (lower > k) { + lower = k; + break; + } + if (a[lower] > pivot2) { // a[lower] >= pivot1 + a[k] = a[--upper]; + a[upper] = a[lower]; + } else { // pivot1 <= a[lower] <= pivot2 + a[k] = a[lower]; } + a[lower] = ak; + } else if (ak > pivot2) { // Move a[k] to the right side + a[k] = a[--upper]; + a[upper] = ak; } } - // Sort center part recursively - sort(a, less, great, false); + /* + * Swap the pivots into their final positions. + */ + a[low] = a[lower]; a[lower] = pivot1; + a[end] = a[upper]; a[upper] = pivot2; + /* + * Sort all parts recursively, excluding known pivots. + */ + if (parallel && length > PARALLEL_QUICKSORT_THRESHOLD) { + ForkJoinTask.invokeAll( + new Sorter(a, bits | 1, lower + 1, upper), + new Sorter(a, bits | 1, upper + 1, high), + new Sorter(a, bits, low, lower) + ); + } else { + sort(a, bits | 1, false, upper + 1, high); + sort(a, bits, false, low, lower); + sort(a, bits | 1, false, lower + 1, upper); + } } else { // Partitioning with one pivot + /* - * Use the third of the five sorted elements as pivot. - * This value is inexpensive approximation of the median. + * Use the third element as the pivot. This value + * is inexpensive approximation of the median. */ int pivot = a[e3]; /* - * Partitioning degenerates to the traditional 3-way - * (or "Dutch National Flag") schema: - * - * left part center part right part - * +-------------------------------------------------+ - * | < pivot | == pivot | ? | > pivot | - * +-------------------------------------------------+ - * ^ ^ ^ - * | | | - * less k great + * The first element to be sorted is moved to the location + * formerly occupied by the pivot. When partitioning is + * completed, the pivot is swapped back into its final + * position, and excluded from subsequent sorting. + */ + a[e3] = a[lower]; + + /* + * Traditional 3-way (Dutch National Flag) partitioning + * + * left part central part right part + * +------------------------------------------------------+ + * | < pivot | ? | == pivot | > pivot | + * +------------------------------------------------------+ + * ^ ^ ^ + * | | | + * lower k upper * * Invariants: * - * all in (left, less) < pivot - * all in [less, k) == pivot - * all in (great, right) > pivot + * all in (low, lower] < pivot + * all in (k, upper) == pivot + * all in [upper, end] > pivot * - * Pointer k is the first index of ?-part. + * Pointer k is the last index of ?-part */ - for (int k = less; k <= great; ++k) { + for (int k = ++upper; --k > lower; ) { if (a[k] == pivot) { continue; } int ak = a[k]; - if (ak < pivot) { // Move a[k] to left part - a[k] = a[less]; - a[less] = ak; - ++less; - } else { // a[k] > pivot - Move a[k] to right part - while (a[great] > pivot) { - --great; - } - if (a[great] < pivot) { // a[great] <= pivot - a[k] = a[less]; - a[less] = a[great]; - ++less; - } else { // a[great] == pivot - /* - * Even though a[great] equals to pivot, the - * assignment a[k] = pivot may be incorrect, - * if a[great] and pivot are floating-point - * zeros of different signs. Therefore in float - * and double sorting methods we have to use - * more accurate assignment a[k] = a[great]. - */ - a[k] = pivot; + + if (ak < pivot) { // Move a[k] to the left side + while (a[++lower] < pivot); + + if (lower > k) { + lower = k; + break; + } + a[k] = pivot; + + if (a[lower] > pivot) { + a[--upper] = a[lower]; } - a[great] = ak; - --great; + a[lower] = ak; + } else { // ak > pivot - Move a[k] to the right side + a[k] = pivot; + a[--upper] = ak; } } /* - * Sort left and right parts recursively. - * All elements from center part are equal + * Swap the pivot into its final position. + */ + a[low] = a[lower]; a[lower] = pivot; + + /* + * Sort the left and the right parts recursively, excluding + * known pivot. All elements from the central part are equal * and, therefore, already sorted. */ - sort(a, left, less - 1, leftmost); - sort(a, great + 1, right, false); + if (parallel && length > PARALLEL_QUICKSORT_THRESHOLD) { + ForkJoinTask.invokeAll( + new Sorter(a, bits, low, lower), + new Sorter(a, bits | 1, upper, high) + ); + } else { + sort(a, bits | 1, false, upper, high); + sort(a, bits, false, low, lower); + } } } /** - * Sorts the specified range of the array using the given - * workspace array slice if possible for merging + * Sorts the specified range of the array. * * @param a the array to be sorted - * @param left the index of the first element, inclusive, to be sorted - * @param right the index of the last element, inclusive, to be sorted - * @param work a workspace array (slice) - * @param workBase origin of usable space in work array - * @param workLen usable size of work array - */ - static void sort(long[] a, int left, int right, - long[] work, int workBase, int workLen) { - // Use Quicksort on small arrays - if (right - left < QUICKSORT_THRESHOLD) { - sort(a, left, right, true); - return; - } + * @param parallel indicates whether sorting is performed in parallel + * @param low the index of the first element, inclusive, to be sorted + * @param high the index of the last element, exclusive, to be sorted + */ + static void sort(long[] a, boolean parallel, int low, int high) { + sort(a, LEFTMOST_BITS, parallel, low, high); + } + + /** + * Sorts the specified range of the array by the Dual-Pivot Quicksort. + * + * @param a the array to be sorted + * @param bits the recursion depth of Quicksort and the leftmost flag + * @param parallel indicates whether sorting is performed in parallel + * @param low the index of the first element, inclusive, to be sorted + * @param high the index of the last element, exclusive, to be sorted + */ + private static void sort(long[] a, int bits, boolean parallel, int low, int high) { + int end = high - 1, length = high - low; /* - * Index run[i] is the start of i-th run - * (ascending or descending sequence). + * Run insertion sorts on non-leftmost part. */ - int[] run = new int[MAX_RUN_COUNT + 1]; - int count = 0; run[0] = left; + if ((bits & 1) > 0) { - // Check if the array is nearly sorted - for (int k = left; k < right; run[count] = k) { - // Equal items in the beginning of the sequence - while (k < right && a[k] == a[k + 1]) - k++; - if (k == right) break; // Sequence finishes with equal items - if (a[k] < a[k + 1]) { // ascending - while (++k <= right && a[k - 1] <= a[k]); - } else if (a[k] > a[k + 1]) { // descending - while (++k <= right && a[k - 1] >= a[k]); - // Transform into an ascending sequence - for (int lo = run[count] - 1, hi = k; ++lo < --hi; ) { - long t = a[lo]; a[lo] = a[hi]; a[hi] = t; - } - } - - // Merge a transformed descending sequence followed by an - // ascending sequence - if (run[count] > left && a[run[count]] >= a[run[count] - 1]) { - count--; + /* + * Use nano insertion sort on tiny part. + */ + if (length < NANO_INSERTION_SORT_THRESHOLD) { + SortingAlgorithms.nanoInsertionSort(a, low, high); + return; } /* - * The array is not highly structured, - * use Quicksort instead of merge sort. + * Use pair insertion sort on small part. */ - if (++count == MAX_RUN_COUNT) { - sort(a, left, right, true); + if (length < PAIR_INSERTION_SORT_THRESHOLD) { + SortingAlgorithms.pairInsertionSort(a, low, end); return; } } - // These invariants should hold true: - // run[0] = 0 - // run[] = right + 1; (terminator) - - if (count == 0) { - // A single equal run - return; - } else if (count == 1 && run[count] > right) { - // Either a single ascending or a transformed descending run. - // Always check that a final run is a proper terminator, otherwise - // we have an unterminated trailing run, to handle downstream. + /* + * Switch to heap sort on the leftmost part or + * if the execution time is becoming quadratic. + */ + if (length < HEAP_SORT_THRESHOLD || (bits -= 2) < 0) { + SortingAlgorithms.heapSort(a, low, end); return; } - right++; - if (run[count] < right) { - // Corner case: the final run is not a terminator. This may happen - // if a final run is an equals run, or there is a single-element run - // at the end. Fix up by adding a proper terminator at the end. - // Note that we terminate with (right + 1), incremented earlier. - run[++count] = right; - } - - // Determine alternation base for merge - byte odd = 0; - for (int n = 1; (n <<= 1) < count; odd ^= 1); - - // Use or create temporary array b for merging - long[] b; // temp array; alternates with a - int ao, bo; // array offsets from 'left' - int blen = right - left; // space needed for b - if (work == null || workLen < blen || workBase + blen > work.length) { - work = new long[blen]; - workBase = 0; - } - if (odd == 0) { - System.arraycopy(a, left, work, workBase, blen); - b = a; - bo = 0; - a = work; - ao = workBase - left; - } else { - b = work; - ao = 0; - bo = workBase - left; - } - - // Merging - for (int last; count > 1; count = last) { - for (int k = (last = 0) + 2; k <= count; k += 2) { - int hi = run[k], mi = run[k - 1]; - for (int i = run[k - 2], p = i, q = mi; i < hi; ++i) { - if (q >= hi || p < mi && a[p + ao] <= a[q + ao]) { - b[i + bo] = a[p++ + ao]; - } else { - b[i + bo] = a[q++ + ao]; - } - } - run[++last] = hi; - } - if ((count & 1) != 0) { - for (int i = right, lo = run[count - 1]; --i >= lo; - b[i + bo] = a[i + ao] - ); - run[++last] = right; - } - long[] t = a; a = b; b = t; - int o = ao; ao = bo; bo = o; - } - } - - /** - * Sorts the specified range of the array by Dual-Pivot Quicksort. - * - * @param a the array to be sorted - * @param left the index of the first element, inclusive, to be sorted - * @param right the index of the last element, inclusive, to be sorted - * @param leftmost indicates if this part is the leftmost in the range - */ - private static void sort(long[] a, int left, int right, boolean leftmost) { - int length = right - left + 1; - - // Use insertion sort on tiny arrays - if (length < INSERTION_SORT_THRESHOLD) { - if (leftmost) { - /* - * Traditional (without sentinel) insertion sort, - * optimized for server VM, is used in case of - * the leftmost part. - */ - for (int i = left, j = i; i < right; j = ++i) { - long ai = a[i + 1]; - while (ai < a[j]) { - a[j + 1] = a[j]; - if (j-- == left) { - break; - } - } - a[j + 1] = ai; - } - } else { - /* - * Skip the longest ascending sequence. - */ - do { - if (left >= right) { - return; - } - } while (a[++left] >= a[left - 1]); - - /* - * Every element from adjoining part plays the role - * of sentinel, therefore this allows us to avoid the - * left range check on each iteration. Moreover, we use - * the more optimized algorithm, so called pair insertion - * sort, which is faster (in the context of Quicksort) - * than traditional implementation of insertion sort. - */ - for (int k = left; ++left <= right; k = ++left) { - long a1 = a[k], a2 = a[left]; - if (a1 < a2) { - a2 = a1; a1 = a[left]; - } - while (a1 < a[--k]) { - a[k + 2] = a[k]; - } - a[++k + 1] = a1; - - while (a2 < a[--k]) { - a[k + 1] = a[k]; - } - a[k + 1] = a2; - } - long last = a[right]; - - while (last < a[--right]) { - a[right + 1] = a[right]; - } - a[right + 1] = last; - } + /* + * Check if the array is nearly sorted + * and then try to sort it by Merging sort. + */ + if (SortingAlgorithms.mergingSort(a, parallel, low, high)) { return; } - // Inexpensive approximation of length / 7 - int seventh = (length >> 3) + (length >> 6) + 1; + /* + * The splitting of the array, defined by the following + * step, is related to the inexpensive approximation of + * the golden ratio. + */ + int step = (length >> 3) * 3 + 3; /* - * Sort five evenly spaced elements around (and including) the - * center element in the range. These elements will be used for - * pivot selection as described below. The choice for spacing - * these elements was empirically determined to work well on - * a wide variety of inputs. + * Five elements around (and including) the central element in + * the array will be used for pivot selection as described below. + * The unequal choice of spacing these elements was empirically + * determined to work well on a wide variety of inputs. */ - int e3 = (left + right) >>> 1; // The midpoint - int e2 = e3 - seventh; - int e1 = e2 - seventh; - int e4 = e3 + seventh; - int e5 = e4 + seventh; + int e1 = low + step; + int e5 = end - step; + int e3 = (e1 + e5) >>> 1; + int e2 = (e1 + e3) >>> 1; + int e4 = (e3 + e5) >>> 1; - // Sort these elements using insertion sort - if (a[e2] < a[e1]) { long t = a[e2]; a[e2] = a[e1]; a[e1] = t; } + /* + * Sort these elements in place by the combination + * of 5-element sorting network and insertion sort. + */ + if (a[e5] < a[e3]) { long t = a[e5]; a[e5] = a[e3]; a[e3] = t; } + if (a[e4] < a[e2]) { long t = a[e4]; a[e4] = a[e2]; a[e2] = t; } + if (a[e5] < a[e4]) { long t = a[e5]; a[e5] = a[e4]; a[e4] = t; } + if (a[e3] < a[e2]) { long t = a[e3]; a[e3] = a[e2]; a[e2] = t; } + if (a[e4] < a[e3]) { long t = a[e4]; a[e4] = a[e3]; a[e3] = t; } - if (a[e3] < a[e2]) { long t = a[e3]; a[e3] = a[e2]; a[e2] = t; - if (t < a[e1]) { a[e2] = a[e1]; a[e1] = t; } - } - if (a[e4] < a[e3]) { long t = a[e4]; a[e4] = a[e3]; a[e3] = t; - if (t < a[e2]) { a[e3] = a[e2]; a[e2] = t; - if (t < a[e1]) { a[e2] = a[e1]; a[e1] = t; } - } - } - if (a[e5] < a[e4]) { long t = a[e5]; a[e5] = a[e4]; a[e4] = t; - if (t < a[e3]) { a[e4] = a[e3]; a[e3] = t; - if (t < a[e2]) { a[e3] = a[e2]; a[e2] = t; - if (t < a[e1]) { a[e2] = a[e1]; a[e1] = t; } + if (a[e1] > a[e2]) { long t = a[e1]; a[e1] = a[e2]; a[e2] = t; + if (t > a[e3]) { a[e2] = a[e3]; a[e3] = t; + if (t > a[e4]) { a[e3] = a[e4]; a[e4] = t; + if (t > a[e5]) { a[e4] = a[e5]; a[e5] = t; } } } } // Pointers - int less = left; // The index of the first element of center part - int great = right; // The index before the first element of right part + int lower = low; // The index of the last element of the left part + int upper = end; // The index of the first element of the right part + + if (a[e1] < a[e2] && a[e2] < a[e3] && a[e3] < a[e4] && a[e4] < a[e5]) { - if (a[e1] != a[e2] && a[e2] != a[e3] && a[e3] != a[e4] && a[e4] != a[e5]) { /* - * Use the second and fourth of the five sorted elements as pivots. - * These values are inexpensive approximations of the first and - * second terciles of the array. Note that pivot1 <= pivot2. + * Use the first and the fifth elements as the pivots. + * These values are inexpensive approximation of tertiles. + * Note, that pivot1 < pivot2. */ - long pivot1 = a[e2]; - long pivot2 = a[e4]; + long pivot1 = a[e1]; + long pivot2 = a[e5]; /* * The first and the last elements to be sorted are moved to the * locations formerly occupied by the pivots. When partitioning - * is complete, the pivots are swapped back into their final + * is completed, the pivots are swapped back into their final * positions, and excluded from subsequent sorting. */ - a[e2] = a[left]; - a[e4] = a[right]; + a[e1] = a[lower]; + a[e5] = a[upper]; /* - * Skip elements, which are less or greater than pivot values. + * Skip elements, which are less or greater than the pivots. */ - while (a[++less] < pivot1); - while (a[--great] > pivot2); + while (a[++lower] < pivot1); + while (a[--upper] > pivot2); /* - * Partitioning: + * Backwards 3-interval partitioning * - * left part center part right part - * +--------------------------------------------------------------+ - * | < pivot1 | pivot1 <= && <= pivot2 | ? | > pivot2 | - * +--------------------------------------------------------------+ - * ^ ^ ^ - * | | | - * less k great + * left part central part right part + * +------------------------------------------------------------+ + * | < pivot1 | ? | pivot1 <= && <= pivot2 | > pivot2 | + * +------------------------------------------------------------+ + * ^ ^ ^ + * | | | + * lower k upper * * Invariants: * - * all in (left, less) < pivot1 - * pivot1 <= all in [less, k) <= pivot2 - * all in (great, right) > pivot2 + * all in (low, lower] < pivot1 + * pivot1 <= all in (k, upper) <= pivot2 + * all in [upper, end) > pivot2 * - * Pointer k is the first index of ?-part. + * Pointer k is the last index of ?-part */ - outer: - for (int k = less - 1; ++k <= great; ) { + for (int $ = --lower, k = ++upper; --k > lower; ) { long ak = a[k]; - if (ak < pivot1) { // Move a[k] to left part - a[k] = a[less]; - /* - * Here and below we use "a[i] = b; i++;" instead - * of "a[i++] = b;" due to performance issue. - */ - a[less] = ak; - ++less; - } else if (ak > pivot2) { // Move a[k] to right part - while (a[great] > pivot2) { - if (great-- == k) { - break outer; - } - } - if (a[great] < pivot1) { // a[great] <= pivot2 - a[k] = a[less]; - a[less] = a[great]; - ++less; - } else { // pivot1 <= a[great] <= pivot2 - a[k] = a[great]; - } - /* - * Here and below we use "a[i] = b; i--;" instead - * of "a[i--] = b;" due to performance issue. - */ - a[great] = ak; - --great; - } - } - - // Swap pivots into their final positions - a[left] = a[less - 1]; a[less - 1] = pivot1; - a[right] = a[great + 1]; a[great + 1] = pivot2; - - // Sort left and right parts recursively, excluding known pivots - sort(a, left, less - 2, leftmost); - sort(a, great + 2, right, false); - - /* - * If center part is too large (comprises > 4/7 of the array), - * swap internal pivot values to ends. - */ - if (less < e1 && e5 < great) { - /* - * Skip elements, which are equal to pivot values. - */ - while (a[less] == pivot1) { - ++less; - } - - while (a[great] == pivot2) { - --great; - } - - /* - * Partitioning: - * - * left part center part right part - * +----------------------------------------------------------+ - * | == pivot1 | pivot1 < && < pivot2 | ? | == pivot2 | - * +----------------------------------------------------------+ - * ^ ^ ^ - * | | | - * less k great - * - * Invariants: - * - * all in (*, less) == pivot1 - * pivot1 < all in [less, k) < pivot2 - * all in (great, *) == pivot2 - * - * Pointer k is the first index of ?-part. - */ - outer: - for (int k = less - 1; ++k <= great; ) { - long ak = a[k]; - if (ak == pivot1) { // Move a[k] to left part - a[k] = a[less]; - a[less] = ak; - ++less; - } else if (ak == pivot2) { // Move a[k] to right part - while (a[great] == pivot2) { - if (great-- == k) { - break outer; - } - } - if (a[great] == pivot1) { // a[great] < pivot2 - a[k] = a[less]; - /* - * Even though a[great] equals to pivot1, the - * assignment a[less] = pivot1 may be incorrect, - * if a[great] and pivot1 are floating-point zeros - * of different signs. Therefore in float and - * double sorting methods we have to use more - * accurate assignment a[less] = a[great]. - */ - a[less] = pivot1; - ++less; - } else { // pivot1 < a[great] < pivot2 - a[k] = a[great]; - } - a[great] = ak; - --great; + + if (ak < pivot1) { // Move a[k] to the left side + while (a[++lower] < pivot1); + + if (lower > k) { + lower = k; + break; } + if (a[lower] > pivot2) { // a[lower] >= pivot1 + a[k] = a[--upper]; + a[upper] = a[lower]; + } else { // pivot1 <= a[lower] <= pivot2 + a[k] = a[lower]; + } + a[lower] = ak; + } else if (ak > pivot2) { // Move a[k] to the right side + a[k] = a[--upper]; + a[upper] = ak; } } - // Sort center part recursively - sort(a, less, great, false); + /* + * Swap the pivots into their final positions. + */ + a[low] = a[lower]; a[lower] = pivot1; + a[end] = a[upper]; a[upper] = pivot2; + /* + * Sort all parts recursively, excluding known pivots. + */ + if (parallel && length > PARALLEL_QUICKSORT_THRESHOLD) { + ForkJoinTask.invokeAll( + new Sorter(a, bits | 1, lower + 1, upper), + new Sorter(a, bits | 1, upper + 1, high), + new Sorter(a, bits, low, lower) + ); + } else { + sort(a, bits | 1, false, upper + 1, high); + sort(a, bits, false, low, lower); + sort(a, bits | 1, false, lower + 1, upper); + } } else { // Partitioning with one pivot + /* - * Use the third of the five sorted elements as pivot. - * This value is inexpensive approximation of the median. + * Use the third element as the pivot. This value + * is inexpensive approximation of the median. */ long pivot = a[e3]; /* - * Partitioning degenerates to the traditional 3-way - * (or "Dutch National Flag") schema: - * - * left part center part right part - * +-------------------------------------------------+ - * | < pivot | == pivot | ? | > pivot | - * +-------------------------------------------------+ - * ^ ^ ^ - * | | | - * less k great + * The first element to be sorted is moved to the location + * formerly occupied by the pivot. When partitioning is + * completed, the pivot is swapped back into its final + * position, and excluded from subsequent sorting. + */ + a[e3] = a[lower]; + + /* + * Traditional 3-way (Dutch National Flag) partitioning + * + * left part central part right part + * +------------------------------------------------------+ + * | < pivot | ? | == pivot | > pivot | + * +------------------------------------------------------+ + * ^ ^ ^ + * | | | + * lower k upper * * Invariants: * - * all in (left, less) < pivot - * all in [less, k) == pivot - * all in (great, right) > pivot + * all in (low, lower] < pivot + * all in (k, upper) == pivot + * all in [upper, end] > pivot * - * Pointer k is the first index of ?-part. + * Pointer k is the last index of ?-part */ - for (int k = less; k <= great; ++k) { + for (int k = ++upper; --k > lower; ) { if (a[k] == pivot) { continue; } long ak = a[k]; - if (ak < pivot) { // Move a[k] to left part - a[k] = a[less]; - a[less] = ak; - ++less; - } else { // a[k] > pivot - Move a[k] to right part - while (a[great] > pivot) { - --great; - } - if (a[great] < pivot) { // a[great] <= pivot - a[k] = a[less]; - a[less] = a[great]; - ++less; - } else { // a[great] == pivot - /* - * Even though a[great] equals to pivot, the - * assignment a[k] = pivot may be incorrect, - * if a[great] and pivot are floating-point - * zeros of different signs. Therefore in float - * and double sorting methods we have to use - * more accurate assignment a[k] = a[great]. - */ - a[k] = pivot; + + if (ak < pivot) { // Move a[k] to the left side + while (a[++lower] < pivot); + + if (lower > k) { + lower = k; + break; + } + a[k] = pivot; + + if (a[lower] > pivot) { + a[--upper] = a[lower]; } - a[great] = ak; - --great; + a[lower] = ak; + } else { // ak > pivot - Move a[k] to the right side + a[k] = pivot; + a[--upper] = ak; } } /* - * Sort left and right parts recursively. - * All elements from center part are equal + * Swap the pivot into its final position. + */ + a[low] = a[lower]; a[lower] = pivot; + + /* + * Sort the left and the right parts recursively, excluding + * known pivot. All elements from the central part are equal * and, therefore, already sorted. */ - sort(a, left, less - 1, leftmost); - sort(a, great + 1, right, false); + if (parallel && length > PARALLEL_QUICKSORT_THRESHOLD) { + ForkJoinTask.invokeAll( + new Sorter(a, bits, low, lower), + new Sorter(a, bits | 1, upper, high) + ); + } else { + sort(a, bits | 1, false, upper, high); + sort(a, bits, false, low, lower); + } } } /** - * Sorts the specified range of the array using the given - * workspace array slice if possible for merging + * Sorts the specified range of the array. * * @param a the array to be sorted - * @param left the index of the first element, inclusive, to be sorted - * @param right the index of the last element, inclusive, to be sorted - * @param work a workspace array (slice) - * @param workBase origin of usable space in work array - * @param workLen usable size of work array - */ - static void sort(short[] a, int left, int right, - short[] work, int workBase, int workLen) { - // Use counting sort on large arrays - if (right - left > COUNTING_SORT_THRESHOLD_FOR_SHORT_OR_CHAR) { - int[] count = new int[NUM_SHORT_VALUES]; - - for (int i = left - 1; ++i <= right; - count[a[i] - Short.MIN_VALUE]++ - ); - for (int i = NUM_SHORT_VALUES, k = right + 1; k > left; ) { - while (count[--i] == 0); - short value = (short) (i + Short.MIN_VALUE); - int s = count[i]; - - do { - a[--k] = value; - } while (--s > 0); - } - } else { // Use Dual-Pivot Quicksort on small arrays - doSort(a, left, right, work, workBase, workLen); + * @param parallel indicates whether sorting is performed in parallel + * @param low the index of the first element, inclusive, to be sorted + * @param high the index of the last element, exclusive, to be sorted + */ + static void sort(byte[] a, boolean parallel, int low, int high) { + if (high - low > COUNTING_SORT_THRESHOLD_FOR_BYTE) { + SortingAlgorithms.countingSort(a, low, high); + } else { + SortingAlgorithms.insertionSort(a, low, high); } } - /** The number of distinct short values. */ - private static final int NUM_SHORT_VALUES = 1 << 16; - /** * Sorts the specified range of the array. * * @param a the array to be sorted - * @param left the index of the first element, inclusive, to be sorted - * @param right the index of the last element, inclusive, to be sorted - * @param work a workspace array (slice) - * @param workBase origin of usable space in work array - * @param workLen usable size of work array - */ - private static void doSort(short[] a, int left, int right, - short[] work, int workBase, int workLen) { - // Use Quicksort on small arrays - if (right - left < QUICKSORT_THRESHOLD) { - sort(a, left, right, true); - return; + * @param parallel indicates whether sorting is performed in parallel + * @param low the index of the first element, inclusive, to be sorted + * @param high the index of the last element, exclusive, to be sorted + */ + static void sort(char[] a, boolean parallel, int low, int high) { + if (high - low > COUNTING_SORT_THRESHOLD_FOR_SHORT_OR_CHAR) { + SortingAlgorithms.countingSort(a, low, high); + } else { + sort(a, LEFTMOST_BITS, parallel, low, high); } + } + + /** + * Sorts the specified range of the array by the Dual-Pivot Quicksort. + * + * @param a the array to be sorted + * @param bits the recursion depth of Quicksort and the leftmost flag + * @param parallel indicates whether sorting is performed in parallel + * @param low the index of the first element, inclusive, to be sorted + * @param high the index of the last element, exclusive, to be sorted + */ + private static void sort(char[] a, int bits, boolean parallel, int low, int high) { + int end = high - 1, length = high - low; /* - * Index run[i] is the start of i-th run - * (ascending or descending sequence). + * Run insertion sorts on non-leftmost part. */ - int[] run = new int[MAX_RUN_COUNT + 1]; - int count = 0; run[0] = left; - - // Check if the array is nearly sorted - for (int k = left; k < right; run[count] = k) { - // Equal items in the beginning of the sequence - while (k < right && a[k] == a[k + 1]) - k++; - if (k == right) break; // Sequence finishes with equal items - if (a[k] < a[k + 1]) { // ascending - while (++k <= right && a[k - 1] <= a[k]); - } else if (a[k] > a[k + 1]) { // descending - while (++k <= right && a[k - 1] >= a[k]); - // Transform into an ascending sequence - for (int lo = run[count] - 1, hi = k; ++lo < --hi; ) { - short t = a[lo]; a[lo] = a[hi]; a[hi] = t; - } - } + if ((bits & 1) > 0) { - // Merge a transformed descending sequence followed by an - // ascending sequence - if (run[count] > left && a[run[count]] >= a[run[count] - 1]) { - count--; + /* + * Use nano insertion sort on tiny part. + */ + if (length < NANO_INSERTION_SORT_THRESHOLD) { + SortingAlgorithms.nanoInsertionSort(a, low, high); + return; } /* - * The array is not highly structured, - * use Quicksort instead of merge sort. + * Use pair insertion sort on small part. */ - if (++count == MAX_RUN_COUNT) { - sort(a, left, right, true); + if (length < PAIR_INSERTION_SORT_THRESHOLD) { + SortingAlgorithms.pairInsertionSort(a, low, end); return; } } - // These invariants should hold true: - // run[0] = 0 - // run[] = right + 1; (terminator) - - if (count == 0) { - // A single equal run - return; - } else if (count == 1 && run[count] > right) { - // Either a single ascending or a transformed descending run. - // Always check that a final run is a proper terminator, otherwise - // we have an unterminated trailing run, to handle downstream. + /* + * Switch to heap sort on the leftmost part or + * if the execution time is becoming quadratic. + */ + if (length < HEAP_SORT_THRESHOLD || (bits -= 2) < 0) { + SortingAlgorithms.heapSort(a, low, end); return; } - right++; - if (run[count] < right) { - // Corner case: the final run is not a terminator. This may happen - // if a final run is an equals run, or there is a single-element run - // at the end. Fix up by adding a proper terminator at the end. - // Note that we terminate with (right + 1), incremented earlier. - run[++count] = right; - } - - // Determine alternation base for merge - byte odd = 0; - for (int n = 1; (n <<= 1) < count; odd ^= 1); - - // Use or create temporary array b for merging - short[] b; // temp array; alternates with a - int ao, bo; // array offsets from 'left' - int blen = right - left; // space needed for b - if (work == null || workLen < blen || workBase + blen > work.length) { - work = new short[blen]; - workBase = 0; - } - if (odd == 0) { - System.arraycopy(a, left, work, workBase, blen); - b = a; - bo = 0; - a = work; - ao = workBase - left; - } else { - b = work; - ao = 0; - bo = workBase - left; - } - - // Merging - for (int last; count > 1; count = last) { - for (int k = (last = 0) + 2; k <= count; k += 2) { - int hi = run[k], mi = run[k - 1]; - for (int i = run[k - 2], p = i, q = mi; i < hi; ++i) { - if (q >= hi || p < mi && a[p + ao] <= a[q + ao]) { - b[i + bo] = a[p++ + ao]; - } else { - b[i + bo] = a[q++ + ao]; - } - } - run[++last] = hi; - } - if ((count & 1) != 0) { - for (int i = right, lo = run[count - 1]; --i >= lo; - b[i + bo] = a[i + ao] - ); - run[++last] = right; - } - short[] t = a; a = b; b = t; - int o = ao; ao = bo; bo = o; - } - } - - /** - * Sorts the specified range of the array by Dual-Pivot Quicksort. - * - * @param a the array to be sorted - * @param left the index of the first element, inclusive, to be sorted - * @param right the index of the last element, inclusive, to be sorted - * @param leftmost indicates if this part is the leftmost in the range - */ - private static void sort(short[] a, int left, int right, boolean leftmost) { - int length = right - left + 1; - - // Use insertion sort on tiny arrays - if (length < INSERTION_SORT_THRESHOLD) { - if (leftmost) { - /* - * Traditional (without sentinel) insertion sort, - * optimized for server VM, is used in case of - * the leftmost part. - */ - for (int i = left, j = i; i < right; j = ++i) { - short ai = a[i + 1]; - while (ai < a[j]) { - a[j + 1] = a[j]; - if (j-- == left) { - break; - } - } - a[j + 1] = ai; - } - } else { - /* - * Skip the longest ascending sequence. - */ - do { - if (left >= right) { - return; - } - } while (a[++left] >= a[left - 1]); - - /* - * Every element from adjoining part plays the role - * of sentinel, therefore this allows us to avoid the - * left range check on each iteration. Moreover, we use - * the more optimized algorithm, so called pair insertion - * sort, which is faster (in the context of Quicksort) - * than traditional implementation of insertion sort. - */ - for (int k = left; ++left <= right; k = ++left) { - short a1 = a[k], a2 = a[left]; - - if (a1 < a2) { - a2 = a1; a1 = a[left]; - } - while (a1 < a[--k]) { - a[k + 2] = a[k]; - } - a[++k + 1] = a1; - while (a2 < a[--k]) { - a[k + 1] = a[k]; - } - a[k + 1] = a2; - } - short last = a[right]; - - while (last < a[--right]) { - a[right + 1] = a[right]; - } - a[right + 1] = last; - } + /* + * Check if the array is nearly sorted + * and then try to sort it by Merging sort. + */ + if (SortingAlgorithms.mergingSort(a, parallel, low, high)) { return; } - // Inexpensive approximation of length / 7 - int seventh = (length >> 3) + (length >> 6) + 1; + /* + * The splitting of the array, defined by the following + * step, is related to the inexpensive approximation of + * the golden ratio. + */ + int step = (length >> 3) * 3 + 3; /* - * Sort five evenly spaced elements around (and including) the - * center element in the range. These elements will be used for - * pivot selection as described below. The choice for spacing - * these elements was empirically determined to work well on - * a wide variety of inputs. + * Five elements around (and including) the central element in + * the array will be used for pivot selection as described below. + * The unequal choice of spacing these elements was empirically + * determined to work well on a wide variety of inputs. */ - int e3 = (left + right) >>> 1; // The midpoint - int e2 = e3 - seventh; - int e1 = e2 - seventh; - int e4 = e3 + seventh; - int e5 = e4 + seventh; + int e1 = low + step; + int e5 = end - step; + int e3 = (e1 + e5) >>> 1; + int e2 = (e1 + e3) >>> 1; + int e4 = (e3 + e5) >>> 1; - // Sort these elements using insertion sort - if (a[e2] < a[e1]) { short t = a[e2]; a[e2] = a[e1]; a[e1] = t; } + /* + * Sort these elements in place by the combination + * of 5-element sorting network and insertion sort. + */ + if (a[e5] < a[e3]) { char t = a[e5]; a[e5] = a[e3]; a[e3] = t; } + if (a[e4] < a[e2]) { char t = a[e4]; a[e4] = a[e2]; a[e2] = t; } + if (a[e5] < a[e4]) { char t = a[e5]; a[e5] = a[e4]; a[e4] = t; } + if (a[e3] < a[e2]) { char t = a[e3]; a[e3] = a[e2]; a[e2] = t; } + if (a[e4] < a[e3]) { char t = a[e4]; a[e4] = a[e3]; a[e3] = t; } - if (a[e3] < a[e2]) { short t = a[e3]; a[e3] = a[e2]; a[e2] = t; - if (t < a[e1]) { a[e2] = a[e1]; a[e1] = t; } - } - if (a[e4] < a[e3]) { short t = a[e4]; a[e4] = a[e3]; a[e3] = t; - if (t < a[e2]) { a[e3] = a[e2]; a[e2] = t; - if (t < a[e1]) { a[e2] = a[e1]; a[e1] = t; } - } - } - if (a[e5] < a[e4]) { short t = a[e5]; a[e5] = a[e4]; a[e4] = t; - if (t < a[e3]) { a[e4] = a[e3]; a[e3] = t; - if (t < a[e2]) { a[e3] = a[e2]; a[e2] = t; - if (t < a[e1]) { a[e2] = a[e1]; a[e1] = t; } + if (a[e1] > a[e2]) { char t = a[e1]; a[e1] = a[e2]; a[e2] = t; + if (t > a[e3]) { a[e2] = a[e3]; a[e3] = t; + if (t > a[e4]) { a[e3] = a[e4]; a[e4] = t; + if (t > a[e5]) { a[e4] = a[e5]; a[e5] = t; } } } } // Pointers - int less = left; // The index of the first element of center part - int great = right; // The index before the first element of right part + int lower = low; // The index of the last element of the left part + int upper = end; // The index of the first element of the right part + + if (a[e1] < a[e2] && a[e2] < a[e3] && a[e3] < a[e4] && a[e4] < a[e5]) { - if (a[e1] != a[e2] && a[e2] != a[e3] && a[e3] != a[e4] && a[e4] != a[e5]) { /* - * Use the second and fourth of the five sorted elements as pivots. - * These values are inexpensive approximations of the first and - * second terciles of the array. Note that pivot1 <= pivot2. + * Use the first and the fifth elements as the pivots. + * These values are inexpensive approximation of tertiles. + * Note, that pivot1 < pivot2. */ - short pivot1 = a[e2]; - short pivot2 = a[e4]; + char pivot1 = a[e1]; + char pivot2 = a[e5]; /* * The first and the last elements to be sorted are moved to the * locations formerly occupied by the pivots. When partitioning - * is complete, the pivots are swapped back into their final + * is completed, the pivots are swapped back into their final * positions, and excluded from subsequent sorting. */ - a[e2] = a[left]; - a[e4] = a[right]; + a[e1] = a[lower]; + a[e5] = a[upper]; /* - * Skip elements, which are less or greater than pivot values. + * Skip elements, which are less or greater than the pivots. */ - while (a[++less] < pivot1); - while (a[--great] > pivot2); + while (a[++lower] < pivot1); + while (a[--upper] > pivot2); /* - * Partitioning: + * Backwards 3-interval partitioning * - * left part center part right part - * +--------------------------------------------------------------+ - * | < pivot1 | pivot1 <= && <= pivot2 | ? | > pivot2 | - * +--------------------------------------------------------------+ - * ^ ^ ^ - * | | | - * less k great + * left part central part right part + * +------------------------------------------------------------+ + * | < pivot1 | ? | pivot1 <= && <= pivot2 | > pivot2 | + * +------------------------------------------------------------+ + * ^ ^ ^ + * | | | + * lower k upper * * Invariants: * - * all in (left, less) < pivot1 - * pivot1 <= all in [less, k) <= pivot2 - * all in (great, right) > pivot2 + * all in (low, lower] < pivot1 + * pivot1 <= all in (k, upper) <= pivot2 + * all in [upper, end) > pivot2 * - * Pointer k is the first index of ?-part. + * Pointer k is the last index of ?-part */ - outer: - for (int k = less - 1; ++k <= great; ) { - short ak = a[k]; - if (ak < pivot1) { // Move a[k] to left part - a[k] = a[less]; - /* - * Here and below we use "a[i] = b; i++;" instead - * of "a[i++] = b;" due to performance issue. - */ - a[less] = ak; - ++less; - } else if (ak > pivot2) { // Move a[k] to right part - while (a[great] > pivot2) { - if (great-- == k) { - break outer; - } - } - if (a[great] < pivot1) { // a[great] <= pivot2 - a[k] = a[less]; - a[less] = a[great]; - ++less; - } else { // pivot1 <= a[great] <= pivot2 - a[k] = a[great]; - } - /* - * Here and below we use "a[i] = b; i--;" instead - * of "a[i--] = b;" due to performance issue. - */ - a[great] = ak; - --great; - } - } - - // Swap pivots into their final positions - a[left] = a[less - 1]; a[less - 1] = pivot1; - a[right] = a[great + 1]; a[great + 1] = pivot2; - - // Sort left and right parts recursively, excluding known pivots - sort(a, left, less - 2, leftmost); - sort(a, great + 2, right, false); - - /* - * If center part is too large (comprises > 4/7 of the array), - * swap internal pivot values to ends. - */ - if (less < e1 && e5 < great) { - /* - * Skip elements, which are equal to pivot values. - */ - while (a[less] == pivot1) { - ++less; - } - - while (a[great] == pivot2) { - --great; - } - - /* - * Partitioning: - * - * left part center part right part - * +----------------------------------------------------------+ - * | == pivot1 | pivot1 < && < pivot2 | ? | == pivot2 | - * +----------------------------------------------------------+ - * ^ ^ ^ - * | | | - * less k great - * - * Invariants: - * - * all in (*, less) == pivot1 - * pivot1 < all in [less, k) < pivot2 - * all in (great, *) == pivot2 - * - * Pointer k is the first index of ?-part. - */ - outer: - for (int k = less - 1; ++k <= great; ) { - short ak = a[k]; - if (ak == pivot1) { // Move a[k] to left part - a[k] = a[less]; - a[less] = ak; - ++less; - } else if (ak == pivot2) { // Move a[k] to right part - while (a[great] == pivot2) { - if (great-- == k) { - break outer; - } - } - if (a[great] == pivot1) { // a[great] < pivot2 - a[k] = a[less]; - /* - * Even though a[great] equals to pivot1, the - * assignment a[less] = pivot1 may be incorrect, - * if a[great] and pivot1 are floating-point zeros - * of different signs. Therefore in float and - * double sorting methods we have to use more - * accurate assignment a[less] = a[great]. - */ - a[less] = pivot1; - ++less; - } else { // pivot1 < a[great] < pivot2 - a[k] = a[great]; - } - a[great] = ak; - --great; + for (int $ = --lower, k = ++upper; --k > lower; ) { + char ak = a[k]; + + if (ak < pivot1) { // Move a[k] to the left side + while (a[++lower] < pivot1); + + if (lower > k) { + lower = k; + break; } + if (a[lower] > pivot2) { // a[lower] >= pivot1 + a[k] = a[--upper]; + a[upper] = a[lower]; + } else { // pivot1 <= a[lower] <= pivot2 + a[k] = a[lower]; + } + a[lower] = ak; + } else if (ak > pivot2) { // Move a[k] to the right side + a[k] = a[--upper]; + a[upper] = ak; } } - // Sort center part recursively - sort(a, less, great, false); + /* + * Swap the pivots into their final positions. + */ + a[low] = a[lower]; a[lower] = pivot1; + a[end] = a[upper]; a[upper] = pivot2; + /* + * Sort all parts recursively, excluding known pivots. + */ + if (parallel && length > PARALLEL_QUICKSORT_THRESHOLD) { + ForkJoinTask.invokeAll( + new Sorter(a, bits | 1, lower + 1, upper), + new Sorter(a, bits | 1, upper + 1, high), + new Sorter(a, bits, low, lower) + ); + } else { + sort(a, bits | 1, false, upper + 1, high); + sort(a, bits, false, low, lower); + sort(a, bits | 1, false, lower + 1, upper); + } } else { // Partitioning with one pivot + /* - * Use the third of the five sorted elements as pivot. - * This value is inexpensive approximation of the median. + * Use the third element as the pivot. This value + * is inexpensive approximation of the median. */ - short pivot = a[e3]; + char pivot = a[e3]; /* - * Partitioning degenerates to the traditional 3-way - * (or "Dutch National Flag") schema: - * - * left part center part right part - * +-------------------------------------------------+ - * | < pivot | == pivot | ? | > pivot | - * +-------------------------------------------------+ - * ^ ^ ^ - * | | | - * less k great + * The first element to be sorted is moved to the location + * formerly occupied by the pivot. When partitioning is + * completed, the pivot is swapped back into its final + * position, and excluded from subsequent sorting. + */ + a[e3] = a[lower]; + + /* + * Traditional 3-way (Dutch National Flag) partitioning + * + * left part central part right part + * +------------------------------------------------------+ + * | < pivot | ? | == pivot | > pivot | + * +------------------------------------------------------+ + * ^ ^ ^ + * | | | + * lower k upper * * Invariants: * - * all in (left, less) < pivot - * all in [less, k) == pivot - * all in (great, right) > pivot + * all in (low, lower] < pivot + * all in (k, upper) == pivot + * all in [upper, end] > pivot * - * Pointer k is the first index of ?-part. + * Pointer k is the last index of ?-part */ - for (int k = less; k <= great; ++k) { + for (int k = ++upper; --k > lower; ) { if (a[k] == pivot) { continue; } - short ak = a[k]; - if (ak < pivot) { // Move a[k] to left part - a[k] = a[less]; - a[less] = ak; - ++less; - } else { // a[k] > pivot - Move a[k] to right part - while (a[great] > pivot) { - --great; - } - if (a[great] < pivot) { // a[great] <= pivot - a[k] = a[less]; - a[less] = a[great]; - ++less; - } else { // a[great] == pivot - /* - * Even though a[great] equals to pivot, the - * assignment a[k] = pivot may be incorrect, - * if a[great] and pivot are floating-point - * zeros of different signs. Therefore in float - * and double sorting methods we have to use - * more accurate assignment a[k] = a[great]. - */ - a[k] = pivot; + char ak = a[k]; + + if (ak < pivot) { // Move a[k] to the left side + while (a[++lower] < pivot); + + if (lower > k) { + lower = k; + break; + } + a[k] = pivot; + + if (a[lower] > pivot) { + a[--upper] = a[lower]; } - a[great] = ak; - --great; + a[lower] = ak; + } else { // ak > pivot - Move a[k] to the right side + a[k] = pivot; + a[--upper] = ak; } } /* - * Sort left and right parts recursively. - * All elements from center part are equal + * Swap the pivot into its final position. + */ + a[low] = a[lower]; a[lower] = pivot; + + /* + * Sort the left and the right parts recursively, excluding + * known pivot. All elements from the central part are equal * and, therefore, already sorted. */ - sort(a, left, less - 1, leftmost); - sort(a, great + 1, right, false); + if (parallel && length > PARALLEL_QUICKSORT_THRESHOLD) { + ForkJoinTask.invokeAll( + new Sorter(a, bits, low, lower), + new Sorter(a, bits | 1, upper, high) + ); + } else { + sort(a, bits | 1, false, upper, high); + sort(a, bits, false, low, lower); + } } } /** - * Sorts the specified range of the array using the given - * workspace array slice if possible for merging + * Sorts the specified range of the array. * * @param a the array to be sorted - * @param left the index of the first element, inclusive, to be sorted - * @param right the index of the last element, inclusive, to be sorted - * @param work a workspace array (slice) - * @param workBase origin of usable space in work array - * @param workLen usable size of work array - */ - static void sort(char[] a, int left, int right, - char[] work, int workBase, int workLen) { - // Use counting sort on large arrays - if (right - left > COUNTING_SORT_THRESHOLD_FOR_SHORT_OR_CHAR) { - int[] count = new int[NUM_CHAR_VALUES]; - - for (int i = left - 1; ++i <= right; - count[a[i]]++ - ); - for (int i = NUM_CHAR_VALUES, k = right + 1; k > left; ) { - while (count[--i] == 0); - char value = (char) i; - int s = count[i]; - - do { - a[--k] = value; - } while (--s > 0); - } - } else { // Use Dual-Pivot Quicksort on small arrays - doSort(a, left, right, work, workBase, workLen); + * @param parallel indicates whether sorting is performed in parallel + * @param low the index of the first element, inclusive, to be sorted + * @param high the index of the last element, exclusive, to be sorted + */ + static void sort(short[] a, boolean parallel, int low, int high) { + if (high - low > COUNTING_SORT_THRESHOLD_FOR_SHORT_OR_CHAR) { + SortingAlgorithms.countingSort(a, low, high); + } else { + sort(a, LEFTMOST_BITS, parallel, low, high); } } - /** The number of distinct char values. */ - private static final int NUM_CHAR_VALUES = 1 << 16; - /** - * Sorts the specified range of the array. + * Sorts the specified range of the array by the Dual-Pivot Quicksort. * * @param a the array to be sorted - * @param left the index of the first element, inclusive, to be sorted - * @param right the index of the last element, inclusive, to be sorted - * @param work a workspace array (slice) - * @param workBase origin of usable space in work array - * @param workLen usable size of work array - */ - private static void doSort(char[] a, int left, int right, - char[] work, int workBase, int workLen) { - // Use Quicksort on small arrays - if (right - left < QUICKSORT_THRESHOLD) { - sort(a, left, right, true); - return; - } + * @param bits the recursion depth of Quicksort and the leftmost flag + * @param parallel indicates whether sorting is performed in parallel + * @param low the index of the first element, inclusive, to be sorted + * @param high the index of the last element, exclusive, to be sorted + */ + private static void sort(short[] a, int bits, boolean parallel, int low, int high) { + int end = high - 1, length = high - low; /* - * Index run[i] is the start of i-th run - * (ascending or descending sequence). + * Run insertion sorts on non-leftmost part. */ - int[] run = new int[MAX_RUN_COUNT + 1]; - int count = 0; run[0] = left; - - // Check if the array is nearly sorted - for (int k = left; k < right; run[count] = k) { - // Equal items in the beginning of the sequence - while (k < right && a[k] == a[k + 1]) - k++; - if (k == right) break; // Sequence finishes with equal items - if (a[k] < a[k + 1]) { // ascending - while (++k <= right && a[k - 1] <= a[k]); - } else if (a[k] > a[k + 1]) { // descending - while (++k <= right && a[k - 1] >= a[k]); - // Transform into an ascending sequence - for (int lo = run[count] - 1, hi = k; ++lo < --hi; ) { - char t = a[lo]; a[lo] = a[hi]; a[hi] = t; - } - } + if ((bits & 1) > 0) { - // Merge a transformed descending sequence followed by an - // ascending sequence - if (run[count] > left && a[run[count]] >= a[run[count] - 1]) { - count--; + /* + * Use nano insertion sort on tiny part. + */ + if (length < NANO_INSERTION_SORT_THRESHOLD) { + SortingAlgorithms.nanoInsertionSort(a, low, high); + return; } /* - * The array is not highly structured, - * use Quicksort instead of merge sort. + * Use pair insertion sort on small part. */ - if (++count == MAX_RUN_COUNT) { - sort(a, left, right, true); + if (length < PAIR_INSERTION_SORT_THRESHOLD) { + SortingAlgorithms.pairInsertionSort(a, low, end); return; } } - // These invariants should hold true: - // run[0] = 0 - // run[] = right + 1; (terminator) - - if (count == 0) { - // A single equal run - return; - } else if (count == 1 && run[count] > right) { - // Either a single ascending or a transformed descending run. - // Always check that a final run is a proper terminator, otherwise - // we have an unterminated trailing run, to handle downstream. + /* + * Switch to heap sort on the leftmost part or + * if the execution time is becoming quadratic. + */ + if (length < HEAP_SORT_THRESHOLD || (bits -= 2) < 0) { + SortingAlgorithms.heapSort(a, low, end); return; } - right++; - if (run[count] < right) { - // Corner case: the final run is not a terminator. This may happen - // if a final run is an equals run, or there is a single-element run - // at the end. Fix up by adding a proper terminator at the end. - // Note that we terminate with (right + 1), incremented earlier. - run[++count] = right; - } - - // Determine alternation base for merge - byte odd = 0; - for (int n = 1; (n <<= 1) < count; odd ^= 1); - - // Use or create temporary array b for merging - char[] b; // temp array; alternates with a - int ao, bo; // array offsets from 'left' - int blen = right - left; // space needed for b - if (work == null || workLen < blen || workBase + blen > work.length) { - work = new char[blen]; - workBase = 0; - } - if (odd == 0) { - System.arraycopy(a, left, work, workBase, blen); - b = a; - bo = 0; - a = work; - ao = workBase - left; - } else { - b = work; - ao = 0; - bo = workBase - left; - } - - // Merging - for (int last; count > 1; count = last) { - for (int k = (last = 0) + 2; k <= count; k += 2) { - int hi = run[k], mi = run[k - 1]; - for (int i = run[k - 2], p = i, q = mi; i < hi; ++i) { - if (q >= hi || p < mi && a[p + ao] <= a[q + ao]) { - b[i + bo] = a[p++ + ao]; - } else { - b[i + bo] = a[q++ + ao]; - } - } - run[++last] = hi; - } - if ((count & 1) != 0) { - for (int i = right, lo = run[count - 1]; --i >= lo; - b[i + bo] = a[i + ao] - ); - run[++last] = right; - } - char[] t = a; a = b; b = t; - int o = ao; ao = bo; bo = o; - } - } - - /** - * Sorts the specified range of the array by Dual-Pivot Quicksort. - * - * @param a the array to be sorted - * @param left the index of the first element, inclusive, to be sorted - * @param right the index of the last element, inclusive, to be sorted - * @param leftmost indicates if this part is the leftmost in the range - */ - private static void sort(char[] a, int left, int right, boolean leftmost) { - int length = right - left + 1; - - // Use insertion sort on tiny arrays - if (length < INSERTION_SORT_THRESHOLD) { - if (leftmost) { - /* - * Traditional (without sentinel) insertion sort, - * optimized for server VM, is used in case of - * the leftmost part. - */ - for (int i = left, j = i; i < right; j = ++i) { - char ai = a[i + 1]; - while (ai < a[j]) { - a[j + 1] = a[j]; - if (j-- == left) { - break; - } - } - a[j + 1] = ai; - } - } else { - /* - * Skip the longest ascending sequence. - */ - do { - if (left >= right) { - return; - } - } while (a[++left] >= a[left - 1]); - - /* - * Every element from adjoining part plays the role - * of sentinel, therefore this allows us to avoid the - * left range check on each iteration. Moreover, we use - * the more optimized algorithm, so called pair insertion - * sort, which is faster (in the context of Quicksort) - * than traditional implementation of insertion sort. - */ - for (int k = left; ++left <= right; k = ++left) { - char a1 = a[k], a2 = a[left]; - - if (a1 < a2) { - a2 = a1; a1 = a[left]; - } - while (a1 < a[--k]) { - a[k + 2] = a[k]; - } - a[++k + 1] = a1; - while (a2 < a[--k]) { - a[k + 1] = a[k]; - } - a[k + 1] = a2; - } - char last = a[right]; - - while (last < a[--right]) { - a[right + 1] = a[right]; - } - a[right + 1] = last; - } + /* + * Check if the array is nearly sorted + * and then try to sort it by Merging sort. + */ + if (SortingAlgorithms.mergingSort(a, parallel, low, high)) { return; } - // Inexpensive approximation of length / 7 - int seventh = (length >> 3) + (length >> 6) + 1; + /* + * The splitting of the array, defined by the following + * step, is related to the inexpensive approximation of + * the golden ratio. + */ + int step = (length >> 3) * 3 + 3; /* - * Sort five evenly spaced elements around (and including) the - * center element in the range. These elements will be used for - * pivot selection as described below. The choice for spacing - * these elements was empirically determined to work well on - * a wide variety of inputs. + * Five elements around (and including) the central element in + * the array will be used for pivot selection as described below. + * The unequal choice of spacing these elements was empirically + * determined to work well on a wide variety of inputs. */ - int e3 = (left + right) >>> 1; // The midpoint - int e2 = e3 - seventh; - int e1 = e2 - seventh; - int e4 = e3 + seventh; - int e5 = e4 + seventh; + int e1 = low + step; + int e5 = end - step; + int e3 = (e1 + e5) >>> 1; + int e2 = (e1 + e3) >>> 1; + int e4 = (e3 + e5) >>> 1; - // Sort these elements using insertion sort - if (a[e2] < a[e1]) { char t = a[e2]; a[e2] = a[e1]; a[e1] = t; } + /* + * Sort these elements in place by the combination + * of 5-element sorting network and insertion sort. + */ + if (a[e5] < a[e3]) { short t = a[e5]; a[e5] = a[e3]; a[e3] = t; } + if (a[e4] < a[e2]) { short t = a[e4]; a[e4] = a[e2]; a[e2] = t; } + if (a[e5] < a[e4]) { short t = a[e5]; a[e5] = a[e4]; a[e4] = t; } + if (a[e3] < a[e2]) { short t = a[e3]; a[e3] = a[e2]; a[e2] = t; } + if (a[e4] < a[e3]) { short t = a[e4]; a[e4] = a[e3]; a[e3] = t; } - if (a[e3] < a[e2]) { char t = a[e3]; a[e3] = a[e2]; a[e2] = t; - if (t < a[e1]) { a[e2] = a[e1]; a[e1] = t; } - } - if (a[e4] < a[e3]) { char t = a[e4]; a[e4] = a[e3]; a[e3] = t; - if (t < a[e2]) { a[e3] = a[e2]; a[e2] = t; - if (t < a[e1]) { a[e2] = a[e1]; a[e1] = t; } - } - } - if (a[e5] < a[e4]) { char t = a[e5]; a[e5] = a[e4]; a[e4] = t; - if (t < a[e3]) { a[e4] = a[e3]; a[e3] = t; - if (t < a[e2]) { a[e3] = a[e2]; a[e2] = t; - if (t < a[e1]) { a[e2] = a[e1]; a[e1] = t; } + if (a[e1] > a[e2]) { short t = a[e1]; a[e1] = a[e2]; a[e2] = t; + if (t > a[e3]) { a[e2] = a[e3]; a[e3] = t; + if (t > a[e4]) { a[e3] = a[e4]; a[e4] = t; + if (t > a[e5]) { a[e4] = a[e5]; a[e5] = t; } } } } // Pointers - int less = left; // The index of the first element of center part - int great = right; // The index before the first element of right part + int lower = low; // The index of the last element of the left part + int upper = end; // The index of the first element of the right part + + if (a[e1] < a[e2] && a[e2] < a[e3] && a[e3] < a[e4] && a[e4] < a[e5]) { - if (a[e1] != a[e2] && a[e2] != a[e3] && a[e3] != a[e4] && a[e4] != a[e5]) { /* - * Use the second and fourth of the five sorted elements as pivots. - * These values are inexpensive approximations of the first and - * second terciles of the array. Note that pivot1 <= pivot2. + * Use the first and the fifth elements as the pivots. + * These values are inexpensive approximation of tertiles. + * Note, that pivot1 < pivot2. */ - char pivot1 = a[e2]; - char pivot2 = a[e4]; + short pivot1 = a[e1]; + short pivot2 = a[e5]; /* * The first and the last elements to be sorted are moved to the * locations formerly occupied by the pivots. When partitioning - * is complete, the pivots are swapped back into their final + * is completed, the pivots are swapped back into their final * positions, and excluded from subsequent sorting. */ - a[e2] = a[left]; - a[e4] = a[right]; + a[e1] = a[lower]; + a[e5] = a[upper]; /* - * Skip elements, which are less or greater than pivot values. + * Skip elements, which are less or greater than the pivots. */ - while (a[++less] < pivot1); - while (a[--great] > pivot2); + while (a[++lower] < pivot1); + while (a[--upper] > pivot2); /* - * Partitioning: + * Backwards 3-interval partitioning * - * left part center part right part - * +--------------------------------------------------------------+ - * | < pivot1 | pivot1 <= && <= pivot2 | ? | > pivot2 | - * +--------------------------------------------------------------+ - * ^ ^ ^ - * | | | - * less k great + * left part central part right part + * +------------------------------------------------------------+ + * | < pivot1 | ? | pivot1 <= && <= pivot2 | > pivot2 | + * +------------------------------------------------------------+ + * ^ ^ ^ + * | | | + * lower k upper * * Invariants: * - * all in (left, less) < pivot1 - * pivot1 <= all in [less, k) <= pivot2 - * all in (great, right) > pivot2 + * all in (low, lower] < pivot1 + * pivot1 <= all in (k, upper) <= pivot2 + * all in [upper, end) > pivot2 * - * Pointer k is the first index of ?-part. + * Pointer k is the last index of ?-part */ - outer: - for (int k = less - 1; ++k <= great; ) { - char ak = a[k]; - if (ak < pivot1) { // Move a[k] to left part - a[k] = a[less]; - /* - * Here and below we use "a[i] = b; i++;" instead - * of "a[i++] = b;" due to performance issue. - */ - a[less] = ak; - ++less; - } else if (ak > pivot2) { // Move a[k] to right part - while (a[great] > pivot2) { - if (great-- == k) { - break outer; - } - } - if (a[great] < pivot1) { // a[great] <= pivot2 - a[k] = a[less]; - a[less] = a[great]; - ++less; - } else { // pivot1 <= a[great] <= pivot2 - a[k] = a[great]; - } - /* - * Here and below we use "a[i] = b; i--;" instead - * of "a[i--] = b;" due to performance issue. - */ - a[great] = ak; - --great; - } - } - - // Swap pivots into their final positions - a[left] = a[less - 1]; a[less - 1] = pivot1; - a[right] = a[great + 1]; a[great + 1] = pivot2; - - // Sort left and right parts recursively, excluding known pivots - sort(a, left, less - 2, leftmost); - sort(a, great + 2, right, false); - - /* - * If center part is too large (comprises > 4/7 of the array), - * swap internal pivot values to ends. - */ - if (less < e1 && e5 < great) { - /* - * Skip elements, which are equal to pivot values. - */ - while (a[less] == pivot1) { - ++less; - } - - while (a[great] == pivot2) { - --great; - } - - /* - * Partitioning: - * - * left part center part right part - * +----------------------------------------------------------+ - * | == pivot1 | pivot1 < && < pivot2 | ? | == pivot2 | - * +----------------------------------------------------------+ - * ^ ^ ^ - * | | | - * less k great - * - * Invariants: - * - * all in (*, less) == pivot1 - * pivot1 < all in [less, k) < pivot2 - * all in (great, *) == pivot2 - * - * Pointer k is the first index of ?-part. - */ - outer: - for (int k = less - 1; ++k <= great; ) { - char ak = a[k]; - if (ak == pivot1) { // Move a[k] to left part - a[k] = a[less]; - a[less] = ak; - ++less; - } else if (ak == pivot2) { // Move a[k] to right part - while (a[great] == pivot2) { - if (great-- == k) { - break outer; - } - } - if (a[great] == pivot1) { // a[great] < pivot2 - a[k] = a[less]; - /* - * Even though a[great] equals to pivot1, the - * assignment a[less] = pivot1 may be incorrect, - * if a[great] and pivot1 are floating-point zeros - * of different signs. Therefore in float and - * double sorting methods we have to use more - * accurate assignment a[less] = a[great]. - */ - a[less] = pivot1; - ++less; - } else { // pivot1 < a[great] < pivot2 - a[k] = a[great]; - } - a[great] = ak; - --great; + for (int $ = --lower, k = ++upper; --k > lower; ) { + short ak = a[k]; + + if (ak < pivot1) { // Move a[k] to the left side + while (a[++lower] < pivot1); + + if (lower > k) { + lower = k; + break; } + if (a[lower] > pivot2) { // a[lower] >= pivot1 + a[k] = a[--upper]; + a[upper] = a[lower]; + } else { // pivot1 <= a[lower] <= pivot2 + a[k] = a[lower]; + } + a[lower] = ak; + } else if (ak > pivot2) { // Move a[k] to the right side + a[k] = a[--upper]; + a[upper] = ak; } } - // Sort center part recursively - sort(a, less, great, false); + /* + * Swap the pivots into their final positions. + */ + a[low] = a[lower]; a[lower] = pivot1; + a[end] = a[upper]; a[upper] = pivot2; + /* + * Sort all parts recursively, excluding known pivots. + */ + if (parallel && length > PARALLEL_QUICKSORT_THRESHOLD) { + ForkJoinTask.invokeAll( + new Sorter(a, bits | 1, lower + 1, upper), + new Sorter(a, bits | 1, upper + 1, high), + new Sorter(a, bits, low, lower) + ); + } else { + sort(a, bits | 1, false, upper + 1, high); + sort(a, bits, false, low, lower); + sort(a, bits | 1, false, lower + 1, upper); + } } else { // Partitioning with one pivot + /* - * Use the third of the five sorted elements as pivot. - * This value is inexpensive approximation of the median. + * Use the third element as the pivot. This value + * is inexpensive approximation of the median. */ - char pivot = a[e3]; + short pivot = a[e3]; /* - * Partitioning degenerates to the traditional 3-way - * (or "Dutch National Flag") schema: - * - * left part center part right part - * +-------------------------------------------------+ - * | < pivot | == pivot | ? | > pivot | - * +-------------------------------------------------+ - * ^ ^ ^ - * | | | - * less k great + * The first element to be sorted is moved to the location + * formerly occupied by the pivot. When partitioning is + * completed, the pivot is swapped back into its final + * position, and excluded from subsequent sorting. + */ + a[e3] = a[lower]; + + /* + * Traditional 3-way (Dutch National Flag) partitioning + * + * left part central part right part + * +------------------------------------------------------+ + * | < pivot | ? | == pivot | > pivot | + * +------------------------------------------------------+ + * ^ ^ ^ + * | | | + * lower k upper * * Invariants: * - * all in (left, less) < pivot - * all in [less, k) == pivot - * all in (great, right) > pivot + * all in (low, lower] < pivot + * all in (k, upper) == pivot + * all in [upper, end] > pivot * - * Pointer k is the first index of ?-part. + * Pointer k is the last index of ?-part */ - for (int k = less; k <= great; ++k) { + for (int k = ++upper; --k > lower; ) { if (a[k] == pivot) { continue; } - char ak = a[k]; - if (ak < pivot) { // Move a[k] to left part - a[k] = a[less]; - a[less] = ak; - ++less; - } else { // a[k] > pivot - Move a[k] to right part - while (a[great] > pivot) { - --great; - } - if (a[great] < pivot) { // a[great] <= pivot - a[k] = a[less]; - a[less] = a[great]; - ++less; - } else { // a[great] == pivot - /* - * Even though a[great] equals to pivot, the - * assignment a[k] = pivot may be incorrect, - * if a[great] and pivot are floating-point - * zeros of different signs. Therefore in float - * and double sorting methods we have to use - * more accurate assignment a[k] = a[great]. - */ - a[k] = pivot; + short ak = a[k]; + + if (ak < pivot) { // Move a[k] to the left side + while (a[++lower] < pivot); + + if (lower > k) { + lower = k; + break; + } + a[k] = pivot; + + if (a[lower] > pivot) { + a[--upper] = a[lower]; } - a[great] = ak; - --great; + a[lower] = ak; + } else { // ak > pivot - Move a[k] to the right side + a[k] = pivot; + a[--upper] = ak; } } /* - * Sort left and right parts recursively. - * All elements from center part are equal - * and, therefore, already sorted. + * Swap the pivot into its final position. */ - sort(a, left, less - 1, leftmost); - sort(a, great + 1, right, false); - } - } + a[low] = a[lower]; a[lower] = pivot; - /** The number of distinct byte values. */ - private static final int NUM_BYTE_VALUES = 1 << 8; - - /** - * Sorts the specified range of the array. - * - * @param a the array to be sorted - * @param left the index of the first element, inclusive, to be sorted - * @param right the index of the last element, inclusive, to be sorted - */ - static void sort(byte[] a, int left, int right) { - // Use counting sort on large arrays - if (right - left > COUNTING_SORT_THRESHOLD_FOR_BYTE) { - int[] count = new int[NUM_BYTE_VALUES]; - - for (int i = left - 1; ++i <= right; - count[a[i] - Byte.MIN_VALUE]++ - ); - for (int i = NUM_BYTE_VALUES, k = right + 1; k > left; ) { - while (count[--i] == 0); - byte value = (byte) (i + Byte.MIN_VALUE); - int s = count[i]; - - do { - a[--k] = value; - } while (--s > 0); - } - } else { // Use insertion sort on small arrays - for (int i = left, j = i; i < right; j = ++i) { - byte ai = a[i + 1]; - while (ai < a[j]) { - a[j + 1] = a[j]; - if (j-- == left) { - break; - } - } - a[j + 1] = ai; + /* + * Sort the left and the right parts recursively, excluding + * known pivot. All elements from the central part are equal + * and, therefore, already sorted. + */ + if (parallel && length > PARALLEL_QUICKSORT_THRESHOLD) { + ForkJoinTask.invokeAll( + new Sorter(a, bits, low, lower), + new Sorter(a, bits | 1, upper, high) + ); + } else { + sort(a, bits | 1, false, upper, high); + sort(a, bits, false, low, lower); } } } /** - * Sorts the specified range of the array using the given - * workspace array slice if possible for merging + * Sorts the specified range of the array. * * @param a the array to be sorted - * @param left the index of the first element, inclusive, to be sorted - * @param right the index of the last element, inclusive, to be sorted - * @param work a workspace array (slice) - * @param workBase origin of usable space in work array - * @param workLen usable size of work array + * @param parallel indicates whether sorting is performed in parallel + * @param low the index of the first element, inclusive, to be sorted + * @param high the index of the last element, exclusive, to be sorted */ - static void sort(float[] a, int left, int right, - float[] work, int workBase, int workLen) { + static void sort(float[] a, boolean parallel, int low, int high) { /* - * Phase 1: Move NaNs to the end of the array. + * Phase 1. Count the number of negative zero -0.0f, + * turn them into positive zero, and move all NaNs + * to the end of the array. */ - while (left <= right && Float.isNaN(a[right])) { - --right; - } - for (int k = right; --k >= left; ) { - float ak = a[k]; - if (ak != ak) { // a[k] is NaN - a[k] = a[right]; - a[right] = ak; - --right; + int numNegativeZero = 0; + + for (int k = high; k > low; ) { + float ak = a[--k]; + + if (ak == 0.0f && Float.floatToRawIntBits(ak) < 0) { // ak is -0.0f + numNegativeZero += 1; + a[k] = 0.0f; + } else if (ak != ak) { // ak is NaN + a[k] = a[--high]; + a[high] = ak; } } /* - * Phase 2: Sort everything except NaNs (which are already in place). + * Phase 2. Sort everything except NaNs, + * which are already in place. */ - doSort(a, left, right, work, workBase, workLen); + sort(a, LEFTMOST_BITS, parallel, low, high); /* - * Phase 3: Place negative zeros before positive zeros. + * Phase 3. Turn positive zero 0.0f + * back into negative zero -0.0f. */ - int hi = right; + if (++numNegativeZero == 1) { + return; + } /* - * Find the first zero, or first positive, or last negative element. + * Find the position one less than + * the index of the first zero. */ - while (left < hi) { - int middle = (left + hi) >>> 1; - float middleValue = a[middle]; + while (low <= high) { + int middle = (low + high) >>> 1; - if (middleValue < 0.0f) { - left = middle + 1; + if (a[middle] < 0) { + low = middle + 1; } else { - hi = middle; + high = middle - 1; } } /* - * Skip the last negative value (if any) or all leading negative zeros. - */ - while (left <= right && Float.floatToRawIntBits(a[left]) < 0) { - ++left; - } - - /* - * Move negative zeros to the beginning of the sub-range. - * - * Partitioning: - * - * +----------------------------------------------------+ - * | < 0.0 | -0.0 | 0.0 | ? ( >= 0.0 ) | - * +----------------------------------------------------+ - * ^ ^ ^ - * | | | - * left p k - * - * Invariants: - * - * all in (*, left) < 0.0 - * all in [left, p) == -0.0 - * all in [p, k) == 0.0 - * all in [k, right] >= 0.0 - * - * Pointer k is the first index of ?-part. + * Replace the required number of 0.0f by -0.0f. */ - for (int k = left, p = left - 1; ++k <= right; ) { - float ak = a[k]; - if (ak != 0.0f) { - break; - } - if (Float.floatToRawIntBits(ak) < 0) { // ak is -0.0f - a[k] = 0.0f; - a[++p] = -0.0f; - } + while (--numNegativeZero > 0) { + a[++high] = -0.0f; } } /** - * Sorts the specified range of the array. + * Sorts the specified range of the array by the Dual-Pivot Quicksort. * * @param a the array to be sorted - * @param left the index of the first element, inclusive, to be sorted - * @param right the index of the last element, inclusive, to be sorted - * @param work a workspace array (slice) - * @param workBase origin of usable space in work array - * @param workLen usable size of work array - */ - private static void doSort(float[] a, int left, int right, - float[] work, int workBase, int workLen) { - // Use Quicksort on small arrays - if (right - left < QUICKSORT_THRESHOLD) { - sort(a, left, right, true); - return; - } + * @param bits the recursion depth of Quicksort and the leftmost flag + * @param parallel indicates whether sorting is performed in parallel + * @param low the index of the first element, inclusive, to be sorted + * @param high the index of the last element, exclusive, to be sorted + */ + private static void sort(float[] a, int bits, boolean parallel, int low, int high) { + int end = high - 1, length = high - low; /* - * Index run[i] is the start of i-th run - * (ascending or descending sequence). + * Run insertion sorts on non-leftmost part. */ - int[] run = new int[MAX_RUN_COUNT + 1]; - int count = 0; run[0] = left; - - // Check if the array is nearly sorted - for (int k = left; k < right; run[count] = k) { - // Equal items in the beginning of the sequence - while (k < right && a[k] == a[k + 1]) - k++; - if (k == right) break; // Sequence finishes with equal items - if (a[k] < a[k + 1]) { // ascending - while (++k <= right && a[k - 1] <= a[k]); - } else if (a[k] > a[k + 1]) { // descending - while (++k <= right && a[k - 1] >= a[k]); - // Transform into an ascending sequence - for (int lo = run[count] - 1, hi = k; ++lo < --hi; ) { - float t = a[lo]; a[lo] = a[hi]; a[hi] = t; - } - } + if ((bits & 1) > 0) { - // Merge a transformed descending sequence followed by an - // ascending sequence - if (run[count] > left && a[run[count]] >= a[run[count] - 1]) { - count--; + /* + * Use nano insertion sort on tiny part. + */ + if (length < NANO_INSERTION_SORT_THRESHOLD) { + SortingAlgorithms.nanoInsertionSort(a, low, high); + return; } /* - * The array is not highly structured, - * use Quicksort instead of merge sort. + * Use pair insertion sort on small part. */ - if (++count == MAX_RUN_COUNT) { - sort(a, left, right, true); + if (length < PAIR_INSERTION_SORT_THRESHOLD) { + SortingAlgorithms.pairInsertionSort(a, low, end); return; } } - // These invariants should hold true: - // run[0] = 0 - // run[] = right + 1; (terminator) - - if (count == 0) { - // A single equal run - return; - } else if (count == 1 && run[count] > right) { - // Either a single ascending or a transformed descending run. - // Always check that a final run is a proper terminator, otherwise - // we have an unterminated trailing run, to handle downstream. + /* + * Switch to heap sort on the leftmost part or + * if the execution time is becoming quadratic. + */ + if (length < HEAP_SORT_THRESHOLD || (bits -= 2) < 0) { + SortingAlgorithms.heapSort(a, low, end); return; } - right++; - if (run[count] < right) { - // Corner case: the final run is not a terminator. This may happen - // if a final run is an equals run, or there is a single-element run - // at the end. Fix up by adding a proper terminator at the end. - // Note that we terminate with (right + 1), incremented earlier. - run[++count] = right; - } - - // Determine alternation base for merge - byte odd = 0; - for (int n = 1; (n <<= 1) < count; odd ^= 1); - - // Use or create temporary array b for merging - float[] b; // temp array; alternates with a - int ao, bo; // array offsets from 'left' - int blen = right - left; // space needed for b - if (work == null || workLen < blen || workBase + blen > work.length) { - work = new float[blen]; - workBase = 0; - } - if (odd == 0) { - System.arraycopy(a, left, work, workBase, blen); - b = a; - bo = 0; - a = work; - ao = workBase - left; - } else { - b = work; - ao = 0; - bo = workBase - left; - } - - // Merging - for (int last; count > 1; count = last) { - for (int k = (last = 0) + 2; k <= count; k += 2) { - int hi = run[k], mi = run[k - 1]; - for (int i = run[k - 2], p = i, q = mi; i < hi; ++i) { - if (q >= hi || p < mi && a[p + ao] <= a[q + ao]) { - b[i + bo] = a[p++ + ao]; - } else { - b[i + bo] = a[q++ + ao]; - } - } - run[++last] = hi; - } - if ((count & 1) != 0) { - for (int i = right, lo = run[count - 1]; --i >= lo; - b[i + bo] = a[i + ao] - ); - run[++last] = right; - } - float[] t = a; a = b; b = t; - int o = ao; ao = bo; bo = o; - } - } - - /** - * Sorts the specified range of the array by Dual-Pivot Quicksort. - * - * @param a the array to be sorted - * @param left the index of the first element, inclusive, to be sorted - * @param right the index of the last element, inclusive, to be sorted - * @param leftmost indicates if this part is the leftmost in the range - */ - private static void sort(float[] a, int left, int right, boolean leftmost) { - int length = right - left + 1; - - // Use insertion sort on tiny arrays - if (length < INSERTION_SORT_THRESHOLD) { - if (leftmost) { - /* - * Traditional (without sentinel) insertion sort, - * optimized for server VM, is used in case of - * the leftmost part. - */ - for (int i = left, j = i; i < right; j = ++i) { - float ai = a[i + 1]; - while (ai < a[j]) { - a[j + 1] = a[j]; - if (j-- == left) { - break; - } - } - a[j + 1] = ai; - } - } else { - /* - * Skip the longest ascending sequence. - */ - do { - if (left >= right) { - return; - } - } while (a[++left] >= a[left - 1]); - - /* - * Every element from adjoining part plays the role - * of sentinel, therefore this allows us to avoid the - * left range check on each iteration. Moreover, we use - * the more optimized algorithm, so called pair insertion - * sort, which is faster (in the context of Quicksort) - * than traditional implementation of insertion sort. - */ - for (int k = left; ++left <= right; k = ++left) { - float a1 = a[k], a2 = a[left]; - - if (a1 < a2) { - a2 = a1; a1 = a[left]; - } - while (a1 < a[--k]) { - a[k + 2] = a[k]; - } - a[++k + 1] = a1; - - while (a2 < a[--k]) { - a[k + 1] = a[k]; - } - a[k + 1] = a2; - } - float last = a[right]; - while (last < a[--right]) { - a[right + 1] = a[right]; - } - a[right + 1] = last; - } + /* + * Check if the array is nearly sorted + * and then try to sort it by Merging sort. + */ + if (SortingAlgorithms.mergingSort(a, parallel, low, high)) { return; } - // Inexpensive approximation of length / 7 - int seventh = (length >> 3) + (length >> 6) + 1; + /* + * The splitting of the array, defined by the following + * step, is related to the inexpensive approximation of + * the golden ratio. + */ + int step = (length >> 3) * 3 + 3; /* - * Sort five evenly spaced elements around (and including) the - * center element in the range. These elements will be used for - * pivot selection as described below. The choice for spacing - * these elements was empirically determined to work well on - * a wide variety of inputs. + * Five elements around (and including) the central element in + * the array will be used for pivot selection as described below. + * The unequal choice of spacing these elements was empirically + * determined to work well on a wide variety of inputs. */ - int e3 = (left + right) >>> 1; // The midpoint - int e2 = e3 - seventh; - int e1 = e2 - seventh; - int e4 = e3 + seventh; - int e5 = e4 + seventh; + int e1 = low + step; + int e5 = end - step; + int e3 = (e1 + e5) >>> 1; + int e2 = (e1 + e3) >>> 1; + int e4 = (e3 + e5) >>> 1; - // Sort these elements using insertion sort - if (a[e2] < a[e1]) { float t = a[e2]; a[e2] = a[e1]; a[e1] = t; } + /* + * Sort these elements in place by the combination + * of 5-element sorting network and insertion sort. + */ + if (a[e5] < a[e3]) { float t = a[e5]; a[e5] = a[e3]; a[e3] = t; } + if (a[e4] < a[e2]) { float t = a[e4]; a[e4] = a[e2]; a[e2] = t; } + if (a[e5] < a[e4]) { float t = a[e5]; a[e5] = a[e4]; a[e4] = t; } + if (a[e3] < a[e2]) { float t = a[e3]; a[e3] = a[e2]; a[e2] = t; } + if (a[e4] < a[e3]) { float t = a[e4]; a[e4] = a[e3]; a[e3] = t; } - if (a[e3] < a[e2]) { float t = a[e3]; a[e3] = a[e2]; a[e2] = t; - if (t < a[e1]) { a[e2] = a[e1]; a[e1] = t; } - } - if (a[e4] < a[e3]) { float t = a[e4]; a[e4] = a[e3]; a[e3] = t; - if (t < a[e2]) { a[e3] = a[e2]; a[e2] = t; - if (t < a[e1]) { a[e2] = a[e1]; a[e1] = t; } - } - } - if (a[e5] < a[e4]) { float t = a[e5]; a[e5] = a[e4]; a[e4] = t; - if (t < a[e3]) { a[e4] = a[e3]; a[e3] = t; - if (t < a[e2]) { a[e3] = a[e2]; a[e2] = t; - if (t < a[e1]) { a[e2] = a[e1]; a[e1] = t; } + if (a[e1] > a[e2]) { float t = a[e1]; a[e1] = a[e2]; a[e2] = t; + if (t > a[e3]) { a[e2] = a[e3]; a[e3] = t; + if (t > a[e4]) { a[e3] = a[e4]; a[e4] = t; + if (t > a[e5]) { a[e4] = a[e5]; a[e5] = t; } } } } // Pointers - int less = left; // The index of the first element of center part - int great = right; // The index before the first element of right part + int lower = low; // The index of the last element of the left part + int upper = end; // The index of the first element of the right part + + if (a[e1] < a[e2] && a[e2] < a[e3] && a[e3] < a[e4] && a[e4] < a[e5]) { - if (a[e1] != a[e2] && a[e2] != a[e3] && a[e3] != a[e4] && a[e4] != a[e5]) { /* - * Use the second and fourth of the five sorted elements as pivots. - * These values are inexpensive approximations of the first and - * second terciles of the array. Note that pivot1 <= pivot2. + * Use the first and the fifth elements as the pivots. + * These values are inexpensive approximation of tertiles. + * Note, that pivot1 < pivot2. */ - float pivot1 = a[e2]; - float pivot2 = a[e4]; + float pivot1 = a[e1]; + float pivot2 = a[e5]; /* * The first and the last elements to be sorted are moved to the * locations formerly occupied by the pivots. When partitioning - * is complete, the pivots are swapped back into their final + * is completed, the pivots are swapped back into their final * positions, and excluded from subsequent sorting. */ - a[e2] = a[left]; - a[e4] = a[right]; + a[e1] = a[lower]; + a[e5] = a[upper]; /* - * Skip elements, which are less or greater than pivot values. + * Skip elements, which are less or greater than the pivots. */ - while (a[++less] < pivot1); - while (a[--great] > pivot2); + while (a[++lower] < pivot1); + while (a[--upper] > pivot2); /* - * Partitioning: + * Backwards 3-interval partitioning * - * left part center part right part - * +--------------------------------------------------------------+ - * | < pivot1 | pivot1 <= && <= pivot2 | ? | > pivot2 | - * +--------------------------------------------------------------+ - * ^ ^ ^ - * | | | - * less k great + * left part central part right part + * +------------------------------------------------------------+ + * | < pivot1 | ? | pivot1 <= && <= pivot2 | > pivot2 | + * +------------------------------------------------------------+ + * ^ ^ ^ + * | | | + * lower k upper * * Invariants: * - * all in (left, less) < pivot1 - * pivot1 <= all in [less, k) <= pivot2 - * all in (great, right) > pivot2 + * all in (low, lower] < pivot1 + * pivot1 <= all in (k, upper) <= pivot2 + * all in [upper, end) > pivot2 * - * Pointer k is the first index of ?-part. + * Pointer k is the last index of ?-part */ - outer: - for (int k = less - 1; ++k <= great; ) { + for (int $ = --lower, k = ++upper; --k > lower; ) { float ak = a[k]; - if (ak < pivot1) { // Move a[k] to left part - a[k] = a[less]; - /* - * Here and below we use "a[i] = b; i++;" instead - * of "a[i++] = b;" due to performance issue. - */ - a[less] = ak; - ++less; - } else if (ak > pivot2) { // Move a[k] to right part - while (a[great] > pivot2) { - if (great-- == k) { - break outer; - } - } - if (a[great] < pivot1) { // a[great] <= pivot2 - a[k] = a[less]; - a[less] = a[great]; - ++less; - } else { // pivot1 <= a[great] <= pivot2 - a[k] = a[great]; - } - /* - * Here and below we use "a[i] = b; i--;" instead - * of "a[i--] = b;" due to performance issue. - */ - a[great] = ak; - --great; - } - } - - // Swap pivots into their final positions - a[left] = a[less - 1]; a[less - 1] = pivot1; - a[right] = a[great + 1]; a[great + 1] = pivot2; - - // Sort left and right parts recursively, excluding known pivots - sort(a, left, less - 2, leftmost); - sort(a, great + 2, right, false); - - /* - * If center part is too large (comprises > 4/7 of the array), - * swap internal pivot values to ends. - */ - if (less < e1 && e5 < great) { - /* - * Skip elements, which are equal to pivot values. - */ - while (a[less] == pivot1) { - ++less; - } - - while (a[great] == pivot2) { - --great; - } - - /* - * Partitioning: - * - * left part center part right part - * +----------------------------------------------------------+ - * | == pivot1 | pivot1 < && < pivot2 | ? | == pivot2 | - * +----------------------------------------------------------+ - * ^ ^ ^ - * | | | - * less k great - * - * Invariants: - * - * all in (*, less) == pivot1 - * pivot1 < all in [less, k) < pivot2 - * all in (great, *) == pivot2 - * - * Pointer k is the first index of ?-part. - */ - outer: - for (int k = less - 1; ++k <= great; ) { - float ak = a[k]; - if (ak == pivot1) { // Move a[k] to left part - a[k] = a[less]; - a[less] = ak; - ++less; - } else if (ak == pivot2) { // Move a[k] to right part - while (a[great] == pivot2) { - if (great-- == k) { - break outer; - } - } - if (a[great] == pivot1) { // a[great] < pivot2 - a[k] = a[less]; - /* - * Even though a[great] equals to pivot1, the - * assignment a[less] = pivot1 may be incorrect, - * if a[great] and pivot1 are floating-point zeros - * of different signs. Therefore in float and - * double sorting methods we have to use more - * accurate assignment a[less] = a[great]. - */ - a[less] = a[great]; - ++less; - } else { // pivot1 < a[great] < pivot2 - a[k] = a[great]; - } - a[great] = ak; - --great; + + if (ak < pivot1) { // Move a[k] to the left side + while (a[++lower] < pivot1); + + if (lower > k) { + lower = k; + break; + } + if (a[lower] > pivot2) { // a[lower] >= pivot1 + a[k] = a[--upper]; + a[upper] = a[lower]; + } else { // pivot1 <= a[lower] <= pivot2 + a[k] = a[lower]; } + a[lower] = ak; + } else if (ak > pivot2) { // Move a[k] to the right side + a[k] = a[--upper]; + a[upper] = ak; } } - // Sort center part recursively - sort(a, less, great, false); + /* + * Swap the pivots into their final positions. + */ + a[low] = a[lower]; a[lower] = pivot1; + a[end] = a[upper]; a[upper] = pivot2; + /* + * Sort all parts recursively, excluding known pivots. + */ + if (parallel && length > PARALLEL_QUICKSORT_THRESHOLD) { + ForkJoinTask.invokeAll( + new Sorter(a, bits | 1, lower + 1, upper), + new Sorter(a, bits | 1, upper + 1, high), + new Sorter(a, bits, low, lower) + ); + } else { + sort(a, bits | 1, false, upper + 1, high); + sort(a, bits, false, low, lower); + sort(a, bits | 1, false, lower + 1, upper); + } } else { // Partitioning with one pivot + /* - * Use the third of the five sorted elements as pivot. - * This value is inexpensive approximation of the median. + * Use the third element as the pivot. This value + * is inexpensive approximation of the median. */ float pivot = a[e3]; /* - * Partitioning degenerates to the traditional 3-way - * (or "Dutch National Flag") schema: - * - * left part center part right part - * +-------------------------------------------------+ - * | < pivot | == pivot | ? | > pivot | - * +-------------------------------------------------+ - * ^ ^ ^ - * | | | - * less k great + * The first element to be sorted is moved to the location + * formerly occupied by the pivot. When partitioning is + * completed, the pivot is swapped back into its final + * position, and excluded from subsequent sorting. + */ + a[e3] = a[lower]; + + /* + * Traditional 3-way (Dutch National Flag) partitioning + * + * left part central part right part + * +------------------------------------------------------+ + * | < pivot | ? | == pivot | > pivot | + * +------------------------------------------------------+ + * ^ ^ ^ + * | | | + * lower k upper * * Invariants: * - * all in (left, less) < pivot - * all in [less, k) == pivot - * all in (great, right) > pivot + * all in (low, lower] < pivot + * all in (k, upper) == pivot + * all in [upper, end] > pivot * - * Pointer k is the first index of ?-part. + * Pointer k is the last index of ?-part */ - for (int k = less; k <= great; ++k) { + for (int k = ++upper; --k > lower; ) { if (a[k] == pivot) { continue; } float ak = a[k]; - if (ak < pivot) { // Move a[k] to left part - a[k] = a[less]; - a[less] = ak; - ++less; - } else { // a[k] > pivot - Move a[k] to right part - while (a[great] > pivot) { - --great; - } - if (a[great] < pivot) { // a[great] <= pivot - a[k] = a[less]; - a[less] = a[great]; - ++less; - } else { // a[great] == pivot - /* - * Even though a[great] equals to pivot, the - * assignment a[k] = pivot may be incorrect, - * if a[great] and pivot are floating-point - * zeros of different signs. Therefore in float - * and double sorting methods we have to use - * more accurate assignment a[k] = a[great]. - */ - a[k] = a[great]; + + if (ak < pivot) { // Move a[k] to the left side + while (a[++lower] < pivot); + + if (lower > k) { + lower = k; + break; } - a[great] = ak; - --great; + a[k] = pivot; + + if (a[lower] > pivot) { + a[--upper] = a[lower]; + } + a[lower] = ak; + } else { // ak > pivot - Move a[k] to the right side + a[k] = pivot; + a[--upper] = ak; } } /* - * Sort left and right parts recursively. - * All elements from center part are equal + * Swap the pivot into its final position. + */ + a[low] = a[lower]; a[lower] = pivot; + + /* + * Sort the left and the right parts recursively, excluding + * known pivot. All elements from the central part are equal * and, therefore, already sorted. */ - sort(a, left, less - 1, leftmost); - sort(a, great + 1, right, false); + if (parallel && length > PARALLEL_QUICKSORT_THRESHOLD) { + ForkJoinTask.invokeAll( + new Sorter(a, bits, low, lower), + new Sorter(a, bits | 1, upper, high) + ); + } else { + sort(a, bits | 1, false, upper, high); + sort(a, bits, false, low, lower); + } } } /** - * Sorts the specified range of the array using the given - * workspace array slice if possible for merging + * Sorts the specified range of the array. * * @param a the array to be sorted - * @param left the index of the first element, inclusive, to be sorted - * @param right the index of the last element, inclusive, to be sorted - * @param work a workspace array (slice) - * @param workBase origin of usable space in work array - * @param workLen usable size of work array + * @param parallel indicates whether sorting is performed in parallel + * @param low the index of the first element, inclusive, to be sorted + * @param high the index of the last element, exclusive, to be sorted */ - static void sort(double[] a, int left, int right, - double[] work, int workBase, int workLen) { + static void sort(double[] a, boolean parallel, int low, int high) { /* - * Phase 1: Move NaNs to the end of the array. + * Phase 1. Count the number of negative zero -0.0d, + * turn them into positive zero, and move all NaNs + * to the end of the array. */ - while (left <= right && Double.isNaN(a[right])) { - --right; - } - for (int k = right; --k >= left; ) { - double ak = a[k]; - if (ak != ak) { // a[k] is NaN - a[k] = a[right]; - a[right] = ak; - --right; + int numNegativeZero = 0; + + for (int k = high; k > low; ) { + double ak = a[--k]; + + if (ak == 0.0d && Double.doubleToRawLongBits(ak) < 0) { // ak is -0.0d + numNegativeZero += 1; + a[k] = 0.0d; + } else if (ak != ak) { // ak is NaN + a[k] = a[--high]; + a[high] = ak; } } /* - * Phase 2: Sort everything except NaNs (which are already in place). + * Phase 2. Sort everything except NaNs, + * which are already in place. */ - doSort(a, left, right, work, workBase, workLen); + sort(a, LEFTMOST_BITS, parallel, low, high); /* - * Phase 3: Place negative zeros before positive zeros. + * Phase 3. Turn positive zero 0.0d + * back into negative zero -0.0d. */ - int hi = right; + if (++numNegativeZero == 1) { + return; + } /* - * Find the first zero, or first positive, or last negative element. + * Find the position one less than + * the index of the first zero. */ - while (left < hi) { - int middle = (left + hi) >>> 1; - double middleValue = a[middle]; + while (low <= high) { + int middle = (low + high) >>> 1; - if (middleValue < 0.0d) { - left = middle + 1; + if (a[middle] < 0) { + low = middle + 1; } else { - hi = middle; + high = middle - 1; } } /* - * Skip the last negative value (if any) or all leading negative zeros. - */ - while (left <= right && Double.doubleToRawLongBits(a[left]) < 0) { - ++left; - } - - /* - * Move negative zeros to the beginning of the sub-range. - * - * Partitioning: - * - * +----------------------------------------------------+ - * | < 0.0 | -0.0 | 0.0 | ? ( >= 0.0 ) | - * +----------------------------------------------------+ - * ^ ^ ^ - * | | | - * left p k - * - * Invariants: - * - * all in (*, left) < 0.0 - * all in [left, p) == -0.0 - * all in [p, k) == 0.0 - * all in [k, right] >= 0.0 - * - * Pointer k is the first index of ?-part. + * Replace the required number of 0.0d by -0.0d. */ - for (int k = left, p = left - 1; ++k <= right; ) { - double ak = a[k]; - if (ak != 0.0d) { - break; - } - if (Double.doubleToRawLongBits(ak) < 0) { // ak is -0.0d - a[k] = 0.0d; - a[++p] = -0.0d; - } + while (--numNegativeZero > 0) { + a[++high] = -0.0d; } } /** - * Sorts the specified range of the array. + * Sorts the specified range of the array by the Dual-Pivot Quicksort. * * @param a the array to be sorted - * @param left the index of the first element, inclusive, to be sorted - * @param right the index of the last element, inclusive, to be sorted - * @param work a workspace array (slice) - * @param workBase origin of usable space in work array - * @param workLen usable size of work array - */ - private static void doSort(double[] a, int left, int right, - double[] work, int workBase, int workLen) { - // Use Quicksort on small arrays - if (right - left < QUICKSORT_THRESHOLD) { - sort(a, left, right, true); - return; - } + * @param bits the recursion depth of Quicksort and the leftmost flag + * @param parallel indicates whether sorting is performed in parallel + * @param low the index of the first element, inclusive, to be sorted + * @param high the index of the last element, exclusive, to be sorted + */ + private static void sort(double[] a, int bits, boolean parallel, int low, int high) { + int end = high - 1, length = high - low; /* - * Index run[i] is the start of i-th run - * (ascending or descending sequence). + * Run insertion sorts on non-leftmost part. */ - int[] run = new int[MAX_RUN_COUNT + 1]; - int count = 0; run[0] = left; - - // Check if the array is nearly sorted - for (int k = left; k < right; run[count] = k) { - // Equal items in the beginning of the sequence - while (k < right && a[k] == a[k + 1]) - k++; - if (k == right) break; // Sequence finishes with equal items - if (a[k] < a[k + 1]) { // ascending - while (++k <= right && a[k - 1] <= a[k]); - } else if (a[k] > a[k + 1]) { // descending - while (++k <= right && a[k - 1] >= a[k]); - // Transform into an ascending sequence - for (int lo = run[count] - 1, hi = k; ++lo < --hi; ) { - double t = a[lo]; a[lo] = a[hi]; a[hi] = t; - } - } + if ((bits & 1) > 0) { - // Merge a transformed descending sequence followed by an - // ascending sequence - if (run[count] > left && a[run[count]] >= a[run[count] - 1]) { - count--; + /* + * Use nano insertion sort on tiny part. + */ + if (length < NANO_INSERTION_SORT_THRESHOLD) { + SortingAlgorithms.nanoInsertionSort(a, low, high); + return; } /* - * The array is not highly structured, - * use Quicksort instead of merge sort. + * Use pair insertion sort on small part. */ - if (++count == MAX_RUN_COUNT) { - sort(a, left, right, true); + if (length < PAIR_INSERTION_SORT_THRESHOLD) { + SortingAlgorithms.pairInsertionSort(a, low, end); return; } } - // These invariants should hold true: - // run[0] = 0 - // run[] = right + 1; (terminator) - - if (count == 0) { - // A single equal run - return; - } else if (count == 1 && run[count] > right) { - // Either a single ascending or a transformed descending run. - // Always check that a final run is a proper terminator, otherwise - // we have an unterminated trailing run, to handle downstream. + /* + * Switch to heap sort on the leftmost part or + * if the execution time is becoming quadratic. + */ + if (length < HEAP_SORT_THRESHOLD || (bits -= 2) < 0) { + SortingAlgorithms.heapSort(a, low, end); return; } - right++; - if (run[count] < right) { - // Corner case: the final run is not a terminator. This may happen - // if a final run is an equals run, or there is a single-element run - // at the end. Fix up by adding a proper terminator at the end. - // Note that we terminate with (right + 1), incremented earlier. - run[++count] = right; - } - - // Determine alternation base for merge - byte odd = 0; - for (int n = 1; (n <<= 1) < count; odd ^= 1); - - // Use or create temporary array b for merging - double[] b; // temp array; alternates with a - int ao, bo; // array offsets from 'left' - int blen = right - left; // space needed for b - if (work == null || workLen < blen || workBase + blen > work.length) { - work = new double[blen]; - workBase = 0; - } - if (odd == 0) { - System.arraycopy(a, left, work, workBase, blen); - b = a; - bo = 0; - a = work; - ao = workBase - left; - } else { - b = work; - ao = 0; - bo = workBase - left; - } - - // Merging - for (int last; count > 1; count = last) { - for (int k = (last = 0) + 2; k <= count; k += 2) { - int hi = run[k], mi = run[k - 1]; - for (int i = run[k - 2], p = i, q = mi; i < hi; ++i) { - if (q >= hi || p < mi && a[p + ao] <= a[q + ao]) { - b[i + bo] = a[p++ + ao]; - } else { - b[i + bo] = a[q++ + ao]; - } - } - run[++last] = hi; - } - if ((count & 1) != 0) { - for (int i = right, lo = run[count - 1]; --i >= lo; - b[i + bo] = a[i + ao] - ); - run[++last] = right; - } - double[] t = a; a = b; b = t; - int o = ao; ao = bo; bo = o; - } - } - - /** - * Sorts the specified range of the array by Dual-Pivot Quicksort. - * - * @param a the array to be sorted - * @param left the index of the first element, inclusive, to be sorted - * @param right the index of the last element, inclusive, to be sorted - * @param leftmost indicates if this part is the leftmost in the range - */ - private static void sort(double[] a, int left, int right, boolean leftmost) { - int length = right - left + 1; - - // Use insertion sort on tiny arrays - if (length < INSERTION_SORT_THRESHOLD) { - if (leftmost) { - /* - * Traditional (without sentinel) insertion sort, - * optimized for server VM, is used in case of - * the leftmost part. - */ - for (int i = left, j = i; i < right; j = ++i) { - double ai = a[i + 1]; - while (ai < a[j]) { - a[j + 1] = a[j]; - if (j-- == left) { - break; - } - } - a[j + 1] = ai; - } - } else { - /* - * Skip the longest ascending sequence. - */ - do { - if (left >= right) { - return; - } - } while (a[++left] >= a[left - 1]); - - /* - * Every element from adjoining part plays the role - * of sentinel, therefore this allows us to avoid the - * left range check on each iteration. Moreover, we use - * the more optimized algorithm, so called pair insertion - * sort, which is faster (in the context of Quicksort) - * than traditional implementation of insertion sort. - */ - for (int k = left; ++left <= right; k = ++left) { - double a1 = a[k], a2 = a[left]; - - if (a1 < a2) { - a2 = a1; a1 = a[left]; - } - while (a1 < a[--k]) { - a[k + 2] = a[k]; - } - a[++k + 1] = a1; - - while (a2 < a[--k]) { - a[k + 1] = a[k]; - } - a[k + 1] = a2; - } - double last = a[right]; - while (last < a[--right]) { - a[right + 1] = a[right]; - } - a[right + 1] = last; - } + /* + * Check if the array is nearly sorted + * and then try to sort it by Merging sort. + */ + if (SortingAlgorithms.mergingSort(a, parallel, low, high)) { return; } - // Inexpensive approximation of length / 7 - int seventh = (length >> 3) + (length >> 6) + 1; + /* + * The splitting of the array, defined by the following + * step, is related to the inexpensive approximation of + * the golden ratio. + */ + int step = (length >> 3) * 3 + 3; /* - * Sort five evenly spaced elements around (and including) the - * center element in the range. These elements will be used for - * pivot selection as described below. The choice for spacing - * these elements was empirically determined to work well on - * a wide variety of inputs. + * Five elements around (and including) the central element in + * the array will be used for pivot selection as described below. + * The unequal choice of spacing these elements was empirically + * determined to work well on a wide variety of inputs. */ - int e3 = (left + right) >>> 1; // The midpoint - int e2 = e3 - seventh; - int e1 = e2 - seventh; - int e4 = e3 + seventh; - int e5 = e4 + seventh; + int e1 = low + step; + int e5 = end - step; + int e3 = (e1 + e5) >>> 1; + int e2 = (e1 + e3) >>> 1; + int e4 = (e3 + e5) >>> 1; - // Sort these elements using insertion sort - if (a[e2] < a[e1]) { double t = a[e2]; a[e2] = a[e1]; a[e1] = t; } + /* + * Sort these elements in place by the combination + * of 5-element sorting network and insertion sort. + */ + if (a[e5] < a[e3]) { double t = a[e5]; a[e5] = a[e3]; a[e3] = t; } + if (a[e4] < a[e2]) { double t = a[e4]; a[e4] = a[e2]; a[e2] = t; } + if (a[e5] < a[e4]) { double t = a[e5]; a[e5] = a[e4]; a[e4] = t; } + if (a[e3] < a[e2]) { double t = a[e3]; a[e3] = a[e2]; a[e2] = t; } + if (a[e4] < a[e3]) { double t = a[e4]; a[e4] = a[e3]; a[e3] = t; } - if (a[e3] < a[e2]) { double t = a[e3]; a[e3] = a[e2]; a[e2] = t; - if (t < a[e1]) { a[e2] = a[e1]; a[e1] = t; } - } - if (a[e4] < a[e3]) { double t = a[e4]; a[e4] = a[e3]; a[e3] = t; - if (t < a[e2]) { a[e3] = a[e2]; a[e2] = t; - if (t < a[e1]) { a[e2] = a[e1]; a[e1] = t; } - } - } - if (a[e5] < a[e4]) { double t = a[e5]; a[e5] = a[e4]; a[e4] = t; - if (t < a[e3]) { a[e4] = a[e3]; a[e3] = t; - if (t < a[e2]) { a[e3] = a[e2]; a[e2] = t; - if (t < a[e1]) { a[e2] = a[e1]; a[e1] = t; } + if (a[e1] > a[e2]) { double t = a[e1]; a[e1] = a[e2]; a[e2] = t; + if (t > a[e3]) { a[e2] = a[e3]; a[e3] = t; + if (t > a[e4]) { a[e3] = a[e4]; a[e4] = t; + if (t > a[e5]) { a[e4] = a[e5]; a[e5] = t; } } } } // Pointers - int less = left; // The index of the first element of center part - int great = right; // The index before the first element of right part + int lower = low; // The index of the last element of the left part + int upper = end; // The index of the first element of the right part + + if (a[e1] < a[e2] && a[e2] < a[e3] && a[e3] < a[e4] && a[e4] < a[e5]) { - if (a[e1] != a[e2] && a[e2] != a[e3] && a[e3] != a[e4] && a[e4] != a[e5]) { /* - * Use the second and fourth of the five sorted elements as pivots. - * These values are inexpensive approximations of the first and - * second terciles of the array. Note that pivot1 <= pivot2. + * Use the first and the fifth elements as the pivots. + * These values are inexpensive approximation of tertiles. + * Note, that pivot1 < pivot2. */ - double pivot1 = a[e2]; - double pivot2 = a[e4]; + double pivot1 = a[e1]; + double pivot2 = a[e5]; /* * The first and the last elements to be sorted are moved to the * locations formerly occupied by the pivots. When partitioning - * is complete, the pivots are swapped back into their final + * is completed, the pivots are swapped back into their final * positions, and excluded from subsequent sorting. */ - a[e2] = a[left]; - a[e4] = a[right]; + a[e1] = a[lower]; + a[e5] = a[upper]; /* - * Skip elements, which are less or greater than pivot values. + * Skip elements, which are less or greater than the pivots. */ - while (a[++less] < pivot1); - while (a[--great] > pivot2); + while (a[++lower] < pivot1); + while (a[--upper] > pivot2); /* - * Partitioning: + * Backwards 3-interval partitioning * - * left part center part right part - * +--------------------------------------------------------------+ - * | < pivot1 | pivot1 <= && <= pivot2 | ? | > pivot2 | - * +--------------------------------------------------------------+ - * ^ ^ ^ - * | | | - * less k great + * left part central part right part + * +------------------------------------------------------------+ + * | < pivot1 | ? | pivot1 <= && <= pivot2 | > pivot2 | + * +------------------------------------------------------------+ + * ^ ^ ^ + * | | | + * lower k upper * * Invariants: * - * all in (left, less) < pivot1 - * pivot1 <= all in [less, k) <= pivot2 - * all in (great, right) > pivot2 + * all in (low, lower] < pivot1 + * pivot1 <= all in (k, upper) <= pivot2 + * all in [upper, end) > pivot2 * - * Pointer k is the first index of ?-part. + * Pointer k is the last index of ?-part */ - outer: - for (int k = less - 1; ++k <= great; ) { + for (int $ = --lower, k = ++upper; --k > lower; ) { double ak = a[k]; - if (ak < pivot1) { // Move a[k] to left part - a[k] = a[less]; - /* - * Here and below we use "a[i] = b; i++;" instead - * of "a[i++] = b;" due to performance issue. - */ - a[less] = ak; - ++less; - } else if (ak > pivot2) { // Move a[k] to right part - while (a[great] > pivot2) { - if (great-- == k) { - break outer; - } - } - if (a[great] < pivot1) { // a[great] <= pivot2 - a[k] = a[less]; - a[less] = a[great]; - ++less; - } else { // pivot1 <= a[great] <= pivot2 - a[k] = a[great]; - } - /* - * Here and below we use "a[i] = b; i--;" instead - * of "a[i--] = b;" due to performance issue. - */ - a[great] = ak; - --great; - } - } - - // Swap pivots into their final positions - a[left] = a[less - 1]; a[less - 1] = pivot1; - a[right] = a[great + 1]; a[great + 1] = pivot2; - - // Sort left and right parts recursively, excluding known pivots - sort(a, left, less - 2, leftmost); - sort(a, great + 2, right, false); - - /* - * If center part is too large (comprises > 4/7 of the array), - * swap internal pivot values to ends. - */ - if (less < e1 && e5 < great) { - /* - * Skip elements, which are equal to pivot values. - */ - while (a[less] == pivot1) { - ++less; - } - - while (a[great] == pivot2) { - --great; - } - - /* - * Partitioning: - * - * left part center part right part - * +----------------------------------------------------------+ - * | == pivot1 | pivot1 < && < pivot2 | ? | == pivot2 | - * +----------------------------------------------------------+ - * ^ ^ ^ - * | | | - * less k great - * - * Invariants: - * - * all in (*, less) == pivot1 - * pivot1 < all in [less, k) < pivot2 - * all in (great, *) == pivot2 - * - * Pointer k is the first index of ?-part. - */ - outer: - for (int k = less - 1; ++k <= great; ) { - double ak = a[k]; - if (ak == pivot1) { // Move a[k] to left part - a[k] = a[less]; - a[less] = ak; - ++less; - } else if (ak == pivot2) { // Move a[k] to right part - while (a[great] == pivot2) { - if (great-- == k) { - break outer; - } - } - if (a[great] == pivot1) { // a[great] < pivot2 - a[k] = a[less]; - /* - * Even though a[great] equals to pivot1, the - * assignment a[less] = pivot1 may be incorrect, - * if a[great] and pivot1 are floating-point zeros - * of different signs. Therefore in float and - * double sorting methods we have to use more - * accurate assignment a[less] = a[great]. - */ - a[less] = a[great]; - ++less; - } else { // pivot1 < a[great] < pivot2 - a[k] = a[great]; - } - a[great] = ak; - --great; + + if (ak < pivot1) { // Move a[k] to the left side + while (a[++lower] < pivot1); + + if (lower > k) { + lower = k; + break; + } + if (a[lower] > pivot2) { // a[lower] >= pivot1 + a[k] = a[--upper]; + a[upper] = a[lower]; + } else { // pivot1 <= a[lower] <= pivot2 + a[k] = a[lower]; } + a[lower] = ak; + } else if (ak > pivot2) { // Move a[k] to the right side + a[k] = a[--upper]; + a[upper] = ak; } } - // Sort center part recursively - sort(a, less, great, false); + /* + * Swap the pivots into their final positions. + */ + a[low] = a[lower]; a[lower] = pivot1; + a[end] = a[upper]; a[upper] = pivot2; + /* + * Sort all parts recursively, excluding known pivots. + */ + if (parallel && length > PARALLEL_QUICKSORT_THRESHOLD) { + ForkJoinTask.invokeAll( + new Sorter(a, bits | 1, lower + 1, upper), + new Sorter(a, bits | 1, upper + 1, high), + new Sorter(a, bits, low, lower) + ); + } else { + sort(a, bits | 1, false, upper + 1, high); + sort(a, bits, false, low, lower); + sort(a, bits | 1, false, lower + 1, upper); + } } else { // Partitioning with one pivot + /* - * Use the third of the five sorted elements as pivot. - * This value is inexpensive approximation of the median. + * Use the third element as the pivot. This value + * is inexpensive approximation of the median. */ double pivot = a[e3]; /* - * Partitioning degenerates to the traditional 3-way - * (or "Dutch National Flag") schema: - * - * left part center part right part - * +-------------------------------------------------+ - * | < pivot | == pivot | ? | > pivot | - * +-------------------------------------------------+ - * ^ ^ ^ - * | | | - * less k great + * The first element to be sorted is moved to the location + * formerly occupied by the pivot. When partitioning is + * completed, the pivot is swapped back into its final + * position, and excluded from subsequent sorting. + */ + a[e3] = a[lower]; + + /* + * Traditional 3-way (Dutch National Flag) partitioning + * + * left part central part right part + * +------------------------------------------------------+ + * | < pivot | ? | == pivot | > pivot | + * +------------------------------------------------------+ + * ^ ^ ^ + * | | | + * lower k upper * * Invariants: * - * all in (left, less) < pivot - * all in [less, k) == pivot - * all in (great, right) > pivot + * all in (low, lower] < pivot + * all in (k, upper) == pivot + * all in [upper, end] > pivot * - * Pointer k is the first index of ?-part. + * Pointer k is the last index of ?-part */ - for (int k = less; k <= great; ++k) { + for (int k = ++upper; --k > lower; ) { if (a[k] == pivot) { continue; } double ak = a[k]; - if (ak < pivot) { // Move a[k] to left part - a[k] = a[less]; - a[less] = ak; - ++less; - } else { // a[k] > pivot - Move a[k] to right part - while (a[great] > pivot) { - --great; - } - if (a[great] < pivot) { // a[great] <= pivot - a[k] = a[less]; - a[less] = a[great]; - ++less; - } else { // a[great] == pivot - /* - * Even though a[great] equals to pivot, the - * assignment a[k] = pivot may be incorrect, - * if a[great] and pivot are floating-point - * zeros of different signs. Therefore in float - * and double sorting methods we have to use - * more accurate assignment a[k] = a[great]. - */ - a[k] = a[great]; + + if (ak < pivot) { // Move a[k] to the left side + while (a[++lower] < pivot); + + if (lower > k) { + lower = k; + break; } - a[great] = ak; - --great; + a[k] = pivot; + + if (a[lower] > pivot) { + a[--upper] = a[lower]; + } + a[lower] = ak; + } else { // ak > pivot - Move a[k] to the right side + a[k] = pivot; + a[--upper] = ak; } } /* - * Sort left and right parts recursively. - * All elements from center part are equal + * Swap the pivot into its final position. + */ + a[low] = a[lower]; a[lower] = pivot; + + /* + * Sort the left and the right parts recursively, excluding + * known pivot. All elements from the central part are equal * and, therefore, already sorted. */ - sort(a, left, less - 1, leftmost); - sort(a, great + 1, right, false); + if (parallel && length > PARALLEL_QUICKSORT_THRESHOLD) { + ForkJoinTask.invokeAll( + new Sorter(a, bits, low, lower), + new Sorter(a, bits | 1, upper, high) + ); + } else { + sort(a, bits | 1, false, upper, high); + sort(a, bits, false, low, lower); + } } } }