/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators;

import java.io.IOException;
import java.util.Map;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.io.PostVersionedIOReadableWritable;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl;
import org.apache.flink.streaming.api.operators.InternalTimerServiceImpl;
import org.apache.flink.streaming.api.operators.InternalTimersSnapshot;
import org.apache.flink.streaming.api.operators.InternalTimersSnapshotReaderWriters;
import org.apache.flink.streaming.api.operators.TimerSerializer;
import org.apache.flink.util.Preconditions;

@Internal
public class InternalTimerServiceSerializationProxy<K>
extends PostVersionedIOReadableWritable {
    public static final int VERSION = 2;
    private final InternalTimeServiceManagerImpl<K> timerServicesManager;
    private ClassLoader userCodeClassLoader;
    private final int keyGroupIdx;

    public InternalTimerServiceSerializationProxy(InternalTimeServiceManagerImpl<K> timerServicesManager, ClassLoader userCodeClassLoader, int keyGroupIdx) {
        this.timerServicesManager = Preconditions.checkNotNull(timerServicesManager);
        this.userCodeClassLoader = Preconditions.checkNotNull(userCodeClassLoader);
        this.keyGroupIdx = keyGroupIdx;
    }

    public InternalTimerServiceSerializationProxy(InternalTimeServiceManagerImpl<K> timerServicesManager, int keyGroupIdx) {
        this.timerServicesManager = Preconditions.checkNotNull(timerServicesManager);
        this.keyGroupIdx = keyGroupIdx;
    }

    @Override
    public int getVersion() {
        return 2;
    }

    @Override
    public int[] getCompatibleVersions() {
        return new int[]{2, 1};
    }

    @Override
    public void write(DataOutputView out) throws IOException {
        super.write(out);
        Map<String, InternalTimerServiceImpl<K, ?>> registeredTimerServices = this.timerServicesManager.getRegisteredTimerServices();
        out.writeInt(registeredTimerServices.size());
        for (Map.Entry<String, InternalTimerServiceImpl<K, ?>> entry : registeredTimerServices.entrySet()) {
            String serviceName = entry.getKey();
            InternalTimerServiceImpl<K, ?> timerService = entry.getValue();
            out.writeUTF(serviceName);
            InternalTimersSnapshotReaderWriters.getWriterForVersion(2, timerService.snapshotTimersForKeyGroup(this.keyGroupIdx), timerService.getKeySerializer(), timerService.getNamespaceSerializer()).writeTimersSnapshot(out);
        }
    }

    @Override
    protected void read(DataInputView in, boolean wasVersioned) throws IOException {
        int noOfTimerServices = in.readInt();
        for (int i = 0; i < noOfTimerServices; ++i) {
            String serviceName = in.readUTF();
            int readerVersion = wasVersioned ? this.getReadVersion() : Integer.MIN_VALUE;
            InternalTimersSnapshot restoredTimersSnapshot = InternalTimersSnapshotReaderWriters.getReaderForVersion(readerVersion, this.userCodeClassLoader).readTimersSnapshot(in);
            InternalTimerServiceImpl timerService = this.registerOrGetTimerService(serviceName, restoredTimersSnapshot);
            timerService.restoreTimersForKeyGroup(restoredTimersSnapshot, this.keyGroupIdx);
        }
    }

    private <N> InternalTimerServiceImpl<K, N> registerOrGetTimerService(String serviceName, InternalTimersSnapshot<?, ?> restoredTimersSnapshot) {
        TypeSerializer<?> keySerializer = restoredTimersSnapshot.getKeySerializerSnapshot().restoreSerializer();
        TypeSerializer<?> namespaceSerializer = restoredTimersSnapshot.getNamespaceSerializerSnapshot().restoreSerializer();
        TimerSerializer timerSerializer = new TimerSerializer(keySerializer, namespaceSerializer);
        return this.timerServicesManager.registerOrGetTimerService(serviceName, timerSerializer);
    }
}

