package com.provectus.kafka.ui.controller;

import com.provectus.kafka.ui.api.ConsumerGroupsApi;
import com.provectus.kafka.ui.exception.ValidationException;
import com.provectus.kafka.ui.model.ConsumerGroupDTO;
import com.provectus.kafka.ui.model.ConsumerGroupDetailsDTO;
import com.provectus.kafka.ui.model.ConsumerGroupOffsetsResetDTO;
import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.service.ConsumerGroupService;
import com.provectus.kafka.ui.service.OffsetsResetService;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.ResponseEntity;
import org.springframework.util.CollectionUtils;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@RestController
/* loaded from: input_file:BOOT-INF/classes/com/provectus/kafka/ui/controller/ConsumerGroupsController.class */
public class ConsumerGroupsController extends AbstractController implements ConsumerGroupsApi {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) ConsumerGroupsController.class);
    private final ConsumerGroupService consumerGroupService;
    private final OffsetsResetService offsetsResetService;

    @Override // com.provectus.kafka.ui.api.ConsumerGroupsApi
    public Mono<ResponseEntity<Void>> deleteConsumerGroup(String str, String str2, ServerWebExchange serverWebExchange) {
        return this.consumerGroupService.deleteConsumerGroupById(getCluster(str), str2).map((v0) -> {
            return ResponseEntity.ok(v0);
        });
    }

    @Override // com.provectus.kafka.ui.api.ConsumerGroupsApi
    public Mono<ResponseEntity<ConsumerGroupDetailsDTO>> getConsumerGroup(String str, String str2, ServerWebExchange serverWebExchange) {
        return this.consumerGroupService.getConsumerGroupDetail(getCluster(str), str2).map((v0) -> {
            return ResponseEntity.ok(v0);
        });
    }

    @Override // com.provectus.kafka.ui.api.ConsumerGroupsApi
    public Mono<ResponseEntity<Flux<ConsumerGroupDTO>>> getConsumerGroups(String str, ServerWebExchange serverWebExchange) {
        return this.consumerGroupService.getConsumerGroups(getCluster(str)).map((v0) -> {
            return Flux.fromIterable(v0);
        }).map((v0) -> {
            return ResponseEntity.ok(v0);
        }).switchIfEmpty(Mono.just(ResponseEntity.notFound().build()));
    }

    @Override // com.provectus.kafka.ui.api.ConsumerGroupsApi
    public Mono<ResponseEntity<Flux<ConsumerGroupDTO>>> getTopicConsumerGroups(String str, String str2, ServerWebExchange serverWebExchange) {
        return this.consumerGroupService.getConsumerGroups(getCluster(str), Optional.of(str2)).map((v0) -> {
            return Flux.fromIterable(v0);
        }).map((v0) -> {
            return ResponseEntity.ok(v0);
        }).switchIfEmpty(Mono.just(ResponseEntity.notFound().build()));
    }

    @Override // com.provectus.kafka.ui.api.ConsumerGroupsApi
    public Mono<ResponseEntity<Void>> resetConsumerGroupOffsets(String str, String str2, Mono<ConsumerGroupOffsetsResetDTO> mono, ServerWebExchange serverWebExchange) {
        return mono.flatMap(consumerGroupOffsetsResetDTO -> {
            KafkaCluster cluster = getCluster(str);
            switch (consumerGroupOffsetsResetDTO.getResetType()) {
                case EARLIEST:
                    return this.offsetsResetService.resetToEarliest(cluster, str2, consumerGroupOffsetsResetDTO.getTopic(), consumerGroupOffsetsResetDTO.getPartitions());
                case LATEST:
                    return this.offsetsResetService.resetToLatest(cluster, str2, consumerGroupOffsetsResetDTO.getTopic(), consumerGroupOffsetsResetDTO.getPartitions());
                case TIMESTAMP:
                    return consumerGroupOffsetsResetDTO.getResetToTimestamp() == null ? Mono.error(new ValidationException("resetToTimestamp is required when TIMESTAMP reset type used")) : this.offsetsResetService.resetToTimestamp(cluster, str2, consumerGroupOffsetsResetDTO.getTopic(), consumerGroupOffsetsResetDTO.getPartitions(), consumerGroupOffsetsResetDTO.getResetToTimestamp().longValue());
                case OFFSET:
                    if (CollectionUtils.isEmpty(consumerGroupOffsetsResetDTO.getPartitionsOffsets())) {
                        return Mono.error(new ValidationException("partitionsOffsets is required when OFFSET reset type used"));
                    }
                    return this.offsetsResetService.resetToOffsets(cluster, str2, consumerGroupOffsetsResetDTO.getTopic(), (Map) consumerGroupOffsetsResetDTO.getPartitionsOffsets().stream().collect(Collectors.toMap((v0) -> {
                        return v0.getPartition();
                    }, (v0) -> {
                        return v0.getOffset();
                    })));
                default:
                    return Mono.error(new ValidationException("Unknown resetType " + consumerGroupOffsetsResetDTO.getResetType()));
            }
        }).thenReturn(ResponseEntity.ok().build());
    }

    public ConsumerGroupsController(ConsumerGroupService consumerGroupService, OffsetsResetService offsetsResetService) {
        this.consumerGroupService = consumerGroupService;
        this.offsetsResetService = offsetsResetService;
    }
}
