Specialize intarray sorting
authorJohn Naylor <[email protected]>
Tue, 18 Feb 2025 04:04:55 +0000 (11:04 +0700)
committerJohn Naylor <[email protected]>
Tue, 18 Feb 2025 04:04:55 +0000 (11:04 +0700)
There is at least one report in the field of storing millions of
integers in arrays, so it seems like a good time to specialize
intarray's qsort function. In doing so, streamline the comparators:
Previously there were three, two for each direction for sorting
and one passed to qunique_arg. To preserve the early exit in the
case of descending input, pass the direction as an argument to
the comparator. This requires giving up duplicate detection, which
previously allowed skipping the qunique_arg() call. Testing showed
no regressions this way.

In passing, get rid of nearby checks that the input has at least
two elements, since preserving them would make some macros less
readable. These are not necessary for correctness, and seem like
premature optimizations.

Author: Andrey M. Borodin <[email protected]>
Discussion: https://postgr.es/m/098A3E67-E4A6-4086-9C66-B1EAEB1DFE1C@yandex-team.ru

contrib/intarray/_int.h
contrib/intarray/_int_tool.c

index 0352cbd36826b48bd6138ae42d7cd5e9f99d444a..ab899a81b4f49ccfb10f6b512a110d36ed4d4fd9 100644 (file)
@@ -41,17 +41,17 @@ typedef struct
 #define SORT(x) \
    do { \
        int     _nelems_ = ARRNELEMS(x); \
-       if (_nelems_ > 1) \
-           isort(ARRPTR(x), _nelems_); \
+       bool _ascending = true; \
+       isort(ARRPTR(x), _nelems_, &_ascending); \
    } while(0)
 
 /* sort the elements of the array and remove duplicates */
 #define PREPAREARR(x) \
    do { \
        int     _nelems_ = ARRNELEMS(x); \
-       if (_nelems_ > 1) \
-           if (isort(ARRPTR(x), _nelems_)) \
-               (x) = _int_unique(x); \
+       bool _ascending = true; \
+       isort(ARRPTR(x), _nelems_, &_ascending); \
+       (x) = _int_unique(x); \
    } while(0)
 
 /* "wish" function */
@@ -109,7 +109,7 @@ typedef struct
 /*
  * useful functions
  */
-bool       isort(int32 *a, int len);
+void       isort(int32 *a, size_t len, void *arg);
 ArrayType  *new_intArrayType(int num);
 ArrayType  *copy_intArrayType(ArrayType *a);
 ArrayType  *resize_intArrayType(ArrayType *a, int num);
@@ -176,16 +176,12 @@ bool      execconsistent(QUERYTYPE *query, ArrayType *array, bool calcnot);
 bool       gin_bool_consistent(QUERYTYPE *query, bool *check);
 bool       query_has_required_values(QUERYTYPE *query);
 
-int            compASC(const void *a, const void *b);
-int            compDESC(const void *a, const void *b);
-
 /* sort, either ascending or descending */
 #define QSORT(a, direction) \
    do { \
        int     _nelems_ = ARRNELEMS(a); \
-       if (_nelems_ > 1) \
-           qsort((void*) ARRPTR(a), _nelems_, sizeof(int32), \
-                 (direction) ? compASC : compDESC ); \
+       bool _ascending = (direction) ? true : false; \
+       isort(ARRPTR(a), _nelems_, &_ascending); \
    } while(0)
 
 #endif                         /* ___INT_H__ */
index c85280c8422c47a9ac39fa0b5430b991a821008d..0d0cdc289cde7d17e32e40b94d655d01f2852758 100644 (file)
@@ -186,36 +186,38 @@ rt__int_size(ArrayType *a, float *size)
    *size = (float) ARRNELEMS(a);
 }
 
-/* qsort_arg comparison function for isort() */
-static int
+/* comparison function for isort() and _int_unique() */
+static inline int
 isort_cmp(const void *a, const void *b, void *arg)
 {
    int32       aval = *((const int32 *) a);
    int32       bval = *((const int32 *) b);
 
-   if (aval < bval)
-       return -1;
-   if (aval > bval)
-       return 1;
-
-   /*
-    * Report if we have any duplicates.  If there are equal keys, qsort must
-    * compare them at some point, else it wouldn't know whether one should go
-    * before or after the other.
-    */
-   *((bool *) arg) = true;
+   if (*((bool *) arg))
+   {
+       /* compare for ascending order */
+       if (aval < bval)
+           return -1;
+       if (aval > bval)
+           return 1;
+   }
+   else
+   {
+       if (aval > bval)
+           return -1;
+       if (aval < bval)
+           return 1;
+   }
    return 0;
 }
 
-/* Sort the given data (len >= 2).  Return true if any duplicates found */
-bool
-isort(int32 *a, int len)
-{
-   bool        r = false;
-
-   qsort_arg(a, len, sizeof(int32), isort_cmp, &r);
-   return r;
-}
+#define ST_SORT isort
+#define ST_ELEMENT_TYPE int32
+#define ST_COMPARE(a, b, ascending) isort_cmp(a, b, ascending)
+#define ST_COMPARE_ARG_TYPE void
+#define ST_SCOPE
+#define ST_DEFINE
+#include "lib/sort_template.h"
 
 /* Create a new int array with room for "num" elements */
 ArrayType *
@@ -311,10 +313,10 @@ ArrayType *
 _int_unique(ArrayType *r)
 {
    int         num = ARRNELEMS(r);
-   bool        duplicates_found;   /* not used */
+   bool        ascending = true;
 
    num = qunique_arg(ARRPTR(r), num, sizeof(int), isort_cmp,
-                     &duplicates_found);
+                     &ascending);
 
    return resize_intArrayType(r, num);
 }
@@ -393,15 +395,3 @@ int_to_intset(int32 elem)
    aa[0] = elem;
    return result;
 }
-
-int
-compASC(const void *a, const void *b)
-{
-   return pg_cmp_s32(*(const int32 *) a, *(const int32 *) b);
-}
-
-int
-compDESC(const void *a, const void *b)
-{
-   return pg_cmp_s32(*(const int32 *) b, *(const int32 *) a);
-}