package com.hsrj.platform.starter.canal.client;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.hsrj.platform.starter.canal.client.exception.CanalClientException;
import com.hsrj.platform.starter.canal.client.transfer.TransponderFactory;
import com.hsrj.platform.starter.canal.config.CanalConfig;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Map;
import java.util.Objects;
import org.springframework.util.StringUtils;

/* loaded from: input_file:com/hsrj/platform/starter/canal/client/AbstractCanalClient.class */
public abstract class AbstractCanalClient implements CanalClient {
    private volatile boolean running;
    private CanalConfig canalConfig;
    protected final TransponderFactory factory;

    /* JADX INFO: Access modifiers changed from: package-private */
    public AbstractCanalClient(CanalConfig canalConfig, TransponderFactory transponderFactory) {
        Objects.requireNonNull(canalConfig, "canalConfig can not be null!");
        Objects.requireNonNull(canalConfig, "transponderFactory can not be null!");
        this.canalConfig = canalConfig;
        this.factory = transponderFactory;
    }

    @Override // com.hsrj.platform.starter.canal.client.CanalClient
    public void start() {
        for (Map.Entry<String, CanalConfig.Instance> entry : getConfig().entrySet()) {
            process(processInstanceEntry(entry), entry);
        }
    }

    protected abstract void process(CanalConnector canalConnector, Map.Entry<String, CanalConfig.Instance> entry);

    private CanalConnector processInstanceEntry(Map.Entry<String, CanalConfig.Instance> entry) {
        CanalConnector newSingleConnector;
        CanalConfig.Instance value = entry.getValue();
        if (value.isClusterEnabled()) {
            ArrayList arrayList = new ArrayList();
            for (String str : value.getZookeeperAddress()) {
                String[] split = str.split(":");
                if (split.length != 2) {
                    throw new CanalClientException("error parsing zookeeper address:" + str);
                }
                arrayList.add(new InetSocketAddress(split[0], Integer.parseInt(split[1])));
            }
            newSingleConnector = CanalConnectors.newClusterConnector(arrayList, entry.getKey(), value.getUserName(), value.getPassword());
        } else {
            newSingleConnector = CanalConnectors.newSingleConnector(new InetSocketAddress(value.getHost(), value.getPort()), entry.getKey(), value.getUserName(), value.getPassword());
        }
        newSingleConnector.connect();
        if (StringUtils.isEmpty(value.getFilter())) {
            newSingleConnector.subscribe();
        } else {
            newSingleConnector.subscribe(value.getFilter());
        }
        newSingleConnector.rollback();
        return newSingleConnector;
    }

    protected Map<String, CanalConfig.Instance> getConfig() {
        Map<String, CanalConfig.Instance> instances;
        CanalConfig canalConfig = this.canalConfig;
        if (canalConfig == null || (instances = canalConfig.getInstances()) == null || instances.isEmpty()) {
            throw new CanalClientException("can not get the configuration of canal client!");
        }
        return canalConfig.getInstances();
    }

    @Override // com.hsrj.platform.starter.canal.client.CanalClient
    public void stop() {
        setRunning(false);
    }

    @Override // com.hsrj.platform.starter.canal.client.CanalClient
    public boolean isRunning() {
        return this.running;
    }

    private void setRunning(boolean z) {
        this.running = z;
    }
}
