package com.provectus.kafka.ui.serde;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.util.JsonFormat;
import com.provectus.kafka.ui.model.MessageSchemaDTO;
import com.provectus.kafka.ui.model.TopicMessageSchemaDTO;
import com.provectus.kafka.ui.serde.RecordSerDe;
import com.provectus.kafka.ui.serde.schemaregistry.MessageFormat;
import com.provectus.kafka.ui.util.jsonschema.JsonSchema;
import com.provectus.kafka.ui.util.jsonschema.ProtobufSchemaConverter;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaUtils;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.utils.Bytes;

/* loaded from: input_file:BOOT-INF/classes/com/provectus/kafka/ui/serde/ProtobufFileRecordSerDe.class */
public class ProtobufFileRecordSerDe implements RecordSerDe {
    private final ProtobufSchema protobufSchema;
    private final ObjectMapper objectMapper;
    private final Path protobufSchemaPath;
    private final ProtobufSchemaConverter schemaConverter = new ProtobufSchemaConverter();
    private final Map<String, Descriptors.Descriptor> messageDescriptorMap;
    private final Descriptors.Descriptor defaultMessageDescriptor;

    public ProtobufFileRecordSerDe(Path path, Map<String, String> map, String str, ObjectMapper objectMapper) throws IOException {
        this.objectMapper = objectMapper;
        this.protobufSchemaPath = path;
        Stream<String> lines = Files.lines(path);
        try {
            ProtobufSchema protobufSchema = new ProtobufSchema((String) lines.collect(Collectors.joining("\n")));
            if (str != null) {
                this.protobufSchema = protobufSchema.copy(str);
            } else {
                this.protobufSchema = protobufSchema;
            }
            this.messageDescriptorMap = new HashMap();
            if (map != null) {
                for (Map.Entry<String, String> entry : map.entrySet()) {
                    this.messageDescriptorMap.put(entry.getKey(), (Descriptors.Descriptor) Objects.requireNonNull(this.protobufSchema.toDescriptor(entry.getValue()), "The given message type is not found in protobuf definition: " + entry.getValue()));
                }
            }
            this.defaultMessageDescriptor = (Descriptors.Descriptor) Objects.requireNonNull(this.protobufSchema.toDescriptor(), "The given message type is not found in protobuf definition: " + str);
            if (lines != null) {
                lines.close();
            }
        } catch (Throwable th) {
            if (lines != null) {
                try {
                    lines.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Override // com.provectus.kafka.ui.serde.RecordSerDe
    public RecordSerDe.DeserializedKeyValue deserialize(ConsumerRecord<Bytes, Bytes> consumerRecord) {
        try {
            RecordSerDe.DeserializedKeyValue.DeserializedKeyValueBuilder builder = RecordSerDe.DeserializedKeyValue.builder();
            if (consumerRecord.key() != null) {
                builder.key(new String(consumerRecord.key().get()));
                builder.keyFormat(MessageFormat.UNKNOWN);
            }
            if (consumerRecord.value() != null) {
                builder.value(parse(consumerRecord.value().get(), getDescriptor(consumerRecord.topic())));
                builder.valueFormat(MessageFormat.PROTOBUF);
            }
            return builder.build();
        } catch (Throwable th) {
            throw new RuntimeException("Failed to parse record from topic " + consumerRecord.topic(), th);
        }
    }

    private Descriptors.Descriptor getDescriptor(String str) {
        return this.messageDescriptorMap.getOrDefault(str, this.defaultMessageDescriptor);
    }

    private String parse(byte[] bArr, Descriptors.Descriptor descriptor) {
        return new String(ProtobufSchemaUtils.toJson(DynamicMessage.parseFrom(descriptor, new ByteArrayInputStream(bArr))));
    }

    @Override // com.provectus.kafka.ui.serde.RecordSerDe
    public ProducerRecord<byte[], byte[]> serialize(String str, @Nullable String str2, @Nullable String str3, @Nullable Integer num) {
        if (str3 == null) {
            return new ProducerRecord<>(str, num, ((String) Objects.requireNonNull(str2)).getBytes(), null);
        }
        DynamicMessage.Builder newBuilder = DynamicMessage.newBuilder(getDescriptor(str));
        try {
            JsonFormat.parser().merge(str3, newBuilder);
            return new ProducerRecord<>(str, num, (byte[]) Optional.ofNullable(str2).map((v0) -> {
                return v0.getBytes();
            }).orElse(null), newBuilder.build().toByteArray());
        } catch (Throwable th) {
            throw new RuntimeException("Failed to merge record for topic " + str, th);
        }
    }

    @Override // com.provectus.kafka.ui.serde.RecordSerDe
    public TopicMessageSchemaDTO getTopicSchema(String str) {
        JsonSchema convert = this.schemaConverter.convert(this.protobufSchemaPath.toUri(), getDescriptor(str));
        MessageSchemaDTO schema = new MessageSchemaDTO().name(this.protobufSchema.fullName()).source(MessageSchemaDTO.SourceEnum.PROTO_FILE).schema(JsonSchema.stringSchema().toJson(this.objectMapper));
        return new TopicMessageSchemaDTO().key(schema).value(new MessageSchemaDTO().name(this.protobufSchema.fullName()).source(MessageSchemaDTO.SourceEnum.PROTO_FILE).schema(convert.toJson(this.objectMapper)));
    }
}
