package com.provectus.kafka.ui.emitter;

import com.provectus.kafka.ui.model.TopicMessageConsumingDTO;
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
import com.provectus.kafka.ui.model.TopicMessagePhaseDTO;
import com.provectus.kafka.ui.serde.RecordSerDe;
import com.provectus.kafka.ui.util.ClusterUtil;
import java.time.Duration;
import java.time.Instant;
import java.util.Iterator;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.utils.Bytes;
import reactor.core.publisher.FluxSink;

/* loaded from: input_file:BOOT-INF/classes/com/provectus/kafka/ui/emitter/AbstractEmitter.class */
public abstract class AbstractEmitter {
    private static final Duration POLL_TIMEOUT_MS = Duration.ofMillis(1000);
    private final RecordSerDe recordDeserializer;
    private long bytes = 0;
    private int records = 0;
    private long elapsed = 0;

    public AbstractEmitter(RecordSerDe recordSerDe) {
        this.recordDeserializer = recordSerDe;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ConsumerRecords<Bytes, Bytes> poll(FluxSink<TopicMessageEventDTO> fluxSink, Consumer<Bytes, Bytes> consumer) {
        Instant now = Instant.now();
        ConsumerRecords<Bytes, Bytes> poll = consumer.poll(POLL_TIMEOUT_MS);
        sendConsuming(fluxSink, poll, Duration.between(now, Instant.now()).toMillis());
        return poll;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public FluxSink<TopicMessageEventDTO> sendMessage(FluxSink<TopicMessageEventDTO> fluxSink, ConsumerRecord<Bytes, Bytes> consumerRecord) {
        return fluxSink.next(new TopicMessageEventDTO().type(TopicMessageEventDTO.TypeEnum.MESSAGE).message(ClusterUtil.mapToTopicMessage(consumerRecord, this.recordDeserializer)));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void sendPhase(FluxSink<TopicMessageEventDTO> fluxSink, String str) {
        fluxSink.next(new TopicMessageEventDTO().type(TopicMessageEventDTO.TypeEnum.PHASE).phase(new TopicMessagePhaseDTO().name(str)));
    }

    protected void sendConsuming(FluxSink<TopicMessageEventDTO> fluxSink, ConsumerRecords<Bytes, Bytes> consumerRecords, long j) {
        Iterator<ConsumerRecord<Bytes, Bytes>> it = consumerRecords.iterator();
        while (it.hasNext()) {
            for (Header header : it.next().headers()) {
                this.bytes += (header.key() != null ? header.key().getBytes().length : 0L) + (header.value() != null ? header.value().length : 0L);
            }
            this.bytes += r0.serializedKeySize() + r0.serializedValueSize();
        }
        this.records += consumerRecords.count();
        this.elapsed += j;
        fluxSink.next(new TopicMessageEventDTO().type(TopicMessageEventDTO.TypeEnum.CONSUMING).consuming(new TopicMessageConsumingDTO().bytesConsumed(Long.valueOf(this.bytes)).elapsedMs(Long.valueOf(this.elapsed)).isCancelled(Boolean.valueOf(fluxSink.isCancelled())).messagesConsumed(Integer.valueOf(this.records))));
    }
}
