package com.provectus.kafka.ui.emitter;

import com.provectus.kafka.ui.model.TopicMessageEventDTO;
import com.provectus.kafka.ui.serde.RecordSerDe;
import com.provectus.kafka.ui.util.OffsetsSeek;
import com.provectus.kafka.ui.util.OffsetsSeekBackward;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.FluxSink;

/* loaded from: input_file:BOOT-INF/classes/com/provectus/kafka/ui/emitter/BackwardRecordEmitter.class */
public class BackwardRecordEmitter extends AbstractEmitter implements Consumer<FluxSink<TopicMessageEventDTO>> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) BackwardRecordEmitter.class);
    private final Function<Map<String, Object>, KafkaConsumer<Bytes, Bytes>> consumerSupplier;
    private final OffsetsSeekBackward offsetsSeek;

    public BackwardRecordEmitter(Function<Map<String, Object>, KafkaConsumer<Bytes, Bytes>> function, OffsetsSeekBackward offsetsSeekBackward, RecordSerDe recordSerDe) {
        super(recordSerDe);
        this.offsetsSeek = offsetsSeekBackward;
        this.consumerSupplier = function;
    }

    @Override // java.util.function.Consumer
    public void accept(FluxSink<TopicMessageEventDTO> fluxSink) {
        try {
            KafkaConsumer<Bytes, Bytes> apply = this.consumerSupplier.apply(Map.of());
            try {
                List<TopicPartition> requestedPartitions = this.offsetsSeek.getRequestedPartitions(apply);
                sendPhase(fluxSink, "Request partitions");
                int msgsPerPartition = this.offsetsSeek.msgsPerPartition(requestedPartitions.size());
                KafkaConsumer<Bytes, Bytes> apply2 = this.consumerSupplier.apply(Map.of(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, Integer.valueOf(msgsPerPartition)));
                try {
                    sendPhase(fluxSink, "Created consumer");
                    TreeMap treeMap = new TreeMap(Comparator.comparingInt((v0) -> {
                        return v0.partition();
                    }));
                    treeMap.putAll(this.offsetsSeek.getPartitionsOffsets(apply2));
                    sendPhase(fluxSink, "Requested partitions offsets");
                    log.debug("partition offsets: {}", treeMap);
                    OffsetsSeek.WaitingOffsets waitingOffsets = this.offsetsSeek.waitingOffsets(apply2, treeMap.keySet());
                    log.debug("waiting offsets {} {}", waitingOffsets.getBeginOffsets(), waitingOffsets.getEndOffsets());
                    while (!fluxSink.isCancelled() && !waitingOffsets.beginReached()) {
                        for (Map.Entry entry : treeMap.entrySet()) {
                            Long l = waitingOffsets.getBeginOffsets().get(Integer.valueOf(((TopicPartition) entry.getKey()).partition()));
                            if (l != null) {
                                apply2.assign(Collections.singleton((TopicPartition) entry.getKey()));
                                long max = Math.max(l.longValue(), ((Long) entry.getValue()).longValue() - msgsPerPartition);
                                log.debug("Polling {} from {}", entry.getKey(), Long.valueOf(max));
                                apply2.seek((TopicPartition) entry.getKey(), max);
                                sendPhase(fluxSink, String.format("Consuming partition: %s from %s", entry.getKey(), Long.valueOf(max)));
                                ConsumerRecords<Bytes, Bytes> poll = poll(fluxSink, apply2);
                                List<ConsumerRecord<Bytes, Bytes>> list = (List) poll.records((TopicPartition) entry.getKey()).stream().filter(consumerRecord -> {
                                    return consumerRecord.offset() < ((Long) treeMap.get(entry.getKey())).longValue();
                                }).collect(Collectors.toList());
                                Collections.reverse(list);
                                log.debug("{} records polled", Integer.valueOf(poll.count()));
                                log.debug("{} records sent", Integer.valueOf(list.size()));
                                if (poll.count() > 0 && list.isEmpty()) {
                                    waitingOffsets.markPolled(((TopicPartition) entry.getKey()).partition());
                                }
                                for (ConsumerRecord<Bytes, Bytes> consumerRecord2 : list) {
                                    if (fluxSink.isCancelled() || waitingOffsets.beginReached()) {
                                        log.info("Begin reached");
                                        break;
                                    } else {
                                        sendMessage(fluxSink, consumerRecord2);
                                        waitingOffsets.markPolled(consumerRecord2);
                                    }
                                }
                                treeMap.put((TopicPartition) entry.getKey(), Long.valueOf(Math.max(max, ((Long) entry.getValue()).longValue() - msgsPerPartition)));
                            }
                        }
                        if (waitingOffsets.beginReached()) {
                            log.info("begin reached after partitions");
                        } else if (fluxSink.isCancelled()) {
                            log.info("sink is cancelled after partitions");
                        }
                    }
                    fluxSink.complete();
                    log.info("Polling finished");
                    if (apply2 != null) {
                        apply2.close();
                    }
                    if (apply != null) {
                        apply.close();
                    }
                } catch (Throwable th) {
                    if (apply2 != null) {
                        try {
                            apply2.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } finally {
            }
        } catch (Exception e) {
            log.error("Error occurred while consuming records", (Throwable) e);
            fluxSink.error(e);
        }
    }
}
