package org.apache.flink.cdc.connectors.postgres.source.fetch;

import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.connector.postgresql.PostgresEventDispatcher;
import io.debezium.connector.postgresql.PostgresObjectUtils;
import io.debezium.connector.postgresql.PostgresOffsetContext;
import io.debezium.connector.postgresql.PostgresPartition;
import io.debezium.connector.postgresql.PostgresSchema;
import io.debezium.connector.postgresql.Utils;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.connector.postgresql.connection.PostgresReplicationConnection;
import io.debezium.connector.postgresql.connection.ReplicationConnection;
import io.debezium.connector.postgresql.spi.SlotState;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.AbstractSnapshotChangeEventSource;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
import io.debezium.pipeline.spi.SnapshotResult;
import io.debezium.relational.RelationalSnapshotChangeEventSource;
import io.debezium.relational.SnapshotChangeRecordEmitter;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.util.Clock;
import io.debezium.util.ColumnUtils;
import io.debezium.util.Strings;
import io.debezium.util.Threads;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit;
import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit;
import org.apache.flink.cdc.connectors.base.source.reader.external.AbstractScanFetchTask;
import org.apache.flink.cdc.connectors.base.source.reader.external.FetchTask;
import org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfig;
import org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresStreamFetchTask;
import org.apache.flink.cdc.connectors.postgres.source.offset.PostgresOffsetUtils;
import org.apache.flink.cdc.connectors.postgres.source.utils.PostgresQueryUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.class */
public class PostgresScanFetchTask extends AbstractScanFetchTask {
    private static final Logger LOG = LoggerFactory.getLogger(PostgresScanFetchTask.class);

    /* loaded from: input_file:org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask$PostgresSnapshotSplitReadTask.class */
    public static class PostgresSnapshotSplitReadTask extends AbstractSnapshotChangeEventSource<PostgresPartition, PostgresOffsetContext> {
        private static final Logger LOG = LoggerFactory.getLogger(PostgresSnapshotSplitReadTask.class);
        private final PostgresConnection jdbcConnection;
        private final PostgresConnectorConfig connectorConfig;
        private final PostgresEventDispatcher<TableId> eventDispatcher;
        private final SnapshotSplit snapshotSplit;
        private final PostgresOffsetContext offsetContext;
        private final PostgresSchema databaseSchema;
        private final SnapshotProgressListener<PostgresPartition> snapshotProgressListener;
        private final Clock clock;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask$PostgresSnapshotSplitReadTask$PostgresSnapshotContext.class */
        public static class PostgresSnapshotContext extends RelationalSnapshotChangeEventSource.RelationalSnapshotContext<PostgresPartition, PostgresOffsetContext> {
            public PostgresSnapshotContext(PostgresPartition postgresPartition) throws SQLException {
                super(postgresPartition, "");
            }
        }

        public PostgresSnapshotSplitReadTask(PostgresConnection postgresConnection, PostgresConnectorConfig postgresConnectorConfig, PostgresSchema postgresSchema, PostgresOffsetContext postgresOffsetContext, PostgresEventDispatcher<TableId> postgresEventDispatcher, SnapshotProgressListener snapshotProgressListener, SnapshotSplit snapshotSplit) {
            super(postgresConnectorConfig, snapshotProgressListener);
            this.jdbcConnection = postgresConnection;
            this.connectorConfig = postgresConnectorConfig;
            this.snapshotProgressListener = snapshotProgressListener;
            this.databaseSchema = postgresSchema;
            this.eventDispatcher = postgresEventDispatcher;
            this.snapshotSplit = snapshotSplit;
            this.offsetContext = postgresOffsetContext;
            this.clock = Clock.SYSTEM;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // io.debezium.pipeline.source.AbstractSnapshotChangeEventSource
        public SnapshotResult<PostgresOffsetContext> doExecute(ChangeEventSource.ChangeEventSourceContext changeEventSourceContext, PostgresOffsetContext postgresOffsetContext, AbstractSnapshotChangeEventSource.SnapshotContext<PostgresPartition, PostgresOffsetContext> snapshotContext, AbstractSnapshotChangeEventSource.SnapshottingTask snapshottingTask) throws Exception {
            PostgresSnapshotContext postgresSnapshotContext = (PostgresSnapshotContext) snapshotContext;
            postgresSnapshotContext.offset = this.offsetContext;
            Utils.refreshSchema(this.databaseSchema, this.jdbcConnection, true);
            createDataEvents(postgresSnapshotContext, this.snapshotSplit.getTableId());
            return SnapshotResult.completed(postgresSnapshotContext.offset);
        }

        private void createDataEvents(PostgresSnapshotContext postgresSnapshotContext, TableId tableId) throws InterruptedException {
            EventDispatcher.SnapshotReceiver<PostgresPartition> snapshotChangeEventReceiver = this.eventDispatcher.getSnapshotChangeEventReceiver();
            LOG.info("Snapshotting table {}", tableId);
            createDataEventsForTable(postgresSnapshotContext, snapshotChangeEventReceiver, (Table) Objects.requireNonNull(this.databaseSchema.tableFor(tableId)));
            snapshotChangeEventReceiver.completeSnapshot();
        }

        /* JADX WARN: Failed to calculate best type for var: r18v1 ??
        java.lang.NullPointerException
         */
        /* JADX WARN: Failed to calculate best type for var: r19v0 ??
        java.lang.NullPointerException
         */
        /* JADX WARN: Multi-variable type inference failed. Error: java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.RegisterArg.getSVar()" because the return value of "jadx.core.dex.nodes.InsnNode.getResult()" is null
        	at jadx.core.dex.visitors.typeinference.AbstractTypeConstraint.collectRelatedVars(AbstractTypeConstraint.java:31)
        	at jadx.core.dex.visitors.typeinference.AbstractTypeConstraint.<init>(AbstractTypeConstraint.java:19)
        	at jadx.core.dex.visitors.typeinference.TypeSearch$1.<init>(TypeSearch.java:376)
        	at jadx.core.dex.visitors.typeinference.TypeSearch.makeMoveConstraint(TypeSearch.java:376)
        	at jadx.core.dex.visitors.typeinference.TypeSearch.makeConstraint(TypeSearch.java:361)
        	at jadx.core.dex.visitors.typeinference.TypeSearch.collectConstraints(TypeSearch.java:341)
        	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
        	at jadx.core.dex.visitors.typeinference.TypeSearch.run(TypeSearch.java:60)
        	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.runMultiVariableSearch(FixTypesVisitor.java:116)
        	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
         */
        /* JADX WARN: Not initialized variable reg: 18, insn: 0x02bd: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r18 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) A[TRY_LEAVE], block:B:80:0x02bd */
        /* JADX WARN: Not initialized variable reg: 19, insn: 0x02c2: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r19 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]), block:B:82:0x02c2 */
        /* JADX WARN: Type inference failed for: r18v1, types: [java.sql.PreparedStatement] */
        /* JADX WARN: Type inference failed for: r19v0, types: [java.lang.Throwable] */
        /* JADX WARN: Type inference failed for: r2v26, types: [P extends io.debezium.pipeline.spi.Partition, io.debezium.pipeline.spi.Partition] */
        /* JADX WARN: Type inference failed for: r3v25, types: [io.debezium.pipeline.spi.OffsetContext, O extends io.debezium.pipeline.spi.OffsetContext] */
        private void createDataEventsForTable(PostgresSnapshotContext postgresSnapshotContext, EventDispatcher.SnapshotReceiver<PostgresPartition> snapshotReceiver, Table table) throws InterruptedException {
            long currentTimeInMillis = this.clock.currentTimeInMillis();
            LOG.info("Exporting data from split '{}' of table {}", this.snapshotSplit.splitId(), table.id());
            String buildSplitScanQuery = PostgresQueryUtils.buildSplitScanQuery(this.snapshotSplit.getTableId(), this.snapshotSplit.getSplitKeyType(), this.snapshotSplit.getSplitStart() == null, this.snapshotSplit.getSplitEnd() == null, (List) this.snapshotSplit.getSplitKeyType().getFieldNames().stream().filter(str -> {
                return table.columnWithName(str).typeName().equals("uuid");
            }).collect(Collectors.toList()));
            LOG.debug("For split '{}' of table {} using select statement: '{}'", new Object[]{this.snapshotSplit.splitId(), table.id(), buildSplitScanQuery});
            try {
                try {
                    PreparedStatement readTableSplitDataStatement = PostgresQueryUtils.readTableSplitDataStatement(this.jdbcConnection, buildSplitScanQuery, this.snapshotSplit.getSplitStart() == null, this.snapshotSplit.getSplitEnd() == null, this.snapshotSplit.getSplitStart(), this.snapshotSplit.getSplitEnd(), this.snapshotSplit.getSplitKeyType().getFieldCount(), this.connectorConfig.getSnapshotFetchSize());
                    Throwable th = null;
                    ResultSet executeQuery = readTableSplitDataStatement.executeQuery();
                    Throwable th2 = null;
                    try {
                        try {
                            ColumnUtils.ColumnArray array = ColumnUtils.toArray(executeQuery, table);
                            long j = 0;
                            Threads.Timer tableScanLogTimer = getTableScanLogTimer();
                            while (executeQuery.next()) {
                                j++;
                                Object[] objArr = new Object[array.getGreatestColumnPosition()];
                                for (int i = 0; i < array.getColumns().length; i++) {
                                    objArr[array.getColumns()[i].position() - 1] = executeQuery.getObject(i + 1);
                                }
                                if (tableScanLogTimer.expired()) {
                                    LOG.info("Exported {} records for split '{}' after {}", new Object[]{Long.valueOf(j), this.snapshotSplit.splitId(), Strings.duration(this.clock.currentTimeInMillis() - currentTimeInMillis)});
                                    this.snapshotProgressListener.rowsScanned(postgresSnapshotContext.partition, table.id(), j);
                                    tableScanLogTimer = getTableScanLogTimer();
                                }
                                ((PostgresOffsetContext) postgresSnapshotContext.offset).event(table.id(), this.clock.currentTime());
                                this.eventDispatcher.dispatchSnapshotEvent(postgresSnapshotContext.partition, table.id(), new SnapshotChangeRecordEmitter(postgresSnapshotContext.partition, postgresSnapshotContext.offset, objArr, this.clock), snapshotReceiver);
                            }
                            LOG.info("Finished exporting {} records for split '{}', total duration '{}'", new Object[]{Long.valueOf(j), this.snapshotSplit.splitId(), Strings.duration(this.clock.currentTimeInMillis() - currentTimeInMillis)});
                            if (executeQuery != null) {
                                if (0 != 0) {
                                    try {
                                        executeQuery.close();
                                    } catch (Throwable th3) {
                                        th2.addSuppressed(th3);
                                    }
                                } else {
                                    executeQuery.close();
                                }
                            }
                            if (readTableSplitDataStatement != null) {
                                if (0 != 0) {
                                    try {
                                        readTableSplitDataStatement.close();
                                    } catch (Throwable th4) {
                                        th.addSuppressed(th4);
                                    }
                                } else {
                                    readTableSplitDataStatement.close();
                                }
                            }
                        } finally {
                        }
                    } catch (Throwable th5) {
                        if (executeQuery != null) {
                            if (th2 != null) {
                                try {
                                    executeQuery.close();
                                } catch (Throwable th6) {
                                    th2.addSuppressed(th6);
                                }
                            } else {
                                executeQuery.close();
                            }
                        }
                        throw th5;
                    }
                } finally {
                }
            } catch (SQLException e) {
                throw new FlinkRuntimeException("Snapshotting of table " + table.id() + " failed", e);
            }
        }

        private Threads.Timer getTableScanLogTimer() {
            return Threads.timer(this.clock, LOG_INTERVAL);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // io.debezium.pipeline.source.AbstractSnapshotChangeEventSource
        public AbstractSnapshotChangeEventSource.SnapshottingTask getSnapshottingTask(PostgresPartition postgresPartition, PostgresOffsetContext postgresOffsetContext) {
            return new AbstractSnapshotChangeEventSource.SnapshottingTask(false, true);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // io.debezium.pipeline.source.AbstractSnapshotChangeEventSource
        public PostgresSnapshotContext prepare(PostgresPartition postgresPartition) throws Exception {
            return new PostgresSnapshotContext(postgresPartition);
        }
    }

    public PostgresScanFetchTask(SnapshotSplit snapshotSplit) {
        super(snapshotSplit);
    }

    @Override // org.apache.flink.cdc.connectors.base.source.reader.external.AbstractScanFetchTask, org.apache.flink.cdc.connectors.base.source.reader.external.FetchTask
    public void execute(FetchTask.Context context) throws Exception {
        PostgresSourceFetchTaskContext postgresSourceFetchTaskContext = (PostgresSourceFetchTaskContext) context;
        PostgresSourceConfig postgresSourceConfig = (PostgresSourceConfig) context.getSourceConfig();
        try {
            maybeCreateSlotForBackFillReadTask(postgresSourceFetchTaskContext.getConnection(), postgresSourceFetchTaskContext.getReplicationConnection(), postgresSourceConfig.getSlotNameForBackfillTask(), postgresSourceFetchTaskContext.getPluginName(), postgresSourceConfig.isSkipSnapshotBackfill());
            super.execute(context);
            maybeDropSlotForBackFillReadTask((PostgresReplicationConnection) postgresSourceFetchTaskContext.getReplicationConnection(), postgresSourceConfig.isSkipSnapshotBackfill());
        } catch (Throwable th) {
            maybeDropSlotForBackFillReadTask((PostgresReplicationConnection) postgresSourceFetchTaskContext.getReplicationConnection(), postgresSourceConfig.isSkipSnapshotBackfill());
            throw th;
        }
    }

    @Override // org.apache.flink.cdc.connectors.base.source.reader.external.AbstractScanFetchTask
    protected void executeDataSnapshot(FetchTask.Context context) throws Exception {
        PostgresSourceFetchTaskContext postgresSourceFetchTaskContext = (PostgresSourceFetchTaskContext) context;
        SnapshotResult<PostgresOffsetContext> execute = new PostgresSnapshotSplitReadTask(postgresSourceFetchTaskContext.getConnection(), postgresSourceFetchTaskContext.getDbzConnectorConfig(), postgresSourceFetchTaskContext.getDatabaseSchema(), postgresSourceFetchTaskContext.getOffsetContext(), postgresSourceFetchTaskContext.getEventDispatcher(), postgresSourceFetchTaskContext.getSnapshotChangeEventSourceMetrics(), this.snapshotSplit).execute(new StoppableChangeEventSourceContext(), postgresSourceFetchTaskContext.getPartition(), postgresSourceFetchTaskContext.getOffsetContext());
        if (execute.isCompletedOrSkipped()) {
            return;
        }
        this.taskRunning = false;
        throw new IllegalStateException(String.format("Read snapshot for postgres split %s fail", execute));
    }

    @Override // org.apache.flink.cdc.connectors.base.source.reader.external.AbstractScanFetchTask
    protected void executeBackfillTask(FetchTask.Context context, StreamSplit streamSplit) throws Exception {
        PostgresSourceFetchTaskContext postgresSourceFetchTaskContext = (PostgresSourceFetchTaskContext) context;
        PostgresOffsetContext postgresOffsetContext = PostgresOffsetUtils.getPostgresOffsetContext(new PostgresOffsetContext.Loader(postgresSourceFetchTaskContext.getDbzConnectorConfig()), streamSplit.getStartingOffset());
        PostgresStreamFetchTask.StreamSplitReadTask streamSplitReadTask = new PostgresStreamFetchTask.StreamSplitReadTask(postgresSourceFetchTaskContext.getDbzConnectorConfig(), postgresSourceFetchTaskContext.getSnapShotter(), postgresSourceFetchTaskContext.getConnection(), postgresSourceFetchTaskContext.getEventDispatcher(), postgresSourceFetchTaskContext.getWaterMarkDispatcher(), postgresSourceFetchTaskContext.getErrorHandler(), postgresSourceFetchTaskContext.getTaskContext().getClock(), postgresSourceFetchTaskContext.getDatabaseSchema(), postgresSourceFetchTaskContext.getTaskContext(), postgresSourceFetchTaskContext.getReplicationConnection(), streamSplit);
        LOG.info("Execute backfillReadTask for split {} with slot name {}", this.snapshotSplit, ((PostgresSourceConfig) postgresSourceFetchTaskContext.getSourceConfig()).getSlotNameForBackfillTask());
        streamSplitReadTask.execute((ChangeEventSource.ChangeEventSourceContext) new StoppableChangeEventSourceContext(), postgresSourceFetchTaskContext.getPartition(), postgresOffsetContext);
    }

    private void maybeCreateSlotForBackFillReadTask(PostgresConnection postgresConnection, ReplicationConnection replicationConnection, String str, String str2, boolean z) {
        String str3;
        if (z) {
            return;
        }
        SlotState slotState = null;
        try {
            try {
                slotState = postgresConnection.getReplicationSlotState(str, str2);
            } catch (SQLException e) {
                LOG.info("Unable to load info of replication slot, will try to create the slot");
            }
            if (slotState == null) {
                try {
                    replicationConnection.createReplicationSlot().orElse(null);
                } catch (SQLException e2) {
                    str3 = "Creation of replication slot failed";
                    throw new FlinkRuntimeException(e2.getMessage().contains("already exists") ? str3 + "; when setting up multiple connectors for the same database host, please make sure to use a distinct replication slot name for each." : "Creation of replication slot failed", e2);
                }
            }
            PostgresObjectUtils.waitForReplicationSlotReady(30, postgresConnection, str, str2);
        } catch (Throwable th) {
            throw new FlinkRuntimeException(th);
        }
    }

    private void maybeDropSlotForBackFillReadTask(PostgresReplicationConnection postgresReplicationConnection, boolean z) {
        if (z) {
            return;
        }
        try {
            postgresReplicationConnection.close(true);
        } catch (Throwable th) {
            LOG.info("here exception occurs");
            throw new FlinkRuntimeException(th);
        }
    }
}
