/*
 * Decompiled with CFR 0.152.
 */
package com.amazon.athena.client.results;

import com.amazon.athena.client.error.QueryResultException;
import com.amazon.athena.client.results.PaginatingAsyncQueryResultsBase;
import com.amazon.athena.client.results.ResultParserFactory;
import com.amazon.athena.client.results.S3UriHelper;
import com.amazon.athena.client.results.parsing.ResultRowsParser;
import com.amazon.athena.logging.AthenaLogger;
import java.nio.ByteBuffer;
import java.text.ParseException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.services.athena.model.QueryExecution;
import software.amazon.awssdk.services.athena.model.ResultSetMetadata;
import software.amazon.awssdk.services.athena.model.StatementType;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.services.s3.model.S3Exception;
import software.amazon.awssdk.utils.Pair;

class S3StreamingQueryResults
extends PaginatingAsyncQueryResultsBase {
    private static final AthenaLogger logger = AthenaLogger.of(S3StreamingQueryResults.class);
    private static final int DEFAULT_MIN_CHUNK_SIZE = 262144;
    private static final int DEFAULT_MAX_CHUNK_SIZE = 0xA00000;
    private static final Pattern CONTENT_RANGE_PATTERN = Pattern.compile("\\Abytes (\\d+)-(\\d+)/(\\d+)\\z");
    private final S3AsyncClient s3Client;
    private final String bucket;
    private final String key;
    private final int fetchSize;
    private final int minChunkSize;
    private final int maxChunkSize;
    private final ResultParserFactory resultParserFactory;
    private int nextChunkSize;

    S3StreamingQueryResults(S3AsyncClient s3Client, Executor executor, QueryExecution queryExecution, ResultSetMetadata resultSetMetadata, Long updateCount, int fetchSize, ResultParserFactory resultParserFactory) {
        this(s3Client, executor, queryExecution, resultSetMetadata, updateCount, fetchSize, 262144, 0xA00000, resultParserFactory);
    }

    S3StreamingQueryResults(S3AsyncClient s3Client, Executor executor, QueryExecution queryExecution, ResultSetMetadata resultSetMetadata, Long updateCount, int fetchSize, int minChunkSize, int maxChunkSize, ResultParserFactory resultParserFactory) {
        super(executor, queryExecution, resultSetMetadata, updateCount, Collections.emptyList());
        this.s3Client = s3Client;
        this.fetchSize = fetchSize;
        this.minChunkSize = minChunkSize;
        this.maxChunkSize = maxChunkSize;
        this.nextChunkSize = minChunkSize;
        this.resultParserFactory = resultParserFactory;
        String outputLocation = this.queryExecution().resultConfiguration().outputLocation();
        Optional<Pair<String, String>> bucketAndKey = S3UriHelper.toBucketAndKey(outputLocation);
        if (!bucketAndKey.isPresent()) {
            throw new IllegalArgumentException(String.format("Invalid output location: \"%s\"", this.queryExecution().resultConfiguration().outputLocation()));
        }
        Pair<String, String> pair = bucketAndKey.get();
        this.bucket = pair.left();
        this.key = pair.right();
    }

    @Override
    protected PaginatingAsyncQueryResultsBase.PaginationController startPagination() {
        return new S3StreamingQueryResultsPaginationControl();
    }

    private class S3StreamingQueryResultsPaginationControl
    implements PaginatingAsyncQueryResultsBase.PaginationController {
        private final ResultRowsParser resultParser;
        private long offset;
        private long objectSize;
        private boolean hasNextPage;
        private final Set<String> DML_SUBSTATEMENT_TYPES_WITHOUT_OUTPUT_OBJECTS = new HashSet<String>(Arrays.asList("DELETE", "INSERT", "MERGE", "UNLOAD", "UPDATE", "VACUUM_TABLE"));

        private S3StreamingQueryResultsPaginationControl() {
            this.resultParser = S3StreamingQueryResults.this.resultParserFactory.createS3ResultRowsParser(S3StreamingQueryResults.this.queryExecution(), S3StreamingQueryResults.this.resultSetMetadata().columnInfo().size());
            this.offset = 0L;
            this.objectSize = 0L;
            this.hasNextPage = true;
        }

        @Override
        public boolean hasNextPage() {
            return this.hasNextPage;
        }

        @Override
        public void loadNextPage(Consumer<String[]> addRows, Consumer<Throwable> reportError, Runnable done) {
            String range = String.format("bytes=%d-%d", this.offset, this.offset + (long)S3StreamingQueryResults.this.nextChunkSize - 1L);
            GetObjectRequest request = (GetObjectRequest)GetObjectRequest.builder().bucket(S3StreamingQueryResults.this.bucket).key(S3StreamingQueryResults.this.key).range(range).build();
            String queryExecutionId = S3StreamingQueryResults.this.queryExecution().queryExecutionId();
            logger.debug("Query execution {} loading range {} from \"s3://{}/{}\"", queryExecutionId, range, S3StreamingQueryResults.this.bucket, S3StreamingQueryResults.this.key);
            long loadingAt = System.nanoTime();
            S3StreamingQueryResults.this.s3Client.getObject(request, AsyncResponseTransformer.toBytes()).whenComplete((responseBytes, e) -> {
                try {
                    if (e == null) {
                        long loadedAt = System.nanoTime();
                        ByteBuffer buffer = responseBytes.asByteBuffer();
                        logger.debug("Query execution {} loaded range {} ({} bytes) from \"s3://{}/{}\"", queryExecutionId, ((GetObjectResponse)responseBytes.response()).contentRange(), buffer.capacity(), S3StreamingQueryResults.this.bucket, S3StreamingQueryResults.this.key);
                        this.updateState((GetObjectResponse)responseBytes.response(), reportError);
                        int rowCount = this.processChunk(responseBytes.asByteBuffer(), !this.hasNextPage, addRows, reportError);
                        if (this.hasNextPage) {
                            logger.trace("Query execution {} processed chunk, now at offset {}, next chunk size is {}", queryExecutionId, this.offset, S3StreamingQueryResults.this.nextChunkSize);
                        } else {
                            logger.trace("Query execution {} processed chunk, now at offset {}, all chunks loaded", queryExecutionId, this.offset);
                        }
                        this.updateNextChunkSize(rowCount);
                        long processedAt = System.nanoTime();
                        logger.info("Query execution {} loaded and processed {} rows from {} bytes in {} ms ({} ms loading, {} ms processing, {} of {} bytes loaded)", queryExecutionId, rowCount, buffer.capacity(), TimeUnit.NANOSECONDS.toMillis(processedAt - loadingAt), TimeUnit.NANOSECONDS.toMillis(loadedAt - loadingAt), TimeUnit.NANOSECONDS.toMillis(processedAt - loadedAt), this.offset, this.objectSize);
                    } else if (this.isEmptyObjectError((Throwable)e, request)) {
                        logger.debug("Query execution {} output location \"s3://{}/{}\" was empty", queryExecutionId, S3StreamingQueryResults.this.bucket, S3StreamingQueryResults.this.key);
                        this.handleEmptyObject(reportError);
                    } else if (this.isExpectedMissingQueryResults((Throwable)e, S3StreamingQueryResults.this.queryExecution())) {
                        logger.debug("Query execution {} output location \"s3://{}/{}\" does not exist (this is expected for statements without query results, like CTAS, UNLOAD, UPDATE, etc.)", S3StreamingQueryResults.this.queryExecution().queryExecutionId(), S3StreamingQueryResults.this.bucket, S3StreamingQueryResults.this.key);
                        this.handleExpectedMissingObject(reportError);
                    } else {
                        Throwable unwrappedError = this.unwrapCompletionException((Throwable)e);
                        logger.warn(String.format("Query execution %s failed loading result range: %s", queryExecutionId, unwrappedError.getMessage()), unwrappedError);
                        reportError.accept(new QueryResultException(S3StreamingQueryResults.this.queryExecution(), String.format("Could not load query results: %s", unwrappedError.getMessage()), unwrappedError));
                    }
                }
                catch (RuntimeException re) {
                    logger.warn(String.format("Query execution %s failed processing result range: %s", queryExecutionId, re.getMessage()), new Object[0]);
                    reportError.accept(new QueryResultException(S3StreamingQueryResults.this.queryExecution(), String.format("Could not process query results: %s", re.getMessage()), re));
                }
                finally {
                    done.run();
                }
            });
        }

        private Throwable unwrapCompletionException(Throwable t) {
            return t instanceof CompletionException ? t.getCause() : t;
        }

        private int processChunk(ByteBuffer chunk, boolean isLastChunk, Consumer<String[]> addRows, Consumer<Throwable> reportError) {
            try {
                int rowCount = this.resultParser.parse(chunk, addRows);
                if (isLastChunk) {
                    rowCount += this.resultParser.finish(addRows);
                }
                return rowCount;
            }
            catch (ParseException pe) {
                logger.warn(String.format("Query execution %s failed processing results: %s", S3StreamingQueryResults.this.queryExecution().queryExecutionId(), pe.getMessage()), pe);
                reportError.accept(new QueryResultException(S3StreamingQueryResults.this.queryExecution(), String.format("Could not process query results: %s", pe.getMessage()), pe));
                return 0;
            }
        }

        private void updateState(GetObjectResponse response, Consumer<Throwable> reportError) {
            Matcher matcher = CONTENT_RANGE_PATTERN.matcher(response.contentRange());
            if (matcher.matches()) {
                long firstByte = Long.parseLong(matcher.group(1));
                long lastByte = Long.parseLong(matcher.group(2));
                this.objectSize = Long.parseLong(matcher.group(3));
                long contentLength = lastByte - firstByte + 1L;
                this.hasNextPage = lastByte < this.objectSize - 1L;
                this.offset += contentLength;
            } else {
                logger.warn("Query execution {} failed processing results: could not parse content range header (\"{}\")", S3StreamingQueryResults.this.queryExecution().queryExecutionId(), response.contentRange());
                reportError.accept(new QueryResultException(S3StreamingQueryResults.this.queryExecution(), String.format("Could not process query results: could not parse content range header (\"%s\")", response.contentRange())));
            }
        }

        private void updateNextChunkSize(int rowCount) {
            if (rowCount > 0) {
                double avgRowSize = (double)S3StreamingQueryResults.this.nextChunkSize / (double)rowCount;
                S3StreamingQueryResults.this.nextChunkSize = (int)(avgRowSize * (double)S3StreamingQueryResults.this.fetchSize);
            } else {
                S3StreamingQueryResults.this.nextChunkSize = S3StreamingQueryResults.this.nextChunkSize * 2;
            }
            S3StreamingQueryResults.this.nextChunkSize = Math.max(S3StreamingQueryResults.this.minChunkSize, Math.min(S3StreamingQueryResults.this.nextChunkSize, S3StreamingQueryResults.this.maxChunkSize));
        }

        private boolean isEmptyObjectError(Throwable rawResponseError, GetObjectRequest request) {
            Throwable responseError = this.unwrapCompletionException(rawResponseError);
            return responseError instanceof S3Exception && ((S3Exception)responseError).statusCode() == 416 && request.range().startsWith("bytes=0");
        }

        private boolean isExpectedMissingQueryResults(Throwable rawResponseError, QueryExecution queryExecution) {
            Throwable responseError = this.unwrapCompletionException(rawResponseError);
            return responseError instanceof S3Exception && ((S3Exception)responseError).statusCode() == 404 && this.isStatementWithoutQueryResults(queryExecution);
        }

        private boolean isStatementWithoutQueryResults(QueryExecution queryExecution) {
            return queryExecution.statementType() == StatementType.DDL || queryExecution.statementType() == StatementType.DML && this.DML_SUBSTATEMENT_TYPES_WITHOUT_OUTPUT_OBJECTS.contains(queryExecution.substatementType());
        }

        private void handleEmptyObject(Consumer<Throwable> reportError) {
            GetObjectResponse emptyObjectResponse = (GetObjectResponse)GetObjectResponse.builder().contentRange("bytes 0-0/0").build();
            this.updateState(emptyObjectResponse, reportError);
        }

        private void handleExpectedMissingObject(Consumer<Throwable> reportError) {
            this.handleEmptyObject(reportError);
        }
    }
}

