package com.provectus.kafka.ui.service;

import com.google.common.util.concurrent.Uninterruptibles;
import com.provectus.kafka.ui.exception.IllegalEntityStateException;
import com.provectus.kafka.ui.exception.NotFoundException;
import com.provectus.kafka.ui.util.MapUtil;
import com.provectus.kafka.ui.util.NumberUtil;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.clients.admin.DescribeClusterResult;
import org.apache.kafka.clients.admin.DescribeConfigsOptions;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.ListTopicsOptions;
import org.apache.kafka.clients.admin.NewPartitionReassignment;
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.admin.RecordsToDelete;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionReplica;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.apache.kafka.common.errors.GroupNotEmptyException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.requests.DescribeLogDirsResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuples;

/* loaded from: input_file:BOOT-INF/classes/com/provectus/kafka/ui/service/ReactiveAdminClient.class */
public class ReactiveAdminClient implements Closeable {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) ReactiveAdminClient.class);
    private final AdminClient client;
    private final String version;
    private final Set<SupportedFeature> features;

    /* loaded from: input_file:BOOT-INF/classes/com/provectus/kafka/ui/service/ReactiveAdminClient$ClusterDescription.class */
    public static final class ClusterDescription {

        @Nullable
        private final Node controller;
        private final String clusterId;
        private final Collection<Node> nodes;
        private final Set<AclOperation> authorizedOperations;

        public ClusterDescription(@Nullable Node node, String str, Collection<Node> collection, Set<AclOperation> set) {
            this.controller = node;
            this.clusterId = str;
            this.nodes = collection;
            this.authorizedOperations = set;
        }

        @Nullable
        public Node getController() {
            return this.controller;
        }

        public String getClusterId() {
            return this.clusterId;
        }

        public Collection<Node> getNodes() {
            return this.nodes;
        }

        public Set<AclOperation> getAuthorizedOperations() {
            return this.authorizedOperations;
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof ClusterDescription)) {
                return false;
            }
            ClusterDescription clusterDescription = (ClusterDescription) obj;
            Node controller = getController();
            Node controller2 = clusterDescription.getController();
            if (controller == null) {
                if (controller2 != null) {
                    return false;
                }
            } else if (!controller.equals(controller2)) {
                return false;
            }
            String clusterId = getClusterId();
            String clusterId2 = clusterDescription.getClusterId();
            if (clusterId == null) {
                if (clusterId2 != null) {
                    return false;
                }
            } else if (!clusterId.equals(clusterId2)) {
                return false;
            }
            Collection<Node> nodes = getNodes();
            Collection<Node> nodes2 = clusterDescription.getNodes();
            if (nodes == null) {
                if (nodes2 != null) {
                    return false;
                }
            } else if (!nodes.equals(nodes2)) {
                return false;
            }
            Set<AclOperation> authorizedOperations = getAuthorizedOperations();
            Set<AclOperation> authorizedOperations2 = clusterDescription.getAuthorizedOperations();
            return authorizedOperations == null ? authorizedOperations2 == null : authorizedOperations.equals(authorizedOperations2);
        }

        public int hashCode() {
            Node controller = getController();
            int hashCode = (1 * 59) + (controller == null ? 43 : controller.hashCode());
            String clusterId = getClusterId();
            int hashCode2 = (hashCode * 59) + (clusterId == null ? 43 : clusterId.hashCode());
            Collection<Node> nodes = getNodes();
            int hashCode3 = (hashCode2 * 59) + (nodes == null ? 43 : nodes.hashCode());
            Set<AclOperation> authorizedOperations = getAuthorizedOperations();
            return (hashCode3 * 59) + (authorizedOperations == null ? 43 : authorizedOperations.hashCode());
        }

        public String toString() {
            return "ReactiveAdminClient.ClusterDescription(controller=" + getController() + ", clusterId=" + getClusterId() + ", nodes=" + getNodes() + ", authorizedOperations=" + getAuthorizedOperations() + ")";
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:BOOT-INF/classes/com/provectus/kafka/ui/service/ReactiveAdminClient$SupportedFeature.class */
    public enum SupportedFeature {
        INCREMENTAL_ALTER_CONFIGS,
        ALTER_CONFIGS
    }

    public static Mono<ReactiveAdminClient> create(AdminClient adminClient) {
        return getClusterVersionImpl(adminClient).map(str -> {
            return new ReactiveAdminClient(adminClient, str, Set.of(getSupportedUpdateFeatureForVersion(str)));
        });
    }

    private static SupportedFeature getSupportedUpdateFeatureForVersion(String str) {
        return NumberUtil.parserClusterVersion(str) <= 2.3f ? SupportedFeature.ALTER_CONFIGS : SupportedFeature.INCREMENTAL_ALTER_CONFIGS;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <T> Mono<T> toMono(KafkaFuture<T> kafkaFuture) {
        return Mono.create(monoSink -> {
            kafkaFuture.whenComplete((obj, th) -> {
                if (th != null) {
                    monoSink.error(th);
                } else {
                    monoSink.success(obj);
                }
            });
        });
    }

    public Mono<Set<String>> listTopics(boolean z) {
        return toMono(this.client.listTopics(new ListTopicsOptions().listInternal(z)).names());
    }

    public Mono<Void> deleteTopic(String str) {
        return toMono(this.client.deleteTopics(List.of(str)).all());
    }

    public String getVersion() {
        return this.version;
    }

    public Mono<Map<String, List<ConfigEntry>>> getTopicsConfig() {
        return listTopics(true).flatMap((v1) -> {
            return getTopicsConfig(v1);
        });
    }

    public Mono<Map<String, List<ConfigEntry>>> getTopicsConfig(Collection<String> collection) {
        return toMonoWithExceptionFilter(this.client.describeConfigs((List) collection.stream().map(str -> {
            return new ConfigResource(ConfigResource.Type.TOPIC, str);
        }).collect(Collectors.toList()), new DescribeConfigsOptions().includeSynonyms(true)).values(), UnknownTopicOrPartitionException.class).map(map -> {
            return (Map) map.entrySet().stream().collect(Collectors.toMap(entry -> {
                return ((ConfigResource) entry.getKey()).name();
            }, entry2 -> {
                return List.copyOf(((Config) entry2.getValue()).entries());
            }));
        });
    }

    public Mono<Map<Integer, List<ConfigEntry>>> loadBrokersConfig(List<Integer> list) {
        return toMono(this.client.describeConfigs((List) list.stream().map(num -> {
            return new ConfigResource(ConfigResource.Type.BROKER, Integer.toString(num.intValue()));
        }).collect(Collectors.toList())).all()).map(map -> {
            return (Map) map.entrySet().stream().collect(Collectors.toMap(entry -> {
                return Integer.valueOf(((ConfigResource) entry.getKey()).name());
            }, entry2 -> {
                return new ArrayList(((Config) entry2.getValue()).entries());
            }));
        });
    }

    public Mono<Map<String, TopicDescription>> describeTopics() {
        return listTopics(true).flatMap((v1) -> {
            return describeTopics(v1);
        });
    }

    public Mono<Map<String, TopicDescription>> describeTopics(Collection<String> collection) {
        return toMonoWithExceptionFilter(this.client.describeTopics(collection).values(), UnknownTopicOrPartitionException.class);
    }

    private <K, V> Mono<Map<K, V>> toMonoWithExceptionFilter(Map<K, KafkaFuture<V>> map, Class<? extends KafkaException> cls) {
        if (map.isEmpty()) {
            return Mono.just(Map.of());
        }
        List list = (List) map.entrySet().stream().map(entry -> {
            return toMono((KafkaFuture) entry.getValue()).map(obj -> {
                return Tuples.of(entry.getKey(), obj);
            });
        }).collect(Collectors.toList());
        return Mono.create(monoSink -> {
            AtomicInteger atomicInteger = new AtomicInteger();
            ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
            list.forEach(mono -> {
                mono.subscribe(tuple2 -> {
                    concurrentHashMap.put(tuple2.getT1(), tuple2.getT2());
                    if (atomicInteger.incrementAndGet() == list.size()) {
                        monoSink.success(concurrentHashMap);
                    }
                }, th -> {
                    if (!th.getClass().isAssignableFrom(cls)) {
                        monoSink.error(th);
                    } else if (atomicInteger.incrementAndGet() == list.size()) {
                        monoSink.success(concurrentHashMap);
                    }
                });
            });
        });
    }

    public Mono<Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>>> describeLogDirs() {
        return describeCluster().map(clusterDescription -> {
            return (List) clusterDescription.getNodes().stream().map((v0) -> {
                return v0.id();
            }).collect(Collectors.toList());
        }).flatMap((v1) -> {
            return describeLogDirs(v1);
        });
    }

    public Mono<Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>>> describeLogDirs(Collection<Integer> collection) {
        return toMono(this.client.describeLogDirs(collection).all());
    }

    public Mono<ClusterDescription> describeCluster() {
        DescribeClusterResult describeCluster = this.client.describeCluster();
        KafkaFuture<Void> allOf = KafkaFuture.allOf(describeCluster.nodes(), describeCluster.clusterId(), describeCluster.controller(), describeCluster.authorizedOperations());
        return Mono.create(monoSink -> {
            allOf.whenComplete((r10, th) -> {
                if (th != null) {
                    monoSink.error(th);
                } else {
                    try {
                        monoSink.success(new ClusterDescription((Node) Uninterruptibles.getUninterruptibly(describeCluster.controller()), (String) Uninterruptibles.getUninterruptibly(describeCluster.clusterId()), (Collection) Uninterruptibles.getUninterruptibly(describeCluster.nodes()), (Set) Uninterruptibles.getUninterruptibly(describeCluster.authorizedOperations())));
                    } catch (ExecutionException e) {
                    }
                }
            });
        });
    }

    private static Mono<String> getClusterVersionImpl(AdminClient adminClient) {
        return toMono(adminClient.describeCluster().controller()).flatMap(node -> {
            return toMono(adminClient.describeConfigs(List.of(new ConfigResource(ConfigResource.Type.BROKER, String.valueOf(node.id())))).all().thenApply(map -> {
                return (String) map.values().stream().map((v0) -> {
                    return v0.entries();
                }).flatMap((v0) -> {
                    return v0.stream();
                }).filter(configEntry -> {
                    return configEntry.name().contains("inter.broker.protocol.version");
                }).findFirst().map((v0) -> {
                    return v0.value();
                }).orElse("1.0-UNKNOWN");
            }));
        });
    }

    public Mono<Void> deleteConsumerGroups(Collection<String> collection) {
        return toMono(this.client.deleteConsumerGroups(collection).all()).onErrorResume(GroupIdNotFoundException.class, groupIdNotFoundException -> {
            return Mono.error(new NotFoundException("The group id does not exist"));
        }).onErrorResume(GroupNotEmptyException.class, groupNotEmptyException -> {
            return Mono.error(new IllegalEntityStateException("The group is not empty"));
        });
    }

    public Mono<Void> createTopic(String str, int i, short s, Map<String, String> map) {
        return toMono(this.client.createTopics(List.of(new NewTopic(str, i, s).configs(map))).all());
    }

    public Mono<Void> alterPartitionReassignments(Map<TopicPartition, Optional<NewPartitionReassignment>> map) {
        return toMono(this.client.alterPartitionReassignments(map).all());
    }

    public Mono<Void> createPartitions(Map<String, NewPartitions> map) {
        return toMono(this.client.createPartitions(map).all());
    }

    public Mono<Void> updateTopicConfig(String str, Map<String, String> map) {
        return this.features.contains(SupportedFeature.INCREMENTAL_ALTER_CONFIGS) ? incrementalAlterConfig(str, map) : alterConfig(str, map);
    }

    public Mono<List<String>> listConsumerGroups() {
        return toMono(this.client.listConsumerGroups().all()).map(collection -> {
            return (List) collection.stream().map((v0) -> {
                return v0.groupId();
            }).collect(Collectors.toList());
        });
    }

    public Mono<Map<String, ConsumerGroupDescription>> describeConsumerGroups(List<String> list) {
        return toMono(this.client.describeConsumerGroups(list).all());
    }

    public Mono<Map<TopicPartition, OffsetAndMetadata>> listConsumerGroupOffsets(String str) {
        return toMono(this.client.listConsumerGroupOffsets(str).partitionsToOffsetAndMetadata()).map(MapUtil::removeNullValues);
    }

    public Mono<Void> alterConsumerGroupOffsets(String str, Map<TopicPartition, Long> map) {
        return toMono(this.client.alterConsumerGroupOffsets(str, (Map) map.entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            return new OffsetAndMetadata(((Long) entry.getValue()).longValue());
        }))).all());
    }

    public Mono<Map<TopicPartition, Long>> listOffsets(String str, OffsetSpec offsetSpec) {
        return topicPartitions(str).flatMap(set -> {
            return listOffsets(set, offsetSpec);
        });
    }

    public Mono<Map<TopicPartition, Long>> listOffsets(Collection<TopicPartition> collection, OffsetSpec offsetSpec) {
        return toMono(this.client.listOffsets((Map) collection.stream().collect(Collectors.toMap(topicPartition -> {
            return topicPartition;
        }, topicPartition2 -> {
            return offsetSpec;
        }))).all()).map(map -> {
            return (Map) map.entrySet().stream().filter(entry -> {
                return ((ListOffsetsResult.ListOffsetsResultInfo) entry.getValue()).offset() >= 0;
            }).collect(Collectors.toMap((v0) -> {
                return v0.getKey();
            }, entry2 -> {
                return Long.valueOf(((ListOffsetsResult.ListOffsetsResultInfo) entry2.getValue()).offset());
            }));
        });
    }

    private Mono<Set<TopicPartition>> topicPartitions(String str) {
        return toMono(this.client.describeTopics(List.of(str)).all()).map(map -> {
            return (Set) map.values().stream().findFirst().stream().flatMap(topicDescription -> {
                return topicDescription.partitions().stream();
            }).map(topicPartitionInfo -> {
                return new TopicPartition(str, topicPartitionInfo.partition());
            }).collect(Collectors.toSet());
        });
    }

    public Mono<Void> updateBrokerConfigByName(Integer num, String str, String str2) {
        return toMono(this.client.incrementalAlterConfigs(Map.of(new ConfigResource(ConfigResource.Type.BROKER, String.valueOf(num)), List.of(new AlterConfigOp(new ConfigEntry(str, str2), AlterConfigOp.OpType.SET)))).all());
    }

    public Mono<Void> deleteRecords(Map<TopicPartition, Long> map) {
        return toMono(this.client.deleteRecords((Map) map.entrySet().stream().map(entry -> {
            return Map.entry((TopicPartition) entry.getKey(), RecordsToDelete.beforeOffset(((Long) entry.getValue()).longValue()));
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }))).all());
    }

    public Mono<Void> alterReplicaLogDirs(Map<TopicPartitionReplica, String> map) {
        return toMono(this.client.alterReplicaLogDirs(map).all());
    }

    private Mono<Void> incrementalAlterConfig(String str, Map<String, String> map) {
        List list = (List) map.entrySet().stream().flatMap(entry -> {
            return Stream.of(new AlterConfigOp(new ConfigEntry((String) entry.getKey(), (String) entry.getValue()), AlterConfigOp.OpType.SET));
        }).collect(Collectors.toList());
        return toMono(this.client.incrementalAlterConfigs(Map.of(new ConfigResource(ConfigResource.Type.TOPIC, str), list)).all());
    }

    private Mono<Void> alterConfig(String str, Map<String, String> map) {
        Config config = new Config((List) map.entrySet().stream().flatMap(entry -> {
            return Stream.of(new ConfigEntry((String) entry.getKey(), (String) entry.getValue()));
        }).collect(Collectors.toList()));
        return toMono(this.client.alterConfigs(Map.of(new ConfigResource(ConfigResource.Type.TOPIC, str), config)).all());
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.client.close();
    }

    public ReactiveAdminClient(AdminClient adminClient, String str, Set<SupportedFeature> set) {
        this.client = adminClient;
        this.version = str;
        this.features = set;
    }
}
