/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.file.src;

import java.io.IOException;
import java.time.Duration;
import java.util.Collection;
import java.util.HashSet;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.connector.file.src.ContinuousEnumerationSettings;
import org.apache.flink.connector.file.src.FileSourceSplit;
import org.apache.flink.connector.file.src.PendingSplitsCheckpoint;
import org.apache.flink.connector.file.src.PendingSplitsCheckpointSerializer;
import org.apache.flink.connector.file.src.assigners.FileSplitAssigner;
import org.apache.flink.connector.file.src.enumerate.FileEnumerator;
import org.apache.flink.connector.file.src.impl.ContinuousFileSplitEnumerator;
import org.apache.flink.connector.file.src.impl.FileSourceReader;
import org.apache.flink.connector.file.src.impl.StaticFileSplitEnumerator;
import org.apache.flink.connector.file.src.reader.BulkFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;

@PublicEvolving
public abstract class AbstractFileSource<T, SplitT extends FileSourceSplit>
implements Source<T, SplitT, PendingSplitsCheckpoint<SplitT>>,
ResultTypeQueryable<T> {
    private static final long serialVersionUID = 1L;
    private final Path[] inputPaths;
    private final FileEnumerator.Provider enumeratorFactory;
    private final FileSplitAssigner.Provider assignerFactory;
    private final BulkFormat<T, SplitT> readerFormat;
    @Nullable
    private final ContinuousEnumerationSettings continuousEnumerationSettings;

    protected AbstractFileSource(Path[] inputPaths, FileEnumerator.Provider fileEnumerator, FileSplitAssigner.Provider splitAssigner, BulkFormat<T, SplitT> readerFormat, @Nullable ContinuousEnumerationSettings continuousEnumerationSettings) {
        Preconditions.checkArgument(inputPaths.length > 0);
        this.inputPaths = inputPaths;
        this.enumeratorFactory = Preconditions.checkNotNull(fileEnumerator);
        this.assignerFactory = Preconditions.checkNotNull(splitAssigner);
        this.readerFormat = Preconditions.checkNotNull(readerFormat);
        this.continuousEnumerationSettings = continuousEnumerationSettings;
    }

    public FileSplitAssigner.Provider getAssignerFactory() {
        return this.assignerFactory;
    }

    @Nullable
    public ContinuousEnumerationSettings getContinuousEnumerationSettings() {
        return this.continuousEnumerationSettings;
    }

    @Override
    public Boundedness getBoundedness() {
        return this.continuousEnumerationSettings == null ? Boundedness.BOUNDED : Boundedness.CONTINUOUS_UNBOUNDED;
    }

    @Override
    public SourceReader<T, SplitT> createReader(SourceReaderContext readerContext) {
        return new FileSourceReader<T, SplitT>(readerContext, this.readerFormat, readerContext.getConfiguration());
    }

    @Override
    public SplitEnumerator<SplitT, PendingSplitsCheckpoint<SplitT>> createEnumerator(SplitEnumeratorContext<SplitT> enumContext) {
        Collection<FileSourceSplit> splits;
        FileEnumerator enumerator = this.enumeratorFactory.create();
        try {
            splits = enumerator.enumerateSplits(this.inputPaths, enumContext.currentParallelism());
        }
        catch (IOException e) {
            throw new FlinkRuntimeException("Could not enumerate file splits", e);
        }
        return this.createSplitEnumerator(enumContext, enumerator, splits, null);
    }

    @Override
    public SplitEnumerator<SplitT, PendingSplitsCheckpoint<SplitT>> restoreEnumerator(SplitEnumeratorContext<SplitT> enumContext, PendingSplitsCheckpoint<SplitT> checkpoint) {
        FileEnumerator enumerator = this.enumeratorFactory.create();
        Collection<FileSourceSplit> splits = checkpoint.getSplits();
        return this.createSplitEnumerator(enumContext, enumerator, splits, checkpoint.getAlreadyProcessedPaths());
    }

    @Override
    public abstract SimpleVersionedSerializer<SplitT> getSplitSerializer();

    @Override
    public SimpleVersionedSerializer<PendingSplitsCheckpoint<SplitT>> getEnumeratorCheckpointSerializer() {
        return new PendingSplitsCheckpointSerializer<SplitT>(this.getSplitSerializer());
    }

    @Override
    public TypeInformation<T> getProducedType() {
        return this.readerFormat.getProducedType();
    }

    private SplitEnumerator<SplitT, PendingSplitsCheckpoint<SplitT>> createSplitEnumerator(SplitEnumeratorContext<SplitT> context, FileEnumerator enumerator, Collection<FileSourceSplit> splits, @Nullable Collection<Path> alreadyProcessedPaths) {
        SplitEnumeratorContext<FileSourceSplit> fileSplitContext = context;
        FileSplitAssigner splitAssigner = this.assignerFactory.create(splits);
        if (this.continuousEnumerationSettings == null) {
            return this.castGeneric(new StaticFileSplitEnumerator(fileSplitContext, splitAssigner));
        }
        if (alreadyProcessedPaths == null) {
            alreadyProcessedPaths = AbstractFileSource.splitsToPaths(splits);
        }
        return this.castGeneric(new ContinuousFileSplitEnumerator(fileSplitContext, enumerator, splitAssigner, this.inputPaths, alreadyProcessedPaths, this.continuousEnumerationSettings.getDiscoveryInterval().toMillis()));
    }

    private SplitEnumerator<SplitT, PendingSplitsCheckpoint<SplitT>> castGeneric(SplitEnumerator<FileSourceSplit, PendingSplitsCheckpoint<FileSourceSplit>> enumerator) {
        return enumerator;
    }

    private static Collection<Path> splitsToPaths(Collection<FileSourceSplit> splits) {
        return splits.stream().map(FileSourceSplit::path).collect(Collectors.toCollection(HashSet::new));
    }

    protected static abstract class AbstractFileSourceBuilder<T, SplitT extends FileSourceSplit, SELF extends AbstractFileSourceBuilder<T, SplitT, SELF>> {
        protected final Path[] inputPaths;
        protected final BulkFormat<T, SplitT> readerFormat;
        protected FileEnumerator.Provider fileEnumerator;
        protected FileSplitAssigner.Provider splitAssigner;
        @Nullable
        protected ContinuousEnumerationSettings continuousSourceSettings;

        protected AbstractFileSourceBuilder(Path[] inputPaths, BulkFormat<T, SplitT> readerFormat, FileEnumerator.Provider defaultFileEnumerator, FileSplitAssigner.Provider defaultSplitAssigner) {
            this.inputPaths = Preconditions.checkNotNull(inputPaths);
            this.readerFormat = Preconditions.checkNotNull(readerFormat);
            this.fileEnumerator = defaultFileEnumerator;
            this.splitAssigner = defaultSplitAssigner;
        }

        public abstract AbstractFileSource<T, SplitT> build();

        public SELF monitorContinuously(Duration discoveryInterval) {
            Preconditions.checkNotNull(discoveryInterval, "discoveryInterval");
            Preconditions.checkArgument(!discoveryInterval.isNegative() && !discoveryInterval.isZero(), "discoveryInterval must be > 0");
            this.continuousSourceSettings = new ContinuousEnumerationSettings(discoveryInterval);
            return this.self();
        }

        public SELF processStaticFileSet() {
            this.continuousSourceSettings = null;
            return this.self();
        }

        public SELF setFileEnumerator(FileEnumerator.Provider fileEnumerator) {
            this.fileEnumerator = Preconditions.checkNotNull(fileEnumerator);
            return this.self();
        }

        public SELF setSplitAssigner(FileSplitAssigner.Provider splitAssigner) {
            this.splitAssigner = Preconditions.checkNotNull(splitAssigner);
            return this.self();
        }

        private SELF self() {
            return (SELF)this;
        }
    }
}

