int i;
char *heapPtr;
- elog(LOG, "Format squeue %s for %d consumers", sqname, ncons);
+ elog(DEBUG1, "Format squeue %s for %d consumers", sqname, ncons);
/* Initialize the shared queue */
sq->sq_pid = 0;
Assert(consMap);
- elog(LOG, "Bind node %s to squeue of step %s as a producer",
+ elog(DEBUG1, "Bind node %s to squeue of step %s as a producer",
PGXC_PARENT_NODE, sqname);
/* Initialize the shared queue */
if (cstate->cs_node == nodeid)
{
/* The process already reported that queue won't read */
- elog(LOG, "Node %d of step %s is released already",
+ elog(DEBUG1, "Node %d of step %s is released already",
nodeid, sqname);
consMap[i++] = SQ_CONS_NONE;
break;
/* Producer should be different process */
Assert(sq->sq_pid != MyProcPid);
- elog(LOG, "Bind node %s to squeue of step %s as a consumer of process %d", PGXC_PARENT_NODE, sqname, sq->sq_pid);
+ elog(DEBUG1, "Bind node %s to squeue of step %s as a consumer of process %d", PGXC_PARENT_NODE, sqname, sq->sq_pid);
/* Sanity checks */
Assert(myindex);
if (cstate->cs_status == CONSUMER_ERROR ||
cstate->cs_status == CONSUMER_DONE)
{
+ int status = cstate->cs_status;
/*
* Producer failed by the time the consumer connect.
* Change status to "Done" to allow producer unbind
LWLockRelease(SQueuesLock);
ereport(ERROR,
(errcode(ERRCODE_PRODUCER_ERROR),
- errmsg("producer error")));
+ errmsg("Producer failed while we were waiting - status was %d", status)));
}
/*
* Any other status is acceptable. Normally it would be
char storename[64];
#ifdef SQUEUE_STAT
- elog(LOG, "Start buffering %s node %d, %d tuples in queue, %ld writes and %ld reads so far",
+ elog(DEBUG1, "Start buffering %s node %d, %d tuples in queue, %ld writes and %ld reads so far",
squeue->sq_key, cstate->cs_node, cstate->cs_ntuples, cstate->stat_writes, cstate->stat_reads);
#endif
*tuplestore = tuplestore_begin_datarow(false, work_mem, tmpcxt);
* are finishing
*/
SetLatch(&sqsync->sqs_producer_latch);
- elog(LOG, "EOF reached while reading from squeue, exiting");
+ elog(DEBUG1, "EOF reached while reading from squeue, exiting");
return true;
}
else if (cstate->cs_status == CONSUMER_ERROR)
*/
ereport(ERROR,
(errcode(ERRCODE_PRODUCER_ERROR),
- errmsg("producer error")));
+ errmsg("Failed to read from shared queue - producer failed and set status to %d",
+ cstate->cs_status)));
}
if (canwait)
{
if (cstate->cs_status != CONSUMER_EOF &&
cstate->cs_status != CONSUMER_DONE)
{
- elog(LOG, "Consumer %d of producer %s is cancelled", i, squeue->sq_key);
+ elog(DEBUG1, "Consumer %d of producer %s is cancelled", i, squeue->sq_key);
cstate->cs_status = CONSUMER_ERROR;
/* discard tuples which may already be in the queue */
cstate->cs_ntuples = 0;
}
LWLockRelease(sqsync->sqs_consumer_sync[i].cs_lwlock);
}
- elog(LOG, "Reset producer %s", squeue->sq_key);
+ elog(DEBUG1, "Reset producer %s", squeue->sq_key);
}
else
{
* are finishing
*/
SetLatch(&sqsync->sqs_producer_latch);
- elog(LOG, "Reset consumer %d of %s", consumerIdx, squeue->sq_key);
+ elog(DEBUG1, "Reset consumer %d of %s", consumerIdx, squeue->sq_key);
}
LWLockRelease(sqsync->sqs_consumer_sync[consumerIdx].cs_lwlock);
cstate->cs_status != CONSUMER_DONE)
{
result++;
- elog(LOG, "Consumer %d of producer %s is cancelled", i, squeue->sq_key);
+ elog(DEBUG1, "Consumer %d of producer %s is cancelled", i, squeue->sq_key);
cstate->cs_status = CONSUMER_ERROR;
/* discard tuples which may already be in the queue */
cstate->cs_ntuples = 0;
}
LWLockRelease(sqsync->sqs_consumer_sync[i].cs_lwlock);
}
- elog(LOG, "Reset producer %s", squeue->sq_key);
+ elog(DEBUG1, "Reset producer %s", squeue->sq_key);
}
LWLockAcquire(sqsync->sqs_consumer_sync[i].cs_lwlock, LW_EXCLUSIVE);
#ifdef SQUEUE_STAT
if (!squeue->stat_finish)
- elog(LOG, "Finishing %s node %d, %ld writes and %ld reads so far, %ld buffer writes, %ld buffer reads, %ld tuples returned to buffer",
+ elog(DEBUG1, "Finishing %s node %d, %ld writes and %ld reads so far, %ld buffer writes, %ld buffer reads, %ld tuples returned to buffer",
squeue->sq_key, cstate->cs_node, cstate->stat_writes, cstate->stat_reads, cstate->stat_buff_writes, cstate->stat_buff_reads, cstate->stat_buff_returns);
#endif
/*
}
if (c_count == 0)
break;
- elog(LOG, "Wait while %d squeue readers finishing", c_count);
+ elog(DEBUG1, "Wait while %d squeue readers finishing", c_count);
/* wait for a notification */
wait_result = WaitLatch(&sqsync->sqs_producer_latch,
WL_LATCH_SET | WL_POSTMASTER_DEATH | WL_TIMEOUT,
/* got notification, continue loop */
}
#ifdef SQUEUE_STAT
- elog(LOG, "Producer %s is done, there were %ld pauses", squeue->sq_key, squeue->stat_paused);
+ elog(DEBUG1, "Producer %s is done, there were %ld pauses", squeue->sq_key, squeue->stat_paused);
#endif
LWLockAcquire(SQueuesLock, LW_EXCLUSIVE);
elog(PANIC, "Shared queue data corruption");
LWLockRelease(SQueuesLock);
- elog(LOG, "Finalized squeue");
+ elog(DEBUG1, "Finalized squeue");
if (wait_result & WL_TIMEOUT)
elog(FATAL, "Timeout while waiting for Consumers finishing");
}
bool found;
volatile SharedQueue sq;
- elog(LOG, "Shared Queue release: %s", sqname);
+ elog(DEBUG1, "Shared Queue release: %s", sqname);
LWLockAcquire(SQueuesLock, LW_EXCLUSIVE);
sqsync->queue = NULL;
if (hash_search(SharedQueues, sqname, HASH_REMOVE, NULL) != sq)
elog(PANIC, "Shared queue data corruption");
- elog(LOG, "Finalized squeue %s", sqname);
+ elog(DEBUG1, "Finalized squeue %s", sqname);
LWLockRelease(SQueuesLock);
return;
}
*/
if (sq->sq_nodeid != PGXC_PARENT_NODE_ID)
{
- elog(LOG, "Looking for consumer %d in %s", myid, sqname);
+ elog(DEBUG1, "Looking for consumer %d in %s", myid, sqname);
/* find specified node in the consumer lists */
for (i = 0; i < sq->sq_nconsumers; i++)
{
* consumers are finishing
*/
SetLatch(&sqsync->sqs_producer_latch);
- elog(LOG, "Release consumer %d of %s", i, sqname);
+ elog(DEBUG1, "Release consumer %d of %s", i, sqname);
}
LWLockRelease(sqsync->sqs_consumer_sync[i].cs_lwlock);
/* exit */
/* Inform producer the consumer have done the job */
cstate->cs_status = CONSUMER_DONE;
SetLatch(&sqsync->sqs_producer_latch);
- elog(LOG, "Release not bound consumer %d of %s", i, sqname);
+ elog(DEBUG1, "Release not bound consumer %d of %s", i, sqname);
LWLockRelease(sqsync->sqs_consumer_sync[i].cs_lwlock);
}
}