package com.provectus.kafka.ui.service;

import com.provectus.kafka.ui.client.KsqlClient;
import com.provectus.kafka.ui.exception.ClusterNotFoundException;
import com.provectus.kafka.ui.exception.KsqlDbNotFoundException;
import com.provectus.kafka.ui.exception.UnprocessableEntityException;
import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.model.KsqlCommandDTO;
import com.provectus.kafka.ui.model.KsqlCommandResponseDTO;
import com.provectus.kafka.ui.strategy.ksql.statement.BaseStrategy;
import java.util.List;
import java.util.Objects;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;

@Service
/* loaded from: input_file:BOOT-INF/classes/com/provectus/kafka/ui/service/KsqlService.class */
public class KsqlService {
    private final KsqlClient ksqlClient;
    private final List<BaseStrategy> ksqlStatementStrategies;

    public Mono<KsqlCommandResponseDTO> executeKsqlCommand(KafkaCluster kafkaCluster, Mono<KsqlCommandDTO> mono) {
        Mono flatMap = Mono.justOrEmpty(kafkaCluster).map((v0) -> {
            return v0.getKsqldbServer();
        }).onErrorResume(th -> {
            return Mono.error(th instanceof ClusterNotFoundException ? th : new KsqlDbNotFoundException());
        }).flatMap(str -> {
            return getStatementStrategyForKsqlCommand(mono).map(baseStrategy -> {
                return baseStrategy.host(str);
            });
        });
        KsqlClient ksqlClient = this.ksqlClient;
        Objects.requireNonNull(ksqlClient);
        return flatMap.flatMap(ksqlClient::execute);
    }

    private Mono<BaseStrategy> getStatementStrategyForKsqlCommand(Mono<KsqlCommandDTO> mono) {
        return mono.map(ksqlCommandDTO -> {
            return this.ksqlStatementStrategies.stream().filter(baseStrategy -> {
                return baseStrategy.test(ksqlCommandDTO.getKsql());
            }).map(baseStrategy2 -> {
                return baseStrategy2.ksqlCommand(ksqlCommandDTO);
            }).findFirst();
        }).flatMap(Mono::justOrEmpty).switchIfEmpty(Mono.error(new UnprocessableEntityException("Invalid sql")));
    }

    public KsqlService(KsqlClient ksqlClient, List<BaseStrategy> list) {
        this.ksqlClient = ksqlClient;
        this.ksqlStatementStrategies = list;
    }
}
