package com.hs.platformservice.esdataconsumer;

import com.alibaba.fastjson.JSONObject;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/hs/platformservice/esdataconsumer/ConsumerRunnable.class */
public class ConsumerRunnable implements Runnable {
    private static final Logger log = LoggerFactory.getLogger(ConsumerRunnable.class);
    private final KafkaConsumer<String, String> consumer;

    /* JADX INFO: Access modifiers changed from: package-private */
    public ConsumerRunnable(String str, String str2, String str3) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", str);
        properties.put("group.id", str2);
        properties.put("enable.auto.commit", "true");
        properties.put("auto.commit.interval.ms", "5000");
        properties.put("session.timeout.ms", "30000");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        this.consumer = new KafkaConsumer<>(properties);
        this.consumer.subscribe(Arrays.asList(str3));
    }

    @Override // java.lang.Runnable
    public void run() {
        while (true) {
            try {
                try {
                    ConsumerRecords poll = this.consumer.poll(200L);
                    ArrayList arrayList = new ArrayList();
                    Iterator it = poll.iterator();
                    while (it.hasNext()) {
                        ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                        log.info(Thread.currentThread().getName() + " consumed " + consumerRecord.partition() + "th message with offset: " + consumerRecord.offset() + " key:" + ((String) consumerRecord.key()) + " value:" + ((String) consumerRecord.value()));
                        arrayList.add(JSONObject.parseObject((String) consumerRecord.value()));
                    }
                    if (!arrayList.isEmpty() && !EsDataImportHandle.importDataToEs(arrayList)) {
                        log.error("Consumer thread import data to ES failed");
                    }
                } catch (Exception e) {
                    log.error("Consumer thread is abnormal: " + e.toString());
                    this.consumer.close();
                    log.error("Shut down consumer thread:" + Thread.currentThread().getName());
                    return;
                }
            } catch (Throwable th) {
                this.consumer.close();
                log.error("Shut down consumer thread:" + Thread.currentThread().getName());
                throw th;
            }
        }
    }
}
