package org.eclipse.hono.cli.app;

import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.buffer.Buffer;
import io.vertx.kafka.client.consumer.KafkaConsumer;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.HashSet;
import java.util.Set;
import javax.annotation.PostConstruct;
import org.eclipse.hono.cli.AbstractCliClient;
import org.eclipse.hono.client.kafka.HonoTopic;
import org.eclipse.hono.client.kafka.consumer.KafkaConsumerConfigProperties;
import org.eclipse.hono.util.MessageHelper;
import org.eclipse.hono.util.Strings;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Component;

@Profile({"receiver-kafka"})
@Component
/* loaded from: input_file:BOOT-INF/classes/org/eclipse/hono/cli/app/KafkaReceiver.class */
public class KafkaReceiver extends AbstractCliClient {
    private static final String TYPE_TELEMETRY = "telemetry";
    private static final String TYPE_EVENT = "event";
    private static final String TYPE_ALL = "all";

    @Value("${tenant.id}")
    protected String tenantId;

    @Value("${message.type}")
    protected String messageType;

    @Value("${print.verbose:true}")
    protected Boolean isPrintVerbose;
    private KafkaConsumerConfigProperties config;

    @Autowired
    public void setConfig(KafkaConsumerConfigProperties kafkaConsumerConfigProperties) {
        this.config = kafkaConsumerConfigProperties;
    }

    @PostConstruct
    Future<Void> start() {
        return createConsumer().onComplete2(this::handleCreateConsumerStatus);
    }

    private Future<Void> createConsumer() {
        if (Strings.isNullOrEmpty(this.tenantId)) {
            return Future.failedFuture("tenant id is not set");
        }
        Set<String> topics = getTopics();
        if (topics.isEmpty()) {
            return Future.failedFuture(String.format("Invalid message type [\"%s\"]. Valid types are \"telemetry\", \"event\" or \"all\"", this.messageType));
        }
        KafkaConsumer create = KafkaConsumer.create(this.vertx, this.config.getConsumerConfig("cli"), String.class, Buffer.class);
        create.handler2(this::logMessage);
        Promise promise = Promise.promise();
        create.subscribe(topics, promise);
        return promise.future();
    }

    private Set<String> getTopics() {
        HashSet hashSet = new HashSet();
        if (this.messageType.equals("telemetry") || this.messageType.equals(TYPE_ALL)) {
            hashSet.add(new HonoTopic(HonoTopic.Type.TELEMETRY, this.tenantId).toString());
        }
        if (this.messageType.equals("event") || this.messageType.equals(TYPE_ALL)) {
            hashSet.add(new HonoTopic(HonoTopic.Type.EVENT, this.tenantId).toString());
        }
        return hashSet;
    }

    private void handleCreateConsumerStatus(AsyncResult<Void> asyncResult) {
        if (asyncResult.succeeded()) {
            this.log.info("Receiver [tenant: {}, mode: {}] subscribed successfully, hit ctrl-c to exit", this.tenantId, this.messageType);
        } else {
            this.log.error("Error occurred during initialization of receiver: {}", asyncResult.cause().getMessage());
            this.vertx.close();
        }
    }

    private void logMessage(KafkaConsumerRecord<String, Buffer> kafkaConsumerRecord) {
        if (this.isPrintVerbose.booleanValue()) {
            logVerbosely(kafkaConsumerRecord);
        } else {
            logBriefly(kafkaConsumerRecord);
        }
    }

    private void logVerbosely(KafkaConsumerRecord<String, Buffer> kafkaConsumerRecord) {
        long timestamp = kafkaConsumerRecord.timestamp();
        LocalDateTime ofInstant = LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.systemDefault());
        StringBuilder sb = new StringBuilder();
        kafkaConsumerRecord.headers().forEach(kafkaHeader -> {
            sb.append("    ");
            sb.append(kafkaHeader.key());
            sb.append("=");
            sb.append(kafkaHeader.value());
            sb.append("\n");
        });
        this.log.info("topic: {}, partition: {}, offset: {}, timestamp: {} ({})\n  Headers:\n{}  Key:\n    {}\n  Value:\n    {}", kafkaConsumerRecord.topic(), Integer.valueOf(kafkaConsumerRecord.partition()), Long.valueOf(kafkaConsumerRecord.offset()), Long.valueOf(timestamp), ofInstant, sb.toString(), kafkaConsumerRecord.key(), kafkaConsumerRecord.value());
    }

    private void logBriefly(KafkaConsumerRecord<String, Buffer> kafkaConsumerRecord) {
        this.log.info("received {} message [device: {}, content-type: {}]: {}", kafkaConsumerRecord.topic().equals(new HonoTopic(HonoTopic.Type.EVENT, this.tenantId).toString()) ? "event" : "telemetry", kafkaConsumerRecord.key(), (String) kafkaConsumerRecord.headers().stream().filter(kafkaHeader -> {
            return kafkaHeader.key().equals(MessageHelper.SYS_PROPERTY_CONTENT_TYPE);
        }).findAny().map(kafkaHeader2 -> {
            return kafkaHeader2.value().toString();
        }).orElse(""), kafkaConsumerRecord.value());
    }
}
