package com.provectus.kafka.ui.service;

import com.provectus.kafka.ui.exception.InvalidRequestApiException;
import com.provectus.kafka.ui.exception.LogDirNotFoundApiException;
import com.provectus.kafka.ui.exception.NotFoundException;
import com.provectus.kafka.ui.exception.TopicOrPartitionNotFoundException;
import com.provectus.kafka.ui.mapper.ClusterMapper;
import com.provectus.kafka.ui.mapper.DescribeLogDirsMapper;
import com.provectus.kafka.ui.model.BrokerConfigDTO;
import com.provectus.kafka.ui.model.BrokerDTO;
import com.provectus.kafka.ui.model.BrokerLogdirUpdateDTO;
import com.provectus.kafka.ui.model.BrokerMetricsDTO;
import com.provectus.kafka.ui.model.BrokersLogdirsDTO;
import com.provectus.kafka.ui.model.InternalBrokerConfig;
import com.provectus.kafka.ui.model.KafkaCluster;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartitionReplica;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.LogDirNotFoundException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.requests.DescribeLogDirsResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Service
/* loaded from: input_file:BOOT-INF/classes/com/provectus/kafka/ui/service/BrokerService.class */
public class BrokerService {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) BrokerService.class);
    private final MetricsCache metricsCache;
    private final AdminClientService adminClientService;
    private final DescribeLogDirsMapper describeLogDirsMapper;
    private final ClusterMapper clusterMapper;

    private Mono<Map<Integer, List<ConfigEntry>>> loadBrokersConfig(KafkaCluster kafkaCluster, List<Integer> list) {
        return this.adminClientService.get(kafkaCluster).flatMap(reactiveAdminClient -> {
            return reactiveAdminClient.loadBrokersConfig(list);
        });
    }

    private Mono<List<ConfigEntry>> loadBrokersConfig(KafkaCluster kafkaCluster, Integer num) {
        return loadBrokersConfig(kafkaCluster, Collections.singletonList(num)).map(map -> {
            return (List) map.values().stream().findFirst().orElseThrow(() -> {
                return new NotFoundException(String.format("Config for broker %s not found", num));
            });
        });
    }

    private Flux<InternalBrokerConfig> getBrokersConfig(KafkaCluster kafkaCluster, Integer num) {
        return this.metricsCache.get(kafkaCluster).getClusterDescription().getNodes().stream().noneMatch(node -> {
            return node.id() == num.intValue();
        }) ? Flux.error(new NotFoundException(String.format("Broker with id %s not found", num))) : loadBrokersConfig(kafkaCluster, num).map(list -> {
            return (List) list.stream().map(InternalBrokerConfig::from).collect(Collectors.toList());
        }).flatMapMany((v0) -> {
            return Flux.fromIterable(v0);
        });
    }

    public Flux<BrokerDTO> getBrokers(KafkaCluster kafkaCluster) {
        return this.adminClientService.get(kafkaCluster).flatMap((v0) -> {
            return v0.describeCluster();
        }).map(clusterDescription -> {
            return (List) clusterDescription.getNodes().stream().map(node -> {
                BrokerDTO brokerDTO = new BrokerDTO();
                brokerDTO.setId(Integer.valueOf(node.id()));
                brokerDTO.setHost(node.host());
                return brokerDTO;
            }).collect(Collectors.toList());
        }).flatMapMany((v0) -> {
            return Flux.fromIterable(v0);
        });
    }

    public Mono<Node> getController(KafkaCluster kafkaCluster) {
        return this.adminClientService.get(kafkaCluster).flatMap((v0) -> {
            return v0.describeCluster();
        }).map((v0) -> {
            return v0.getController();
        });
    }

    public Mono<Void> updateBrokerLogDir(KafkaCluster kafkaCluster, Integer num, BrokerLogdirUpdateDTO brokerLogdirUpdateDTO) {
        return this.adminClientService.get(kafkaCluster).flatMap(reactiveAdminClient -> {
            return updateBrokerLogDir(reactiveAdminClient, brokerLogdirUpdateDTO, num);
        });
    }

    private Mono<Void> updateBrokerLogDir(ReactiveAdminClient reactiveAdminClient, BrokerLogdirUpdateDTO brokerLogdirUpdateDTO, Integer num) {
        return reactiveAdminClient.alterReplicaLogDirs(Map.of(new TopicPartitionReplica(brokerLogdirUpdateDTO.getTopic(), brokerLogdirUpdateDTO.getPartition().intValue(), num.intValue()), brokerLogdirUpdateDTO.getLogDir())).onErrorResume(UnknownTopicOrPartitionException.class, unknownTopicOrPartitionException -> {
            return Mono.error(new TopicOrPartitionNotFoundException());
        }).onErrorResume(LogDirNotFoundException.class, logDirNotFoundException -> {
            return Mono.error(new LogDirNotFoundApiException());
        }).doOnError(th -> {
            log.error("Unexpected error", th);
        });
    }

    public Mono<Void> updateBrokerConfigByName(KafkaCluster kafkaCluster, Integer num, String str, String str2) {
        return this.adminClientService.get(kafkaCluster).flatMap(reactiveAdminClient -> {
            return reactiveAdminClient.updateBrokerConfigByName(num, str, str2);
        }).onErrorResume(InvalidRequestException.class, (Function<? super E, ? extends Mono<? extends R>>) invalidRequestException -> {
            return Mono.error(new InvalidRequestApiException(invalidRequestException.getMessage()));
        }).doOnError(th -> {
            log.error("Unexpected error", th);
        });
    }

    private Mono<Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>>> getClusterLogDirs(KafkaCluster kafkaCluster, List<Integer> list) {
        return this.adminClientService.get(kafkaCluster).flatMap(reactiveAdminClient -> {
            List list2 = (List) this.metricsCache.get(kafkaCluster).getClusterDescription().getNodes().stream().map((v0) -> {
                return v0.id();
            }).collect(Collectors.toList());
            if (list != null && !list.isEmpty()) {
                list2.retainAll(list);
            }
            return reactiveAdminClient.describeLogDirs(list2);
        }).onErrorResume(TimeoutException.class, (Function<? super E, ? extends Mono<? extends R>>) timeoutException -> {
            log.error("Error during fetching log dirs", (Throwable) timeoutException);
            return Mono.just(new HashMap());
        });
    }

    public Flux<BrokersLogdirsDTO> getAllBrokersLogdirs(KafkaCluster kafkaCluster, List<Integer> list) {
        Mono<Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>>> clusterLogDirs = getClusterLogDirs(kafkaCluster, list);
        DescribeLogDirsMapper describeLogDirsMapper = this.describeLogDirsMapper;
        Objects.requireNonNull(describeLogDirsMapper);
        return clusterLogDirs.map(describeLogDirsMapper::toBrokerLogDirsList).flatMapMany((v0) -> {
            return Flux.fromIterable(v0);
        });
    }

    public Flux<BrokerConfigDTO> getBrokerConfig(KafkaCluster kafkaCluster, Integer num) {
        Flux<InternalBrokerConfig> brokersConfig = getBrokersConfig(kafkaCluster, num);
        ClusterMapper clusterMapper = this.clusterMapper;
        Objects.requireNonNull(clusterMapper);
        return brokersConfig.map(clusterMapper::toBrokerConfig);
    }

    public Mono<BrokerMetricsDTO> getBrokerMetrics(KafkaCluster kafkaCluster, Integer num) {
        Mono justOrEmpty = Mono.justOrEmpty(this.metricsCache.get(kafkaCluster).getJmxMetrics().getInternalBrokerMetrics().get(num));
        ClusterMapper clusterMapper = this.clusterMapper;
        Objects.requireNonNull(clusterMapper);
        return justOrEmpty.map(clusterMapper::toBrokerMetrics);
    }

    public BrokerService(MetricsCache metricsCache, AdminClientService adminClientService, DescribeLogDirsMapper describeLogDirsMapper, ClusterMapper clusterMapper) {
        this.metricsCache = metricsCache;
        this.adminClientService = adminClientService;
        this.describeLogDirsMapper = describeLogDirsMapper;
        this.clusterMapper = clusterMapper;
    }
}
