package com.provectus.kafka.ui.controller;

import com.provectus.kafka.ui.api.KsqlApi;
import com.provectus.kafka.ui.model.KsqlCommandDTO;
import com.provectus.kafka.ui.model.KsqlCommandResponseDTO;
import com.provectus.kafka.ui.model.KsqlResponseDTO;
import com.provectus.kafka.ui.model.KsqlTableResponseDTO;
import com.provectus.kafka.ui.service.KsqlService;
import com.provectus.kafka.ui.service.ksql.KsqlApiClient;
import java.util.Map;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@RestController
/* loaded from: input_file:BOOT-INF/classes/com/provectus/kafka/ui/controller/KsqlController.class */
public class KsqlController extends AbstractController implements KsqlApi {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) KsqlController.class);
    private final KsqlService ksqlService;

    @Override // com.provectus.kafka.ui.api.KsqlApi
    public Mono<ResponseEntity<KsqlCommandResponseDTO>> executeKsqlCommand(String str, Mono<KsqlCommandDTO> mono, ServerWebExchange serverWebExchange) {
        return this.ksqlService.executeKsqlCommand(getCluster(str), mono).map((v0) -> {
            return ResponseEntity.ok(v0);
        });
    }

    @Override // com.provectus.kafka.ui.api.KsqlApi
    public Mono<ResponseEntity<Flux<KsqlResponseDTO>>> executeKsql(String str, Mono<KsqlCommandDTO> mono, ServerWebExchange serverWebExchange) {
        return Mono.just(ResponseEntity.ok(mono.flux().flatMap(ksqlCommandDTO -> {
            return new KsqlApiClient(getCluster(str)).execute(ksqlCommandDTO.getKsql(), (Map) Optional.ofNullable(ksqlCommandDTO.getStreamsProperties()).orElse(Map.of()));
        }).map(ksqlResponseTable -> {
            return new KsqlResponseDTO().table(new KsqlTableResponseDTO().header(ksqlResponseTable.getHeader()).columnNames(ksqlResponseTable.getColumnNames()).values(ksqlResponseTable.getValues()));
        })));
    }

    public KsqlController(KsqlService ksqlService) {
        this.ksqlService = ksqlService;
    }
}
