package org.apache.flink.connector.elasticsearch.lookup;

import java.io.IOException;
import java.lang.AutoCloseable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.elasticsearch.ElasticsearchApiCallBridge;
import org.apache.flink.connector.elasticsearch.NetworkClientConfig;
import org.apache.flink.elasticsearch7.shaded.org.apache.http.HttpHost;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.search.SearchRequest;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.common.Strings;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.index.query.BoolQueryBuilder;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.index.query.TermQueryBuilder;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.search.builder.SearchSourceBuilder;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.util.DataFormatConverters;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.LookupFunction;
import org.apache.flink.table.types.DataType;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/connector/elasticsearch/lookup/ElasticsearchRowDataLookupFunction.class */
public class ElasticsearchRowDataLookupFunction<C extends AutoCloseable> extends LookupFunction {
    private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchRowDataLookupFunction.class);
    private static final long serialVersionUID = 1;
    private final DeserializationSchema<RowData> deserializationSchema;
    private final String index;
    private final String type;
    private final String[] producedNames;
    private final String[] lookupKeys;
    private final int maxRetryTimes;
    private final DataFormatConverters.DataFormatConverter[] converters;
    private SearchRequest searchRequest;
    private SearchSourceBuilder searchSourceBuilder;
    private final ElasticsearchApiCallBridge<C> callBridge;
    private final NetworkClientConfig networkClientConfig;
    private final List<HttpHost> hosts;
    private transient C client;

    public ElasticsearchRowDataLookupFunction(DeserializationSchema<RowData> deserializationSchema, int i, String str, String str2, String[] strArr, DataType[] dataTypeArr, String[] strArr2, List<HttpHost> list, NetworkClientConfig networkClientConfig, ElasticsearchApiCallBridge<C> elasticsearchApiCallBridge) {
        Preconditions.checkNotNull(deserializationSchema, "No DeserializationSchema supplied.");
        Preconditions.checkNotNull(Integer.valueOf(i), "No maxRetryTimes supplied.");
        Preconditions.checkNotNull(strArr, "No fieldNames supplied.");
        Preconditions.checkNotNull(dataTypeArr, "No fieldTypes supplied.");
        Preconditions.checkNotNull(strArr2, "No keyNames supplied.");
        Preconditions.checkNotNull(list, "No hosts supplied.");
        Preconditions.checkNotNull(networkClientConfig, "No networkClientConfig supplied.");
        Preconditions.checkNotNull(elasticsearchApiCallBridge, "No ElasticsearchApiCallBridge supplied.");
        this.deserializationSchema = deserializationSchema;
        this.maxRetryTimes = i;
        this.index = str;
        this.type = str2;
        this.producedNames = strArr;
        this.lookupKeys = strArr2;
        this.converters = new DataFormatConverters.DataFormatConverter[strArr2.length];
        Map map = (Map) IntStream.range(0, strArr.length).boxed().collect(Collectors.toMap(num -> {
            return strArr[num.intValue()];
        }, num2 -> {
            return num2;
        }));
        for (int i2 = 0; i2 < strArr2.length; i2++) {
            Integer num3 = (Integer) map.get(strArr2[i2]);
            Preconditions.checkArgument(num3 != null, "Lookup keys %s not selected", new Object[]{Arrays.toString(strArr2)});
            this.converters[i2] = DataFormatConverters.getConverterForDataType(dataTypeArr[num3.intValue()]);
        }
        this.networkClientConfig = networkClientConfig;
        this.hosts = list;
        this.callBridge = elasticsearchApiCallBridge;
    }

    public void open(FunctionContext functionContext) throws Exception {
        this.client = this.callBridge.createClient(this.networkClientConfig, this.hosts);
        this.searchRequest = new SearchRequest(this.index);
        if (this.type == null) {
            this.searchRequest.types(Strings.EMPTY_ARRAY);
        } else {
            this.searchRequest.types(this.type);
        }
        this.searchSourceBuilder = new SearchSourceBuilder();
        this.searchSourceBuilder.fetchSource(this.producedNames, (String[]) null);
        this.deserializationSchema.open((DeserializationSchema.InitializationContext) null);
    }

    public Collection<RowData> lookup(RowData rowData) {
        ArrayList arrayList;
        Tuple2<String, String[]> search;
        BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder();
        for (int i = 0; i < this.lookupKeys.length; i++) {
            boolQueryBuilder.must(new TermQueryBuilder(this.lookupKeys[i], this.converters[i].toExternal(rowData, i)));
        }
        this.searchSourceBuilder.query(boolQueryBuilder);
        this.searchRequest.source(this.searchSourceBuilder);
        for (int i2 = 0; i2 <= this.maxRetryTimes; i2++) {
            try {
                arrayList = new ArrayList();
                search = this.callBridge.search(this.client, this.searchRequest);
            } catch (IOException e) {
                LOG.error(String.format("Elasticsearch search error, retry times = %d", Integer.valueOf(i2)), e);
                if (i2 >= this.maxRetryTimes) {
                    throw new FlinkRuntimeException("Execution of Elasticsearch search failed.", e);
                }
                try {
                    Thread.sleep(1000 * i2);
                } catch (InterruptedException e2) {
                    LOG.warn("Interrupted while waiting to retry failed elasticsearch search, aborting");
                    throw new FlinkRuntimeException(e2);
                }
            }
            if (((String[]) search.f1).length > 0) {
                for (String str : (String[]) search.f1) {
                    arrayList.add(parseSearchResult(str));
                }
                arrayList.trimToSize();
                return arrayList;
            }
            continue;
        }
        return Collections.emptyList();
    }

    private RowData parseSearchResult(String str) {
        RowData rowData = null;
        try {
            rowData = (RowData) this.deserializationSchema.deserialize(str.getBytes());
        } catch (IOException e) {
            LOG.error("Deserialize search hit failed: " + e.getMessage());
        }
        return rowData;
    }
}
