package kafdrop.service;

import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.TreeMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.PostConstruct;
import kafdrop.config.KafkaConfiguration;
import kafdrop.model.TopicPartitionVO;
import kafdrop.model.TopicVO;
import kafdrop.util.Deserializers;
import kafdrop.util.MessageDeserializer;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:BOOT-INF/classes/kafdrop/service/KafkaHighLevelConsumer.class */
public final class KafkaHighLevelConsumer {
    private static final int POLL_TIMEOUT_MS = 200;
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) KafkaHighLevelConsumer.class);
    private KafkaConsumer<byte[], byte[]> kafkaConsumer;
    private final KafkaConfiguration kafkaConfiguration;

    public KafkaHighLevelConsumer(KafkaConfiguration kafkaConfiguration) {
        this.kafkaConfiguration = kafkaConfiguration;
    }

    @PostConstruct
    private void initializeClient() {
        if (this.kafkaConsumer == null) {
            Properties properties = new Properties();
            properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
            properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
            properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
            properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
            properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
            properties.put("client.id", "kafdrop-consumer");
            properties.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
            this.kafkaConfiguration.applyCommon(properties);
            this.kafkaConsumer = new KafkaConsumer<>(properties);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void setTopicPartitionSizes(List<TopicVO> list) {
        initializeClient();
        Map map = (Map) list.stream().map(topicVO -> {
            return Pair.of(topicVO, (List) topicVO.getPartitions().stream().map(topicPartitionVO -> {
                return new TopicPartition(topicVO.getName(), topicPartitionVO.getId());
            }).collect(Collectors.toList()));
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }));
        List list2 = (List) map.values().stream().flatMap((v0) -> {
            return v0.stream();
        }).collect(Collectors.toList());
        this.kafkaConsumer.assign(list2);
        Map<TopicPartition, Long> beginningOffsets = this.kafkaConsumer.beginningOffsets(list2);
        Map<TopicPartition, Long> endOffsets = this.kafkaConsumer.endOffsets(list2);
        map.forEach((topicVO2, list3) -> {
            list3.forEach(topicPartition -> {
                topicVO2.getPartition(topicPartition.partition()).ifPresent(topicPartitionVO -> {
                    Long l = (Long) beginningOffsets.get(topicPartition);
                    Long l2 = (Long) endOffsets.get(topicPartition);
                    LOG.debug("topic: {}, partition: {}, startOffset: {}, endOffset: {}", topicPartition.topic(), Integer.valueOf(topicPartition.partition()), l, l2);
                    topicPartitionVO.setFirstOffset(l.longValue());
                    topicPartitionVO.setSize(l2.longValue());
                });
            });
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized List<ConsumerRecord<String, String>> getLatestRecords(TopicPartition topicPartition, long j, int i, Deserializers deserializers) {
        initializeClient();
        List singletonList = Collections.singletonList(topicPartition);
        this.kafkaConsumer.assign(singletonList);
        this.kafkaConsumer.seek(topicPartition, j);
        ArrayList arrayList = new ArrayList(i);
        Long l = this.kafkaConsumer.beginningOffsets(singletonList).get(topicPartition);
        long longValue = this.kafkaConsumer.endOffsets(singletonList).get(topicPartition).longValue() - 1;
        if (l.longValue() > longValue) {
            return Collections.emptyList();
        }
        long j2 = j - 1;
        int i2 = 0;
        while (arrayList.size() < i && j2 < longValue) {
            List<ConsumerRecord<byte[], byte[]>> records = this.kafkaConsumer.poll(Duration.ofMillis(200L)).records(topicPartition);
            if (records.isEmpty()) {
                i2++;
                if (i2 == 3) {
                    break;
                }
            } else {
                arrayList.addAll(records);
                j2 = records.get(records.size() - 1).offset();
                i2 = 0;
            }
        }
        return (List) arrayList.subList(0, Math.min(i, arrayList.size())).stream().map(consumerRecord -> {
            return new ConsumerRecord(consumerRecord.topic(), consumerRecord.partition(), consumerRecord.offset(), consumerRecord.timestamp(), consumerRecord.timestampType(), consumerRecord.serializedKeySize(), consumerRecord.serializedValueSize(), deserialize(deserializers.getKeyDeserializer(), (byte[]) consumerRecord.key()), deserialize(deserializers.getValueDeserializer(), (byte[]) consumerRecord.value()), consumerRecord.headers(), consumerRecord.leaderEpoch());
        }).collect(Collectors.toList());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized List<ConsumerRecord<String, String>> getLatestRecords(String str, int i, Deserializers deserializers) {
        initializeClient();
        List<TopicPartition> list = (List) this.kafkaConsumer.partitionsFor(str).stream().map(partitionInfo -> {
            return new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
        }).collect(Collectors.toList());
        this.kafkaConsumer.assign(list);
        Map<TopicPartition, Long> endOffsets = this.kafkaConsumer.endOffsets(list);
        for (TopicPartition topicPartition : list) {
            this.kafkaConsumer.seek(topicPartition, Math.max(0L, Math.max(0L, endOffsets.get(topicPartition).longValue() - 1) - i));
        }
        int size = i * list.size();
        Map map = (Map) list.stream().collect(Collectors.toMap(topicPartition2 -> {
            return topicPartition2;
        }, topicPartition3 -> {
            return new ArrayList(i);
        }));
        boolean z = true;
        while (map.size() < size && z) {
            ConsumerRecords<byte[], byte[]> poll = this.kafkaConsumer.poll(Duration.ofMillis(200L));
            z = false;
            for (TopicPartition topicPartition4 : poll.partitions()) {
                List<ConsumerRecord<byte[], byte[]>> records = poll.records(topicPartition4);
                if (!records.isEmpty()) {
                    ((List) map.get(topicPartition4)).addAll(records);
                    z = records.get(records.size() - 1).offset() < endOffsets.get(topicPartition4).longValue() - 1;
                }
            }
        }
        return (List) map.values().stream().flatMap((v0) -> {
            return v0.stream();
        }).map(consumerRecord -> {
            return new ConsumerRecord(consumerRecord.topic(), consumerRecord.partition(), consumerRecord.offset(), consumerRecord.timestamp(), consumerRecord.timestampType(), consumerRecord.serializedKeySize(), consumerRecord.serializedValueSize(), deserialize(deserializers.getKeyDeserializer(), (byte[]) consumerRecord.key()), deserialize(deserializers.getValueDeserializer(), (byte[]) consumerRecord.value()), consumerRecord.headers(), consumerRecord.leaderEpoch());
        }).collect(Collectors.toList());
    }

    private static String deserialize(MessageDeserializer messageDeserializer, byte[] bArr) {
        return bArr != null ? messageDeserializer.deserializeMessage(ByteBuffer.wrap(bArr)) : "empty";
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized Map<String, List<PartitionInfo>> getAllTopics() {
        initializeClient();
        return this.kafkaConsumer.listTopics();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized Map<String, TopicVO> getTopicInfos(Map<String, List<PartitionInfo>> map, String[] strArr) {
        initializeClient();
        Set<String> keySet = map.keySet();
        if (strArr.length == 0) {
            strArr = (String[]) Arrays.copyOf(keySet.toArray(), keySet.size(), String[].class);
        }
        Stream stream = Arrays.stream(strArr);
        Objects.requireNonNull(keySet);
        return (Map) stream.filter((v1) -> {
            return r1.contains(v1);
        }).map(str -> {
            return Pair.of(str, getTopicInfo(str, (List) map.get(str)));
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }));
    }

    private TopicVO getTopicInfo(String str, List<PartitionInfo> list) {
        TopicVO topicVO = new TopicVO(str);
        TreeMap treeMap = new TreeMap();
        for (PartitionInfo partitionInfo : list) {
            TopicPartitionVO topicPartitionVO = new TopicPartitionVO(partitionInfo.partition());
            Set set = (Set) Arrays.stream(partitionInfo.inSyncReplicas()).map((v0) -> {
                return v0.id();
            }).collect(Collectors.toSet());
            Set set2 = (Set) Arrays.stream(partitionInfo.offlineReplicas()).map((v0) -> {
                return v0.id();
            }).collect(Collectors.toSet());
            for (Node node : partitionInfo.replicas()) {
                topicPartitionVO.addReplica(new TopicPartitionVO.PartitionReplica(Integer.valueOf(node.id()), set.contains(Integer.valueOf(node.id())), false, set2.contains(Integer.valueOf(node.id()))));
            }
            Node leader = partitionInfo.leader();
            if (leader != null) {
                topicPartitionVO.addReplica(new TopicPartitionVO.PartitionReplica(Integer.valueOf(leader.id()), true, true, false));
            }
            treeMap.put(Integer.valueOf(partitionInfo.partition()), topicPartitionVO);
        }
        topicVO.setPartitions(treeMap);
        return topicVO;
    }
}
