/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler.adaptive;

import java.time.Duration;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledFuture;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.checkpoint.CheckpointScheduling;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
import org.apache.flink.runtime.scheduler.adaptive.FailureResult;
import org.apache.flink.runtime.scheduler.adaptive.FailureResultUtil;
import org.apache.flink.runtime.scheduler.adaptive.State;
import org.apache.flink.runtime.scheduler.adaptive.StateFactory;
import org.apache.flink.runtime.scheduler.adaptive.StateTransitions;
import org.apache.flink.runtime.scheduler.adaptive.StateWithExecutionGraph;
import org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntry;
import org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointStoppingException;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;
import org.slf4j.Logger;

class StopWithSavepoint
extends StateWithExecutionGraph {
    private final Context context;
    private final CompletableFuture<String> operationFuture;
    private final CheckpointScheduling checkpointScheduling;
    private boolean hasFullyFinished = false;
    @Nullable
    private String savepoint = null;
    @Nullable
    private Throwable operationFailureCause;

    StopWithSavepoint(Context context, ExecutionGraph executionGraph, ExecutionGraphHandler executionGraphHandler, OperatorCoordinatorHandler operatorCoordinatorHandler, CheckpointScheduling checkpointScheduling, Logger logger, ClassLoader userCodeClassLoader, CompletableFuture<String> savepointFuture, List<ExceptionHistoryEntry> failureCollection) {
        super(context, executionGraph, executionGraphHandler, operatorCoordinatorHandler, logger, userCodeClassLoader, failureCollection);
        this.context = context;
        this.checkpointScheduling = checkpointScheduling;
        this.operationFuture = new CompletableFuture();
        FutureUtils.assertNoException(savepointFuture.handle((savepointLocation, throwable) -> {
            context.runIfState(this, () -> this.handleSavepointCompletion((String)savepointLocation, (Throwable)throwable), Duration.ZERO);
            return null;
        }));
    }

    private void handleSavepointCompletion(@Nullable String savepoint, @Nullable Throwable throwable) {
        if (this.hasFullyFinished) {
            Preconditions.checkState(throwable == null, "A savepoint should never fail after a job has been terminated via stop-with-savepoint.");
            this.completeOperationAndGoToFinished(savepoint);
        } else if (throwable != null) {
            this.operationFailureCause = throwable;
            this.checkpointScheduling.startCheckpointScheduler();
            this.context.goToExecuting(this.getExecutionGraph(), this.getExecutionGraphHandler(), this.getOperatorCoordinatorHandler(), this.getFailures());
        } else {
            this.savepoint = savepoint;
        }
    }

    @Override
    public void onLeave(Class<? extends State> newState) {
        this.operationFuture.completeExceptionally(new FlinkException("Stop with savepoint operation could not be completed.", this.operationFailureCause));
        super.onLeave(newState);
    }

    @Override
    public void cancel() {
        this.context.goToCanceling(this.getExecutionGraph(), this.getExecutionGraphHandler(), this.getOperatorCoordinatorHandler(), this.getFailures());
    }

    @Override
    public JobStatus getJobStatus() {
        return JobStatus.RUNNING;
    }

    @Override
    void onFailure(Throwable cause) {
        this.operationFailureCause = cause;
        if (this.savepoint == null) {
            FailureResultUtil.restartOrFail(this.context.howToHandleFailure(cause), this.context, this);
        } else {
            StopWithSavepointStoppingException ex = new StopWithSavepointStoppingException(this.savepoint, this.getJobId(), cause);
            this.operationFuture.completeExceptionally(ex);
            FailureResultUtil.restartOrFail(this.context.howToHandleFailure(ex), this.context, this);
        }
    }

    @Override
    void onGloballyTerminalState(JobStatus globallyTerminalState) {
        if (globallyTerminalState == JobStatus.FINISHED) {
            if (this.savepoint == null) {
                this.hasFullyFinished = true;
            } else {
                this.completeOperationAndGoToFinished(this.savepoint);
            }
        } else {
            this.handleGlobalFailure(new FlinkException("Job did not reach the FINISHED state while performing stop-with-savepoint."));
        }
    }

    private void completeOperationAndGoToFinished(String savepoint) {
        this.operationFuture.complete(savepoint);
        this.context.goToFinished(ArchivedExecutionGraph.createFrom(this.getExecutionGraph()));
    }

    CompletableFuture<String> getOperationFuture() {
        return this.operationFuture;
    }

    static class Factory
    implements StateFactory<StopWithSavepoint> {
        private final Context context;
        private final ExecutionGraph executionGraph;
        private final ExecutionGraphHandler executionGraphHandler;
        private final OperatorCoordinatorHandler operatorCoordinatorHandler;
        private final CheckpointScheduling checkpointScheduling;
        private final Logger logger;
        private final ClassLoader userCodeClassLoader;
        private final CompletableFuture<String> savepointFuture;
        private final List<ExceptionHistoryEntry> failureCollection;

        Factory(Context context, ExecutionGraph executionGraph, ExecutionGraphHandler executionGraphHandler, OperatorCoordinatorHandler operatorCoordinatorHandler, CheckpointScheduling checkpointScheduling, Logger logger, ClassLoader userCodeClassLoader, CompletableFuture<String> savepointFuture, List<ExceptionHistoryEntry> failureCollection) {
            this.context = context;
            this.executionGraph = executionGraph;
            this.executionGraphHandler = executionGraphHandler;
            this.operatorCoordinatorHandler = operatorCoordinatorHandler;
            this.checkpointScheduling = checkpointScheduling;
            this.logger = logger;
            this.userCodeClassLoader = userCodeClassLoader;
            this.savepointFuture = savepointFuture;
            this.failureCollection = failureCollection;
        }

        @Override
        public Class<StopWithSavepoint> getStateClass() {
            return StopWithSavepoint.class;
        }

        @Override
        public StopWithSavepoint getState() {
            return new StopWithSavepoint(this.context, this.executionGraph, this.executionGraphHandler, this.operatorCoordinatorHandler, this.checkpointScheduling, this.logger, this.userCodeClassLoader, this.savepointFuture, this.failureCollection);
        }
    }

    static interface Context
    extends StateWithExecutionGraph.Context,
    StateTransitions.ToCancelling,
    StateTransitions.ToExecuting,
    StateTransitions.ToFailing,
    StateTransitions.ToRestarting {
        public FailureResult howToHandleFailure(Throwable var1);

        public ScheduledFuture<?> runIfState(State var1, Runnable var2, Duration var3);
    }
}

