package com.hsrj.platform.starter.canal.client.transfer;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.exception.CanalClientException;
import com.hsrj.platform.starter.canal.client.ListenerPoint;
import com.hsrj.platform.starter.canal.config.CanalConfig;
import com.hsrj.platform.starter.canal.event.CanalEventListener;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/hsrj/platform/starter/canal/client/transfer/AbstractMessageTransponder.class */
public abstract class AbstractMessageTransponder implements MessageTransponder {
    private final CanalConnector connector;
    protected final CanalConfig.Instance config;
    protected final String destination;
    protected final List<CanalEventListener> listeners = new ArrayList();
    protected final List<ListenerPoint> annoListeners = new ArrayList();
    private volatile boolean running = true;
    private static final Logger logger = LoggerFactory.getLogger(AbstractMessageTransponder.class);

    public AbstractMessageTransponder(CanalConnector canalConnector, Map.Entry<String, CanalConfig.Instance> entry, List<CanalEventListener> list, List<ListenerPoint> list2) {
        Objects.requireNonNull(canalConnector, "connector can not be null!");
        Objects.requireNonNull(entry, "config can not be null!");
        this.connector = canalConnector;
        this.destination = entry.getKey();
        this.config = entry.getValue();
        if (list != null) {
            this.listeners.addAll(list);
        }
        if (list2 != null) {
            this.annoListeners.addAll(list2);
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        int retryCount = this.config.getRetryCount();
        long acquireInterval = this.config.getAcquireInterval();
        String name = Thread.currentThread().getName();
        while (this.running && !Thread.currentThread().isInterrupted()) {
            try {
                try {
                    try {
                        Message withoutAck = this.connector.getWithoutAck(this.config.getBatchSize());
                        long id = withoutAck.getId();
                        int size = withoutAck.getEntries().size();
                        if (logger.isDebugEnabled()) {
                            logger.debug("{}: Get message from canal server >>>>> size:{}", name, Integer.valueOf(size));
                        }
                        if (id == -1 || size == 0) {
                            if (logger.isDebugEnabled()) {
                                logger.debug("{}: Empty message... sleep for {} millis", name, Long.valueOf(acquireInterval));
                            }
                            Thread.sleep(acquireInterval);
                        } else {
                            distributeEvent(withoutAck);
                        }
                        this.connector.ack(id);
                        if (logger.isDebugEnabled()) {
                            logger.debug("{}: Ack message. batchId:{}", name, Long.valueOf(id));
                        }
                        if (retryCount <= 0) {
                            stop();
                            logger.info("{}: Topping the client.. ", Thread.currentThread().getName());
                        }
                    } catch (CanalClientException e) {
                        retryCount--;
                        logger.error(name + ": Error occurred!! ", e);
                        try {
                            Thread.sleep(acquireInterval);
                        } catch (InterruptedException e2) {
                            retryCount = 0;
                        }
                        if (retryCount <= 0) {
                            stop();
                            logger.info("{}: Topping the client.. ", Thread.currentThread().getName());
                        }
                    }
                } catch (InterruptedException e3) {
                    retryCount = 0;
                    this.connector.rollback();
                    if (0 <= 0) {
                        stop();
                        logger.info("{}: Topping the client.. ", Thread.currentThread().getName());
                    }
                }
            } catch (Throwable th) {
                if (retryCount <= 0) {
                    stop();
                    logger.info("{}: Topping the client.. ", Thread.currentThread().getName());
                }
                throw th;
            }
        }
        stop();
        logger.info("{}: client stopped. ", Thread.currentThread().getName());
    }

    protected abstract void distributeEvent(Message message);

    void stop() {
        this.running = false;
    }
}
