package com.provectus.kafka.ui.controller;

import com.provectus.kafka.ui.api.MessagesApi;
import com.provectus.kafka.ui.model.ConsumerPosition;
import com.provectus.kafka.ui.model.CreateTopicMessageDTO;
import com.provectus.kafka.ui.model.SeekDirectionDTO;
import com.provectus.kafka.ui.model.SeekTypeDTO;
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
import com.provectus.kafka.ui.model.TopicMessageSchemaDTO;
import com.provectus.kafka.ui.service.MessagesService;
import com.provectus.kafka.ui.service.TopicsService;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;
import javax.validation.Valid;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@RestController
/* loaded from: input_file:BOOT-INF/classes/com/provectus/kafka/ui/controller/MessagesController.class */
public class MessagesController extends AbstractController implements MessagesApi {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) MessagesController.class);
    private final MessagesService messagesService;
    private final TopicsService topicsService;

    @Override // com.provectus.kafka.ui.api.MessagesApi
    public Mono<ResponseEntity<Void>> deleteTopicMessages(String str, String str2, @Valid List<Integer> list, ServerWebExchange serverWebExchange) {
        return this.messagesService.deleteTopicMessages(getCluster(str), str2, (List) Optional.ofNullable(list).orElse(List.of())).thenReturn(ResponseEntity.ok().build());
    }

    @Override // com.provectus.kafka.ui.api.MessagesApi
    public Mono<ResponseEntity<Flux<TopicMessageEventDTO>>> getTopicMessages(String str, String str2, @Valid SeekTypeDTO seekTypeDTO, @Valid List<String> list, @Valid Integer num, @Valid String str3, @Valid SeekDirectionDTO seekDirectionDTO, ServerWebExchange serverWebExchange) {
        return parseConsumerPosition(str2, seekTypeDTO, list, seekDirectionDTO).map(consumerPosition -> {
            return ResponseEntity.ok(this.messagesService.loadMessages(getCluster(str), str2, consumerPosition, str3, num));
        });
    }

    @Override // com.provectus.kafka.ui.api.MessagesApi
    public Mono<ResponseEntity<TopicMessageSchemaDTO>> getTopicSchema(String str, String str2, ServerWebExchange serverWebExchange) {
        return Mono.just(this.topicsService.getTopicSchema(getCluster(str), str2)).map((v0) -> {
            return ResponseEntity.ok(v0);
        });
    }

    @Override // com.provectus.kafka.ui.api.MessagesApi
    public Mono<ResponseEntity<Void>> sendTopicMessages(String str, String str2, @Valid Mono<CreateTopicMessageDTO> mono, ServerWebExchange serverWebExchange) {
        return mono.flatMap(createTopicMessageDTO -> {
            return this.messagesService.sendMessage(getCluster(str), str2, createTopicMessageDTO).then();
        }).map((v0) -> {
            return ResponseEntity.ok(v0);
        });
    }

    private Mono<ConsumerPosition> parseConsumerPosition(String str, SeekTypeDTO seekTypeDTO, List<String> list, SeekDirectionDTO seekDirectionDTO) {
        return Mono.justOrEmpty(list).defaultIfEmpty(Collections.emptyList()).flatMapIterable(Function.identity()).map(str2 -> {
            String[] split = str2.split("::");
            if (split.length != 2) {
                throw new IllegalArgumentException("Wrong seekTo argument format. See API docs for details");
            }
            return Pair.of(new TopicPartition(str, Integer.parseInt(split[0])), Long.valueOf(Long.parseLong(split[1])));
        }).collectMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }).map(map -> {
            return new ConsumerPosition(seekTypeDTO != null ? seekTypeDTO : SeekTypeDTO.BEGINNING, map, seekDirectionDTO);
        });
    }

    public MessagesController(MessagesService messagesService, TopicsService topicsService) {
        this.messagesService = messagesService;
        this.topicsService = topicsService;
    }
}
