package com.provectus.kafka.ui.client;

import com.provectus.kafka.ui.connect.ApiClient;
import com.provectus.kafka.ui.connect.api.KafkaConnectClientApi;
import com.provectus.kafka.ui.connect.model.Connector;
import com.provectus.kafka.ui.connect.model.NewConnector;
import com.provectus.kafka.ui.exception.KafkaConnectConflictReponseException;
import com.provectus.kafka.ui.exception.ValidationException;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.util.MultiValueMap;
import org.springframework.web.client.RestClientException;
import org.springframework.web.reactive.function.client.WebClientResponseException;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;
import reactor.util.retry.RetryBackoffSpec;

/* loaded from: input_file:BOOT-INF/classes/com/provectus/kafka/ui/client/RetryingKafkaConnectClient.class */
public class RetryingKafkaConnectClient extends KafkaConnectClientApi {
    private static final int MAX_RETRIES = 5;
    private static final Logger log = LoggerFactory.getLogger((Class<?>) RetryingKafkaConnectClient.class);
    private static final Duration RETRIES_DELAY = Duration.ofMillis(200);

    /* loaded from: input_file:BOOT-INF/classes/com/provectus/kafka/ui/client/RetryingKafkaConnectClient$RetryingApiClient.class */
    private static class RetryingApiClient extends ApiClient {
        private RetryingApiClient() {
        }

        @Override // com.provectus.kafka.ui.connect.ApiClient
        public <T> Mono<T> invokeAPI(String str, HttpMethod httpMethod, Map<String, Object> map, MultiValueMap<String, String> multiValueMap, Object obj, HttpHeaders httpHeaders, MultiValueMap<String, String> multiValueMap2, MultiValueMap<String, Object> multiValueMap3, List<MediaType> list, MediaType mediaType, String[] strArr, ParameterizedTypeReference<T> parameterizedTypeReference) throws RestClientException {
            return RetryingKafkaConnectClient.withRetryOnConflict(super.invokeAPI(str, httpMethod, map, multiValueMap, obj, httpHeaders, multiValueMap2, multiValueMap3, list, mediaType, strArr, parameterizedTypeReference));
        }

        @Override // com.provectus.kafka.ui.connect.ApiClient
        public <T> Flux<T> invokeFluxAPI(String str, HttpMethod httpMethod, Map<String, Object> map, MultiValueMap<String, String> multiValueMap, Object obj, HttpHeaders httpHeaders, MultiValueMap<String, String> multiValueMap2, MultiValueMap<String, Object> multiValueMap3, List<MediaType> list, MediaType mediaType, String[] strArr, ParameterizedTypeReference<T> parameterizedTypeReference) throws RestClientException {
            return RetryingKafkaConnectClient.withRetryOnConflict(super.invokeFluxAPI(str, httpMethod, map, multiValueMap, obj, httpHeaders, multiValueMap2, multiValueMap3, list, mediaType, strArr, parameterizedTypeReference));
        }
    }

    public RetryingKafkaConnectClient(String str) {
        super(new RetryingApiClient().setBasePath(str));
    }

    private static Retry conflictCodeRetry() {
        return RetryBackoffSpec.fixedDelay(5L, RETRIES_DELAY).filter(th -> {
            return th instanceof WebClientResponseException.Conflict;
        }).onRetryExhaustedThrow((retryBackoffSpec, retrySignal) -> {
            return new KafkaConnectConflictReponseException((WebClientResponseException.Conflict) retrySignal.failure());
        });
    }

    private static <T> Mono<T> withRetryOnConflict(Mono<T> mono) {
        return mono.retryWhen(conflictCodeRetry());
    }

    private static <T> Flux<T> withRetryOnConflict(Flux<T> flux) {
        return flux.retryWhen(conflictCodeRetry());
    }

    private static <T> Mono<T> withBadRequestErrorHandling(Mono<T> mono) {
        return mono.onErrorResume(WebClientResponseException.BadRequest.class, badRequest -> {
            return Mono.error(new ValidationException("Invalid configuration"));
        }).onErrorResume(WebClientResponseException.InternalServerError.class, internalServerError -> {
            return Mono.error(new ValidationException("Invalid configuration"));
        });
    }

    @Override // com.provectus.kafka.ui.connect.api.KafkaConnectClientApi
    public Mono<Connector> createConnector(NewConnector newConnector) throws RestClientException {
        return withBadRequestErrorHandling(super.createConnector(newConnector));
    }

    @Override // com.provectus.kafka.ui.connect.api.KafkaConnectClientApi
    public Mono<Connector> setConnectorConfig(String str, Map<String, Object> map) throws RestClientException {
        return withBadRequestErrorHandling(super.setConnectorConfig(str, map));
    }
}
