package com.provectus.kafka.ui.service;

import com.google.common.annotations.VisibleForTesting;
import com.provectus.kafka.ui.exception.TopicMetadataException;
import com.provectus.kafka.ui.exception.TopicNotFoundException;
import com.provectus.kafka.ui.exception.ValidationException;
import com.provectus.kafka.ui.mapper.ClusterMapper;
import com.provectus.kafka.ui.model.Feature;
import com.provectus.kafka.ui.model.InternalLogDirStats;
import com.provectus.kafka.ui.model.InternalPartitionsOffsets;
import com.provectus.kafka.ui.model.InternalTopic;
import com.provectus.kafka.ui.model.InternalTopicConfig;
import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.model.PartitionsIncreaseDTO;
import com.provectus.kafka.ui.model.PartitionsIncreaseResponseDTO;
import com.provectus.kafka.ui.model.ReplicationFactorChangeDTO;
import com.provectus.kafka.ui.model.ReplicationFactorChangeResponseDTO;
import com.provectus.kafka.ui.model.TopicColumnsToSortDTO;
import com.provectus.kafka.ui.model.TopicConfigDTO;
import com.provectus.kafka.ui.model.TopicCreationDTO;
import com.provectus.kafka.ui.model.TopicDTO;
import com.provectus.kafka.ui.model.TopicDetailsDTO;
import com.provectus.kafka.ui.model.TopicMessageSchemaDTO;
import com.provectus.kafka.ui.model.TopicUpdateDTO;
import com.provectus.kafka.ui.model.TopicsResponseDTO;
import com.provectus.kafka.ui.serde.DeserializationService;
import com.provectus.kafka.ui.service.MetricsCache;
import com.provectus.kafka.ui.util.JmxClusterUtil;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.NewPartitionReassignment;
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.TopicPartition;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;

@Service
/* loaded from: input_file:BOOT-INF/classes/com/provectus/kafka/ui/service/TopicsService.class */
public class TopicsService {
    private static final Integer DEFAULT_PAGE_SIZE = 25;
    private final AdminClientService adminClientService;
    private final ClusterMapper clusterMapper;
    private final DeserializationService deserializationService;
    private final MetricsCache metricsCache;

    @VisibleForTesting
    /* loaded from: input_file:BOOT-INF/classes/com/provectus/kafka/ui/service/TopicsService$Pagination.class */
    static final class Pagination {
        private final ReactiveAdminClient adminClient;
        private final MetricsCache.Metrics metrics;

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:BOOT-INF/classes/com/provectus/kafka/ui/service/TopicsService$Pagination$Page.class */
        public static final class Page {
            private final List<String> topics;
            private final int totalPages;

            public Page(List<String> list, int i) {
                this.topics = list;
                this.totalPages = i;
            }

            public List<String> getTopics() {
                return this.topics;
            }

            public int getTotalPages() {
                return this.totalPages;
            }

            public boolean equals(Object obj) {
                if (obj == this) {
                    return true;
                }
                if (!(obj instanceof Page)) {
                    return false;
                }
                Page page = (Page) obj;
                if (getTotalPages() != page.getTotalPages()) {
                    return false;
                }
                List<String> topics = getTopics();
                List<String> topics2 = page.getTopics();
                return topics == null ? topics2 == null : topics.equals(topics2);
            }

            public int hashCode() {
                int totalPages = (1 * 59) + getTotalPages();
                List<String> topics = getTopics();
                return (totalPages * 59) + (topics == null ? 43 : topics.hashCode());
            }

            public String toString() {
                return "TopicsService.Pagination.Page(topics=" + getTopics() + ", totalPages=" + getTotalPages() + ")";
            }
        }

        Mono<Page> getPage(Optional<Integer> optional, Optional<Integer> optional2, Optional<Boolean> optional3, Optional<String> optional4, Optional<TopicColumnsToSortDTO> optional5) {
            return geTopicsForPagination().map(list -> {
                Predicate predicate = num -> {
                    return num.intValue() > 0;
                };
                int intValue = ((Integer) optional2.filter(predicate).orElse(TopicsService.DEFAULT_PAGE_SIZE)).intValue();
                int intValue2 = (((Integer) optional.filter(predicate).orElse(1)).intValue() - 1) * intValue;
                List list = (List) list.stream().filter(internalTopic -> {
                    return !internalTopic.isInternal() || ((Boolean) optional3.map(bool -> {
                        return Boolean.valueOf(internalTopic.isInternal() == bool.booleanValue());
                    }).orElse(true)).booleanValue();
                }).filter(internalTopic2 -> {
                    return ((Boolean) optional4.map(str -> {
                        return Boolean.valueOf(StringUtils.containsIgnoreCase(internalTopic2.getName(), str));
                    }).orElse(true)).booleanValue();
                }).sorted(getComparatorForTopic(optional5)).collect(Collectors.toList());
                return new Page((List) list.stream().skip(intValue2).limit(intValue).map((v0) -> {
                    return v0.getName();
                }).collect(Collectors.toList()), (list.size() / intValue) + (list.size() % intValue == 0 ? 0 : 1));
            });
        }

        private Comparator<InternalTopic> getComparatorForTopic(Optional<TopicColumnsToSortDTO> optional) {
            Comparator<InternalTopic> comparing = Comparator.comparing((v0) -> {
                return v0.getName();
            });
            if (optional.isEmpty()) {
                return comparing;
            }
            switch (optional.get()) {
                case TOTAL_PARTITIONS:
                    return Comparator.comparing((v0) -> {
                        return v0.getPartitionCount();
                    });
                case OUT_OF_SYNC_REPLICAS:
                    return Comparator.comparing(internalTopic -> {
                        return Integer.valueOf(internalTopic.getReplicas() - internalTopic.getInSyncReplicas());
                    });
                case REPLICATION_FACTOR:
                    return Comparator.comparing((v0) -> {
                        return v0.getReplicationFactor();
                    });
                case NAME:
                default:
                    return comparing;
            }
        }

        private Mono<List<String>> filterExisting(Collection<String> collection) {
            return this.adminClient.listTopics(true).map(set -> {
                Stream stream = set.stream();
                Objects.requireNonNull(collection);
                return (List) stream.filter((v1) -> {
                    return r1.contains(v1);
                }).collect(Collectors.toList());
            });
        }

        private Mono<List<InternalTopic>> geTopicsForPagination() {
            return filterExisting(this.metrics.getTopicDescriptions().keySet()).map(list -> {
                return (List) list.stream().map(str -> {
                    return InternalTopic.from(this.metrics.getTopicDescriptions().get(str), this.metrics.getTopicConfigs().getOrDefault(str, List.of()), InternalPartitionsOffsets.empty(), this.metrics.getJmxMetrics(), this.metrics.getLogDirInfo());
                }).collect(Collectors.toList());
            });
        }

        public Pagination(ReactiveAdminClient reactiveAdminClient, MetricsCache.Metrics metrics) {
            this.adminClient = reactiveAdminClient;
            this.metrics = metrics;
        }

        public ReactiveAdminClient getAdminClient() {
            return this.adminClient;
        }

        public MetricsCache.Metrics getMetrics() {
            return this.metrics;
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof Pagination)) {
                return false;
            }
            Pagination pagination = (Pagination) obj;
            ReactiveAdminClient adminClient = getAdminClient();
            ReactiveAdminClient adminClient2 = pagination.getAdminClient();
            if (adminClient == null) {
                if (adminClient2 != null) {
                    return false;
                }
            } else if (!adminClient.equals(adminClient2)) {
                return false;
            }
            MetricsCache.Metrics metrics = getMetrics();
            MetricsCache.Metrics metrics2 = pagination.getMetrics();
            return metrics == null ? metrics2 == null : metrics.equals(metrics2);
        }

        public int hashCode() {
            ReactiveAdminClient adminClient = getAdminClient();
            int hashCode = (1 * 59) + (adminClient == null ? 43 : adminClient.hashCode());
            MetricsCache.Metrics metrics = getMetrics();
            return (hashCode * 59) + (metrics == null ? 43 : metrics.hashCode());
        }

        public String toString() {
            return "TopicsService.Pagination(adminClient=" + getAdminClient() + ", metrics=" + getMetrics() + ")";
        }
    }

    public Mono<TopicsResponseDTO> getTopics(KafkaCluster kafkaCluster, Optional<Integer> optional, Optional<Integer> optional2, Optional<Boolean> optional3, Optional<String> optional4, Optional<TopicColumnsToSortDTO> optional5) {
        return this.adminClientService.get(kafkaCluster).flatMap(reactiveAdminClient -> {
            return new Pagination(reactiveAdminClient, this.metricsCache.get(kafkaCluster)).getPage(optional, optional2, optional3, optional4, optional5).flatMap(page -> {
                return loadTopics(kafkaCluster, page.getTopics()).map(list -> {
                    TopicsResponseDTO topicsResponseDTO = new TopicsResponseDTO();
                    Stream stream = list.stream();
                    ClusterMapper clusterMapper = this.clusterMapper;
                    Objects.requireNonNull(clusterMapper);
                    return topicsResponseDTO.topics((List) stream.map(clusterMapper::toTopic).collect(Collectors.toList())).pageCount(Integer.valueOf(page.getTotalPages()));
                });
            });
        });
    }

    private Mono<List<InternalTopic>> loadTopics(KafkaCluster kafkaCluster, List<String> list) {
        return list.isEmpty() ? Mono.just(List.of()) : this.adminClientService.get(kafkaCluster).flatMap(reactiveAdminClient -> {
            return reactiveAdminClient.describeTopics(list).zipWith(reactiveAdminClient.getTopicsConfig(list), (map, map2) -> {
                this.metricsCache.update(kafkaCluster, map, map2);
                return getPartitionOffsets(map, reactiveAdminClient).map(internalPartitionsOffsets -> {
                    MetricsCache.Metrics metrics = this.metricsCache.get(kafkaCluster);
                    return createList(list, map, map2, internalPartitionsOffsets, metrics.getJmxMetrics(), metrics.getLogDirInfo());
                });
            });
        }).flatMap(Function.identity());
    }

    private Mono<InternalTopic> loadTopic(KafkaCluster kafkaCluster, String str) {
        return loadTopics(kafkaCluster, List.of(str)).map(list -> {
            return (InternalTopic) list.stream().findFirst().orElseThrow(TopicNotFoundException::new);
        });
    }

    private List<InternalTopic> createList(List<String> list, Map<String, TopicDescription> map, Map<String, List<ConfigEntry>> map2, InternalPartitionsOffsets internalPartitionsOffsets, JmxClusterUtil.JmxMetrics jmxMetrics, InternalLogDirStats internalLogDirStats) {
        Stream<String> stream = list.stream();
        Objects.requireNonNull(map);
        return (List) stream.filter((v1) -> {
            return r1.containsKey(v1);
        }).map(str -> {
            return InternalTopic.from((TopicDescription) map.get(str), (List) map2.getOrDefault(str, List.of()), internalPartitionsOffsets, jmxMetrics, internalLogDirStats);
        }).collect(Collectors.toList());
    }

    private Mono<InternalPartitionsOffsets> getPartitionOffsets(Map<String, TopicDescription> map, ReactiveAdminClient reactiveAdminClient) {
        List list = (List) map.values().stream().flatMap(topicDescription -> {
            return topicDescription.partitions().stream().map(topicPartitionInfo -> {
                return new TopicPartition(topicDescription.name(), topicPartitionInfo.partition());
            });
        }).collect(Collectors.toList());
        return reactiveAdminClient.listOffsets(list, OffsetSpec.earliest()).zipWith(reactiveAdminClient.listOffsets(list, OffsetSpec.latest()), (map2, map3) -> {
            return (Map) list.stream().filter(topicPartition -> {
                return map2.containsKey(topicPartition) && map3.containsKey(topicPartition);
            }).map(topicPartition2 -> {
                return Map.entry(topicPartition2, new InternalPartitionsOffsets.Offsets((Long) map2.get(topicPartition2), (Long) map3.get(topicPartition2)));
            }).collect(Collectors.toMap((v0) -> {
                return v0.getKey();
            }, (v0) -> {
                return v0.getValue();
            }));
        }).map(InternalPartitionsOffsets::new);
    }

    public Mono<TopicDetailsDTO> getTopicDetails(KafkaCluster kafkaCluster, String str) {
        Mono<InternalTopic> loadTopic = loadTopic(kafkaCluster, str);
        ClusterMapper clusterMapper = this.clusterMapper;
        Objects.requireNonNull(clusterMapper);
        return loadTopic.map(clusterMapper::toTopicDetails);
    }

    public Mono<List<TopicConfigDTO>> getTopicConfigs(KafkaCluster kafkaCluster, String str) {
        return this.adminClientService.get(kafkaCluster).flatMap(reactiveAdminClient -> {
            return reactiveAdminClient.getTopicsConfig(List.of(str));
        }).map(map -> {
            return (List) map.values().stream().findFirst().orElseThrow(TopicNotFoundException::new);
        }).map(list -> {
            Stream map2 = list.stream().map(InternalTopicConfig::from);
            ClusterMapper clusterMapper = this.clusterMapper;
            Objects.requireNonNull(clusterMapper);
            return (List) map2.map(clusterMapper::toTopicConfig).collect(Collectors.toList());
        });
    }

    private Mono<InternalTopic> createTopic(KafkaCluster kafkaCluster, ReactiveAdminClient reactiveAdminClient, Mono<TopicCreationDTO> mono) {
        return mono.flatMap(topicCreationDTO -> {
            return reactiveAdminClient.createTopic(topicCreationDTO.getName(), topicCreationDTO.getPartitions().intValue(), topicCreationDTO.getReplicationFactor().shortValue(), topicCreationDTO.getConfigs()).thenReturn(topicCreationDTO);
        }).onErrorResume(th -> {
            return Mono.error(new TopicMetadataException(th.getMessage()));
        }).flatMap(topicCreationDTO2 -> {
            return loadTopic(kafkaCluster, topicCreationDTO2.getName());
        });
    }

    public Mono<TopicDTO> createTopic(KafkaCluster kafkaCluster, Mono<TopicCreationDTO> mono) {
        Mono<R> flatMap = this.adminClientService.get(kafkaCluster).flatMap(reactiveAdminClient -> {
            return createTopic(kafkaCluster, reactiveAdminClient, mono);
        });
        ClusterMapper clusterMapper = this.clusterMapper;
        Objects.requireNonNull(clusterMapper);
        return flatMap.map(clusterMapper::toTopic);
    }

    private Mono<InternalTopic> updateTopic(KafkaCluster kafkaCluster, String str, TopicUpdateDTO topicUpdateDTO) {
        return this.adminClientService.get(kafkaCluster).flatMap(reactiveAdminClient -> {
            return reactiveAdminClient.updateTopicConfig(str, topicUpdateDTO.getConfigs()).then(loadTopic(kafkaCluster, str));
        });
    }

    public Mono<TopicDTO> updateTopic(KafkaCluster kafkaCluster, String str, Mono<TopicUpdateDTO> mono) {
        Mono<R> flatMap = mono.flatMap(topicUpdateDTO -> {
            return updateTopic(kafkaCluster, str, topicUpdateDTO);
        });
        ClusterMapper clusterMapper = this.clusterMapper;
        Objects.requireNonNull(clusterMapper);
        return flatMap.map(clusterMapper::toTopic);
    }

    private Mono<InternalTopic> changeReplicationFactor(KafkaCluster kafkaCluster, ReactiveAdminClient reactiveAdminClient, String str, Map<TopicPartition, Optional<NewPartitionReassignment>> map) {
        return reactiveAdminClient.alterPartitionReassignments(map).then(loadTopic(kafkaCluster, str));
    }

    public Mono<ReplicationFactorChangeResponseDTO> changeReplicationFactor(KafkaCluster kafkaCluster, String str, ReplicationFactorChangeDTO replicationFactorChangeDTO) {
        return loadTopic(kafkaCluster, str).flatMap(internalTopic -> {
            return this.adminClientService.get(kafkaCluster).flatMap(reactiveAdminClient -> {
                Integer valueOf = Integer.valueOf(internalTopic.getReplicationFactor());
                Integer totalReplicationFactor = replicationFactorChangeDTO.getTotalReplicationFactor();
                Integer valueOf2 = Integer.valueOf(this.metricsCache.get(kafkaCluster).getClusterDescription().getNodes().size());
                return totalReplicationFactor.equals(valueOf) ? Mono.error(new ValidationException(String.format("Topic already has replicationFactor %s.", valueOf))) : totalReplicationFactor.intValue() > valueOf2.intValue() ? Mono.error(new ValidationException(String.format("Requested replication factor %s more than brokers count %s.", totalReplicationFactor, valueOf2))) : changeReplicationFactor(kafkaCluster, reactiveAdminClient, str, getPartitionsReassignments(kafkaCluster, internalTopic, replicationFactorChangeDTO));
            }).map(internalTopic -> {
                return new ReplicationFactorChangeResponseDTO().topicName(internalTopic.getName()).totalReplicationFactor(Integer.valueOf(internalTopic.getReplicationFactor()));
            });
        });
    }

    private Map<TopicPartition, Optional<NewPartitionReassignment>> getPartitionsReassignments(KafkaCluster kafkaCluster, InternalTopic internalTopic, ReplicationFactorChangeDTO replicationFactorChangeDTO) {
        Map<Integer, List<Integer>> currentAssignment = getCurrentAssignment(internalTopic);
        Map<Integer, Integer> brokersMap = getBrokersMap(kafkaCluster, currentAssignment);
        int replicationFactor = internalTopic.getReplicationFactor();
        if (replicationFactorChangeDTO.getTotalReplicationFactor().intValue() > replicationFactor) {
            for (List<Integer> list : currentAssignment.values()) {
                for (Integer num : (List) brokersMap.entrySet().stream().sorted(Map.Entry.comparingByValue()).map((v0) -> {
                    return v0.getKey();
                }).collect(Collectors.toList())) {
                    if (!list.contains(num)) {
                        list.add(num);
                        brokersMap.merge(num, 1, (v0, v1) -> {
                            return Integer.sum(v0, v1);
                        });
                    }
                    if (list.size() == replicationFactorChangeDTO.getTotalReplicationFactor().intValue()) {
                        break;
                    }
                }
                if (list.size() != replicationFactorChangeDTO.getTotalReplicationFactor().intValue()) {
                    throw new ValidationException("Something went wrong during adding replicas");
                }
            }
        } else {
            if (replicationFactorChangeDTO.getTotalReplicationFactor().intValue() >= replicationFactor) {
                throw new ValidationException("Replication factor already equals requested");
            }
            for (Map.Entry<Integer, List<Integer>> entry : currentAssignment.entrySet()) {
                Integer key = entry.getKey();
                List<Integer> value = entry.getValue();
                for (Integer num2 : (List) brokersMap.entrySet().stream().sorted(Map.Entry.comparingByValue(Comparator.reverseOrder())).map((v0) -> {
                    return v0.getKey();
                }).collect(Collectors.toList())) {
                    if (!internalTopic.getPartitions().get(key).getLeader().equals(num2)) {
                        value.remove(num2);
                        brokersMap.merge(num2, -1, (v0, v1) -> {
                            return Integer.sum(v0, v1);
                        });
                    }
                    if (value.size() == replicationFactorChangeDTO.getTotalReplicationFactor().intValue()) {
                        break;
                    }
                }
                if (value.size() != replicationFactorChangeDTO.getTotalReplicationFactor().intValue()) {
                    throw new ValidationException("Something went wrong during removing replicas");
                }
            }
        }
        return (Map) currentAssignment.entrySet().stream().collect(Collectors.toMap(entry2 -> {
            return new TopicPartition(internalTopic.getName(), ((Integer) entry2.getKey()).intValue());
        }, entry3 -> {
            return Optional.of(new NewPartitionReassignment((List) entry3.getValue()));
        }));
    }

    private Map<Integer, List<Integer>> getCurrentAssignment(InternalTopic internalTopic) {
        return (Map) internalTopic.getPartitions().values().stream().collect(Collectors.toMap((v0) -> {
            return v0.getPartition();
        }, internalPartition -> {
            return (List) internalPartition.getReplicas().stream().map((v0) -> {
                return v0.getBroker();
            }).collect(Collectors.toList());
        }));
    }

    private Map<Integer, Integer> getBrokersMap(KafkaCluster kafkaCluster, Map<Integer, List<Integer>> map) {
        Map<Integer, Integer> map2 = (Map) this.metricsCache.get(kafkaCluster).getClusterDescription().getNodes().stream().map((v0) -> {
            return v0.id();
        }).collect(Collectors.toMap(num -> {
            return num;
        }, num2 -> {
            return 0;
        }));
        map.values().forEach(list -> {
            list.forEach(num3 -> {
                map2.put(num3, Integer.valueOf(((Integer) map2.get(num3)).intValue() + 1));
            });
        });
        return map2;
    }

    public Mono<PartitionsIncreaseResponseDTO> increaseTopicPartitions(KafkaCluster kafkaCluster, String str, PartitionsIncreaseDTO partitionsIncreaseDTO) {
        return loadTopic(kafkaCluster, str).flatMap(internalTopic -> {
            return this.adminClientService.get(kafkaCluster).flatMap(reactiveAdminClient -> {
                Integer valueOf = Integer.valueOf(internalTopic.getPartitionCount());
                Integer totalPartitionsCount = partitionsIncreaseDTO.getTotalPartitionsCount();
                return totalPartitionsCount.intValue() < valueOf.intValue() ? Mono.error(new ValidationException(String.format("Topic currently has %s partitions, which is higher than the requested %s.", valueOf, totalPartitionsCount))) : totalPartitionsCount.equals(valueOf) ? Mono.error(new ValidationException(String.format("Topic already has %s partitions.", valueOf))) : reactiveAdminClient.createPartitions(Collections.singletonMap(str, NewPartitions.increaseTo(partitionsIncreaseDTO.getTotalPartitionsCount().intValue()))).then(loadTopic(kafkaCluster, str));
            }).map(internalTopic -> {
                return new PartitionsIncreaseResponseDTO().topicName(internalTopic.getName()).totalPartitionsCount(Integer.valueOf(internalTopic.getPartitionCount()));
            });
        });
    }

    public Mono<Void> deleteTopic(KafkaCluster kafkaCluster, String str) {
        return this.metricsCache.get(kafkaCluster).getFeatures().contains(Feature.TOPIC_DELETION) ? this.adminClientService.get(kafkaCluster).flatMap(reactiveAdminClient -> {
            return reactiveAdminClient.deleteTopic(str);
        }).doOnSuccess(r7 -> {
            this.metricsCache.onTopicDelete(kafkaCluster, str);
        }) : Mono.error(new ValidationException("Topic deletion restricted"));
    }

    public TopicMessageSchemaDTO getTopicSchema(KafkaCluster kafkaCluster, String str) {
        if (this.metricsCache.get(kafkaCluster).getTopicDescriptions().containsKey(str)) {
            return this.deserializationService.getRecordDeserializerForCluster(kafkaCluster).getTopicSchema(str);
        }
        throw new TopicNotFoundException();
    }

    public TopicsService(AdminClientService adminClientService, ClusterMapper clusterMapper, DeserializationService deserializationService, MetricsCache metricsCache) {
        this.adminClientService = adminClientService;
        this.clusterMapper = clusterMapper;
        this.deserializationService = deserializationService;
        this.metricsCache = metricsCache;
    }
}
