/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.storage.internals.log;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.stream.Stream;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Message;
import org.apache.kafka.common.protocol.MessageUtil;
import org.apache.kafka.common.protocol.Readable;
import org.apache.kafka.common.utils.ByteUtils;
import org.apache.kafka.common.utils.Crc32C;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.log.remote.metadata.storage.generated.ProducerSnapshot;
import org.apache.kafka.storage.internals.log.AppendOrigin;
import org.apache.kafka.storage.internals.log.BatchMetadata;
import org.apache.kafka.storage.internals.log.CompletedTxn;
import org.apache.kafka.storage.internals.log.CorruptSnapshotException;
import org.apache.kafka.storage.internals.log.LogFileUtils;
import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
import org.apache.kafka.storage.internals.log.ProducerAppendInfo;
import org.apache.kafka.storage.internals.log.ProducerStateEntry;
import org.apache.kafka.storage.internals.log.ProducerStateManagerConfig;
import org.apache.kafka.storage.internals.log.SnapshotFile;
import org.apache.kafka.storage.internals.log.TxnMetadata;
import org.apache.kafka.storage.internals.log.VerificationStateEntry;
import org.slf4j.Logger;

public class ProducerStateManager {
    public static final long LATE_TRANSACTION_BUFFER_MS = 300000L;
    private static final int VERSION_OFFSET = 0;
    private static final int CRC_OFFSET = 2;
    private static final int PRODUCER_ENTRIES_OFFSET = 6;
    private final Logger log;
    private final TopicPartition topicPartition;
    private final int maxTransactionTimeoutMs;
    private final ProducerStateManagerConfig producerStateManagerConfig;
    private final Time time;
    private final Map<Long, ProducerStateEntry> producers = new HashMap<Long, ProducerStateEntry>();
    private final Map<Long, VerificationStateEntry> verificationStates = new HashMap<Long, VerificationStateEntry>();
    private final TreeMap<Long, TxnMetadata> ongoingTxns = new TreeMap();
    private final TreeMap<Long, TxnMetadata> unreplicatedTxns = new TreeMap();
    private volatile File logDir;
    private volatile int producerIdCount = 0;
    private volatile long oldestTxnLastTimestamp = -1L;
    private ConcurrentSkipListMap<Long, SnapshotFile> snapshots;
    private long lastMapOffset = 0L;
    private long lastSnapOffset = 0L;

    public ProducerStateManager(TopicPartition topicPartition, File logDir, int maxTransactionTimeoutMs, ProducerStateManagerConfig producerStateManagerConfig, Time time) throws IOException {
        this.topicPartition = topicPartition;
        this.logDir = logDir;
        this.maxTransactionTimeoutMs = maxTransactionTimeoutMs;
        this.producerStateManagerConfig = producerStateManagerConfig;
        this.time = time;
        this.log = new LogContext("[ProducerStateManager partition=" + String.valueOf(topicPartition) + "] ").logger(ProducerStateManager.class);
        this.snapshots = this.loadSnapshots();
    }

    public int maxTransactionTimeoutMs() {
        return this.maxTransactionTimeoutMs;
    }

    public ProducerStateManagerConfig producerStateManagerConfig() {
        return this.producerStateManagerConfig;
    }

    public boolean hasLateTransaction(long currentTimeMs) {
        long lastTimestamp = this.oldestTxnLastTimestamp;
        return lastTimestamp > 0L && currentTimeMs - lastTimestamp > (long)this.maxTransactionTimeoutMs + 300000L;
    }

    public void truncateFullyAndReloadSnapshots() throws IOException {
        this.log.info("Reloading the producer state snapshots");
        this.truncateFullyAndStartAt(0L);
        this.snapshots = this.loadSnapshots();
    }

    public int producerIdCount() {
        return this.producerIdCount;
    }

    private void addProducerId(long producerId, ProducerStateEntry entry) {
        this.producers.put(producerId, entry);
        this.producerIdCount = this.producers.size();
    }

    private void clearProducerIds() {
        this.producers.clear();
        this.producerIdCount = 0;
    }

    public VerificationStateEntry maybeCreateVerificationStateEntry(long producerId, int sequence, short epoch, boolean supportsEpochBump) {
        VerificationStateEntry entry = this.verificationStates.computeIfAbsent(producerId, pid -> new VerificationStateEntry(this.time.milliseconds(), sequence, epoch, supportsEpochBump));
        entry.maybeUpdateLowestSequenceAndEpoch(sequence, epoch);
        return entry;
    }

    public VerificationStateEntry verificationStateEntry(long producerId) {
        return this.verificationStates.get(producerId);
    }

    public void clearVerificationStateEntry(long producerId) {
        this.verificationStates.remove(producerId);
    }

    private ConcurrentSkipListMap<Long, SnapshotFile> loadSnapshots() throws IOException {
        ConcurrentSkipListMap<Long, SnapshotFile> offsetToSnapshots = new ConcurrentSkipListMap<Long, SnapshotFile>();
        List<SnapshotFile> snapshotFiles = ProducerStateManager.listSnapshotFiles(this.logDir);
        for (SnapshotFile snapshotFile : snapshotFiles) {
            offsetToSnapshots.put(snapshotFile.offset, snapshotFile);
        }
        return offsetToSnapshots;
    }

    public void removeStraySnapshots(Collection<Long> segmentBaseOffsets) throws IOException {
        SnapshotFile removedSnapshot;
        long maxOffset;
        long strayOffset;
        OptionalLong maxSegmentBaseOffset = segmentBaseOffsets.isEmpty() ? OptionalLong.empty() : OptionalLong.of(segmentBaseOffsets.stream().max(Long::compare).get());
        HashSet<Long> baseOffsets = new HashSet<Long>(segmentBaseOffsets);
        Optional<Object> latestStraySnapshot = Optional.empty();
        ConcurrentSkipListMap<Long, SnapshotFile> snapshots = this.loadSnapshots();
        for (SnapshotFile snapshot : snapshots.values()) {
            long key = snapshot.offset;
            if (latestStraySnapshot.isPresent()) {
                SnapshotFile prev = (SnapshotFile)latestStraySnapshot.get();
                if (baseOffsets.contains(key)) continue;
                prev.deleteIfExists();
                snapshots.remove(prev.offset);
                latestStraySnapshot = Optional.of(snapshot);
                continue;
            }
            if (baseOffsets.contains(key)) continue;
            latestStraySnapshot = Optional.of(snapshot);
        }
        if (latestStraySnapshot.isPresent() && maxSegmentBaseOffset.isPresent() && (strayOffset = ((SnapshotFile)latestStraySnapshot.get()).offset) < (maxOffset = maxSegmentBaseOffset.getAsLong()) && (removedSnapshot = snapshots.remove(strayOffset)) != null) {
            removedSnapshot.deleteIfExists();
        }
        this.snapshots = snapshots;
    }

    public Optional<LogOffsetMetadata> firstUnstableOffset() {
        Optional<LogOffsetMetadata> unreplicatedFirstOffset = Optional.ofNullable(this.unreplicatedTxns.firstEntry()).map(e -> ((TxnMetadata)e.getValue()).firstOffset);
        Optional<LogOffsetMetadata> undecidedFirstOffset = Optional.ofNullable(this.ongoingTxns.firstEntry()).map(e -> ((TxnMetadata)e.getValue()).firstOffset);
        if (unreplicatedFirstOffset.isEmpty()) {
            return undecidedFirstOffset;
        }
        if (undecidedFirstOffset.isEmpty()) {
            return unreplicatedFirstOffset;
        }
        if (undecidedFirstOffset.get().messageOffset < unreplicatedFirstOffset.get().messageOffset) {
            return undecidedFirstOffset;
        }
        return unreplicatedFirstOffset;
    }

    public void onHighWatermarkUpdated(long highWatermark) {
        this.removeUnreplicatedTransactions(highWatermark);
    }

    public OptionalLong firstUndecidedOffset() {
        Map.Entry<Long, TxnMetadata> firstEntry = this.ongoingTxns.firstEntry();
        return firstEntry != null ? OptionalLong.of(firstEntry.getValue().firstOffset.messageOffset) : OptionalLong.empty();
    }

    public long mapEndOffset() {
        return this.lastMapOffset;
    }

    public Map<Long, ProducerStateEntry> activeProducers() {
        return Collections.unmodifiableMap(this.producers);
    }

    public boolean isEmpty() {
        return this.producers.isEmpty() && this.unreplicatedTxns.isEmpty();
    }

    private void loadFromSnapshot(long logStartOffset, long currentTime) throws IOException {
        Optional<SnapshotFile> latestSnapshotFileOptional;
        while ((latestSnapshotFileOptional = this.latestSnapshotFile()).isPresent()) {
            SnapshotFile snapshot = latestSnapshotFileOptional.get();
            try {
                this.log.info("Loading producer state from snapshot file '{}'", (Object)snapshot);
                Stream<ProducerStateEntry> loadedProducers = ProducerStateManager.readSnapshot(snapshot.file()).stream().filter(producerEntry -> !this.isProducerExpired(currentTime, (ProducerStateEntry)producerEntry));
                loadedProducers.forEach(this::loadProducerEntry);
                this.lastMapOffset = this.lastSnapOffset = snapshot.offset;
                this.updateOldestTxnTimestamp();
                return;
            }
            catch (CorruptSnapshotException e) {
                this.log.warn("Failed to load producer snapshot from '{}': {}", (Object)snapshot.file(), (Object)e.getMessage());
                this.removeAndDeleteSnapshot(snapshot.offset);
            }
        }
        this.lastSnapOffset = logStartOffset;
        this.lastMapOffset = logStartOffset;
    }

    public void loadProducerEntry(ProducerStateEntry entry) {
        long producerId = entry.producerId();
        this.addProducerId(producerId, entry);
        entry.currentTxnFirstOffset().ifPresent(offset -> this.ongoingTxns.put(offset, new TxnMetadata(producerId, offset)));
    }

    private boolean isProducerExpired(long currentTimeMs, ProducerStateEntry producerState) {
        return producerState.currentTxnFirstOffset().isEmpty() && currentTimeMs - producerState.lastTimestamp() >= (long)this.producerStateManagerConfig.producerIdExpirationMs();
    }

    public void removeExpiredProducers(long currentTimeMs) {
        this.producers.entrySet().removeIf(entry -> this.isProducerExpired(currentTimeMs, (ProducerStateEntry)entry.getValue()));
        this.producerIdCount = this.producers.size();
        this.verificationStates.entrySet().removeIf(entry -> currentTimeMs - ((VerificationStateEntry)entry.getValue()).timestamp() >= (long)this.producerStateManagerConfig.producerIdExpirationMs());
    }

    public void truncateAndReload(long logStartOffset, long logEndOffset, long currentTimeMs) throws IOException {
        for (SnapshotFile snapshot : this.snapshots.values()) {
            if (snapshot.offset <= logEndOffset && snapshot.offset > logStartOffset) continue;
            this.removeAndDeleteSnapshot(snapshot.offset);
        }
        if (logEndOffset != this.mapEndOffset()) {
            this.clearProducerIds();
            this.ongoingTxns.clear();
            this.updateOldestTxnTimestamp();
            this.unreplicatedTxns.clear();
            this.loadFromSnapshot(logStartOffset, currentTimeMs);
        } else {
            this.onLogStartOffsetIncremented(logStartOffset);
        }
    }

    public ProducerAppendInfo prepareUpdate(long producerId, AppendOrigin origin) {
        ProducerStateEntry currentEntry = this.lastEntry(producerId).orElse(ProducerStateEntry.empty(producerId));
        return new ProducerAppendInfo(this.topicPartition, producerId, currentEntry, origin, this.verificationStateEntry(producerId));
    }

    public void update(ProducerAppendInfo appendInfo) {
        if (appendInfo.producerId() == -1L) {
            throw new IllegalArgumentException("Invalid producer id " + appendInfo.producerId() + " passed to update for partition" + String.valueOf(this.topicPartition));
        }
        this.log.trace("Updated producer {} state to {}", (Object)appendInfo.producerId(), (Object)appendInfo);
        ProducerStateEntry updatedEntry = appendInfo.toEntry();
        ProducerStateEntry currentEntry = this.producers.get(appendInfo.producerId());
        if (currentEntry != null) {
            currentEntry.update(updatedEntry);
        } else {
            this.addProducerId(appendInfo.producerId(), updatedEntry);
        }
        appendInfo.startedTransactions().forEach(txn -> this.ongoingTxns.put(txn.firstOffset.messageOffset, (TxnMetadata)txn));
        this.updateOldestTxnTimestamp();
    }

    private void updateOldestTxnTimestamp() {
        Map.Entry<Long, TxnMetadata> firstEntry = this.ongoingTxns.firstEntry();
        if (firstEntry == null) {
            this.oldestTxnLastTimestamp = -1L;
        } else {
            TxnMetadata oldestTxnMetadata = firstEntry.getValue();
            ProducerStateEntry entry = this.producers.get(oldestTxnMetadata.producerId);
            this.oldestTxnLastTimestamp = entry != null ? entry.lastTimestamp() : -1L;
        }
    }

    public void updateMapEndOffset(long lastOffset) {
        this.lastMapOffset = lastOffset;
    }

    public Optional<ProducerStateEntry> lastEntry(long producerId) {
        return Optional.ofNullable(this.producers.get(producerId));
    }

    public void takeSnapshot() throws IOException {
        this.takeSnapshot(true);
    }

    public Optional<File> takeSnapshot(boolean sync) throws IOException {
        if (this.lastMapOffset > this.lastSnapOffset) {
            SnapshotFile snapshotFile = new SnapshotFile(LogFileUtils.producerSnapshotFile(this.logDir, this.lastMapOffset));
            long start = this.time.hiResClockMs();
            ProducerStateManager.writeSnapshot(snapshotFile.file(), this.producers, sync);
            this.log.info("Wrote producer snapshot at offset {} with {} producer ids in {} ms.", new Object[]{this.lastMapOffset, this.producers.size(), this.time.hiResClockMs() - start});
            this.snapshots.put(snapshotFile.offset, snapshotFile);
            this.lastSnapOffset = this.lastMapOffset;
            return Optional.of(snapshotFile.file());
        }
        return Optional.empty();
    }

    public void updateParentDir(File parentDir) {
        this.logDir = parentDir;
        this.snapshots.forEach((k, v) -> v.updateParentDir(parentDir));
    }

    public OptionalLong latestSnapshotOffset() {
        Optional<SnapshotFile> snapshotFileOptional = this.latestSnapshotFile();
        return snapshotFileOptional.map(snapshotFile -> OptionalLong.of(snapshotFile.offset)).orElseGet(OptionalLong::empty);
    }

    public OptionalLong oldestSnapshotOffset() {
        Optional<SnapshotFile> snapshotFileOptional = this.oldestSnapshotFile();
        return snapshotFileOptional.map(snapshotFile -> OptionalLong.of(snapshotFile.offset)).orElseGet(OptionalLong::empty);
    }

    public Optional<SnapshotFile> snapshotFileForOffset(long offset) {
        return Optional.ofNullable(this.snapshots.get(offset));
    }

    public void onLogStartOffsetIncremented(long logStartOffset) {
        this.removeUnreplicatedTransactions(logStartOffset);
        if (this.lastMapOffset < logStartOffset) {
            this.lastMapOffset = logStartOffset;
        }
        this.lastSnapOffset = this.latestSnapshotOffset().orElse(logStartOffset);
    }

    private void removeUnreplicatedTransactions(long offset) {
        Iterator<Map.Entry<Long, TxnMetadata>> iterator = this.unreplicatedTxns.entrySet().iterator();
        while (iterator.hasNext()) {
            Map.Entry<Long, TxnMetadata> txnEntry = iterator.next();
            OptionalLong lastOffset = txnEntry.getValue().lastOffset;
            if (!lastOffset.isPresent() || lastOffset.getAsLong() >= offset) continue;
            iterator.remove();
        }
    }

    public void truncateFullyAndStartAt(long offset) throws IOException {
        this.clearProducerIds();
        this.ongoingTxns.clear();
        this.unreplicatedTxns.clear();
        for (SnapshotFile snapshotFile : this.snapshots.values()) {
            this.removeAndDeleteSnapshot(snapshotFile.offset);
        }
        this.lastSnapOffset = 0L;
        this.lastMapOffset = offset;
        this.updateOldestTxnTimestamp();
    }

    public long lastStableOffset(CompletedTxn completedTxn) {
        return this.findNextIncompleteTxn(completedTxn.producerId).map(x -> x.firstOffset.messageOffset).orElse(completedTxn.lastOffset + 1L);
    }

    private Optional<TxnMetadata> findNextIncompleteTxn(long producerId) {
        for (TxnMetadata txnMetadata : this.ongoingTxns.values()) {
            if (txnMetadata.producerId == producerId) continue;
            return Optional.of(txnMetadata);
        }
        return Optional.empty();
    }

    public void completeTxn(CompletedTxn completedTxn) {
        TxnMetadata txnMetadata = this.ongoingTxns.remove(completedTxn.firstOffset);
        if (txnMetadata == null) {
            throw new IllegalArgumentException("Attempted to complete transaction " + String.valueOf(completedTxn) + " on partition " + String.valueOf(this.topicPartition) + " which was not started");
        }
        txnMetadata.lastOffset = OptionalLong.of(completedTxn.lastOffset);
        this.unreplicatedTxns.put(completedTxn.firstOffset, txnMetadata);
        this.updateOldestTxnTimestamp();
    }

    public void deleteSnapshotsBefore(long offset) throws IOException {
        for (SnapshotFile snapshot : this.snapshots.subMap((Object)0L, (Object)offset).values()) {
            this.removeAndDeleteSnapshot(snapshot.offset);
        }
    }

    public Optional<File> fetchSnapshot(long offset) {
        return Optional.ofNullable(this.snapshots.get(offset)).map(SnapshotFile::file);
    }

    private Optional<SnapshotFile> oldestSnapshotFile() {
        return Optional.ofNullable(this.snapshots.firstEntry()).map(Map.Entry::getValue);
    }

    private Optional<SnapshotFile> latestSnapshotFile() {
        return Optional.ofNullable(this.snapshots.lastEntry()).map(Map.Entry::getValue);
    }

    private void removeAndDeleteSnapshot(long snapshotOffset) throws IOException {
        SnapshotFile snapshotFile = this.snapshots.remove(snapshotOffset);
        if (snapshotFile != null) {
            snapshotFile.deleteIfExists();
        }
    }

    public Optional<SnapshotFile> removeAndMarkSnapshotForDeletion(long snapshotOffset) throws IOException {
        SnapshotFile snapshotFile = this.snapshots.remove(snapshotOffset);
        if (snapshotFile != null) {
            try {
                snapshotFile.renameToDelete();
                return Optional.of(snapshotFile);
            }
            catch (NoSuchFileException ex) {
                this.log.info("Failed to rename producer state snapshot {} with deletion suffix because it was already deleted", (Object)snapshotFile.file().getAbsoluteFile());
            }
        }
        return Optional.empty();
    }

    public static List<ProducerStateEntry> readSnapshot(File file) throws IOException {
        ProducerSnapshot producerSnapshot;
        byte[] buffer = Files.readAllBytes(file.toPath());
        ByteBuffer byteBuffer = ByteBuffer.wrap(buffer);
        try {
            short version = byteBuffer.getShort();
            if (version < 1 || version > 1) {
                throw new CorruptSnapshotException("Snapshot contained an unknown file version " + version);
            }
            producerSnapshot = new ProducerSnapshot((Readable)new ByteBufferAccessor(byteBuffer), version);
        }
        catch (Exception e) {
            throw new CorruptSnapshotException("Snapshot failed schema validation: " + e.getMessage());
        }
        long crc = producerSnapshot.crc();
        long computedCrc = Crc32C.compute((byte[])buffer, (int)6, (int)(buffer.length - 6));
        if (crc != computedCrc) {
            throw new CorruptSnapshotException("Snapshot is corrupt (CRC is no longer valid). Stored crc: " + crc + ". Computed crc: " + computedCrc);
        }
        List<ProducerSnapshot.ProducerEntry> producerEntries = producerSnapshot.producerEntries();
        ArrayList<ProducerStateEntry> entries = new ArrayList<ProducerStateEntry>(producerEntries.size());
        for (ProducerSnapshot.ProducerEntry producerEntry : producerEntries) {
            long producerId = producerEntry.producerId();
            short producerEpoch = producerEntry.epoch();
            int lastSequence = producerEntry.lastSequence();
            long lastOffset = producerEntry.lastOffset();
            long timestamp = producerEntry.timestamp();
            int offsetDelta = producerEntry.offsetDelta();
            int coordinatorEpoch = producerEntry.coordinatorEpoch();
            long currentTxnFirstOffset = producerEntry.currentTxnFirstOffset();
            OptionalLong currentTxnFirstOffsetVal = currentTxnFirstOffset >= 0L ? OptionalLong.of(currentTxnFirstOffset) : OptionalLong.empty();
            Optional<BatchMetadata> batchMetadata = lastOffset >= 0L ? Optional.of(new BatchMetadata(lastSequence, lastOffset, offsetDelta, timestamp)) : Optional.empty();
            entries.add(new ProducerStateEntry(producerId, producerEpoch, coordinatorEpoch, timestamp, currentTxnFirstOffsetVal, batchMetadata));
        }
        return entries;
    }

    public static void writeSnapshot(File file, Map<Long, ProducerStateEntry> entries, boolean sync) throws IOException {
        ProducerSnapshot producerSnapshot = new ProducerSnapshot();
        ArrayList<ProducerSnapshot.ProducerEntry> producerEntries = new ArrayList<ProducerSnapshot.ProducerEntry>(entries.size());
        for (Map.Entry<Long, ProducerStateEntry> producerIdEntry : entries.entrySet()) {
            Long producerId = producerIdEntry.getKey();
            ProducerStateEntry entry = producerIdEntry.getValue();
            ProducerSnapshot.ProducerEntry producerEntry = new ProducerSnapshot.ProducerEntry().setProducerId(producerId).setEpoch(entry.producerEpoch()).setLastSequence(entry.lastSeq()).setLastOffset(entry.lastDataOffset()).setOffsetDelta(entry.lastOffsetDelta()).setTimestamp(entry.lastTimestamp()).setCoordinatorEpoch(entry.coordinatorEpoch()).setCurrentTxnFirstOffset(entry.currentTxnFirstOffset().orElse(-1L));
            producerEntries.add(producerEntry);
        }
        producerSnapshot.setProducerEntries(producerEntries);
        ByteBuffer buffer = MessageUtil.toVersionPrefixedByteBuffer((short)1, (Message)producerSnapshot);
        long crc = Crc32C.compute((ByteBuffer)buffer, (int)6, (int)(buffer.limit() - 6));
        ByteUtils.writeUnsignedInt((ByteBuffer)buffer, (int)2, (long)crc);
        try (FileChannel fileChannel = FileChannel.open(file.toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE);){
            Utils.writeFully((FileChannel)fileChannel, (ByteBuffer)buffer);
            if (sync) {
                fileChannel.force(true);
            }
        }
    }

    private static boolean isSnapshotFile(Path path) {
        return Files.isRegularFile(path, new LinkOption[0]) && path.getFileName().toString().endsWith(".snapshot");
    }

    public static List<SnapshotFile> listSnapshotFiles(File dir) throws IOException {
        if (dir.exists() && dir.isDirectory()) {
            try (Stream<Path> paths = Files.list(dir.toPath());){
                List<SnapshotFile> list = paths.filter(ProducerStateManager::isSnapshotFile).map(path -> new SnapshotFile(path.toFile())).toList();
                return list;
            }
        }
        return List.of();
    }
}

