Fix nasty performance problem in tsquery_rewrite().
authorTom Lane <[email protected]>
Sun, 30 Oct 2016 21:35:42 +0000 (17:35 -0400)
committerTom Lane <[email protected]>
Sun, 30 Oct 2016 21:35:42 +0000 (17:35 -0400)
tsquery_rewrite() tries to find matches to subsets of AND/OR conditions;
for example, in the query 'a | b | c' the substitution subquery 'a | c'
should match and lead to replacement of the first and third items.
That's fine, but the matching algorithm apparently takes about O(2^N)
for an N-clause query (I say "apparently" because the code is also both
unintelligible and uncommented).  We could probably do better than that
even without any extra assumptions --- but actually, we know that the
subclauses are sorted, indeed are depending on that elsewhere in this very
same function.  So we can just scan the two lists a single time to detect
matches, as though we were doing a merge join.

Also do a re-flattening call (QTNTernary()) in tsquery_rewrite_query, just
to make sure that the tree fits the expectations of the next search cycle.
I didn't try to devise a test case for this, but I'm pretty sure that the
oversight could have led to failure to match in some cases where a match
would be expected.

Improve comments, and also stick a CHECK_FOR_INTERRUPTS into
dofindsubquery, just in case it's still too slow for somebody.

Per report from Andreas Seltenreich.  Back-patch to all supported branches.

Discussion: <[email protected]>

src/backend/utils/adt/tsquery_rewrite.c

index 28f328ddb312dabba6c54b1aa13096b27fca36de..e5bed6efb38e3d3a3f110098a34b69cf4e58da8a 100644 (file)
 #include "utils/builtins.h"
 
 
-static int
-addone(int *counters, int last, int total)
-{
-   /* since this function recurses, it could be driven to stack overflow. */
-   check_stack_depth();
-
-   counters[last]++;
-   if (counters[last] >= total)
-   {
-       if (last == 0)
-           return 0;
-       if (addone(counters, last - 1, total - 1) == 0)
-           return 0;
-       counters[last] = counters[last - 1] + 1;
-   }
-   return 1;
-}
-
 /*
- * If node is equal to ex, replace it with subs. Replacement is actually done
- * by returning either node or a copy of subs.
+ * If "node" is equal to "ex", return a copy of "subs" instead.
+ * If "ex" matches a subset of node's children, return a modified version
+ * of "node" in which those children are replaced with a copy of "subs".
+ * Otherwise return "node" unmodified.
+ *
+ * The QTN_NOCHANGE bit is set in successfully modified nodes, so that
+ * we won't uselessly recurse into them.
+ * Also, set *isfind true if we make a replacement.
  */
 static QTNode *
 findeq(QTNode *node, QTNode *ex, QTNode *subs, bool *isfind)
 {
+   /* Can't match unless signature matches and node type matches. */
    if ((node->sign & ex->sign) != ex->sign ||
        node->valnode->type != ex->valnode->type)
        return node;
 
+   /* Ignore nodes marked NOCHANGE, too. */
    if (node->flags & QTN_NOCHANGE)
        return node;
 
    if (node->valnode->type == QI_OPR)
    {
+       /* Must be same operator. */
        if (node->valnode->qoperator.oper != ex->valnode->qoperator.oper)
            return node;
 
        if (node->nchild == ex->nchild)
        {
+           /*
+            * Simple case: when same number of children, match if equal.
+            * (This is reliable when the children were sorted earlier.)
+            */
            if (QTNEq(node, ex))
            {
+               /* Match; delete node and return a copy of subs instead. */
                QTNFree(node);
                if (subs)
                {
@@ -73,79 +69,92 @@ findeq(QTNode *node, QTNode *ex, QTNode *subs, bool *isfind)
                *isfind = true;
            }
        }
-       else if (node->nchild > ex->nchild)
+       else if (node->nchild > ex->nchild && ex->nchild > 0)
        {
            /*
-            * AND and NOT are commutative, so we check if a subset of the
-            * children match. For example, if tnode is A | B | C, and ex is B
-            * | C, we have a match after we convert tnode to A | (B | C).
+            * AND and OR are commutative/associative, so we should check if a
+            * subset of the children match.  For example, if node is A|B|C,
+            * and ex is B|C, we have a match after we notionally convert node
+            * to A|(B|C).  This does not work for NOT or PHRASE nodes, but we
+            * can't get here for those node types because they have a fixed
+            * number of children.
+            *
+            * Because we expect that the children are sorted, it suffices to
+            * make one pass through the two lists to find the matches.
             */
-           int        *counters = (int *) palloc(sizeof(int) * node->nchild);
-           int         i;
-           QTNode     *tnode = (QTNode *) palloc(sizeof(QTNode));
-
-           memset(tnode, 0, sizeof(QTNode));
-           tnode->child = (QTNode **) palloc(sizeof(QTNode *) * ex->nchild);
-           tnode->nchild = ex->nchild;
-           tnode->valnode = (QueryItem *) palloc(sizeof(QueryItem));
-           *(tnode->valnode) = *(ex->valnode);
-
-           for (i = 0; i < ex->nchild; i++)
-               counters[i] = i;
-
-           do
+           bool       *matched;
+           int         nmatched;
+           int         i,
+                       j;
+
+           /* Assert that the subset rule is OK */
+           Assert(node->valnode->qoperator.oper == OP_AND ||
+                  node->valnode->qoperator.oper == OP_OR);
+
+           /* matched[] will record which children of node matched */
+           matched = (bool *) palloc0(node->nchild * sizeof(bool));
+           nmatched = 0;
+           i = j = 0;
+           while (i < node->nchild && j < ex->nchild)
            {
-               tnode->sign = 0;
-               for (i = 0; i < ex->nchild; i++)
+               int         cmp = QTNodeCompare(node->child[i], ex->child[j]);
+
+               if (cmp == 0)
                {
-                   tnode->child[i] = node->child[counters[i]];
-                   tnode->sign |= tnode->child[i]->sign;
+                   /* match! */
+                   matched[i] = true;
+                   nmatched++;
+                   i++, j++;
                }
+               else if (cmp < 0)
+               {
+                   /* node->child[i] has no match, ignore it */
+                   i++;
+               }
+               else
+               {
+                   /* ex->child[j] has no match; we can give up immediately */
+                   break;
+               }
+           }
 
-               if (QTNEq(tnode, ex))
+           if (nmatched == ex->nchild)
+           {
+               /* collapse out the matched children of node */
+               j = 0;
+               for (i = 0; i < node->nchild; i++)
                {
-                   int         j = 0;
-
-                   pfree(tnode->valnode);
-                   pfree(tnode->child);
-                   pfree(tnode);
-                   if (subs)
-                   {
-                       tnode = QTNCopy(subs);
-                       tnode->flags = QTN_NOCHANGE | QTN_NEEDFREE;
-                   }
+                   if (matched[i])
+                       QTNFree(node->child[i]);
                    else
-                       tnode = NULL;
-
-                   node->child[counters[0]] = tnode;
-
-                   for (i = 1; i < ex->nchild; i++)
-                       node->child[counters[i]] = NULL;
-                   for (i = 0; i < node->nchild; i++)
-                   {
-                       if (node->child[i])
-                       {
-                           node->child[j] = node->child[i];
-                           j++;
-                       }
-                   }
+                       node->child[j++] = node->child[i];
+               }
 
-                   node->nchild = j;
+               /* and instead insert a copy of subs */
+               if (subs)
+               {
+                   subs = QTNCopy(subs);
+                   subs->flags |= QTN_NOCHANGE;
+                   node->child[j++] = subs;
+               }
 
-                   *isfind = true;
+               node->nchild = j;
+
+               /*
+                * Re-sort the node to put new child in the right place.  This
+                * is a bit bogus, because it won't matter for findsubquery's
+                * remaining processing, and it's insufficient to prepare the
+                * tree for another search (we would need to re-flatten as
+                * well, and we don't want to do that because we'd lose the
+                * QTN_NOCHANGE marking on the new child).  But it's needed to
+                * keep the results the same as the regression tests expect.
+                */
+               QTNSort(node);
 
-                   break;
-               }
-           } while (addone(counters, ex->nchild - 1, node->nchild));
-           if (tnode && (tnode->flags & QTN_NOCHANGE) == 0)
-           {
-               pfree(tnode->valnode);
-               pfree(tnode->child);
-               pfree(tnode);
+               *isfind = true;
            }
-           else
-               QTNSort(node);
-           pfree(counters);
+
+           pfree(matched);
        }
    }
    else
@@ -173,12 +182,20 @@ findeq(QTNode *node, QTNode *ex, QTNode *subs, bool *isfind)
    return node;
 }
 
+/*
+ * Recursive guts of findsubquery(): attempt to replace "ex" with "subs"
+ * at the root node, and if we failed to do so, recursively match against
+ * child nodes.
+ */
 static QTNode *
 dofindsubquery(QTNode *root, QTNode *ex, QTNode *subs, bool *isfind)
 {
    /* since this function recurses, it could be driven to stack overflow. */
    check_stack_depth();
 
+   /* also, since it's a bit expensive, let's check for query cancel. */
+   CHECK_FOR_INTERRUPTS();
+
    root = findeq(root, ex, subs, isfind);
 
    if (root && (root->flags & QTN_NOCHANGE) == 0 && root->valnode->type == QI_OPR)
@@ -192,6 +209,10 @@ dofindsubquery(QTNode *root, QTNode *ex, QTNode *subs, bool *isfind)
    return root;
 }
 
+/*
+ * Delete any void subtrees that may have been inserted when the replacement
+ * subtree is void.
+ */
 static QTNode *
 dropvoidsubtree(QTNode *root)
 {
@@ -231,6 +252,14 @@ dropvoidsubtree(QTNode *root)
    return root;
 }
 
+/*
+ * Substitute "subs" for "ex" throughout the QTNode tree at root.
+ *
+ * If isfind isn't NULL, set *isfind to show whether we made any substitution.
+ *
+ * Both "root" and "ex" must have been through QTNTernary and QTNSort
+ * to ensure reliable matching.
+ */
 QTNode *
 findsubquery(QTNode *root, QTNode *ex, QTNode *subs, bool *isfind)
 {
@@ -344,6 +373,7 @@ tsquery_rewrite_query(PG_FUNCTION_ARGS)
                {
                    /* ready the tree for another pass */
                    QTNClearFlags(tree, QTN_NOCHANGE);
+                   QTNTernary(tree);
                    QTNSort(tree);
                }
            }