package org.eclipse.hono.client.kafka;

import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.kafka.client.producer.KafkaProducer;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.function.BiFunction;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.UnsupportedForMessageFormatException;
import org.apache.kafka.common.errors.UnsupportedVersionException;

/* loaded from: input_file:BOOT-INF/lib/hono-client-kafka-common-1.6.1.jar:org/eclipse/hono/client/kafka/CachingKafkaProducerFactory.class */
public class CachingKafkaProducerFactory<K, V> implements KafkaProducerFactory<K, V> {
    private final Map<String, KafkaProducer<K, V>> activeProducers = new HashMap();
    private final BiFunction<String, Map<String, String>, KafkaProducer<K, V>> producerInstanceSupplier;

    public CachingKafkaProducerFactory(BiFunction<String, Map<String, String>, KafkaProducer<K, V>> biFunction) {
        this.producerInstanceSupplier = biFunction;
    }

    @Override // org.eclipse.hono.client.kafka.KafkaProducerFactory
    public KafkaProducer<K, V> getOrCreateProducer(String str, KafkaProducerConfigProperties kafkaProducerConfigProperties) {
        this.activeProducers.computeIfAbsent(str, str2 -> {
            KafkaProducer<K, V> apply = this.producerInstanceSupplier.apply(str, kafkaProducerConfigProperties.getProducerConfig(str));
            return apply.exceptionHandler(getExceptionHandler(str2, apply));
        });
        return this.activeProducers.get(str);
    }

    private Handler<Throwable> getExceptionHandler(String str, KafkaProducer<K, V> kafkaProducer) {
        return th -> {
            if (isFatalError(th)) {
                this.activeProducers.remove(str);
                kafkaProducer.close();
            }
        };
    }

    public Optional<KafkaProducer<K, V>> getProducer(String str) {
        return Optional.ofNullable(this.activeProducers.get(str));
    }

    @Override // org.eclipse.hono.client.kafka.KafkaProducerFactory
    public Future<Void> closeProducer(String str) {
        KafkaProducer<K, V> remove = this.activeProducers.remove(str);
        if (remove == null) {
            return Future.succeededFuture();
        }
        Promise promise = Promise.promise();
        remove.close(promise);
        return promise.future();
    }

    public static boolean isFatalError(Throwable th) {
        return (th instanceof ProducerFencedException) || (th instanceof OutOfOrderSequenceException) || (th instanceof AuthorizationException) || (th instanceof UnsupportedVersionException) || (th instanceof UnsupportedForMessageFormatException);
    }
}
