package com.provectus.kafka.ui.service;

import com.provectus.kafka.ui.emitter.BackwardRecordEmitter;
import com.provectus.kafka.ui.emitter.ForwardRecordEmitter;
import com.provectus.kafka.ui.exception.TopicNotFoundException;
import com.provectus.kafka.ui.exception.ValidationException;
import com.provectus.kafka.ui.model.ConsumerPosition;
import com.provectus.kafka.ui.model.CreateTopicMessageDTO;
import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.model.SeekDirectionDTO;
import com.provectus.kafka.ui.model.TopicMessageDTO;
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
import com.provectus.kafka.ui.serde.DeserializationService;
import com.provectus.kafka.ui.serde.RecordSerDe;
import com.provectus.kafka.ui.util.FilterTopicMessageEvents;
import com.provectus.kafka.ui.util.OffsetsSeekBackward;
import com.provectus.kafka.ui.util.OffsetsSeekForward;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

@Service
/* loaded from: input_file:BOOT-INF/classes/com/provectus/kafka/ui/service/MessagesService.class */
public class MessagesService {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) MessagesService.class);
    private static final int MAX_LOAD_RECORD_LIMIT = 100;
    private static final int DEFAULT_LOAD_RECORD_LIMIT = 20;
    private final AdminClientService adminClientService;
    private final DeserializationService deserializationService;
    private final ConsumerGroupService consumerGroupService;
    private final MetricsCache metricsCache;

    public Mono<Void> deleteTopicMessages(KafkaCluster kafkaCluster, String str, List<Integer> list) {
        if (this.metricsCache.get(kafkaCluster).getTopicDescriptions().containsKey(str)) {
            return offsetsForDeletion(kafkaCluster, str, list).flatMap(map -> {
                return this.adminClientService.get(kafkaCluster).flatMap(reactiveAdminClient -> {
                    return reactiveAdminClient.deleteRecords(map);
                });
            });
        }
        throw new TopicNotFoundException();
    }

    private Mono<Map<TopicPartition, Long>> offsetsForDeletion(KafkaCluster kafkaCluster, String str, List<Integer> list) {
        return this.adminClientService.get(kafkaCluster).flatMap(reactiveAdminClient -> {
            return reactiveAdminClient.listOffsets(str, OffsetSpec.earliest()).zipWith(reactiveAdminClient.listOffsets(str, OffsetSpec.latest()), (map, map2) -> {
                return (Map) map2.entrySet().stream().filter(entry -> {
                    return list.isEmpty() || list.contains(Integer.valueOf(((TopicPartition) entry.getKey()).partition()));
                }).filter(entry2 -> {
                    return !((Long) entry2.getValue()).equals(map.get(entry2.getKey()));
                }).collect(Collectors.toMap((v0) -> {
                    return v0.getKey();
                }, (v0) -> {
                    return v0.getValue();
                }));
            });
        });
    }

    public Mono<RecordMetadata> sendMessage(KafkaCluster kafkaCluster, String str, CreateTopicMessageDTO createTopicMessageDTO) {
        if (createTopicMessageDTO.getPartition() != null && createTopicMessageDTO.getPartition().intValue() > this.metricsCache.get(kafkaCluster).getTopicDescriptions().get(str).partitions().size() - 1) {
            throw new ValidationException("Invalid partition");
        }
        RecordSerDe recordDeserializerForCluster = this.deserializationService.getRecordDeserializerForCluster(kafkaCluster);
        Properties properties = new Properties();
        properties.putAll(kafkaCluster.getProperties());
        properties.put("bootstrap.servers", kafkaCluster.getBootstrapServers());
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
        KafkaProducer kafkaProducer = new KafkaProducer(properties);
        try {
            ProducerRecord<byte[], byte[]> serialize = recordDeserializerForCluster.serialize(str, createTopicMessageDTO.getKey().orElse(null), createTopicMessageDTO.getContent().orElse(null), createTopicMessageDTO.getPartition());
            ProducerRecord producerRecord = new ProducerRecord(serialize.topic(), serialize.partition(), serialize.key(), serialize.value(), createHeaders(createTopicMessageDTO.getHeaders()));
            CompletableFuture completableFuture = new CompletableFuture();
            kafkaProducer.send(producerRecord, (recordMetadata, exc) -> {
                if (exc != null) {
                    completableFuture.completeExceptionally(exc);
                } else {
                    completableFuture.complete(recordMetadata);
                }
            });
            Mono<RecordMetadata> fromFuture = Mono.fromFuture(completableFuture);
            kafkaProducer.close();
            return fromFuture;
        } catch (Throwable th) {
            try {
                kafkaProducer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    private Iterable<Header> createHeaders(@Nullable Map<String, String> map) {
        if (map == null) {
            return new RecordHeaders();
        }
        RecordHeaders recordHeaders = new RecordHeaders();
        map.forEach((str, str2) -> {
            recordHeaders.add(new RecordHeader(str, str2.getBytes()));
        });
        return recordHeaders;
    }

    public Flux<TopicMessageEventDTO> loadMessages(KafkaCluster kafkaCluster, String str, ConsumerPosition consumerPosition, String str2, Integer num) {
        int intValue = ((Integer) Optional.ofNullable(num).map(num2 -> {
            return Integer.valueOf(Math.min(num2.intValue(), 100));
        }).orElse(20)).intValue();
        RecordSerDe recordDeserializerForCluster = this.deserializationService.getRecordDeserializerForCluster(kafkaCluster);
        return Flux.create(consumerPosition.getSeekDirection().equals(SeekDirectionDTO.FORWARD) ? new ForwardRecordEmitter(() -> {
            return this.consumerGroupService.createConsumer(kafkaCluster);
        }, new OffsetsSeekForward(str, consumerPosition), recordDeserializerForCluster) : new BackwardRecordEmitter(map -> {
            return this.consumerGroupService.createConsumer(kafkaCluster, map);
        }, new OffsetsSeekBackward(str, consumerPosition, intValue), recordDeserializerForCluster)).filter(topicMessageEventDTO -> {
            return filterTopicMessage(topicMessageEventDTO, str2);
        }).takeWhile(new FilterTopicMessageEvents(intValue)).subscribeOn(Schedulers.elastic()).share();
    }

    private boolean filterTopicMessage(TopicMessageEventDTO topicMessageEventDTO, String str) {
        if (StringUtils.isEmpty(str) || !topicMessageEventDTO.getType().equals(TopicMessageEventDTO.TypeEnum.MESSAGE)) {
            return true;
        }
        TopicMessageDTO message = topicMessageEventDTO.getMessage();
        return (!StringUtils.isEmpty(message.getKey()) && message.getKey().contains(str)) || (!StringUtils.isEmpty(message.getContent()) && message.getContent().contains(str));
    }

    public MessagesService(AdminClientService adminClientService, DeserializationService deserializationService, ConsumerGroupService consumerGroupService, MetricsCache metricsCache) {
        this.adminClientService = adminClientService;
        this.deserializationService = deserializationService;
        this.consumerGroupService = consumerGroupService;
        this.metricsCache = metricsCache;
    }
}
