package com.provectus.kafka.ui.service;

import com.provectus.kafka.ui.exception.DuplicateEntityException;
import com.provectus.kafka.ui.exception.SchemaNotFoundException;
import com.provectus.kafka.ui.exception.SchemaTypeIsNotSupportedException;
import com.provectus.kafka.ui.exception.UnprocessableEntityException;
import com.provectus.kafka.ui.exception.ValidationException;
import com.provectus.kafka.ui.mapper.ClusterMapper;
import com.provectus.kafka.ui.model.CompatibilityCheckResponseDTO;
import com.provectus.kafka.ui.model.CompatibilityLevelDTO;
import com.provectus.kafka.ui.model.InternalSchemaRegistry;
import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.model.NewSchemaSubjectDTO;
import com.provectus.kafka.ui.model.SchemaSubjectDTO;
import com.provectus.kafka.ui.model.SchemaTypeDTO;
import com.provectus.kafka.ui.model.schemaregistry.ErrorResponse;
import com.provectus.kafka.ui.model.schemaregistry.InternalCompatibilityCheck;
import com.provectus.kafka.ui.model.schemaregistry.InternalCompatibilityLevel;
import com.provectus.kafka.ui.model.schemaregistry.InternalNewSchema;
import com.provectus.kafka.ui.model.schemaregistry.SubjectIdResponse;
import java.util.Formatter;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Service;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Service
/* loaded from: input_file:BOOT-INF/classes/com/provectus/kafka/ui/service/SchemaRegistryService.class */
public class SchemaRegistryService {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) SchemaRegistryService.class);
    public static final String NO_SUCH_SCHEMA_VERSION = "No such schema %s with version %s";
    public static final String NO_SUCH_SCHEMA = "No such schema %s";
    private static final String URL_SUBJECTS = "/subjects";
    private static final String URL_SUBJECT = "/subjects/{schemaName}";
    private static final String URL_SUBJECT_VERSIONS = "/subjects/{schemaName}/versions";
    private static final String URL_SUBJECT_BY_VERSION = "/subjects/{schemaName}/versions/{version}";
    private static final String LATEST = "latest";
    private static final String UNRECOGNIZED_FIELD_SCHEMA_TYPE = "Unrecognized field: schemaType";
    private final ClusterMapper mapper;
    private final WebClient webClient;

    public Flux<SchemaSubjectDTO> getAllLatestVersionSchemas(KafkaCluster kafkaCluster) {
        return getAllSubjectNames(kafkaCluster).flatMapMany((v0) -> {
            return Flux.fromArray(v0);
        }).flatMap(str -> {
            return getLatestSchemaVersionBySubject(kafkaCluster, str);
        });
    }

    public Mono<String[]> getAllSubjectNames(KafkaCluster kafkaCluster) {
        return configuredWebClient(kafkaCluster, HttpMethod.GET, URL_SUBJECTS, new Object[0]).retrieve().bodyToMono(String[].class).doOnError(th -> {
            log.error("Unexpected error", th);
        });
    }

    public Flux<SchemaSubjectDTO> getAllVersionsBySubject(KafkaCluster kafkaCluster, String str) {
        return getSubjectVersions(kafkaCluster, str).flatMap(num -> {
            return getSchemaSubjectByVersion(kafkaCluster, str, num);
        });
    }

    private Flux<Integer> getSubjectVersions(KafkaCluster kafkaCluster, String str) {
        WebClient.ResponseSpec retrieve = configuredWebClient(kafkaCluster, HttpMethod.GET, URL_SUBJECT_VERSIONS, str).retrieve();
        HttpStatus httpStatus = HttpStatus.NOT_FOUND;
        Objects.requireNonNull(httpStatus);
        return retrieve.onStatus((v1) -> {
            return r1.equals(v1);
        }, throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA, str))).bodyToFlux(Integer.class);
    }

    public Mono<SchemaSubjectDTO> getSchemaSubjectByVersion(KafkaCluster kafkaCluster, String str, Integer num) {
        return getSchemaSubject(kafkaCluster, str, String.valueOf(num));
    }

    public Mono<SchemaSubjectDTO> getLatestSchemaVersionBySubject(KafkaCluster kafkaCluster, String str) {
        return getSchemaSubject(kafkaCluster, str, "latest");
    }

    private Mono<SchemaSubjectDTO> getSchemaSubject(KafkaCluster kafkaCluster, String str, String str2) {
        WebClient.ResponseSpec retrieve = configuredWebClient(kafkaCluster, HttpMethod.GET, URL_SUBJECT_BY_VERSION, str, str2).retrieve();
        HttpStatus httpStatus = HttpStatus.NOT_FOUND;
        Objects.requireNonNull(httpStatus);
        return retrieve.onStatus((v1) -> {
            return r1.equals(v1);
        }, throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA_VERSION, str, str2))).bodyToMono(SchemaSubjectDTO.class).map(this::withSchemaType).zipWith(getSchemaCompatibilityInfoOrGlobal(kafkaCluster, str)).map(tuple2 -> {
            SchemaSubjectDTO schemaSubjectDTO = (SchemaSubjectDTO) tuple2.getT1();
            schemaSubjectDTO.setCompatibilityLevel(((CompatibilityLevelDTO) tuple2.getT2()).getCompatibility().getValue());
            return schemaSubjectDTO;
        });
    }

    @NotNull
    private SchemaSubjectDTO withSchemaType(SchemaSubjectDTO schemaSubjectDTO) {
        return schemaSubjectDTO.schemaType((SchemaTypeDTO) Optional.ofNullable(schemaSubjectDTO.getSchemaType()).orElse(SchemaTypeDTO.AVRO));
    }

    public Mono<ResponseEntity<Void>> deleteSchemaSubjectByVersion(KafkaCluster kafkaCluster, String str, Integer num) {
        return deleteSchemaSubject(kafkaCluster, str, String.valueOf(num));
    }

    public Mono<ResponseEntity<Void>> deleteLatestSchemaSubject(KafkaCluster kafkaCluster, String str) {
        return deleteSchemaSubject(kafkaCluster, str, "latest");
    }

    private Mono<ResponseEntity<Void>> deleteSchemaSubject(KafkaCluster kafkaCluster, String str, String str2) {
        WebClient.ResponseSpec retrieve = configuredWebClient(kafkaCluster, HttpMethod.DELETE, URL_SUBJECT_BY_VERSION, str, str2).retrieve();
        HttpStatus httpStatus = HttpStatus.NOT_FOUND;
        Objects.requireNonNull(httpStatus);
        return retrieve.onStatus((v1) -> {
            return r1.equals(v1);
        }, throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA_VERSION, str, str2))).toBodilessEntity();
    }

    public Mono<ResponseEntity<Void>> deleteSchemaSubjectEntirely(KafkaCluster kafkaCluster, String str) {
        WebClient.ResponseSpec retrieve = configuredWebClient(kafkaCluster, HttpMethod.DELETE, URL_SUBJECT, str).retrieve();
        HttpStatus httpStatus = HttpStatus.NOT_FOUND;
        Objects.requireNonNull(httpStatus);
        return retrieve.onStatus((v1) -> {
            return r1.equals(v1);
        }, throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA, str))).toBodilessEntity();
    }

    public Mono<SchemaSubjectDTO> registerNewSchema(KafkaCluster kafkaCluster, Mono<NewSchemaSubjectDTO> mono) {
        return mono.flatMap(newSchemaSubjectDTO -> {
            Mono<InternalNewSchema> just = Mono.just(new InternalNewSchema(newSchemaSubjectDTO.getSchema(), SchemaTypeDTO.AVRO == newSchemaSubjectDTO.getSchemaType() ? null : newSchemaSubjectDTO.getSchemaType()));
            String subject = newSchemaSubjectDTO.getSubject();
            InternalSchemaRegistry schemaRegistry = kafkaCluster.getSchemaRegistry();
            return checkSchemaOnDuplicate(subject, just, schemaRegistry).flatMap(schemaSubjectDTO -> {
                return submitNewSchema(subject, just, schemaRegistry);
            }).flatMap(subjectIdResponse -> {
                return getLatestSchemaVersionBySubject(kafkaCluster, subject);
            });
        });
    }

    @NotNull
    private Mono<SubjectIdResponse> submitNewSchema(String str, Mono<InternalNewSchema> mono, InternalSchemaRegistry internalSchemaRegistry) {
        WebClient.ResponseSpec retrieve = configuredWebClient(internalSchemaRegistry, HttpMethod.POST, URL_SUBJECT_VERSIONS, str).contentType(MediaType.APPLICATION_JSON).body(BodyInserters.fromPublisher(mono, InternalNewSchema.class)).retrieve();
        HttpStatus httpStatus = HttpStatus.UNPROCESSABLE_ENTITY;
        Objects.requireNonNull(httpStatus);
        return retrieve.onStatus((v1) -> {
            return r1.equals(v1);
        }, clientResponse -> {
            return clientResponse.bodyToMono(ErrorResponse.class).flatMap(errorResponse -> {
                return Mono.error(isUnrecognizedFieldSchemaTypeMessage(errorResponse.getMessage()) ? new SchemaTypeIsNotSupportedException() : new UnprocessableEntityException(errorResponse.getMessage()));
            });
        }).bodyToMono(SubjectIdResponse.class);
    }

    @NotNull
    private Mono<SchemaSubjectDTO> checkSchemaOnDuplicate(String str, Mono<InternalNewSchema> mono, InternalSchemaRegistry internalSchemaRegistry) {
        WebClient.ResponseSpec retrieve = configuredWebClient(internalSchemaRegistry, HttpMethod.POST, URL_SUBJECT, str).contentType(MediaType.APPLICATION_JSON).body(BodyInserters.fromPublisher(mono, InternalNewSchema.class)).retrieve();
        HttpStatus httpStatus = HttpStatus.NOT_FOUND;
        Objects.requireNonNull(httpStatus);
        WebClient.ResponseSpec onStatus = retrieve.onStatus((v1) -> {
            return r1.equals(v1);
        }, clientResponse -> {
            return Mono.empty();
        });
        HttpStatus httpStatus2 = HttpStatus.UNPROCESSABLE_ENTITY;
        Objects.requireNonNull(httpStatus2);
        return onStatus.onStatus((v1) -> {
            return r1.equals(v1);
        }, clientResponse2 -> {
            return clientResponse2.bodyToMono(ErrorResponse.class).flatMap(errorResponse -> {
                return Mono.error(isUnrecognizedFieldSchemaTypeMessage(errorResponse.getMessage()) ? new SchemaTypeIsNotSupportedException() : new UnprocessableEntityException(errorResponse.getMessage()));
            });
        }).bodyToMono(SchemaSubjectDTO.class).filter(schemaSubjectDTO -> {
            return Objects.isNull(schemaSubjectDTO.getId());
        }).switchIfEmpty(Mono.error(new DuplicateEntityException("Such schema already exists")));
    }

    @NotNull
    private Function<ClientResponse, Mono<? extends Throwable>> throwIfNotFoundStatus(String str) {
        return clientResponse -> {
            return Mono.error(new SchemaNotFoundException(str));
        };
    }

    public Mono<Void> updateSchemaCompatibility(KafkaCluster kafkaCluster, String str, Mono<CompatibilityLevelDTO> mono) {
        WebClient.ResponseSpec retrieve = configuredWebClient(kafkaCluster, HttpMethod.PUT, Objects.isNull(str) ? "/config" : "/config/{schemaName}", str).contentType(MediaType.APPLICATION_JSON).body(BodyInserters.fromPublisher(mono, CompatibilityLevelDTO.class)).retrieve();
        HttpStatus httpStatus = HttpStatus.NOT_FOUND;
        Objects.requireNonNull(httpStatus);
        return retrieve.onStatus((v1) -> {
            return r1.equals(v1);
        }, throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA, str))).bodyToMono(Void.class);
    }

    public Mono<Void> updateSchemaCompatibility(KafkaCluster kafkaCluster, Mono<CompatibilityLevelDTO> mono) {
        return updateSchemaCompatibility(kafkaCluster, null, mono);
    }

    public Mono<CompatibilityLevelDTO> getSchemaCompatibilityLevel(KafkaCluster kafkaCluster, String str) {
        Mono bodyToMono = configuredWebClient(kafkaCluster, HttpMethod.GET, Objects.isNull(str) ? "/config" : "/config/{schemaName}", str).retrieve().bodyToMono(InternalCompatibilityLevel.class);
        ClusterMapper clusterMapper = this.mapper;
        Objects.requireNonNull(clusterMapper);
        return bodyToMono.map(clusterMapper::toCompatibilityLevel).onErrorResume(th -> {
            return Mono.empty();
        });
    }

    public Mono<CompatibilityLevelDTO> getGlobalSchemaCompatibilityLevel(KafkaCluster kafkaCluster) {
        return getSchemaCompatibilityLevel(kafkaCluster, null);
    }

    private Mono<CompatibilityLevelDTO> getSchemaCompatibilityInfoOrGlobal(KafkaCluster kafkaCluster, String str) {
        return getSchemaCompatibilityLevel(kafkaCluster, str).switchIfEmpty(getGlobalSchemaCompatibilityLevel(kafkaCluster));
    }

    public Mono<CompatibilityCheckResponseDTO> checksSchemaCompatibility(KafkaCluster kafkaCluster, String str, Mono<NewSchemaSubjectDTO> mono) {
        WebClient.ResponseSpec retrieve = configuredWebClient(kafkaCluster, HttpMethod.POST, "/compatibility/subjects/{schemaName}/versions/latest", str).contentType(MediaType.APPLICATION_JSON).body(BodyInserters.fromPublisher(mono, NewSchemaSubjectDTO.class)).retrieve();
        HttpStatus httpStatus = HttpStatus.NOT_FOUND;
        Objects.requireNonNull(httpStatus);
        Mono bodyToMono = retrieve.onStatus((v1) -> {
            return r1.equals(v1);
        }, throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA, str))).bodyToMono(InternalCompatibilityCheck.class);
        ClusterMapper clusterMapper = this.mapper;
        Objects.requireNonNull(clusterMapper);
        return bodyToMono.map(clusterMapper::toCompatibilityCheckResponse);
    }

    public String formatted(String str, Object... objArr) {
        return new Formatter().format(str, objArr).toString();
    }

    private void setBasicAuthIfEnabled(InternalSchemaRegistry internalSchemaRegistry, HttpHeaders httpHeaders) {
        if (internalSchemaRegistry.getUsername() != null && internalSchemaRegistry.getPassword() != null) {
            httpHeaders.setBasicAuth(internalSchemaRegistry.getUsername(), internalSchemaRegistry.getPassword());
        } else {
            if (internalSchemaRegistry.getUsername() != null) {
                throw new ValidationException("You specified username but do not specified password");
            }
            if (internalSchemaRegistry.getPassword() != null) {
                throw new ValidationException("You specified password but do not specified username");
            }
        }
    }

    private WebClient.RequestBodySpec configuredWebClient(KafkaCluster kafkaCluster, HttpMethod httpMethod, String str, Object... objArr) {
        return configuredWebClient(kafkaCluster.getSchemaRegistry(), httpMethod, str, objArr);
    }

    private WebClient.RequestBodySpec configuredWebClient(InternalSchemaRegistry internalSchemaRegistry, HttpMethod httpMethod, String str, Object... objArr) {
        return ((WebClient.RequestBodySpec) this.webClient.method(httpMethod).uri(internalSchemaRegistry.getFirstUrl() + str, objArr)).headers(httpHeaders -> {
            setBasicAuthIfEnabled(internalSchemaRegistry, httpHeaders);
        });
    }

    private boolean isUnrecognizedFieldSchemaTypeMessage(String str) {
        return str.contains(UNRECOGNIZED_FIELD_SCHEMA_TYPE);
    }

    public SchemaRegistryService(ClusterMapper clusterMapper, WebClient webClient) {
        this.mapper = clusterMapper;
        this.webClient = webClient;
    }
}
