package com.provectus.kafka.ui.serde.schemaregistry;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.provectus.kafka.ui.exception.ValidationException;
import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.model.MessageSchemaDTO;
import com.provectus.kafka.ui.model.TopicMessageSchemaDTO;
import com.provectus.kafka.ui.serde.RecordSerDe;
import com.provectus.kafka.ui.util.jsonschema.AvroJsonSchemaConverter;
import com.provectus.kafka.ui.util.jsonschema.JsonSchema;
import com.provectus.kafka.ui.util.jsonschema.ProtobufSchemaConverter;
import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.SchemaProvider;
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.kafka.schemaregistry.json.JsonSchemaProvider;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Nullable;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.utils.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/classes/com/provectus/kafka/ui/serde/schemaregistry/SchemaRegistryAwareRecordSerDe.class */
public class SchemaRegistryAwareRecordSerDe implements RecordSerDe {
    private static final int CLIENT_IDENTITY_MAP_CAPACITY = 100;
    private final KafkaCluster cluster;
    private final Map<String, MessageFormatter> valueFormatMap = new ConcurrentHashMap();
    private final Map<String, MessageFormatter> keyFormatMap = new ConcurrentHashMap();

    @Nullable
    private final SchemaRegistryClient schemaRegistryClient;

    @Nullable
    private final AvroMessageFormatter avroFormatter;

    @Nullable
    private final ProtobufMessageFormatter protobufFormatter;

    @Nullable
    private final JsonSchemaMessageFormatter jsonSchemaMessageFormatter;
    private final ObjectMapper objectMapper;
    private static final Logger log = LoggerFactory.getLogger((Class<?>) SchemaRegistryAwareRecordSerDe.class);
    private static final StringMessageFormatter stringFormatter = new StringMessageFormatter();
    private static final ProtobufSchemaConverter protoSchemaConverter = new ProtobufSchemaConverter();
    private static final AvroJsonSchemaConverter avroSchemaConverter = new AvroJsonSchemaConverter();

    private SchemaRegistryClient createSchemaRegistryClient(KafkaCluster kafkaCluster) {
        if (kafkaCluster.getSchemaRegistry() == null) {
            throw new ValidationException("schemaRegistry is not specified");
        }
        List of = List.of(new AvroSchemaProvider(), new ProtobufSchemaProvider(), new JsonSchemaProvider());
        HashMap hashMap = new HashMap();
        String username = kafkaCluster.getSchemaRegistry().getUsername();
        String password = kafkaCluster.getSchemaRegistry().getPassword();
        if (username != null && password != null) {
            hashMap.put("basic.auth.credentials.source", "USER_INFO");
            hashMap.put("basic.auth.user.info", username + ":" + password);
        } else {
            if (username != null) {
                throw new ValidationException("You specified username but do not specified password");
            }
            if (password != null) {
                throw new ValidationException("You specified password but do not specified username");
            }
        }
        return new CachedSchemaRegistryClient(kafkaCluster.getSchemaRegistry().getUrl(), 100, (List<SchemaProvider>) of, hashMap);
    }

    public SchemaRegistryAwareRecordSerDe(KafkaCluster kafkaCluster, ObjectMapper objectMapper) {
        this.cluster = kafkaCluster;
        this.objectMapper = objectMapper;
        this.schemaRegistryClient = kafkaCluster.getSchemaRegistry() != null ? createSchemaRegistryClient(kafkaCluster) : null;
        if (this.schemaRegistryClient != null) {
            this.avroFormatter = new AvroMessageFormatter(this.schemaRegistryClient);
            this.protobufFormatter = new ProtobufMessageFormatter(this.schemaRegistryClient);
            this.jsonSchemaMessageFormatter = new JsonSchemaMessageFormatter(this.schemaRegistryClient);
        } else {
            this.avroFormatter = null;
            this.protobufFormatter = null;
            this.jsonSchemaMessageFormatter = null;
        }
    }

    @Override // com.provectus.kafka.ui.serde.RecordSerDe
    public RecordSerDe.DeserializedKeyValue deserialize(ConsumerRecord<Bytes, Bytes> consumerRecord) {
        try {
            RecordSerDe.DeserializedKeyValue.DeserializedKeyValueBuilder builder = RecordSerDe.DeserializedKeyValue.builder();
            if (consumerRecord.key() != null) {
                MessageFormatter messageFormatter = getMessageFormatter(consumerRecord, true);
                builder.key(messageFormatter.format(consumerRecord.topic(), consumerRecord.key().get()));
                builder.keyFormat(messageFormatter.getFormat());
                builder.keySchemaId((String) getSchemaId(consumerRecord.key(), messageFormatter.getFormat()).map((v0) -> {
                    return String.valueOf(v0);
                }).orElse(null));
            }
            if (consumerRecord.value() != null) {
                MessageFormatter messageFormatter2 = getMessageFormatter(consumerRecord, false);
                builder.value(messageFormatter2.format(consumerRecord.topic(), consumerRecord.value().get()));
                builder.valueFormat(messageFormatter2.getFormat());
                builder.valueSchemaId((String) getSchemaId(consumerRecord.value(), messageFormatter2.getFormat()).map((v0) -> {
                    return String.valueOf(v0);
                }).orElse(null));
            }
            return builder.build();
        } catch (Throwable th) {
            throw new RuntimeException("Failed to parse record from topic " + consumerRecord.topic(), th);
        }
    }

    @Override // com.provectus.kafka.ui.serde.RecordSerDe
    public ProducerRecord<byte[], byte[]> serialize(String str, @Nullable String str2, @Nullable String str3, @Nullable Integer num) {
        Optional<SchemaMetadata> schemaBySubject = getSchemaBySubject(str, true);
        Optional<SchemaMetadata> schemaBySubject2 = getSchemaBySubject(str, false);
        return new ProducerRecord<>(str, num, schemaBySubject.isPresent() ? serialize(schemaBySubject.get(), str, str2, true) : serialize(str2), schemaBySubject2.isPresent() ? serialize(schemaBySubject2.get(), str, str3, false) : serialize(str3));
    }

    private byte[] serialize(SchemaMetadata schemaMetadata, String str, String str2, boolean z) {
        MessageReader jsonSchemaMessageReader;
        if (str2 == null) {
            return null;
        }
        if (schemaMetadata.getSchemaType().equals(MessageFormat.PROTOBUF.name())) {
            jsonSchemaMessageReader = new ProtobufMessageReader(str, z, this.schemaRegistryClient, schemaMetadata);
        } else if (schemaMetadata.getSchemaType().equals(MessageFormat.AVRO.name())) {
            jsonSchemaMessageReader = new AvroMessageReader(str, z, this.schemaRegistryClient, schemaMetadata);
        } else {
            if (!schemaMetadata.getSchemaType().equals(MessageFormat.JSON.name())) {
                throw new IllegalStateException("Unsupported schema type: " + schemaMetadata.getSchemaType());
            }
            jsonSchemaMessageReader = new JsonSchemaMessageReader(str, z, this.schemaRegistryClient, schemaMetadata);
        }
        return jsonSchemaMessageReader.read(str2);
    }

    private byte[] serialize(String str) {
        if (str == null) {
            return null;
        }
        return str.getBytes();
    }

    @Override // com.provectus.kafka.ui.serde.RecordSerDe
    public TopicMessageSchemaDTO getTopicSchema(String str) {
        Optional<SchemaMetadata> schemaBySubject = getSchemaBySubject(str, false);
        Optional<SchemaMetadata> schemaBySubject2 = getSchemaBySubject(str, true);
        String str2 = (String) schemaBySubject.map(this::convertSchema).orElseGet(() -> {
            return JsonSchema.stringSchema().toJson(this.objectMapper);
        });
        MessageSchemaDTO schema = new MessageSchemaDTO().name((String) schemaBySubject2.map(schemaMetadata -> {
            return schemaSubject(str, true);
        }).orElse("unknown")).source(MessageSchemaDTO.SourceEnum.SCHEMA_REGISTRY).schema((String) schemaBySubject2.map(this::convertSchema).orElseGet(() -> {
            return JsonSchema.stringSchema().toJson(this.objectMapper);
        }));
        return new TopicMessageSchemaDTO().key(schema).value(new MessageSchemaDTO().name((String) schemaBySubject.map(schemaMetadata2 -> {
            return schemaSubject(str, false);
        }).orElse("unknown")).source(MessageSchemaDTO.SourceEnum.SCHEMA_REGISTRY).schema(str2));
    }

    private String convertSchema(SchemaMetadata schemaMetadata) {
        URI resolve = new URI(this.cluster.getSchemaRegistry().getFirstUrl()).resolve(Integer.toString(schemaMetadata.getId()));
        ParsedSchema schemaById = ((SchemaRegistryClient) Objects.requireNonNull(this.schemaRegistryClient)).getSchemaById(schemaMetadata.getId());
        return schemaMetadata.getSchemaType().equals(MessageFormat.PROTOBUF.name()) ? protoSchemaConverter.convert(resolve, ((ProtobufSchema) schemaById).toDescriptor()).toJson(this.objectMapper) : schemaMetadata.getSchemaType().equals(MessageFormat.AVRO.name()) ? avroSchemaConverter.convert(resolve, ((AvroSchema) schemaById).rawSchema()).toJson(this.objectMapper) : schemaMetadata.getSchemaType().equals(MessageFormat.JSON.name()) ? schemaMetadata.getSchema() : JsonSchema.stringSchema().toJson(this.objectMapper);
    }

    private MessageFormatter getMessageFormatter(ConsumerRecord<Bytes, Bytes> consumerRecord, boolean z) {
        return z ? this.keyFormatMap.computeIfAbsent(consumerRecord.topic(), str -> {
            return detectFormat(consumerRecord, true);
        }) : this.valueFormatMap.computeIfAbsent(consumerRecord.topic(), str2 -> {
            return detectFormat(consumerRecord, false);
        });
    }

    private MessageFormatter detectFormat(ConsumerRecord<Bytes, Bytes> consumerRecord, boolean z) {
        if (this.schemaRegistryClient != null) {
            try {
                Optional<String> or = getSchemaFromMessage(consumerRecord, z).or(() -> {
                    return getSchemaBySubject(consumerRecord.topic(), z).map((v0) -> {
                        return v0.getSchemaType();
                    });
                });
                if (or.isPresent()) {
                    if (or.get().equals(MessageFormat.PROTOBUF.name())) {
                        if (tryFormatter(this.protobufFormatter, consumerRecord, z).isPresent()) {
                            return this.protobufFormatter;
                        }
                    } else if (or.get().equals(MessageFormat.AVRO.name())) {
                        if (tryFormatter(this.avroFormatter, consumerRecord, z).isPresent()) {
                            return this.avroFormatter;
                        }
                    } else {
                        if (!or.get().equals(MessageFormat.JSON.name())) {
                            throw new IllegalStateException("Unsupported schema type: " + or.get());
                        }
                        if (tryFormatter(this.jsonSchemaMessageFormatter, consumerRecord, z).isPresent()) {
                            return this.jsonSchemaMessageFormatter;
                        }
                    }
                }
            } catch (Exception e) {
                log.warn("Failed to get Schema for topic {}", consumerRecord.topic(), e);
            }
        }
        return stringFormatter;
    }

    private Optional<MessageFormatter> tryFormatter(MessageFormatter messageFormatter, ConsumerRecord<Bytes, Bytes> consumerRecord, boolean z) {
        try {
            messageFormatter.format(consumerRecord.topic(), z ? consumerRecord.key().get() : consumerRecord.value().get());
            return Optional.of(messageFormatter);
        } catch (Throwable th) {
            log.warn("Failed to parse by {} from topic {}", messageFormatter.getClass(), consumerRecord.topic(), th);
            return Optional.empty();
        }
    }

    private Optional<String> getSchemaFromMessage(ConsumerRecord<Bytes, Bytes> consumerRecord, boolean z) {
        Optional<String> empty = Optional.empty();
        Bytes key = z ? consumerRecord.key() : consumerRecord.value();
        if (key != null) {
            ByteBuffer wrap = ByteBuffer.wrap(key.get());
            if (wrap.get() == 0) {
                int i = wrap.getInt();
                empty = Optional.ofNullable(this.schemaRegistryClient).flatMap(schemaRegistryClient -> {
                    return wrapClientCall(() -> {
                        return schemaRegistryClient.getSchemaById(i);
                    });
                }).map((v0) -> {
                    return v0.schemaType();
                });
            }
        }
        return empty;
    }

    private Optional<Integer> getSchemaId(Bytes bytes, MessageFormat messageFormat) {
        if (messageFormat != MessageFormat.AVRO && messageFormat != MessageFormat.PROTOBUF && messageFormat != MessageFormat.JSON) {
            return Optional.empty();
        }
        ByteBuffer wrap = ByteBuffer.wrap(bytes.get());
        return wrap.get() == 0 ? Optional.of(Integer.valueOf(wrap.getInt())) : Optional.empty();
    }

    private Optional<SchemaMetadata> getSchemaBySubject(String str, boolean z) {
        return Optional.ofNullable(this.schemaRegistryClient).flatMap(schemaRegistryClient -> {
            return wrapClientCall(() -> {
                return schemaRegistryClient.getLatestSchemaMetadata(schemaSubject(str, z));
            });
        });
    }

    private <T> Optional<T> wrapClientCall(Callable<T> callable) {
        try {
            return Optional.ofNullable(callable.call());
        } catch (RestClientException e) {
            if (e.getStatus() == 404) {
                return Optional.empty();
            }
            throw new RuntimeException("Error calling SchemaRegistryClient", e);
        }
    }

    private String schemaSubject(String str, boolean z) {
        return String.format(z ? this.cluster.getKeySchemaNameTemplate() : this.cluster.getSchemaNameTemplate(), str);
    }
}
