package com.alicloud.openservices.tablestore;

import com.alicloud.openservices.tablestore.core.auth.ServiceCredentials;
import com.alicloud.openservices.tablestore.core.utils.ParamChecker;
import com.alicloud.openservices.tablestore.core.utils.Preconditions;
import com.alicloud.openservices.tablestore.model.ConsumedCapacity;
import com.alicloud.openservices.tablestore.model.DescribeTableRequest;
import com.alicloud.openservices.tablestore.model.DescribeTableResponse;
import com.alicloud.openservices.tablestore.model.RowChange;
import com.alicloud.openservices.tablestore.model.TableMeta;
import com.alicloud.openservices.tablestore.writer.Bucket;
import com.alicloud.openservices.tablestore.writer.Group;
import com.alicloud.openservices.tablestore.writer.RowWriteResult;
import com.alicloud.openservices.tablestore.writer.WriterConfig;
import com.alicloud.openservices.tablestore.writer.WriterResult;
import com.alicloud.openservices.tablestore.writer.WriterStatistics;
import com.alicloud.openservices.tablestore.writer.config.BucketConfig;
import com.alicloud.openservices.tablestore.writer.dispatch.BaseDispatcher;
import com.alicloud.openservices.tablestore.writer.dispatch.HashPartitionKeyDispatcher;
import com.alicloud.openservices.tablestore.writer.dispatch.HashPrimaryKeyDispatcher;
import com.alicloud.openservices.tablestore.writer.dispatch.RoundRobinDispatcher;
import com.alicloud.openservices.tablestore.writer.handle.WriterHandleStatistics;
import com.alicloud.openservices.tablestore.writer.retry.CertainCodeNotRetryStrategy;
import com.alicloud.openservices.tablestore.writer.retry.CertainCodeRetryStrategy;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alicloud/openservices/tablestore/DefaultTableStoreWriter.class */
public class DefaultTableStoreWriter implements TableStoreWriter {
    private static final int SCHEDULED_CORE_POOL_SIZE = 2;
    private final AsyncClientInterface ots;
    private final Executor executor;
    private final WriterConfig writerConfig;
    private TableStoreCallback<RowChange, ConsumedCapacity> callback;
    private TableStoreCallback<RowChange, RowWriteResult> resultCallback;
    private final String tableName;
    private TableMeta tableMeta;
    private Bucket[] buckets;
    private final WriterHandleStatistics writerStatistics;
    private BaseDispatcher dispatcher;
    private final Semaphore semaphore;
    private final boolean isInnerConstruct;
    private boolean allowDuplicatePkInBatchRequest;
    private Logger logger = LoggerFactory.getLogger(TableStoreWriter.class);
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2, new ThreadFactory() { // from class: com.alicloud.openservices.tablestore.DefaultTableStoreWriter.1
        private final AtomicInteger counter = new AtomicInteger(0);

        @Override // java.util.concurrent.ThreadFactory
        public Thread newThread(Runnable runnable) {
            return new Thread(runnable, "writer-scheduled-pool-%d" + this.counter.getAndIncrement());
        }
    });

    public DefaultTableStoreWriter(AsyncClientInterface asyncClientInterface, String str, WriterConfig writerConfig, TableStoreCallback<RowChange, ConsumedCapacity> tableStoreCallback, Executor executor) {
        this.allowDuplicatePkInBatchRequest = true;
        Preconditions.checkNotNull(asyncClientInterface, "The ots client can not be null.");
        Preconditions.checkArgument((str == null || str.isEmpty()) ? false : true, "The table name can not be null or empty.");
        Preconditions.checkNotNull(executor, "The executor service can not be null.");
        this.writerStatistics = new WriterHandleStatistics();
        this.ots = asyncClientInterface;
        this.tableName = str;
        this.writerConfig = writerConfig;
        this.callback = tableStoreCallback;
        this.resultCallback = createResultCallback(tableStoreCallback);
        this.executor = executor;
        this.allowDuplicatePkInBatchRequest = this.writerConfig.isAllowDuplicatedRowInBatchRequest();
        this.semaphore = new Semaphore(this.writerConfig.getConcurrency());
        this.isInnerConstruct = false;
        initialize();
        this.closed.set(false);
    }

    public DefaultTableStoreWriter(String str, ServiceCredentials serviceCredentials, String str2, String str3, WriterConfig writerConfig, TableStoreCallback<RowChange, RowWriteResult> tableStoreCallback) {
        this.allowDuplicatePkInBatchRequest = true;
        Preconditions.checkArgument((str3 == null || str3.isEmpty()) ? false : true, "The table name can not be null or empty.");
        this.writerStatistics = new WriterHandleStatistics();
        ClientConfiguration clientConfiguration = new ClientConfiguration();
        clientConfiguration.setMaxConnections(writerConfig.getClientMaxConnections());
        switch (writerConfig.getWriterRetryStrategy()) {
            case CERTAIN_ERROR_CODE_NOT_RETRY:
                clientConfiguration.setRetryStrategy(new CertainCodeNotRetryStrategy());
                break;
            case CERTAIN_ERROR_CODE_RETRY:
            default:
                clientConfiguration.setRetryStrategy(new CertainCodeRetryStrategy());
                break;
        }
        this.ots = new AsyncClient(str, serviceCredentials.getAccessKeyId(), serviceCredentials.getAccessKeySecret(), str2, clientConfiguration, serviceCredentials.getSecurityToken());
        this.tableName = str3;
        this.writerConfig = writerConfig;
        this.callback = null;
        this.resultCallback = tableStoreCallback;
        this.executor = createThreadPool(writerConfig);
        this.allowDuplicatePkInBatchRequest = this.writerConfig.isAllowDuplicatedRowInBatchRequest();
        this.semaphore = new Semaphore(this.writerConfig.getConcurrency());
        this.isInnerConstruct = true;
        initialize();
        this.closed.set(false);
    }

    public DefaultTableStoreWriter(String str, ServiceCredentials serviceCredentials, String str2, String str3, WriterConfig writerConfig, ClientConfiguration clientConfiguration, TableStoreCallback<RowChange, RowWriteResult> tableStoreCallback) {
        this.allowDuplicatePkInBatchRequest = true;
        Preconditions.checkArgument((str3 == null || str3.isEmpty()) ? false : true, "The table name can not be null or empty.");
        this.writerStatistics = new WriterHandleStatistics();
        this.ots = new AsyncClient(str, serviceCredentials.getAccessKeyId(), serviceCredentials.getAccessKeySecret(), str2, clientConfiguration, serviceCredentials.getSecurityToken());
        this.tableName = str3;
        this.writerConfig = writerConfig;
        this.callback = null;
        this.resultCallback = tableStoreCallback;
        this.executor = createThreadPool(writerConfig);
        this.allowDuplicatePkInBatchRequest = this.writerConfig.isAllowDuplicatedRowInBatchRequest();
        this.semaphore = new Semaphore(this.writerConfig.getConcurrency());
        this.isInnerConstruct = true;
        initialize();
        this.closed.set(false);
    }

    private void initialize() {
        this.logger.info("Start initialize ots writer, table name: {}.", this.tableName);
        DescribeTableRequest describeTableRequest = new DescribeTableRequest();
        describeTableRequest.setTableName(this.tableName);
        try {
            DescribeTableResponse describeTableResponse = this.ots.describeTable(describeTableRequest, null).get();
            if (describeTableResponse.getIndexMeta() != null && describeTableResponse.getIndexMeta().size() > 0) {
                this.allowDuplicatePkInBatchRequest = false;
                this.logger.info("Table [{}] has globalIndex, allowDuplicatePkInBatchRequest will be overwrite by [false]", this.tableName);
            }
            this.tableMeta = describeTableResponse.getTableMeta();
            this.logger.info("End initialize with table meta: {}.", this.tableMeta);
            this.buckets = new Bucket[this.writerConfig.getBucketCount()];
            for (int i = 0; i < this.writerConfig.getBucketCount(); i++) {
                this.buckets[i] = new Bucket(new BucketConfig(i, this.tableMeta.getTableName(), this.writerConfig.getWriteMode(), this.allowDuplicatePkInBatchRequest), this.ots, this.writerConfig, this.resultCallback, this.executor, this.writerStatistics, this.semaphore);
            }
            switch (this.writerConfig.getDispatchMode()) {
                case HASH_PARTITION_KEY:
                    this.dispatcher = new HashPartitionKeyDispatcher(this.writerConfig.getBucketCount());
                    break;
                case ROUND_ROBIN:
                    this.dispatcher = new RoundRobinDispatcher(this.writerConfig.getBucketCount());
                    break;
                case HASH_PRIMARY_KEY:
                    this.dispatcher = new HashPrimaryKeyDispatcher(this.writerConfig.getBucketCount());
                    break;
                default:
                    throw new ClientException(String.format("The dispatch mode [%s] not supported", this.writerConfig.getDispatchMode()));
            }
            startFlushTimer(this.writerConfig.getFlushInterval());
            startLogTimer(this.writerConfig.getLogInterval());
        } catch (Exception e) {
            throw new ClientException(e);
        }
    }

    private ExecutorService createThreadPool(WriterConfig writerConfig) {
        int callbackThreadCount = writerConfig.getCallbackThreadCount();
        int callbackThreadPoolQueueSize = writerConfig.getCallbackThreadPoolQueueSize();
        return new ThreadPoolExecutor(callbackThreadCount, callbackThreadCount, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(callbackThreadPoolQueueSize), new ThreadFactory() { // from class: com.alicloud.openservices.tablestore.DefaultTableStoreWriter.2
            private final AtomicInteger counter = new AtomicInteger(1);

            @Override // java.util.concurrent.ThreadFactory
            public Thread newThread(Runnable runnable) {
                return new Thread(runnable, "writer-callback-" + this.counter.getAndIncrement());
            }
        }, new ThreadPoolExecutor.CallerRunsPolicy());
    }

    private TableStoreCallback<RowChange, RowWriteResult> createResultCallback(final TableStoreCallback<RowChange, ConsumedCapacity> tableStoreCallback) {
        if (tableStoreCallback != null) {
            return new TableStoreCallback<RowChange, RowWriteResult>() { // from class: com.alicloud.openservices.tablestore.DefaultTableStoreWriter.3
                @Override // com.alicloud.openservices.tablestore.TableStoreCallback
                public void onCompleted(RowChange rowChange, RowWriteResult rowWriteResult) {
                    tableStoreCallback.onCompleted(rowChange, rowWriteResult.getConsumedCapacity());
                }

                @Override // com.alicloud.openservices.tablestore.TableStoreCallback
                public void onFailed(RowChange rowChange, Exception exc) {
                    tableStoreCallback.onFailed(rowChange, exc);
                }
            };
        }
        return null;
    }

    @Override // com.alicloud.openservices.tablestore.TableStoreWriter
    public void addRowChange(RowChange rowChange) {
        if (this.writerConfig.isEnableSchemaCheck()) {
            ParamChecker.checkRowChange(this.tableMeta, rowChange, this.writerConfig);
        }
        Group group = new Group(1);
        while (!addRowChangeInternal(rowChange, group)) {
            try {
                Thread.sleep(1L);
            } catch (InterruptedException e) {
            }
        }
    }

    @Override // com.alicloud.openservices.tablestore.TableStoreWriter
    public Future<WriterResult> addRowChangeWithFuture(RowChange rowChange) {
        if (this.writerConfig.isEnableSchemaCheck()) {
            ParamChecker.checkRowChange(this.tableMeta, rowChange, this.writerConfig);
        }
        Group group = new Group(1);
        while (!addRowChangeInternal(rowChange, group)) {
            try {
                Thread.sleep(1L);
            } catch (InterruptedException e) {
            }
        }
        return group.getFuture();
    }

    @Override // com.alicloud.openservices.tablestore.TableStoreWriter
    public boolean tryAddRowChange(RowChange rowChange) {
        if (this.writerConfig.isEnableSchemaCheck()) {
            ParamChecker.checkRowChange(this.tableMeta, rowChange, this.writerConfig);
        }
        return addRowChangeInternal(rowChange, new Group(1));
    }

    private boolean addRowChangeInternal(RowChange rowChange, Group group) {
        if (this.closed.get()) {
            throw new ClientException("The writer has been closed.");
        }
        return this.buckets[this.dispatcher.getDispatchIndex(rowChange)].addRowChange(rowChange, group);
    }

    public void startFlushTimer(int i) {
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { // from class: com.alicloud.openservices.tablestore.DefaultTableStoreWriter.4
            @Override // java.lang.Runnable
            public void run() {
                DefaultTableStoreWriter.this.triggerFlush();
            }
        }, 0L, i, TimeUnit.MILLISECONDS);
    }

    private void startLogTimer(int i) {
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { // from class: com.alicloud.openservices.tablestore.DefaultTableStoreWriter.5
            @Override // java.lang.Runnable
            public void run() {
                StringBuilder sb = new StringBuilder("RingBuffer Remain: ");
                for (Bucket bucket : DefaultTableStoreWriter.this.buckets) {
                    sb.append(bucket.getRingBuffer().remainingCapacity());
                    sb.append(", ");
                }
                DefaultTableStoreWriter.this.logger.debug(sb.toString());
                StringBuilder sb2 = new StringBuilder("Dispatcher Count: ");
                for (AtomicLong atomicLong : DefaultTableStoreWriter.this.dispatcher.getBucketDispatchRowCount()) {
                    sb2.append(atomicLong.get());
                    sb2.append(", ");
                }
                DefaultTableStoreWriter.this.logger.debug(sb2.toString());
            }
        }, 0L, i, TimeUnit.MILLISECONDS);
    }

    @Override // com.alicloud.openservices.tablestore.TableStoreWriter
    public void addRowChange(List<RowChange> list, List<RowChange> list2) throws ClientException {
        list2.clear();
        for (RowChange rowChange : list) {
            try {
                addRowChange(rowChange);
            } catch (ClientException e) {
                list2.add(rowChange);
            }
        }
        if (!list2.isEmpty()) {
            throw new ClientException("There is dirty rows.");
        }
    }

    @Override // com.alicloud.openservices.tablestore.TableStoreWriter
    public Future<WriterResult> addRowChangeWithFuture(List<RowChange> list) throws ClientException {
        Group group = new Group(list.size());
        for (RowChange rowChange : list) {
            if (this.writerConfig.isEnableSchemaCheck()) {
                ParamChecker.checkRowChange(this.tableMeta, rowChange, this.writerConfig);
            }
            while (!addRowChangeInternal(rowChange, group)) {
                try {
                    try {
                        Thread.sleep(1L);
                    } catch (InterruptedException e) {
                    }
                } catch (ClientException e2) {
                    group.failedOneRow(rowChange, e2);
                }
            }
        }
        return group.getFuture();
    }

    @Override // com.alicloud.openservices.tablestore.TableStoreWriter
    public void setCallback(TableStoreCallback<RowChange, ConsumedCapacity> tableStoreCallback) {
        this.callback = tableStoreCallback;
        this.resultCallback = createResultCallback(tableStoreCallback);
        for (Bucket bucket : this.buckets) {
            bucket.setResultCallback(this.resultCallback);
        }
    }

    @Override // com.alicloud.openservices.tablestore.TableStoreWriter
    public void setResultCallback(TableStoreCallback<RowChange, RowWriteResult> tableStoreCallback) {
        this.callback = null;
        this.resultCallback = tableStoreCallback;
        for (Bucket bucket : this.buckets) {
            bucket.setResultCallback(tableStoreCallback);
        }
    }

    @Override // com.alicloud.openservices.tablestore.TableStoreWriter
    public TableStoreCallback<RowChange, ConsumedCapacity> getCallback() {
        return this.callback;
    }

    @Override // com.alicloud.openservices.tablestore.TableStoreWriter
    public TableStoreCallback<RowChange, RowWriteResult> getResultCallback() {
        return this.resultCallback;
    }

    @Override // com.alicloud.openservices.tablestore.TableStoreWriter
    public WriterConfig getWriterConfig() {
        return this.writerConfig;
    }

    @Override // com.alicloud.openservices.tablestore.TableStoreWriter
    public WriterStatistics getWriterStatistics() {
        return this.writerStatistics;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public CountDownLatch triggerFlush() {
        CountDownLatch countDownLatch = new CountDownLatch(this.writerConfig.getBucketCount());
        for (Bucket bucket : this.buckets) {
            bucket.addSignal(countDownLatch);
        }
        this.logger.info("WriterStatistics: " + this.writerStatistics);
        return countDownLatch;
    }

    @Override // com.alicloud.openservices.tablestore.TableStoreWriter
    public synchronized void flush() throws ClientException {
        this.logger.debug("trigger flush and waiting.");
        if (this.closed.get()) {
            throw new ClientException("The writer has been closed.");
        }
        try {
            triggerFlush().await();
            this.logger.debug("user trigger flush finished.");
        } catch (InterruptedException e) {
            throw new ClientException(e);
        }
    }

    @Override // com.alicloud.openservices.tablestore.TableStoreWriter
    public synchronized void close() {
        if (this.closed.get()) {
            throw new ClientException("The writer has already been closed.");
        }
        flush();
        this.scheduledExecutorService.shutdown();
        for (Bucket bucket : this.buckets) {
            bucket.close();
            this.logger.debug(String.format("bucket [%d] is closed.", Integer.valueOf(bucket.getId())));
        }
        if (this.isInnerConstruct) {
            this.ots.shutdown();
            ((ExecutorService) this.executor).shutdown();
        }
        this.closed.set(true);
    }
}
