/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.coordinator.group.streams.topics;

import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.OptionalInt;
import java.util.Set;
import java.util.function.Function;
import org.apache.kafka.common.errors.StreamsInvalidTopologyException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
import org.slf4j.Logger;

public class RepartitionTopics {
    private final Logger log;
    private final Collection<StreamsGroupTopologyValue.Subtopology> subtopologies;
    private final Function<String, OptionalInt> topicPartitionCountProvider;

    public RepartitionTopics(LogContext logContext, Collection<StreamsGroupTopologyValue.Subtopology> subtopologies, Function<String, OptionalInt> topicPartitionCountProvider) {
        this.log = logContext.logger(this.getClass());
        this.subtopologies = subtopologies;
        this.topicPartitionCountProvider = topicPartitionCountProvider;
    }

    public Map<String, Integer> setup() {
        HashSet<String> missingSourceTopicsForTopology = new HashSet<String>();
        for (StreamsGroupTopologyValue.Subtopology subtopology : this.subtopologies) {
            Set<String> missingSourceTopicsForSubtopology = this.computeMissingExternalSourceTopics(subtopology);
            missingSourceTopicsForTopology.addAll(missingSourceTopicsForSubtopology);
        }
        if (!missingSourceTopicsForTopology.isEmpty()) {
            throw new IllegalStateException(String.format("Missing source topics: %s", String.join((CharSequence)", ", missingSourceTopicsForTopology)));
        }
        Map<String, Integer> repartitionTopicPartitionCount = this.computeRepartitionTopicPartitionCount();
        for (StreamsGroupTopologyValue.Subtopology subtopology : this.subtopologies) {
            if (!subtopology.repartitionSourceTopics().stream().anyMatch(repartitionTopic -> !repartitionTopicPartitionCount.containsKey(repartitionTopic.name()))) continue;
            throw new StreamsInvalidTopologyException("Failed to compute number of partitions for all repartition topics, because a repartition source topic is never used as a sink topic.");
        }
        return repartitionTopicPartitionCount;
    }

    private Set<String> computeMissingExternalSourceTopics(StreamsGroupTopologyValue.Subtopology subtopology) {
        HashSet<String> missingExternalSourceTopics = new HashSet<String>(subtopology.sourceTopics());
        for (StreamsGroupTopologyValue.TopicInfo topicInfo : subtopology.repartitionSourceTopics()) {
            missingExternalSourceTopics.remove(topicInfo.name());
        }
        missingExternalSourceTopics.removeIf(x -> this.topicPartitionCountProvider.apply((String)x).isPresent());
        return missingExternalSourceTopics;
    }

    private Map<String, Integer> computeRepartitionTopicPartitionCount() {
        boolean partitionCountNeeded;
        HashMap<String, Integer> repartitionTopicPartitionCounts = new HashMap<String, Integer>();
        for (StreamsGroupTopologyValue.Subtopology subtopology : this.subtopologies) {
            for (StreamsGroupTopologyValue.TopicInfo repartitionSourceTopic : subtopology.repartitionSourceTopics()) {
                if (repartitionSourceTopic.partitions() == 0) continue;
                repartitionTopicPartitionCounts.put(repartitionSourceTopic.name(), repartitionSourceTopic.partitions());
            }
        }
        do {
            partitionCountNeeded = false;
            boolean progressMadeThisIteration = false;
            for (StreamsGroupTopologyValue.Subtopology subtopology : this.subtopologies) {
                for (String repartitionSinkTopic : subtopology.repartitionSinkTopics()) {
                    if (repartitionTopicPartitionCounts.containsKey(repartitionSinkTopic)) continue;
                    Integer numPartitions = this.computePartitionCount(repartitionTopicPartitionCounts, subtopology);
                    if (numPartitions == null) {
                        partitionCountNeeded = true;
                        this.log.trace("Unable to determine number of partitions for {}, another iteration is needed", (Object)repartitionSinkTopic);
                        continue;
                    }
                    this.log.trace("Determined number of partitions for {} to be {}", (Object)repartitionSinkTopic, (Object)numPartitions);
                    repartitionTopicPartitionCounts.put(repartitionSinkTopic, numPartitions);
                    progressMadeThisIteration = true;
                }
            }
            if (progressMadeThisIteration || !partitionCountNeeded) continue;
            throw new StreamsInvalidTopologyException("Failed to compute number of partitions for all repartition topics. There may be loops in the topology that cannot be resolved.");
        } while (partitionCountNeeded);
        return repartitionTopicPartitionCounts;
    }

    private Integer computePartitionCount(Map<String, Integer> repartitionTopicPartitionCounts, StreamsGroupTopologyValue.Subtopology subtopology) {
        Integer partitionCount = null;
        for (StreamsGroupTopologyValue.TopicInfo repartitionSourceTopic : subtopology.repartitionSourceTopics()) {
            Integer numPartitionsCandidate = repartitionTopicPartitionCounts.get(repartitionSourceTopic.name());
            if (numPartitionsCandidate == null || partitionCount != null && numPartitionsCandidate <= partitionCount) continue;
            partitionCount = numPartitionsCandidate;
        }
        for (String externalSourceTopic : subtopology.sourceTopics()) {
            OptionalInt actualPartitionCount = this.topicPartitionCountProvider.apply(externalSourceTopic);
            if (!actualPartitionCount.isPresent() || partitionCount != null && actualPartitionCount.getAsInt() <= partitionCount) continue;
            partitionCount = actualPartitionCount.getAsInt();
        }
        return partitionCount;
    }
}

