package org.apache.flink.cdc.connectors.base.source.metrics;

import io.debezium.data.Envelope;
import io.debezium.relational.TableId;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.source.SourceRecord;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.SourceReaderMetricGroup;
import org.apache.flink.util.clock.SystemClock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/cdc/connectors/base/source/metrics/SourceReaderMetrics.class */
public class SourceReaderMetrics {
    private static final Logger LOG = LoggerFactory.getLogger(SourceReaderMetrics.class);
    public static final long UNDEFINED = -1;
    public static final String NAMESPACE_GROUP_KEY = "namespace";
    public static final String SCHEMA_GROUP_KEY = "schema";
    public static final String TABLE_GROUP_KEY = "table";
    public static final String NUM_SNAPSHOT_RECORDS = "numSnapshotRecords";
    public static final String NUM_INSERT_DML_RECORDS = "numInsertDMLRecords";
    public static final String NUM_UPDATE_DML_RECORDS = "numUpdateDMLRecords";
    public static final String NUM_DELETE_DML_RECORDS = "numDeleteDMLRecords";
    public static final String NUM_DDL_RECORDS = "numDDLRecords";
    public static final String CURRENT_EVENT_TIME_LAG = "currentEventTimeLag";
    private final SourceReaderMetricGroup metricGroup;
    private final Counter snapshotCounter;
    private final Counter insertCounter;
    private final Counter updateCounter;
    private final Counter deleteCounter;
    private final Counter schemaChangeCounter;
    private final Counter numRecordsInErrorsCounter;
    private final Map<TableId, TableMetrics> tableMetricsMap = new HashMap();
    private volatile long fetchDelay = -1;
    private volatile long lastReceivedEventTime = -1;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/cdc/connectors/base/source/metrics/SourceReaderMetrics$TableMetrics.class */
    public static class TableMetrics {
        private final Counter recordsCounter;
        private final Counter snapshotCounter;
        private final Counter insertCounter;
        private final Counter updateCounter;
        private final Counter deleteCounter;
        private final Counter schemaChangeCounter;

        public TableMetrics(String str, String str2, String str3, MetricGroup metricGroup) {
            String processNull = processNull(str);
            String processNull2 = processNull(str2);
            MetricGroup addGroup = metricGroup.addGroup("namespace", processNull).addGroup("schema", processNull2).addGroup("table", processNull(str3));
            this.recordsCounter = addGroup.counter("numRecordsIn");
            this.snapshotCounter = addGroup.counter(SourceReaderMetrics.NUM_SNAPSHOT_RECORDS);
            this.insertCounter = addGroup.counter(SourceReaderMetrics.NUM_INSERT_DML_RECORDS);
            this.updateCounter = addGroup.counter(SourceReaderMetrics.NUM_UPDATE_DML_RECORDS);
            this.deleteCounter = addGroup.counter(SourceReaderMetrics.NUM_DELETE_DML_RECORDS);
            this.schemaChangeCounter = addGroup.counter(SourceReaderMetrics.NUM_DDL_RECORDS);
        }

        private String processNull(String str) {
            return StringUtils.isBlank(str) ? "" : str;
        }

        public void markSnapshotRecord() {
            this.recordsCounter.inc();
            this.snapshotCounter.inc();
        }

        public void markInsertRecord() {
            this.recordsCounter.inc();
            this.insertCounter.inc();
        }

        public void markDeleteRecord() {
            this.recordsCounter.inc();
            this.deleteCounter.inc();
        }

        public void markUpdateRecord() {
            this.recordsCounter.inc();
            this.updateCounter.inc();
        }

        public void markSchemaChangeRecord() {
            this.recordsCounter.inc();
            this.schemaChangeCounter.inc();
        }
    }

    public SourceReaderMetrics(SourceReaderMetricGroup sourceReaderMetricGroup) {
        this.metricGroup = sourceReaderMetricGroup;
        this.numRecordsInErrorsCounter = sourceReaderMetricGroup.getNumRecordsInErrorsCounter();
        sourceReaderMetricGroup.gauge("currentFetchEventTimeLag", this::getFetchDelay);
        sourceReaderMetricGroup.gauge(CURRENT_EVENT_TIME_LAG, this::getCurrentEventTimeLag);
        this.snapshotCounter = sourceReaderMetricGroup.counter(NUM_SNAPSHOT_RECORDS);
        this.insertCounter = sourceReaderMetricGroup.counter(NUM_INSERT_DML_RECORDS);
        this.updateCounter = sourceReaderMetricGroup.counter(NUM_UPDATE_DML_RECORDS);
        this.deleteCounter = sourceReaderMetricGroup.counter(NUM_DELETE_DML_RECORDS);
        this.schemaChangeCounter = sourceReaderMetricGroup.counter(NUM_DDL_RECORDS);
    }

    public long getFetchDelay() {
        return this.fetchDelay;
    }

    public void recordFetchDelay(long j) {
        this.fetchDelay = j;
    }

    public void addNumRecordsInErrors(long j) {
        this.numRecordsInErrorsCounter.inc(j);
    }

    public void updateLastReceivedEventTime(Long l) {
        if (l == null || l.longValue() <= 0) {
            return;
        }
        this.lastReceivedEventTime = l.longValue();
    }

    public void markRecord() {
        try {
            this.metricGroup.getIOMetricGroup().getNumRecordsInCounter().inc();
        } catch (Exception e) {
            LOG.warn("Failed to update record counters.", e);
        }
    }

    public void updateRecordCounters(SourceRecord sourceRecord) {
        catchAndWarnLogAllExceptions(() -> {
            if (!SourceRecordUtils.isDataChangeRecord(sourceRecord)) {
                if (SourceRecordUtils.isSchemaChangeEvent(sourceRecord)) {
                    this.schemaChangeCounter.inc();
                    TableId tableId = SourceRecordUtils.getTableId(sourceRecord);
                    if (tableId != null) {
                        getTableMetrics(tableId).markSchemaChangeRecord();
                        return;
                    }
                    return;
                }
                return;
            }
            TableMetrics tableMetrics = getTableMetrics(SourceRecordUtils.getTableId(sourceRecord));
            switch (Envelope.operationFor(sourceRecord)) {
                case READ:
                    this.snapshotCounter.inc();
                    tableMetrics.markSnapshotRecord();
                    return;
                case CREATE:
                    this.insertCounter.inc();
                    tableMetrics.markInsertRecord();
                    return;
                case DELETE:
                    this.deleteCounter.inc();
                    tableMetrics.markDeleteRecord();
                    return;
                case UPDATE:
                    this.updateCounter.inc();
                    tableMetrics.markUpdateRecord();
                    return;
                default:
                    return;
            }
        });
    }

    private TableMetrics getTableMetrics(TableId tableId) {
        return this.tableMetricsMap.computeIfAbsent(tableId, tableId2 -> {
            return new TableMetrics(tableId2.catalog(), tableId2.schema(), tableId2.table(), this.metricGroup);
        });
    }

    private void catchAndWarnLogAllExceptions(Runnable runnable) {
        try {
            runnable.run();
        } catch (Exception e) {
            LOG.warn("Failed to update metrics", e);
        }
    }

    private long getCurrentEventTimeLag() {
        if (this.lastReceivedEventTime == -1) {
            return -1L;
        }
        return SystemClock.getInstance().absoluteTimeMillis() - this.lastReceivedEventTime;
    }
}
