package com.provectus.kafka.ui.service;

import com.google.common.base.Preconditions;
import com.provectus.kafka.ui.exception.NotFoundException;
import com.provectus.kafka.ui.exception.ValidationException;
import com.provectus.kafka.ui.model.KafkaCluster;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.common.ConsumerGroupState;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono;

@Component
/* loaded from: input_file:BOOT-INF/classes/com/provectus/kafka/ui/service/OffsetsResetService.class */
public class OffsetsResetService {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) OffsetsResetService.class);
    private final AdminClientService adminClientService;

    public Mono<Void> resetToEarliest(KafkaCluster kafkaCluster, String str, String str2, Collection<Integer> collection) {
        return checkGroupCondition(kafkaCluster, str).flatMap(reactiveAdminClient -> {
            return offsets(reactiveAdminClient, str2, collection, OffsetSpec.earliest()).flatMap(map -> {
                return resetOffsets(reactiveAdminClient, str, map);
            });
        });
    }

    private Mono<Map<TopicPartition, Long>> offsets(ReactiveAdminClient reactiveAdminClient, String str, @Nullable Collection<Integer> collection, OffsetSpec offsetSpec) {
        return collection == null ? reactiveAdminClient.listOffsets(str, offsetSpec) : reactiveAdminClient.listOffsets((Collection<TopicPartition>) collection.stream().map(num -> {
            return new TopicPartition(str, num.intValue());
        }).collect(Collectors.toSet()), offsetSpec);
    }

    public Mono<Void> resetToLatest(KafkaCluster kafkaCluster, String str, String str2, Collection<Integer> collection) {
        return checkGroupCondition(kafkaCluster, str).flatMap(reactiveAdminClient -> {
            return offsets(reactiveAdminClient, str2, collection, OffsetSpec.latest()).flatMap(map -> {
                return resetOffsets(reactiveAdminClient, str, map);
            });
        });
    }

    public Mono<Void> resetToTimestamp(KafkaCluster kafkaCluster, String str, String str2, Collection<Integer> collection, long j) {
        return checkGroupCondition(kafkaCluster, str).flatMap(reactiveAdminClient -> {
            return offsets(reactiveAdminClient, str2, collection, OffsetSpec.forTimestamp(j)).flatMap(map -> {
                return offsets(reactiveAdminClient, str2, collection, OffsetSpec.latest()).map(map -> {
                    return editTsOffsets(map, map);
                });
            }).flatMap(map2 -> {
                return resetOffsets(reactiveAdminClient, str, map2);
            });
        });
    }

    public Mono<Void> resetToOffsets(KafkaCluster kafkaCluster, String str, String str2, Map<Integer, Long> map) {
        Preconditions.checkNotNull(map);
        Map map2 = (Map) map.entrySet().stream().collect(Collectors.toMap(entry -> {
            return new TopicPartition(str2, ((Integer) entry.getKey()).intValue());
        }, (v0) -> {
            return v0.getValue();
        }));
        return checkGroupCondition(kafkaCluster, str).flatMap(reactiveAdminClient -> {
            return reactiveAdminClient.listOffsets(map2.keySet(), OffsetSpec.earliest()).flatMap(map3 -> {
                return reactiveAdminClient.listOffsets(map2.keySet(), OffsetSpec.latest()).map(map3 -> {
                    return editOffsetsBounds(map2, map3, map3);
                }).flatMap(map4 -> {
                    return resetOffsets(reactiveAdminClient, str, map4);
                });
            });
        });
    }

    private Mono<ReactiveAdminClient> checkGroupCondition(KafkaCluster kafkaCluster, String str) {
        return this.adminClientService.get(kafkaCluster).flatMap(reactiveAdminClient -> {
            return reactiveAdminClient.listConsumerGroups().filter(list -> {
                return list.stream().anyMatch(str2 -> {
                    return str2.equals(str);
                });
            }).flatMap(list2 -> {
                return reactiveAdminClient.describeConsumerGroups(List.of(str));
            }).filter(map -> {
                return map.containsKey(str);
            }).map(map2 -> {
                return (ConsumerGroupDescription) map2.get(str);
            }).flatMap(consumerGroupDescription -> {
                return !Set.of(ConsumerGroupState.DEAD, ConsumerGroupState.EMPTY).contains(consumerGroupDescription.state()) ? Mono.error(new ValidationException(String.format("Group's offsets can be reset only if group is inactive, but group is in %s state", consumerGroupDescription.state()))) : Mono.just(reactiveAdminClient);
            }).switchIfEmpty(Mono.error(new NotFoundException("Consumer group not found")));
        });
    }

    private Map<TopicPartition, Long> editTsOffsets(Map<TopicPartition, Long> map, Map<TopicPartition, Long> map2) {
        HashMap hashMap = new HashMap(map2);
        hashMap.putAll(map);
        return hashMap;
    }

    private Map<TopicPartition, Long> editOffsetsBounds(Map<TopicPartition, Long> map, Map<TopicPartition, Long> map2, Map<TopicPartition, Long> map3) {
        HashMap hashMap = new HashMap();
        map.forEach((topicPartition, l) -> {
            if (((Long) map2.get(topicPartition)).longValue() > l.longValue()) {
                log.warn("Offset for partition {} is lower than earliest offset, resetting to earliest", topicPartition);
                hashMap.put(topicPartition, (Long) map2.get(topicPartition));
            } else if (((Long) map3.get(topicPartition)).longValue() >= l.longValue()) {
                hashMap.put(topicPartition, l);
            } else {
                log.warn("Offset for partition {} is greater than latest offset, resetting to latest", topicPartition);
                hashMap.put(topicPartition, (Long) map3.get(topicPartition));
            }
        });
        return hashMap;
    }

    private Mono<Void> resetOffsets(ReactiveAdminClient reactiveAdminClient, String str, Map<TopicPartition, Long> map) {
        return reactiveAdminClient.alterConsumerGroupOffsets(str, map);
    }

    public OffsetsResetService(AdminClientService adminClientService) {
        this.adminClientService = adminClientService;
    }
}
