/*
 * Decompiled with CFR 0.152.
 */
package com.amazon.athena.client.results;

import com.amazon.athena.client.error.QueryResultException;
import com.amazon.athena.client.results.AsyncQueryResults;
import com.amazon.athena.client.results.ResultFormatHelper;
import com.amazon.athena.client.results.ResultParserFactory;
import com.amazon.athena.client.results.S3UriHelper;
import com.amazon.athena.client.results.parsing.ResultRowsParser;
import com.amazon.athena.jdbc.support.UncheckedParseException;
import com.amazon.athena.logging.AthenaLogger;
import io.netty.handler.timeout.ReadTimeoutException;
import java.nio.ByteBuffer;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.core.async.ResponsePublisher;
import software.amazon.awssdk.services.athena.model.QueryExecution;
import software.amazon.awssdk.services.athena.model.ResultSetMetadata;
import software.amazon.awssdk.services.athena.model.StatementType;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.services.s3.model.S3Exception;
import software.amazon.awssdk.utils.Pair;
import software.amazon.awssdk.utils.internal.async.EmptySubscription;

class S3V2StreamingQueryResults
implements AsyncQueryResults {
    private static final AthenaLogger logger = AthenaLogger.of(S3V2StreamingQueryResults.class);
    private final S3AsyncClient s3Client;
    private final QueryExecution queryExecution;
    private final String queryExecutionId;
    private final ResultSetMetadata resultSetMetadata;
    private final Long updateCount;
    private final ResultParserFactory resultParserFactory;
    private final String bucket;
    private final String key;
    private boolean needsRestart;

    S3V2StreamingQueryResults(S3AsyncClient s3Client, QueryExecution queryExecution, ResultSetMetadata resultSetMetadata, Long updateCount, ResultParserFactory resultParserFactory) {
        this.s3Client = s3Client;
        this.queryExecution = queryExecution;
        this.queryExecutionId = this.queryExecution().queryExecutionId();
        this.resultSetMetadata = resultSetMetadata;
        this.updateCount = updateCount == null ? 0L : updateCount;
        this.resultParserFactory = resultParserFactory;
        Optional<Pair<String, String>> bucketAndKey = S3UriHelper.toBucketAndKey(queryExecution.resultConfiguration().outputLocation());
        if (!bucketAndKey.isPresent()) {
            throw new IllegalArgumentException(String.format("Invalid output location: \"%s\"", this.queryExecution().resultConfiguration().outputLocation()));
        }
        Pair<String, String> pair = bucketAndKey.get();
        this.bucket = pair.left();
        this.key = pair.right();
        this.needsRestart = false;
    }

    @Override
    public QueryExecution queryExecution() {
        return this.queryExecution;
    }

    @Override
    public ResultSetMetadata resultSetMetadata() {
        return this.resultSetMetadata;
    }

    @Override
    public long updateCount() {
        return this.updateCount;
    }

    @Override
    public void subscribe(Subscriber<? super String[]> subscriber) {
        new AutoRestartingHeaderSkippingSubscriber(subscriber).start();
    }

    private class AutoRestartingHeaderSkippingSubscriber
    implements Subscriber<String[]>,
    Subscription {
        private final Subscriber<? super String[]> actualSubscriber;
        private final ResultRowsParser parser;
        private final List<String[]> tempRowBuffer;
        private Subscription currentSubscription;
        private boolean hasSkippedHeaders;
        private long restartOffset;
        private long requestCountBalance;
        private final Set<String> DML_SUBSTATEMENT_TYPES_WITHOUT_OUTPUT_OBJECTS = new HashSet<String>(Arrays.asList("DELETE", "INSERT", "MERGE", "UNLOAD", "UPDATE", "VACUUM_TABLE"));

        private AutoRestartingHeaderSkippingSubscriber(Subscriber<? super String[]> actualSubscriber) {
            this.actualSubscriber = actualSubscriber;
            this.parser = S3V2StreamingQueryResults.this.resultParserFactory.createS3ResultRowsParser(S3V2StreamingQueryResults.this.queryExecution(), S3V2StreamingQueryResults.this.resultSetMetadata().columnInfo().size());
            this.tempRowBuffer = new ArrayList<String[]>();
            this.hasSkippedHeaders = ResultFormatHelper.isPlainTextResult(S3V2StreamingQueryResults.this.queryExecution);
            this.restartOffset = 0L;
            this.currentSubscription = null;
            this.requestCountBalance = 0L;
        }

        private void start() {
            logger.debug("Query execution {} requesting \"s3://{}/{}\" from offset {}", S3V2StreamingQueryResults.this.queryExecutionId, S3V2StreamingQueryResults.this.bucket, S3V2StreamingQueryResults.this.key, this.restartOffset);
            GetObjectRequest.Builder requestBuilder = GetObjectRequest.builder().bucket(S3V2StreamingQueryResults.this.bucket).key(S3V2StreamingQueryResults.this.key);
            if (this.restartOffset > 0L) {
                requestBuilder.range(String.format("bytes=%d-", this.restartOffset));
            }
            S3V2StreamingQueryResults.this.needsRestart = false;
            S3V2StreamingQueryResults.this.s3Client.getObject((GetObjectRequest)requestBuilder.build(), AsyncResponseTransformer.toPublisher()).whenComplete(this::onGetObjectResponse);
        }

        private void onGetObjectResponse(ResponsePublisher<GetObjectResponse> response, Throwable error) {
            if (error == null) {
                response.flatMapIterable(this::onResponseBytes).subscribe(this);
            } else if (this.isExpectedMissingQueryResults(error)) {
                logger.debug("Query execution {} output location \"s3://{}/{}\" does not exist (this is expected for statements without query results, like CTAS, UNLOAD, UPDATE, etc.)", S3V2StreamingQueryResults.this.queryExecutionId, S3V2StreamingQueryResults.this.bucket, S3V2StreamingQueryResults.this.key);
                this.actualSubscriber.onSubscribe(new EmptySubscription(this.actualSubscriber));
                this.actualSubscriber.onComplete();
            } else {
                logger.warn("Query execution {} failed loading \"s3://{}/{}\": {}", S3V2StreamingQueryResults.this.queryExecutionId, S3V2StreamingQueryResults.this.bucket, S3V2StreamingQueryResults.this.key, error.getMessage(), error);
                this.actualSubscriber.onSubscribe(new EmptySubscription(this.actualSubscriber));
                this.actualSubscriber.onError(new QueryResultException(S3V2StreamingQueryResults.this.queryExecution, String.format("Could not load query results: %s", error.getMessage()), error));
            }
        }

        private Iterable<String[]> onResponseBytes(ByteBuffer chunk) {
            try {
                this.tempRowBuffer.clear();
                this.parser.parse(chunk, this.tempRowBuffer::add);
                this.restartOffset += (long)chunk.position();
                return this.tempRowBuffer;
            }
            catch (ParseException pe) {
                logger.warn("Query execution {} failed processing results: {}", S3V2StreamingQueryResults.this.queryExecutionId, pe.getMessage(), pe);
                throw new UncheckedParseException(pe);
            }
        }

        @Override
        public void onSubscribe(Subscription actualSubscription) {
            boolean isRestart = this.currentSubscription != null;
            this.currentSubscription = actualSubscription;
            if (isRestart) {
                this.currentSubscription.request(this.requestCountBalance < 0L ? Long.MAX_VALUE : this.requestCountBalance);
            } else {
                this.actualSubscriber.onSubscribe(this);
            }
        }

        @Override
        public void onNext(String[] row) {
            if (this.hasSkippedHeaders) {
                --this.requestCountBalance;
                this.actualSubscriber.onNext((String[])row);
            } else {
                this.hasSkippedHeaders = true;
                this.currentSubscription.request(1L);
            }
        }

        @Override
        public void onError(Throwable error) {
            if (error instanceof ReadTimeoutException) {
                if (this.requestCountBalance > 0L) {
                    logger.warn("Query execution {} timed out reading results, will restart at {}", S3V2StreamingQueryResults.this.queryExecutionId, this.restartOffset);
                    this.start();
                } else {
                    S3V2StreamingQueryResults.this.needsRestart = true;
                    logger.warn("Query execution {} timed out reading results, will restart at {} when more items are requested", S3V2StreamingQueryResults.this.queryExecutionId, this.restartOffset);
                }
            } else {
                Throwable actualError = error;
                if (error instanceof UncheckedParseException) {
                    actualError = actualError.getCause();
                }
                String message = String.format("Could not process query results: %s", actualError.getMessage());
                this.actualSubscriber.onError(new QueryResultException(S3V2StreamingQueryResults.this.queryExecution, message, actualError));
            }
        }

        private boolean isExpectedMissingQueryResults(Throwable responseError) {
            return responseError instanceof S3Exception && ((S3Exception)responseError).statusCode() == 404 && this.isStatementWithoutQueryResults(S3V2StreamingQueryResults.this.queryExecution);
        }

        private boolean isStatementWithoutQueryResults(QueryExecution queryExecution) {
            return queryExecution.statementType() == StatementType.DDL || queryExecution.statementType() == StatementType.DML && this.DML_SUBSTATEMENT_TYPES_WITHOUT_OUTPUT_OBJECTS.contains(queryExecution.substatementType());
        }

        @Override
        public void onComplete() {
            try {
                this.tempRowBuffer.clear();
                this.parser.finish(this.actualSubscriber::onNext);
                this.actualSubscriber.onComplete();
                logger.info("Query execution {} finished reading results", S3V2StreamingQueryResults.this.queryExecution.queryExecutionId());
            }
            catch (ParseException e) {
                this.onError(e);
            }
        }

        @Override
        public void request(long itemCount) {
            this.requestCountBalance += itemCount;
            if (S3V2StreamingQueryResults.this.needsRestart) {
                this.start();
            } else {
                this.currentSubscription.request(itemCount);
            }
        }

        @Override
        public void cancel() {
            this.currentSubscription.cancel();
        }
    }
}

