package com.alicloud.openservices.tablestore.tunnel.pipeline;

import com.alicloud.openservices.tablestore.core.protocol.ResponseFactory;
import com.alicloud.openservices.tablestore.model.tunnel.internal.CheckpointResponse;
import com.alicloud.openservices.tablestore.model.tunnel.internal.ReadRecordsRequest;
import com.alicloud.openservices.tablestore.model.tunnel.internal.ReadRecordsResponse;
import com.alicloud.openservices.tablestore.tunnel.worker.ChannelConnect;
import com.alicloud.openservices.tablestore.tunnel.worker.ChannelConnectStatus;
import com.alicloud.openservices.tablestore.tunnel.worker.ProcessRecordsInput;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alicloud/openservices/tablestore/tunnel/pipeline/ProcessDataPipeline.class */
public class ProcessDataPipeline implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(ProcessDataPipeline.class);
    private final ChannelConnect connect;
    private volatile boolean started = false;
    private final ThreadPoolExecutor readRecordsExecutor;
    private final ThreadPoolExecutor processRecordsExecutor;
    private final ExecutorService pipelineHelperExecutor;
    private ProcessDataBackoff backoff;
    private Pipeline<ReadRecordsRequest, CheckpointResponse> pipeline;
    private static final int COUNT_BAR = 500;
    private static final int SIZE_BAR = 921600;

    public ProcessDataPipeline(ChannelConnect channelConnect, ExecutorService executorService, ThreadPoolExecutor threadPoolExecutor, ThreadPoolExecutor threadPoolExecutor2) {
        this.connect = channelConnect;
        this.pipelineHelperExecutor = executorService;
        this.readRecordsExecutor = threadPoolExecutor;
        this.processRecordsExecutor = threadPoolExecutor2;
    }

    @Override // java.lang.Runnable
    public void run() {
        if (!this.started) {
            LOG.info("Initial process data pipeline.");
            this.pipeline = buildPipeline();
            this.pipeline.init(new ProcessDataPipelineContext(this.connect));
            this.started = true;
        }
        this.pipeline.process(new ReadRecordsRequest(this.connect.getTunnelId(), this.connect.getClientId(), this.connect.getChannelId(), this.connect.getToken()));
    }

    private Pipeline<ReadRecordsRequest, CheckpointResponse> buildPipeline() {
        Pipeline<ReadRecordsRequest, CheckpointResponse> pipeline = new Pipeline<>(this.pipelineHelperExecutor);
        pipeline.addExecutorForStage(createReadRecordsStage(), this.readRecordsExecutor);
        pipeline.addExecutorForStage(createProcessRecordsStage(), this.processRecordsExecutor);
        return pipeline;
    }

    private Stage<ReadRecordsRequest, ProcessRecordsInput> createReadRecordsStage() {
        return new AbstractStage<ReadRecordsRequest, ProcessRecordsInput>() { // from class: com.alicloud.openservices.tablestore.tunnel.pipeline.ProcessDataPipeline.1
            @Override // com.alicloud.openservices.tablestore.tunnel.pipeline.AbstractStage
            public ProcessRecordsInput doProcess(ReadRecordsRequest readRecordsRequest) throws StageException {
                if (ProcessDataPipeline.this.connect.getStatus() != ChannelConnectStatus.RUNNING) {
                    throw new StageException(this, readRecordsRequest, "Channel is not running.");
                }
                if (ProcessDataPipeline.this.connect.getToken() == null || ResponseFactory.FINISH_TAG.equals(ProcessDataPipeline.this.connect.getToken())) {
                    ProcessDataPipeline.LOG.info("Channel is finished, channel will be closed.");
                    ProcessDataPipeline.this.connect.close(true);
                    throw new StageException(this, readRecordsRequest, "Channel connect is finished.");
                }
                try {
                    ProcessDataPipeline.LOG.debug("Begin read records, connect: {}", ProcessDataPipeline.this.connect);
                    long currentTimeMillis = System.currentTimeMillis();
                    ReadRecordsResponse readRecords = ProcessDataPipeline.this.connect.getClient().readRecords(readRecordsRequest);
                    ProcessDataPipeline.LOG.info("GetRecords, Num: {}, Channel connect: {}, Latency: {} ms, Next Token: {}", new Object[]{Integer.valueOf(readRecords.getRecords().size()), ProcessDataPipeline.this.connect, Long.valueOf(System.currentTimeMillis() - currentTimeMillis), readRecords.getNextToken()});
                    if (ProcessDataPipeline.this.backoff != null) {
                        if (ProcessDataPipeline.this.checkDataEnough(readRecords.getRecords().size(), readRecords.getMemoizedSerializedSize())) {
                            ProcessDataPipeline.LOG.debug("Backoff is reset");
                            ProcessDataPipeline.this.backoff.reset();
                        } else {
                            ProcessDataPipeline.LOG.debug("Data is not full, sleep {} msec.", Long.valueOf(ProcessDataPipeline.this.backoff.nextBackOffMillis()));
                            Thread.sleep(ProcessDataPipeline.this.backoff.nextBackOffMillis());
                        }
                    }
                    return new ProcessRecordsInput(readRecords.getRecords(), readRecords.getNextToken(), readRecords.getRequestId());
                } catch (Exception e) {
                    throw new StageException(this, readRecordsRequest, e.getMessage(), e);
                }
            }
        };
    }

    private Stage<ProcessRecordsInput, Boolean> createProcessRecordsStage() {
        return new AbstractStage<ProcessRecordsInput, Boolean>() { // from class: com.alicloud.openservices.tablestore.tunnel.pipeline.ProcessDataPipeline.2
            @Override // com.alicloud.openservices.tablestore.tunnel.pipeline.AbstractStage
            public Boolean doProcess(ProcessRecordsInput processRecordsInput) throws StageException {
                if (ProcessDataPipeline.this.connect.getStatus() != ChannelConnectStatus.RUNNING) {
                    throw new StageException(this, processRecordsInput, "Channel is not running.");
                }
                try {
                    ProcessDataPipeline.this.connect.getProcessor().process(processRecordsInput);
                    ProcessDataPipeline.this.connect.setToken(processRecordsInput.getNextToken());
                    ProcessDataPipeline.LOG.info("Continue run pipeline, connect: {}", ProcessDataPipeline.this.connect);
                    ProcessDataPipeline.this.connect.getChannelExecutorService().submit(ProcessDataPipeline.this.connect.getProcessPipeline());
                    return true;
                } catch (Exception e) {
                    throw new StageException(this, processRecordsInput, e.getMessage(), e);
                }
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean checkDataEnough(int i, int i2) {
        return i > 500 || i2 > SIZE_BAR;
    }

    public ProcessDataBackoff getBackoff() {
        return this.backoff;
    }

    public void setBackoff(ProcessDataBackoff processDataBackoff) {
        this.backoff = processDataBackoff;
    }
}
