package org.apache.flink.cdc.connectors.base.source;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.cdc.common.annotation.Experimental;
import org.apache.flink.cdc.common.annotation.VisibleForTesting;
import org.apache.flink.cdc.connectors.base.config.SourceConfig;
import org.apache.flink.cdc.connectors.base.dialect.DataSourceDialect;
import org.apache.flink.cdc.connectors.base.source.assigner.HybridSplitAssigner;
import org.apache.flink.cdc.connectors.base.source.assigner.SplitAssigner;
import org.apache.flink.cdc.connectors.base.source.assigner.StreamSplitAssigner;
import org.apache.flink.cdc.connectors.base.source.assigner.state.HybridPendingSplitsState;
import org.apache.flink.cdc.connectors.base.source.assigner.state.PendingSplitsState;
import org.apache.flink.cdc.connectors.base.source.assigner.state.PendingSplitsStateSerializer;
import org.apache.flink.cdc.connectors.base.source.assigner.state.StreamPendingSplitsState;
import org.apache.flink.cdc.connectors.base.source.enumerator.IncrementalSourceEnumerator;
import org.apache.flink.cdc.connectors.base.source.meta.offset.OffsetFactory;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceRecords;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitSerializer;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitState;
import org.apache.flink.cdc.connectors.base.source.metrics.SourceReaderMetrics;
import org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceReader;
import org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceReaderContext;
import org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceRecordEmitter;
import org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader;
import org.apache.flink.cdc.connectors.base.source.utils.hooks.SnapshotPhaseHooks;
import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.flink.connector.base.source.reader.RecordEmitter;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.util.FlinkRuntimeException;

@Experimental
/* loaded from: input_file:org/apache/flink/cdc/connectors/base/source/IncrementalSource.class */
public class IncrementalSource<T, C extends SourceConfig> implements Source<T, SourceSplitBase, PendingSplitsState>, ResultTypeQueryable<T> {
    private static final long serialVersionUID = 1;
    protected final SourceConfig.Factory<C> configFactory;
    protected final DataSourceDialect<C> dataSourceDialect;
    protected final OffsetFactory offsetFactory;
    protected final DebeziumDeserializationSchema<T> deserializationSchema;
    protected final SourceSplitSerializer sourceSplitSerializer;
    protected SnapshotPhaseHooks snapshotHooks = SnapshotPhaseHooks.empty();

    public IncrementalSource(SourceConfig.Factory<C> factory, DebeziumDeserializationSchema<T> debeziumDeserializationSchema, final OffsetFactory offsetFactory, DataSourceDialect<C> dataSourceDialect) {
        this.configFactory = factory;
        this.deserializationSchema = debeziumDeserializationSchema;
        this.offsetFactory = offsetFactory;
        this.dataSourceDialect = dataSourceDialect;
        this.sourceSplitSerializer = new SourceSplitSerializer() { // from class: org.apache.flink.cdc.connectors.base.source.IncrementalSource.1
            @Override // org.apache.flink.cdc.connectors.base.source.meta.offset.OffsetDeserializerSerializer
            public OffsetFactory getOffsetFactory() {
                return offsetFactory;
            }
        };
    }

    public Boundedness getBoundedness() {
        return this.configFactory.create2(0).getStartupOptions().isSnapshotOnly() ? Boundedness.BOUNDED : Boundedness.CONTINUOUS_UNBOUNDED;
    }

    @Override // 
    /* renamed from: createReader */
    public IncrementalSourceReader<T, C> mo333createReader(SourceReaderContext sourceReaderContext) throws Exception {
        C create2 = this.configFactory.create2(sourceReaderContext.getIndexOfSubtask());
        FutureCompletingBlockingQueue futureCompletingBlockingQueue = new FutureCompletingBlockingQueue();
        SourceReaderMetrics sourceReaderMetrics = new SourceReaderMetrics(sourceReaderContext.metricGroup());
        IncrementalSourceReaderContext incrementalSourceReaderContext = new IncrementalSourceReaderContext(sourceReaderContext);
        return new IncrementalSourceReader<>(futureCompletingBlockingQueue, () -> {
            return new IncrementalSourceSplitReader(sourceReaderContext.getIndexOfSubtask(), this.dataSourceDialect, create2, incrementalSourceReaderContext, this.snapshotHooks);
        }, createRecordEmitter(create2, sourceReaderMetrics), sourceReaderContext.getConfiguration(), incrementalSourceReaderContext, create2, this.sourceSplitSerializer, this.dataSourceDialect);
    }

    public SplitEnumerator<SourceSplitBase, PendingSplitsState> createEnumerator(SplitEnumeratorContext<SourceSplitBase> splitEnumeratorContext) {
        SplitAssigner hybridSplitAssigner;
        C create2 = this.configFactory.create2(0);
        if (create2.getStartupOptions().isStreamOnly()) {
            hybridSplitAssigner = new StreamSplitAssigner(create2, this.dataSourceDialect, this.offsetFactory, splitEnumeratorContext);
        } else {
            try {
                hybridSplitAssigner = new HybridSplitAssigner(create2, splitEnumeratorContext.currentParallelism(), this.dataSourceDialect.discoverDataCollections(create2), this.dataSourceDialect.isDataCollectionIdCaseSensitive(create2), this.dataSourceDialect, this.offsetFactory, splitEnumeratorContext);
            } catch (Exception e) {
                throw new FlinkRuntimeException("Failed to discover captured tables for enumerator", e);
            }
        }
        return new IncrementalSourceEnumerator(splitEnumeratorContext, create2, hybridSplitAssigner, getBoundedness());
    }

    public SplitEnumerator<SourceSplitBase, PendingSplitsState> restoreEnumerator(SplitEnumeratorContext<SourceSplitBase> splitEnumeratorContext, PendingSplitsState pendingSplitsState) {
        SplitAssigner streamSplitAssigner;
        C create2 = this.configFactory.create2(0);
        if (pendingSplitsState instanceof HybridPendingSplitsState) {
            streamSplitAssigner = new HybridSplitAssigner(create2, splitEnumeratorContext.currentParallelism(), (HybridPendingSplitsState) pendingSplitsState, this.dataSourceDialect, this.offsetFactory, splitEnumeratorContext);
        } else {
            if (!(pendingSplitsState instanceof StreamPendingSplitsState)) {
                throw new UnsupportedOperationException("Unsupported restored PendingSplitsState: " + pendingSplitsState);
            }
            streamSplitAssigner = new StreamSplitAssigner(create2, (StreamPendingSplitsState) pendingSplitsState, this.dataSourceDialect, this.offsetFactory, splitEnumeratorContext);
        }
        return new IncrementalSourceEnumerator(splitEnumeratorContext, create2, streamSplitAssigner, getBoundedness());
    }

    public SimpleVersionedSerializer<SourceSplitBase> getSplitSerializer() {
        return this.sourceSplitSerializer;
    }

    public SimpleVersionedSerializer<PendingSplitsState> getEnumeratorCheckpointSerializer() {
        return new PendingSplitsStateSerializer((SourceSplitSerializer) getSplitSerializer());
    }

    public TypeInformation<T> getProducedType() {
        return this.deserializationSchema.getProducedType();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public RecordEmitter<SourceRecords, T, SourceSplitState> createRecordEmitter(SourceConfig sourceConfig, SourceReaderMetrics sourceReaderMetrics) {
        return new IncrementalSourceRecordEmitter(this.deserializationSchema, sourceReaderMetrics, sourceConfig.isIncludeSchemaChanges(), this.offsetFactory);
    }

    @VisibleForTesting
    public void setSnapshotHooks(SnapshotPhaseHooks snapshotPhaseHooks) {
        this.snapshotHooks = snapshotPhaseHooks;
    }

    public /* bridge */ /* synthetic */ SplitEnumerator restoreEnumerator(SplitEnumeratorContext splitEnumeratorContext, Object obj) throws Exception {
        return restoreEnumerator((SplitEnumeratorContext<SourceSplitBase>) splitEnumeratorContext, (PendingSplitsState) obj);
    }
}
