package org.apache.flink.cdc.connectors.base.relational;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.document.DocumentWriter;
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotChangeEventSource;
import io.debezium.pipeline.source.spi.EventMetadataProvider;
import io.debezium.pipeline.spi.ChangeEventCreator;
import io.debezium.pipeline.spi.Partition;
import io.debezium.pipeline.spi.SchemaChangeEventEmitter;
import io.debezium.relational.TableId;
import io.debezium.relational.history.HistoryRecord;
import io.debezium.schema.DataCollectionFilters;
import io.debezium.schema.DatabaseSchema;
import io.debezium.schema.HistorizedDatabaseSchema;
import io.debezium.schema.SchemaChangeEvent;
import io.debezium.schema.TopicSelector;
import io.debezium.util.SchemaNameAdjuster;
import java.io.IOException;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import org.apache.flink.cdc.connectors.base.WatermarkDispatcher;
import org.apache.flink.cdc.connectors.base.relational.handler.SchemaChangeEventHandler;
import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
import org.apache.flink.cdc.connectors.base.source.meta.wartermark.WatermarkEvent;
import org.apache.flink.cdc.connectors.base.source.meta.wartermark.WatermarkKind;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.data.Schema;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/cdc/connectors/base/relational/JdbcSourceEventDispatcher.class */
public class JdbcSourceEventDispatcher<P extends Partition> extends EventDispatcher<P, TableId> implements WatermarkDispatcher {
    public static final String HISTORY_RECORD_FIELD = "historyRecord";
    private final ChangeEventQueue<DataChangeEvent> queue;
    private final HistorizedDatabaseSchema historizedSchema;
    private final DataCollectionFilters.DataCollectionFilter<TableId> filter;
    private final CommonConnectorConfig connectorConfig;
    private final TopicSelector<TableId> topicSelector;
    private final Schema schemaChangeKeySchema;
    private final Schema schemaChangeValueSchema;
    private final String topic;
    private final SchemaChangeEventHandler schemaChangeEventHandler;
    private static final Logger LOG = LoggerFactory.getLogger(JdbcSourceEventDispatcher.class);
    private static final DocumentWriter DOCUMENT_WRITER = DocumentWriter.defaultWriter();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/cdc/connectors/base/relational/JdbcSourceEventDispatcher$SchemaChangeEventReceiver.class */
    public final class SchemaChangeEventReceiver implements SchemaChangeEventEmitter.Receiver {
        private SchemaChangeEventReceiver() {
        }

        private Struct schemaChangeRecordKey(SchemaChangeEvent schemaChangeEvent) {
            Struct struct = new Struct(JdbcSourceEventDispatcher.this.schemaChangeKeySchema);
            struct.put(HistoryRecord.Fields.DATABASE_NAME, schemaChangeEvent.getDatabase());
            return struct;
        }

        private Struct schemaChangeRecordValue(SchemaChangeEvent schemaChangeEvent) throws IOException {
            String write = JdbcSourceEventDispatcher.DOCUMENT_WRITER.write(new HistoryRecord(JdbcSourceEventDispatcher.this.schemaChangeEventHandler.parseSource(schemaChangeEvent), schemaChangeEvent.getOffset(), schemaChangeEvent.getDatabase(), schemaChangeEvent.getSchema(), schemaChangeEvent.getDdl(), schemaChangeEvent.getTableChanges()).document());
            Struct struct = new Struct(JdbcSourceEventDispatcher.this.schemaChangeValueSchema);
            struct.put("source", schemaChangeEvent.getSource());
            struct.put(JdbcSourceEventDispatcher.HISTORY_RECORD_FIELD, write);
            return struct;
        }

        @Override // io.debezium.pipeline.spi.SchemaChangeEventEmitter.Receiver
        public void schemaChangeEvent(SchemaChangeEvent schemaChangeEvent) throws InterruptedException {
            if (SchemaChangeEvent.SchemaChangeEventType.DROP.equals(schemaChangeEvent.getType())) {
                JdbcSourceEventDispatcher.LOG.info("Received drop table event " + schemaChangeEvent + " at offset: " + schemaChangeEvent.getOffset());
            }
            JdbcSourceEventDispatcher.this.historizedSchema.applySchemaChange(schemaChangeEvent);
            if (JdbcSourceEventDispatcher.this.connectorConfig.isSchemaChangesHistoryEnabled()) {
                try {
                    JdbcSourceEventDispatcher.this.queue.enqueue(new DataChangeEvent(new SourceRecord(schemaChangeEvent.getPartition(), schemaChangeEvent.getOffset(), JdbcSourceEventDispatcher.this.topicSelector.getPrimaryTopic(), 0, JdbcSourceEventDispatcher.this.schemaChangeKeySchema, schemaChangeRecordKey(schemaChangeEvent), JdbcSourceEventDispatcher.this.schemaChangeValueSchema, schemaChangeRecordValue(schemaChangeEvent))));
                } catch (IOException e) {
                    throw new IllegalStateException(String.format("dispatch schema change event %s error ", schemaChangeEvent), e);
                }
            }
        }
    }

    public JdbcSourceEventDispatcher(CommonConnectorConfig commonConnectorConfig, TopicSelector<TableId> topicSelector, DatabaseSchema<TableId> databaseSchema, ChangeEventQueue<DataChangeEvent> changeEventQueue, DataCollectionFilters.DataCollectionFilter<TableId> dataCollectionFilter, ChangeEventCreator changeEventCreator, EventMetadataProvider eventMetadataProvider, SchemaNameAdjuster schemaNameAdjuster, SchemaChangeEventHandler schemaChangeEventHandler) {
        super(commonConnectorConfig, topicSelector, databaseSchema, changeEventQueue, dataCollectionFilter, changeEventCreator, eventMetadataProvider, schemaNameAdjuster);
        this.historizedSchema = databaseSchema instanceof HistorizedDatabaseSchema ? (HistorizedDatabaseSchema) databaseSchema : null;
        this.filter = dataCollectionFilter;
        this.queue = changeEventQueue;
        this.connectorConfig = commonConnectorConfig;
        this.topicSelector = topicSelector;
        this.topic = topicSelector.getPrimaryTopic();
        this.schemaChangeKeySchema = SchemaBuilder.struct().name(schemaNameAdjuster.adjust("io.debezium.connector." + commonConnectorConfig.getConnectorName() + ".SchemaChangeKey")).field(HistoryRecord.Fields.DATABASE_NAME, Schema.STRING_SCHEMA).build();
        this.schemaChangeValueSchema = SchemaBuilder.struct().name(schemaNameAdjuster.adjust("io.debezium.connector." + commonConnectorConfig.getConnectorName() + ".SchemaChangeValue")).field("source", commonConnectorConfig.getSourceInfoStructMaker().schema()).field(HISTORY_RECORD_FIELD, Schema.OPTIONAL_STRING_SCHEMA).build();
        this.schemaChangeEventHandler = schemaChangeEventHandler;
    }

    public ChangeEventQueue<DataChangeEvent> getQueue() {
        return this.queue;
    }

    /* renamed from: dispatchSchemaChangeEvent, reason: avoid collision after fix types in other method */
    public void dispatchSchemaChangeEvent2(P p, TableId tableId, SchemaChangeEventEmitter schemaChangeEventEmitter) throws InterruptedException {
        if (tableId != null && !this.filter.isIncluded(tableId) && (this.historizedSchema == null || this.historizedSchema.storeOnlyCapturedTables())) {
            LOG.trace("Filtering schema change event for {}", tableId);
            return;
        }
        schemaChangeEventEmitter.emitSchemaChangeEvent(new SchemaChangeEventReceiver());
        IncrementalSnapshotChangeEventSource<P, TableId> incrementalSnapshotChangeEventSource = getIncrementalSnapshotChangeEventSource();
        if (incrementalSnapshotChangeEventSource != null) {
            incrementalSnapshotChangeEventSource.processSchemaChange(p, tableId);
        }
    }

    @Override // io.debezium.pipeline.EventDispatcher
    public void dispatchSchemaChangeEvent(Collection<TableId> collection, SchemaChangeEventEmitter schemaChangeEventEmitter) throws InterruptedException {
        boolean z = false;
        if (collection != null && !collection.isEmpty()) {
            Iterator<TableId> it = collection.iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                if (this.filter.isIncluded(it.next())) {
                    z = true;
                    break;
                }
            }
        } else {
            z = true;
        }
        if (z || !(this.historizedSchema == null || this.historizedSchema.storeOnlyCapturedTables())) {
            schemaChangeEventEmitter.emitSchemaChangeEvent(new SchemaChangeEventReceiver());
        } else {
            LOG.trace("Filtering schema change event for {}", collection);
        }
    }

    @Override // org.apache.flink.cdc.connectors.base.WatermarkDispatcher
    public void dispatchWatermarkEvent(Map<String, ?> map, SourceSplitBase sourceSplitBase, Offset offset, WatermarkKind watermarkKind) throws InterruptedException {
        this.queue.enqueue(new DataChangeEvent(WatermarkEvent.create(map, this.topic, sourceSplitBase.splitId(), watermarkKind, offset)));
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // io.debezium.pipeline.EventDispatcher
    public /* bridge */ /* synthetic */ void dispatchSchemaChangeEvent(Partition partition, TableId tableId, SchemaChangeEventEmitter schemaChangeEventEmitter) throws InterruptedException {
        dispatchSchemaChangeEvent2((JdbcSourceEventDispatcher<P>) partition, tableId, schemaChangeEventEmitter);
    }
}
