/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.jobgraph.RestoreMode;
import org.apache.flink.runtime.state.CompositeStateHandle;
import org.apache.flink.runtime.state.PlaceholderStreamStateHandle;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.SharedStateRegistryKey;
import org.apache.flink.runtime.state.StateObject;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.Executors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class SharedStateRegistryImpl
implements SharedStateRegistry {
    private static final Logger LOG = LoggerFactory.getLogger(SharedStateRegistryImpl.class);
    private final Map<SharedStateRegistryKey, SharedStateEntry> registeredStates = new HashMap<SharedStateRegistryKey, SharedStateEntry>();
    private boolean open;
    private final Executor asyncDisposalExecutor;
    private long highestNotClaimedCheckpointID = -1L;

    public SharedStateRegistryImpl() {
        this(Executors.directExecutor());
    }

    public SharedStateRegistryImpl(Executor asyncDisposalExecutor) {
        this.asyncDisposalExecutor = Preconditions.checkNotNull(asyncDisposalExecutor);
        this.open = true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public StreamStateHandle registerReference(SharedStateRegistryKey registrationKey, StreamStateHandle state, long checkpointID) {
        SharedStateEntry entry;
        Preconditions.checkNotNull(state);
        StreamStateHandle scheduledStateDeletion = null;
        Map<SharedStateRegistryKey, SharedStateEntry> map = this.registeredStates;
        synchronized (map) {
            Preconditions.checkState(this.open, "Attempt to register state to closed SharedStateRegistry.");
            entry = this.registeredStates.get(registrationKey);
            if (entry == null) {
                Preconditions.checkState(!this.isPlaceholder(state), "Attempt to reference unknown state: " + registrationKey);
                entry = new SharedStateEntry(state, checkpointID);
                this.registeredStates.put(registrationKey, entry);
                LOG.trace("Registered new shared state {} under key {}.", (Object)entry, (Object)registrationKey);
            } else {
                if (!Objects.equals(state, entry.stateHandle)) {
                    if (entry.confirmed || this.isPlaceholder(state)) {
                        scheduledStateDeletion = state;
                    } else {
                        scheduledStateDeletion = entry.stateHandle;
                        entry.stateHandle = state;
                    }
                    LOG.trace("Identified duplicate state registration under key {}. New state {} was determined to be an unnecessary copy of existing state {} and will be dropped.", new Object[]{registrationKey, state, entry.stateHandle});
                }
                LOG.trace("Updating last checkpoint for {} from {} to {}", new Object[]{registrationKey, entry.lastUsedCheckpointID, checkpointID});
                entry.advanceLastUsingCheckpointID(checkpointID);
            }
        }
        this.scheduleAsyncDelete(scheduledStateDeletion);
        return entry.stateHandle;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void unregisterUnusedState(long lowestCheckpointID) {
        LOG.debug("Discard state created before checkpoint {} and not used afterwards", (Object)lowestCheckpointID);
        ArrayList<StreamStateHandle> subsumed = new ArrayList<StreamStateHandle>();
        Map<SharedStateRegistryKey, SharedStateEntry> map = this.registeredStates;
        synchronized (map) {
            Iterator<SharedStateEntry> it = this.registeredStates.values().iterator();
            while (it.hasNext()) {
                SharedStateEntry entry = it.next();
                if (entry.lastUsedCheckpointID >= lowestCheckpointID) continue;
                if (entry.createdByCheckpointID > this.highestNotClaimedCheckpointID) {
                    subsumed.add(entry.stateHandle);
                }
                it.remove();
            }
        }
        LOG.trace("Discard {} state asynchronously", (Object)subsumed.size());
        for (StreamStateHandle handle : subsumed) {
            this.scheduleAsyncDelete(handle);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void registerAll(Iterable<? extends CompositeStateHandle> stateHandles, long checkpointID) {
        if (stateHandles == null) {
            return;
        }
        Map<SharedStateRegistryKey, SharedStateEntry> map = this.registeredStates;
        synchronized (map) {
            for (CompositeStateHandle compositeStateHandle : stateHandles) {
                compositeStateHandle.registerSharedStates(this, checkpointID);
            }
        }
    }

    @Override
    public void registerAllAfterRestored(CompletedCheckpoint checkpoint, RestoreMode mode) {
        this.registerAll(checkpoint.getOperatorStates().values(), checkpoint.getCheckpointID());
        if (mode != RestoreMode.CLAIM) {
            this.highestNotClaimedCheckpointID = Math.max(this.highestNotClaimedCheckpointID, checkpoint.getCheckpointID());
        }
    }

    @Override
    public void checkpointCompleted(long checkpointId) {
        for (SharedStateEntry entry : this.registeredStates.values()) {
            if (entry.lastUsedCheckpointID != checkpointId) continue;
            entry.confirmed = true;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public String toString() {
        Map<SharedStateRegistryKey, SharedStateEntry> map = this.registeredStates;
        synchronized (map) {
            return "SharedStateRegistry{registeredStates=" + this.registeredStates + '}';
        }
    }

    private void scheduleAsyncDelete(StreamStateHandle streamStateHandle) {
        if (streamStateHandle != null && !this.isPlaceholder(streamStateHandle)) {
            LOG.trace("Scheduled delete of state handle {}.", (Object)streamStateHandle);
            AsyncDisposalRunnable asyncDisposalRunnable = new AsyncDisposalRunnable(streamStateHandle);
            try {
                this.asyncDisposalExecutor.execute(asyncDisposalRunnable);
            }
            catch (RejectedExecutionException ex) {
                asyncDisposalRunnable.run();
            }
        }
    }

    private boolean isPlaceholder(StreamStateHandle stateHandle) {
        return stateHandle instanceof PlaceholderStreamStateHandle;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() {
        Map<SharedStateRegistryKey, SharedStateEntry> map = this.registeredStates;
        synchronized (map) {
            this.open = false;
        }
    }

    private static final class SharedStateEntry {
        StreamStateHandle stateHandle;
        private final long createdByCheckpointID;
        private long lastUsedCheckpointID;
        private boolean confirmed;

        SharedStateEntry(StreamStateHandle value, long checkpointID) {
            this.stateHandle = value;
            this.createdByCheckpointID = checkpointID;
            this.lastUsedCheckpointID = checkpointID;
        }

        public String toString() {
            return "SharedStateEntry{stateHandle=" + this.stateHandle + ", createdByCheckpointID=" + this.createdByCheckpointID + ", lastUsedCheckpointID=" + this.lastUsedCheckpointID + '}';
        }

        private void advanceLastUsingCheckpointID(long checkpointID) {
            this.lastUsedCheckpointID = Math.max(checkpointID, this.lastUsedCheckpointID);
        }
    }

    private static final class AsyncDisposalRunnable
    implements Runnable {
        private final StateObject toDispose;

        public AsyncDisposalRunnable(StateObject toDispose) {
            this.toDispose = Preconditions.checkNotNull(toDispose);
        }

        @Override
        public void run() {
            try {
                this.toDispose.discardState();
            }
            catch (Exception e) {
                LOG.warn("A problem occurred during asynchronous disposal of a shared state object: {}", (Object)this.toDispose, (Object)e);
            }
        }
    }
}

