package com.provectus.kafka.ui.emitter;

import com.provectus.kafka.ui.model.TopicMessageEventDTO;
import com.provectus.kafka.ui.serde.RecordSerDe;
import com.provectus.kafka.ui.util.OffsetsSeek;
import java.time.Duration;
import java.util.Iterator;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.utils.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.FluxSink;

/* loaded from: input_file:BOOT-INF/classes/com/provectus/kafka/ui/emitter/ForwardRecordEmitter.class */
public class ForwardRecordEmitter extends AbstractEmitter implements Consumer<FluxSink<TopicMessageEventDTO>> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) ForwardRecordEmitter.class);
    private static final Duration POLL_TIMEOUT_MS = Duration.ofMillis(1000);
    private final Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier;
    private final OffsetsSeek offsetsSeek;

    public ForwardRecordEmitter(Supplier<KafkaConsumer<Bytes, Bytes>> supplier, OffsetsSeek offsetsSeek, RecordSerDe recordSerDe) {
        super(recordSerDe);
        this.consumerSupplier = supplier;
        this.offsetsSeek = offsetsSeek;
    }

    @Override // java.util.function.Consumer
    public void accept(FluxSink<TopicMessageEventDTO> fluxSink) {
        try {
            KafkaConsumer<Bytes, Bytes> kafkaConsumer = this.consumerSupplier.get();
            try {
                sendPhase(fluxSink, "Assigning partitions");
                OffsetsSeek.WaitingOffsets assignAndSeek = this.offsetsSeek.assignAndSeek(kafkaConsumer);
                while (!fluxSink.isCancelled() && !assignAndSeek.endReached()) {
                    sendPhase(fluxSink, "Polling");
                    ConsumerRecords<Bytes, Bytes> poll = poll(fluxSink, kafkaConsumer);
                    log.info("{} records polled", Integer.valueOf(poll.count()));
                    Iterator<ConsumerRecord<Bytes, Bytes>> it = poll.iterator();
                    while (it.hasNext()) {
                        ConsumerRecord<Bytes, Bytes> next = it.next();
                        if (!fluxSink.isCancelled() && !assignAndSeek.endReached()) {
                            sendMessage(fluxSink, next);
                            assignAndSeek.markPolled(next);
                        }
                    }
                }
                fluxSink.complete();
                log.info("Polling finished");
                if (kafkaConsumer != null) {
                    kafkaConsumer.close();
                }
            } finally {
            }
        } catch (Exception e) {
            log.error("Error occurred while consuming records", (Throwable) e);
            fluxSink.error(e);
        }
    }
}
