package com.provectus.kafka.ui.service;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.provectus.kafka.ui.client.KafkaConnectClients;
import com.provectus.kafka.ui.connect.model.Connector;
import com.provectus.kafka.ui.connect.model.ConnectorPlugin;
import com.provectus.kafka.ui.connect.model.ConnectorStatus;
import com.provectus.kafka.ui.connect.model.ConnectorStatusConnector;
import com.provectus.kafka.ui.connect.model.ConnectorTask;
import com.provectus.kafka.ui.connect.model.ConnectorTopics;
import com.provectus.kafka.ui.connect.model.TaskStatus;
import com.provectus.kafka.ui.exception.ConnectNotFoundException;
import com.provectus.kafka.ui.exception.ValidationException;
import com.provectus.kafka.ui.mapper.ClusterMapper;
import com.provectus.kafka.ui.mapper.KafkaConnectMapper;
import com.provectus.kafka.ui.model.ConnectDTO;
import com.provectus.kafka.ui.model.ConnectorActionDTO;
import com.provectus.kafka.ui.model.ConnectorDTO;
import com.provectus.kafka.ui.model.ConnectorPluginConfigValidationResponseDTO;
import com.provectus.kafka.ui.model.ConnectorPluginDTO;
import com.provectus.kafka.ui.model.ConnectorStateDTO;
import com.provectus.kafka.ui.model.FullConnectorInfoDTO;
import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.model.KafkaConnectCluster;
import com.provectus.kafka.ui.model.NewConnectorDTO;
import com.provectus.kafka.ui.model.TaskDTO;
import com.provectus.kafka.ui.model.connect.InternalConnectInfo;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import org.springframework.web.reactive.function.client.WebClientResponseException;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;

@Service
/* loaded from: input_file:BOOT-INF/classes/com/provectus/kafka/ui/service/KafkaConnectService.class */
public class KafkaConnectService {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) KafkaConnectService.class);
    private final ClusterMapper clusterMapper;
    private final KafkaConnectMapper kafkaConnectMapper;
    private final ObjectMapper objectMapper;
    private final KafkaConfigSanitizer kafkaConfigSanitizer;

    public Mono<Flux<ConnectDTO>> getConnects(KafkaCluster kafkaCluster) {
        Stream<KafkaConnectCluster> stream = kafkaCluster.getKafkaConnect().stream();
        ClusterMapper clusterMapper = this.clusterMapper;
        Objects.requireNonNull(clusterMapper);
        return Mono.just(Flux.fromIterable((Iterable) stream.map(clusterMapper::toKafkaConnect).collect(Collectors.toList())));
    }

    public Flux<FullConnectorInfoDTO> getAllConnectors(KafkaCluster kafkaCluster, String str) {
        Flux flatMap = getConnects(kafkaCluster).flatMapMany(Function.identity()).flatMap(connectDTO -> {
            return getConnectorNames(kafkaCluster, connectDTO.getName());
        }).flatMap(tuple2 -> {
            return getConnector(kafkaCluster, (String) tuple2.getT1(), (String) tuple2.getT2());
        }).flatMap(connectorDTO -> {
            return getConnectorConfig(kafkaCluster, connectorDTO.getConnect(), connectorDTO.getName()).map(map -> {
                return InternalConnectInfo.builder().connector(connectorDTO).config(map).build();
            });
        }).flatMap(internalConnectInfo -> {
            ConnectorDTO connector = internalConnectInfo.getConnector();
            return getConnectorTasks(kafkaCluster, connector.getConnect(), connector.getName()).collectList().map(list -> {
                return InternalConnectInfo.builder().connector(connector).config(internalConnectInfo.getConfig()).tasks(list).build();
            });
        }).flatMap(internalConnectInfo2 -> {
            ConnectorDTO connector = internalConnectInfo2.getConnector();
            return getConnectorTopics(kafkaCluster, connector.getConnect(), connector.getName()).map(connectorTopics -> {
                return InternalConnectInfo.builder().connector(connector).config(internalConnectInfo2.getConfig()).tasks(internalConnectInfo2.getTasks()).topics(connectorTopics.getTopics()).build();
            });
        });
        KafkaConnectMapper kafkaConnectMapper = this.kafkaConnectMapper;
        Objects.requireNonNull(kafkaConnectMapper);
        return flatMap.map(kafkaConnectMapper::fullConnectorInfoFromTuple).filter(matchesSearchTerm(str));
    }

    private Predicate<FullConnectorInfoDTO> matchesSearchTerm(String str) {
        return fullConnectorInfoDTO -> {
            return getSearchValues(fullConnectorInfoDTO).anyMatch(str2 -> {
                return str2.contains(StringUtils.defaultString(str, "").toUpperCase());
            });
        };
    }

    private Stream<String> getSearchValues(FullConnectorInfoDTO fullConnectorInfoDTO) {
        return Stream.of((Object[]) new String[]{fullConnectorInfoDTO.getName(), fullConnectorInfoDTO.getStatus().getState().getValue(), fullConnectorInfoDTO.getType().getValue()}).map((v0) -> {
            return v0.toUpperCase();
        });
    }

    private Mono<ConnectorTopics> getConnectorTopics(KafkaCluster kafkaCluster, String str, String str2) {
        return getConnectAddress(kafkaCluster, str).flatMap(str3 -> {
            return KafkaConnectClients.withBaseUrl(str3).getConnectorTopics(str2).map(map -> {
                return (ConnectorTopics) map.get(str2);
            });
        });
    }

    private Flux<Tuple2<String, String>> getConnectorNames(KafkaCluster kafkaCluster, String str) {
        return getConnectors(kafkaCluster, str).collectList().map(list -> {
            return (String) list.get(0);
        }).map(this::parseToList).flatMapMany((v0) -> {
            return Flux.fromIterable(v0);
        }).map(str2 -> {
            return Tuples.of(str, str2);
        });
    }

    private List<String> parseToList(String str) {
        return (List) this.objectMapper.readValue(str, new TypeReference<List<String>>() { // from class: com.provectus.kafka.ui.service.KafkaConnectService.1
        });
    }

    public Flux<String> getConnectors(KafkaCluster kafkaCluster, String str) {
        return getConnectAddress(kafkaCluster, str).flatMapMany(str2 -> {
            return KafkaConnectClients.withBaseUrl(str2).getConnectors(null).doOnError(th -> {
                log.error("Unexpected error upon getting connectors", th);
            });
        });
    }

    public Mono<ConnectorDTO> createConnector(KafkaCluster kafkaCluster, String str, Mono<NewConnectorDTO> mono) {
        return getConnectAddress(kafkaCluster, str).flatMap(str2 -> {
            Mono flatMap = mono.flatMap(newConnectorDTO -> {
                return connectorExists(kafkaCluster, str, newConnectorDTO.getName()).map(bool -> {
                    if (bool.booleanValue()) {
                        throw new ValidationException(String.format("Connector with name %s already exists", newConnectorDTO.getName()));
                    }
                    return newConnectorDTO;
                });
            });
            KafkaConnectMapper kafkaConnectMapper = this.kafkaConnectMapper;
            Objects.requireNonNull(kafkaConnectMapper);
            return flatMap.map(kafkaConnectMapper::toClient).flatMap(newConnector -> {
                return KafkaConnectClients.withBaseUrl(str2).createConnector(newConnector);
            }).flatMap(connector -> {
                return getConnector(kafkaCluster, str, connector.getName());
            });
        });
    }

    private Mono<Boolean> connectorExists(KafkaCluster kafkaCluster, String str, String str2) {
        return getConnectorNames(kafkaCluster, str).map((v0) -> {
            return v0.getT2();
        }).collectList().map(list -> {
            return Boolean.valueOf(list.contains(str2));
        });
    }

    public Mono<ConnectorDTO> getConnector(KafkaCluster kafkaCluster, String str, String str2) {
        return getConnectAddress(kafkaCluster, str).flatMap(str3 -> {
            Mono<Connector> connector = KafkaConnectClients.withBaseUrl(str3).getConnector(str2);
            KafkaConnectMapper kafkaConnectMapper = this.kafkaConnectMapper;
            Objects.requireNonNull(kafkaConnectMapper);
            return connector.map(kafkaConnectMapper::fromClient).flatMap(connectorDTO -> {
                return KafkaConnectClients.withBaseUrl(str3).getConnectorStatus(connectorDTO.getName()).onErrorResume(WebClientResponseException.NotFound.class, notFound -> {
                    return emptyStatus(str2);
                }).map(connectorStatus -> {
                    ConnectorStatusConnector connector2 = connectorStatus.getConnector();
                    ConnectorDTO connectorDTO = (ConnectorDTO) new ConnectorDTO().connect(str).status(this.kafkaConnectMapper.fromClient(connector2)).type(connectorDTO.getType()).tasks(connectorDTO.getTasks()).name(connectorDTO.getName()).config((Map) connectorDTO.getConfig().entrySet().stream().collect(Collectors.toMap((v0) -> {
                        return v0.getKey();
                    }, entry -> {
                        return this.kafkaConfigSanitizer.sanitize((String) entry.getKey(), entry.getValue());
                    })));
                    if (connectorStatus.getTasks() != null) {
                        Stream<R> map = connectorStatus.getTasks().stream().map((v0) -> {
                            return v0.getState();
                        });
                        TaskStatus.StateEnum stateEnum = TaskStatus.StateEnum.FAILED;
                        Objects.requireNonNull(stateEnum);
                        if (map.anyMatch((v1) -> {
                            return r1.equals(v1);
                        })) {
                            connectorDTO.getStatus().state(ConnectorStateDTO.TASK_FAILED);
                        }
                    }
                    return connectorDTO;
                });
            });
        });
    }

    private Mono<ConnectorStatus> emptyStatus(String str) {
        return Mono.just(new ConnectorStatus().name(str).tasks(List.of()).connector(new ConnectorStatusConnector().state(ConnectorStatusConnector.StateEnum.UNASSIGNED)));
    }

    public Mono<Map<String, Object>> getConnectorConfig(KafkaCluster kafkaCluster, String str, String str2) {
        return getConnectAddress(kafkaCluster, str).flatMap(str3 -> {
            return KafkaConnectClients.withBaseUrl(str3).getConnectorConfig(str2);
        }).map(map -> {
            HashMap hashMap = new HashMap();
            map.forEach((str4, obj) -> {
                hashMap.put(str4, this.kafkaConfigSanitizer.sanitize(str4, obj));
            });
            return hashMap;
        });
    }

    public Mono<ConnectorDTO> setConnectorConfig(KafkaCluster kafkaCluster, String str, String str2, Mono<Object> mono) {
        return getConnectAddress(kafkaCluster, str).flatMap(str3 -> {
            Mono flatMap = mono.flatMap(obj -> {
                return KafkaConnectClients.withBaseUrl(str3).setConnectorConfig(str2, (Map) obj);
            });
            KafkaConnectMapper kafkaConnectMapper = this.kafkaConnectMapper;
            Objects.requireNonNull(kafkaConnectMapper);
            return flatMap.map(kafkaConnectMapper::fromClient);
        });
    }

    public Mono<Void> deleteConnector(KafkaCluster kafkaCluster, String str, String str2) {
        return getConnectAddress(kafkaCluster, str).flatMap(str3 -> {
            return KafkaConnectClients.withBaseUrl(str3).deleteConnector(str2);
        });
    }

    public Mono<Void> updateConnectorState(KafkaCluster kafkaCluster, String str, String str2, ConnectorActionDTO connectorActionDTO) {
        Function function;
        switch (connectorActionDTO) {
            case RESTART:
                function = str3 -> {
                    return KafkaConnectClients.withBaseUrl(str3).restartConnector(str2);
                };
                break;
            case PAUSE:
                function = str4 -> {
                    return KafkaConnectClients.withBaseUrl(str4).pauseConnector(str2);
                };
                break;
            case RESUME:
                function = str5 -> {
                    return KafkaConnectClients.withBaseUrl(str5).resumeConnector(str2);
                };
                break;
            default:
                throw new IllegalStateException("Unexpected value: " + connectorActionDTO);
        }
        return getConnectAddress(kafkaCluster, str).flatMap(function);
    }

    public Flux<TaskDTO> getConnectorTasks(KafkaCluster kafkaCluster, String str, String str2) {
        return getConnectAddress(kafkaCluster, str).flatMapMany(str3 -> {
            Flux<ConnectorTask> onErrorResume = KafkaConnectClients.withBaseUrl(str3).getConnectorTasks(str2).onErrorResume(WebClientResponseException.NotFound.class, notFound -> {
                return Flux.empty();
            });
            KafkaConnectMapper kafkaConnectMapper = this.kafkaConnectMapper;
            Objects.requireNonNull(kafkaConnectMapper);
            return onErrorResume.map(kafkaConnectMapper::fromClient).flatMap(taskDTO -> {
                Mono<TaskStatus> onErrorResume2 = KafkaConnectClients.withBaseUrl(str3).getConnectorTaskStatus(str2, taskDTO.getId().getTask()).onErrorResume(WebClientResponseException.NotFound.class, notFound2 -> {
                    return Mono.empty();
                });
                KafkaConnectMapper kafkaConnectMapper2 = this.kafkaConnectMapper;
                Objects.requireNonNull(kafkaConnectMapper2);
                Mono<R> map = onErrorResume2.map(kafkaConnectMapper2::fromClient);
                Objects.requireNonNull(taskDTO);
                return map.map(taskDTO::status);
            });
        });
    }

    public Mono<Void> restartConnectorTask(KafkaCluster kafkaCluster, String str, String str2, Integer num) {
        return getConnectAddress(kafkaCluster, str).flatMap(str3 -> {
            return KafkaConnectClients.withBaseUrl(str3).restartConnectorTask(str2, num);
        });
    }

    public Mono<Flux<ConnectorPluginDTO>> getConnectorPlugins(KafkaCluster kafkaCluster, String str) {
        return Mono.just(getConnectAddress(kafkaCluster, str).flatMapMany(str2 -> {
            Flux<ConnectorPlugin> connectorPlugins = KafkaConnectClients.withBaseUrl(str2).getConnectorPlugins();
            KafkaConnectMapper kafkaConnectMapper = this.kafkaConnectMapper;
            Objects.requireNonNull(kafkaConnectMapper);
            return connectorPlugins.map(kafkaConnectMapper::fromClient);
        }));
    }

    public Mono<ConnectorPluginConfigValidationResponseDTO> validateConnectorPluginConfig(KafkaCluster kafkaCluster, String str, String str2, Mono<Object> mono) {
        return getConnectAddress(kafkaCluster, str).flatMap(str3 -> {
            Mono flatMap = mono.flatMap(obj -> {
                return KafkaConnectClients.withBaseUrl(str3).validateConnectorPluginConfig(str2, (Map) obj);
            });
            KafkaConnectMapper kafkaConnectMapper = this.kafkaConnectMapper;
            Objects.requireNonNull(kafkaConnectMapper);
            return flatMap.map(kafkaConnectMapper::fromClient);
        });
    }

    private Mono<String> getConnectAddress(KafkaCluster kafkaCluster, String str) {
        return Mono.justOrEmpty((Optional) kafkaCluster.getKafkaConnect().stream().filter(kafkaConnectCluster -> {
            return kafkaConnectCluster.getName().equals(str);
        }).findFirst().map((v0) -> {
            return v0.getAddress();
        })).switchIfEmpty(Mono.error((Supplier<? extends Throwable>) ConnectNotFoundException::new));
    }

    public KafkaConnectService(ClusterMapper clusterMapper, KafkaConnectMapper kafkaConnectMapper, ObjectMapper objectMapper, KafkaConfigSanitizer kafkaConfigSanitizer) {
        this.clusterMapper = clusterMapper;
        this.kafkaConnectMapper = kafkaConnectMapper;
        this.objectMapper = objectMapper;
        this.kafkaConfigSanitizer = kafkaConfigSanitizer;
    }
}
