/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.tools;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Scanner;
import java.util.SplittableRandom;
import net.sourceforge.argparse4j.ArgumentParsers;
import net.sourceforge.argparse4j.impl.Arguments;
import net.sourceforge.argparse4j.inf.ArgumentAction;
import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.ArgumentParserException;
import net.sourceforge.argparse4j.inf.MutuallyExclusiveGroup;
import net.sourceforge.argparse4j.inf.Namespace;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.util.ThroughputThrottler;
import org.apache.kafka.tools.ToolsUtils;

public class ProducerPerformance {
    public static final String DEFAULT_TRANSACTION_ID_PREFIX = "performance-producer-";
    public static final long DEFAULT_TRANSACTION_DURATION_MS = 3000L;
    Callback cb;
    Stats stats;
    Stats steadyStateStats;

    public static void main(String[] args) throws Exception {
        ProducerPerformance perf = new ProducerPerformance();
        perf.start(args);
    }

    void start(String[] args) throws IOException {
        ArgumentParser parser = ProducerPerformance.argParser();
        try {
            ConfigPostProcessor config = new ConfigPostProcessor(parser, args);
            KafkaProducer<byte[], byte[]> producer = this.createKafkaProducer(config.producerProps);
            if (config.transactionsEnabled) {
                producer.initTransactions();
            }
            byte[] payload = null;
            if (config.recordSize != null) {
                payload = new byte[config.recordSize.intValue()];
            }
            SplittableRandom random = new SplittableRandom(0L);
            if (config.warmupRecords > 0L) {
                System.out.println("Warmup first " + config.warmupRecords + " records. Steady state results will print after the complete test summary.");
            }
            boolean isSteadyState = false;
            this.stats = new Stats(config.numRecords, config.reportingInterval, isSteadyState);
            long startMs = System.currentTimeMillis();
            ThroughputThrottler throttler = new ThroughputThrottler(config.throughput, startMs);
            int currentTransactionSize = 0;
            long transactionStartTime = 0L;
            for (long i = 0L; i < config.numRecords; ++i) {
                payload = ProducerPerformance.generateRandomPayload(config.recordSize, config.payloadByteList, payload, random, config.payloadMonotonic, i);
                if (config.transactionsEnabled && currentTransactionSize == 0) {
                    producer.beginTransaction();
                    transactionStartTime = System.currentTimeMillis();
                }
                ProducerRecord record = new ProducerRecord(config.topicName, (Object)payload);
                long sendStartMs = System.currentTimeMillis();
                isSteadyState = config.warmupRecords > 0L;
                if (isSteadyState && i == config.warmupRecords) {
                    this.steadyStateStats = new Stats(config.numRecords - config.warmupRecords, config.reportingInterval, isSteadyState);
                    this.stats.suppressPrinting();
                }
                this.cb = new PerfCallback(sendStartMs, payload.length, this.stats, this.steadyStateStats);
                producer.send(record, this.cb);
                ++currentTransactionSize;
                if (config.transactionsEnabled && config.transactionDurationMs <= sendStartMs - transactionStartTime) {
                    producer.commitTransaction();
                    currentTransactionSize = 0;
                }
                if (!throttler.shouldThrottle(i, sendStartMs)) continue;
                throttler.throttle();
            }
            if (config.transactionsEnabled && currentTransactionSize != 0) {
                producer.commitTransaction();
            }
            if (!config.shouldPrintMetrics) {
                producer.close();
                this.stats.printTotal();
                if (this.steadyStateStats != null) {
                    this.steadyStateStats.printTotal();
                }
            } else {
                producer.flush();
                this.stats.printTotal();
                if (this.steadyStateStats != null) {
                    this.steadyStateStats.printTotal();
                }
                ToolsUtils.printMetrics(producer.metrics());
                producer.close();
            }
        }
        catch (ArgumentParserException e) {
            if (args.length == 0) {
                parser.printHelp();
                Exit.exit((int)0);
            }
            parser.handleError(e);
            Exit.exit((int)1);
        }
    }

    KafkaProducer<byte[], byte[]> createKafkaProducer(Properties props) {
        return new KafkaProducer(props);
    }

    static byte[] generateRandomPayload(Integer recordSize, List<byte[]> payloadByteList, byte[] payload, SplittableRandom random, boolean payloadMonotonic, long recordValue) {
        if (!payloadByteList.isEmpty()) {
            payload = payloadByteList.get(random.nextInt(payloadByteList.size()));
        } else if (recordSize != null) {
            for (int j = 0; j < payload.length; ++j) {
                payload[j] = (byte)(random.nextInt(26) + 65);
            }
        } else if (payloadMonotonic) {
            payload = Long.toString(recordValue).getBytes(StandardCharsets.UTF_8);
        } else {
            throw new IllegalArgumentException("No payload file, record size or payload-monotonic option provided.");
        }
        return payload;
    }

    static Properties readProps(List<String> producerProps, String producerConfig) throws IOException {
        Properties props = new Properties();
        if (producerConfig != null) {
            props.putAll((Map<?, ?>)Utils.loadProps((String)producerConfig));
        }
        if (producerProps != null) {
            for (String prop : producerProps) {
                String[] pieces = prop.split("=");
                if (pieces.length != 2) {
                    throw new IllegalArgumentException("Invalid property: " + prop);
                }
                props.put(pieces[0], pieces[1]);
            }
        }
        props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        if (props.getProperty("client.id") == null) {
            props.put("client.id", "perf-producer-client");
        }
        return props;
    }

    static List<byte[]> readPayloadFile(String payloadFilePath, String payloadDelimiter) throws IOException {
        ArrayList<byte[]> payloadByteList = new ArrayList<byte[]>();
        if (payloadFilePath != null) {
            Path path = Paths.get(payloadFilePath, new String[0]);
            System.out.println("Reading payloads from: " + String.valueOf(path.toAbsolutePath()));
            if (Files.notExists(path, new LinkOption[0]) || Files.size(path) == 0L) {
                throw new IllegalArgumentException("File does not exist or empty file provided.");
            }
            try (Scanner payLoadScanner = new Scanner(path, StandardCharsets.UTF_8);){
                payLoadScanner.useDelimiter(payloadDelimiter);
                while (payLoadScanner.hasNext()) {
                    byte[] payloadBytes = payLoadScanner.next().getBytes(StandardCharsets.UTF_8);
                    payloadByteList.add(payloadBytes);
                }
            }
            System.out.println("Number of records read: " + payloadByteList.size());
        }
        return payloadByteList;
    }

    static ArgumentParser argParser() {
        ArgumentParser parser = ArgumentParsers.newArgumentParser((String)"kafka-producer-perf-test").defaultHelp(true).description("This tool is used to verify the producer performance. To enable transactions, you can specify a transactional id or set a transaction duration using --transaction-duration-ms. There are three ways to specify the transactional id: set transactional.id=<id> via --command-property, set transactional.id=<id> in the config file via --command-config, or use --transactional-id <id>.");
        parser.addArgument(new String[]{"--bootstrap-server"}).action((ArgumentAction)Arguments.store()).required(false).type(String.class).metavar(new String[]{"BOOTSTRAP-SERVER"}).dest("bootstrapServer").help("The server(s) to connect to. This config takes precedence over bootstrap.servers specified via --command-property or --command-config.");
        MutuallyExclusiveGroup payloadOptions = parser.addMutuallyExclusiveGroup().required(true).description("Note that you must provide exactly one of --record-size, --payload-file or --payload-monotonic.");
        parser.addArgument(new String[]{"--topic"}).action((ArgumentAction)Arguments.store()).required(true).type(String.class).metavar(new String[]{"TOPIC"}).help("Produce records to this topic.");
        parser.addArgument(new String[]{"--num-records"}).action((ArgumentAction)Arguments.store()).required(true).type(Long.class).metavar(new String[]{"NUM-RECORDS"}).dest("numRecords").help("Number of records to produce.");
        payloadOptions.addArgument(new String[]{"--record-size"}).action((ArgumentAction)Arguments.store()).required(false).type(Integer.class).metavar(new String[]{"RECORD-SIZE"}).dest("recordSize").help("Record size in bytes. Note that you must provide exactly one of --record-size, --payload-file or --payload-monotonic.");
        payloadOptions.addArgument(new String[]{"--payload-file"}).action((ArgumentAction)Arguments.store()).required(false).type(String.class).metavar(new String[]{"PAYLOAD-FILE"}).dest("payloadFile").help("File to read the record payloads from. This works only for UTF-8 encoded text files. Payloads will be read from this file and a payload will be randomly selected when sending records. Note that you must provide exactly one of --record-size, --payload-file or --payload-monotonic.");
        payloadOptions.addArgument(new String[]{"--payload-monotonic"}).action((ArgumentAction)Arguments.storeTrue()).type(Boolean.class).metavar(new String[]{"PAYLOAD-MONOTONIC"}).dest("payloadMonotonic").help("Payload is a monotonically increasing integer. Note that you must provide exactly one of --record-size, --payload-file or --payload-monotonic.");
        parser.addArgument(new String[]{"--payload-delimiter"}).action((ArgumentAction)Arguments.store()).required(false).type(String.class).metavar(new String[]{"PAYLOAD-DELIMITER"}).dest("payloadDelimiter").setDefault((Object)"\\n").help("Provides the delimiter to be used when --payload-file is provided. Defaults to new line. Note that this parameter will be ignored if --payload-file is not provided.");
        parser.addArgument(new String[]{"--throughput"}).action((ArgumentAction)Arguments.store()).required(true).type(Double.class).metavar(new String[]{"THROUGHPUT"}).help("Throttle maximum record throughput to *approximately* THROUGHPUT records/sec. Set this to -1 to disable throttling.");
        parser.addArgument(new String[]{"--producer-props"}).nargs("+").required(false).metavar(new String[]{"PROP-NAME=PROP-VALUE"}).type(String.class).dest("producerConfig").help("(DEPRECATED) Kafka producer related configuration properties like client.id. These configs take precedence over those passed via --command-config or --producer.config. This option will be removed in a future version. Use --command-property instead.");
        parser.addArgument(new String[]{"--command-property"}).nargs("+").required(false).metavar(new String[]{"PROP-NAME=PROP-VALUE"}).type(String.class).dest("commandProperties").help("Kafka producer related configuration properties like client.id. These configs take precedence over those passed via --command-config or --producer.config.");
        parser.addArgument(new String[]{"--producer.config"}).action((ArgumentAction)Arguments.store()).required(false).type(String.class).metavar(new String[]{"CONFIG-FILE"}).dest("producerConfigFile").help("(DEPRECATED) Producer config properties file. This option will be removed in a future version. Use --command-config instead.");
        parser.addArgument(new String[]{"--command-config"}).action((ArgumentAction)Arguments.store()).required(false).type(String.class).metavar(new String[]{"CONFIG-FILE"}).dest("commandConfigFile").help("Producer config properties file.");
        parser.addArgument(new String[]{"--print-metrics"}).action((ArgumentAction)Arguments.storeTrue()).type(Boolean.class).metavar(new String[]{"PRINT-METRICS"}).dest("printMetrics").help("Print out metrics at the end of the test.");
        parser.addArgument(new String[]{"--transactional-id"}).action((ArgumentAction)Arguments.store()).required(false).type(String.class).metavar(new String[]{"TRANSACTIONAL-ID"}).dest("transactionalId").help("The transactional id to use. This config takes precedence over the transactional.id specified via --command-property or --command-config. Note that if the transactional id is not specified while --transaction-duration-ms is provided, the default value for the transactional id will be performance-producer- followed by a random uuid.");
        parser.addArgument(new String[]{"--transaction-duration-ms"}).action((ArgumentAction)Arguments.store()).required(false).type(Long.class).metavar(new String[]{"TRANSACTION-DURATION"}).dest("transactionDurationMs").help("The maximum duration of each transaction. The commitTransaction will be called after this time has elapsed. The value should be greater than 0. If the transactional id is specified via --command-property, --command-config or --transactional-id but --transaction-duration-ms is not specified, the default value will be 3000.");
        parser.addArgument(new String[]{"--warmup-records"}).action((ArgumentAction)Arguments.store()).required(false).type(Long.class).metavar(new String[]{"WARMUP-RECORDS"}).dest("warmupRecords").setDefault((Object)0L).help("The number of records to treat as warmup. These initial records will not be included in steady-state statistics. An additional summary line will be printed describing the steady-state statistics.");
        parser.addArgument(new String[]{"--reporting-interval"}).action((ArgumentAction)Arguments.store()).required(false).type(Long.class).metavar(new String[]{"INTERVAL-MS"}).dest("reportingInterval").setDefault((Object)5000L).help("Interval in milliseconds at which to print progress info.");
        return parser;
    }

    static final class ConfigPostProcessor {
        final String bootstrapServer;
        final String topicName;
        final long numRecords;
        final long warmupRecords;
        final Integer recordSize;
        final double throughput;
        final boolean payloadMonotonic;
        final Properties producerProps;
        final boolean shouldPrintMetrics;
        final Long transactionDurationMs;
        final boolean transactionsEnabled;
        final List<byte[]> payloadByteList;
        final long reportingInterval;

        public ConfigPostProcessor(ArgumentParser parser, String[] args) throws IOException, ArgumentParserException {
            Namespace namespace = parser.parseArgs(args);
            this.bootstrapServer = namespace.getString("bootstrapServer");
            this.topicName = namespace.getString("topic");
            this.numRecords = namespace.getLong("numRecords");
            this.warmupRecords = Math.max(namespace.getLong("warmupRecords"), 0L);
            this.recordSize = namespace.getInt("recordSize");
            this.throughput = namespace.getDouble("throughput");
            this.payloadMonotonic = namespace.getBoolean("payloadMonotonic");
            this.shouldPrintMetrics = namespace.getBoolean("printMetrics");
            this.reportingInterval = namespace.getLong("reportingInterval");
            List producerConfigs = namespace.getList("producerConfig");
            String producerConfigFile = namespace.getString("producerConfigFile");
            List commandProperties = namespace.getList("commandProperties");
            String commandConfigFile = namespace.getString("commandConfigFile");
            String payloadFilePath = namespace.getString("payloadFile");
            Long transactionDurationMsArg = namespace.getLong("transactionDurationMs");
            String transactionIdArg = namespace.getString("transactionalId");
            if (this.numRecords <= 0L) {
                throw new ArgumentParserException("--num-records should be greater than zero.", parser);
            }
            if (this.warmupRecords >= this.numRecords) {
                throw new ArgumentParserException("The value for --warmup-records must be strictly fewer than the number of records in the test, --num-records.", parser);
            }
            if (this.recordSize != null && this.recordSize <= 0) {
                throw new ArgumentParserException("--record-size should be greater than zero.", parser);
            }
            if (this.bootstrapServer == null && commandProperties == null && producerConfigs == null && producerConfigFile == null && commandConfigFile == null) {
                throw new ArgumentParserException("At least one of --bootstrap-server, --command-property, --producer-props, --producer.config or --command-config must be specified.", parser);
            }
            if (commandProperties != null && producerConfigs != null) {
                throw new ArgumentParserException("--command-property and --producer-props cannot be specified together.", parser);
            }
            if (commandConfigFile != null && producerConfigFile != null) {
                throw new ArgumentParserException("--command-config and --producer.config cannot be specified together.", parser);
            }
            if (transactionDurationMsArg != null && transactionDurationMsArg <= 0L) {
                throw new ArgumentParserException("--transaction-duration-ms should be greater than zero.", parser);
            }
            if (this.reportingInterval <= 0L) {
                throw new ArgumentParserException("--reporting-interval should be greater than zero.", parser);
            }
            String payloadDelimiter = namespace.getString("payloadDelimiter").equals("\\n") ? "\n" : namespace.getString("payloadDelimiter");
            this.payloadByteList = ProducerPerformance.readPayloadFile(payloadFilePath, payloadDelimiter);
            if (producerConfigs != null) {
                System.out.println("Option --producer-props has been deprecated and will be removed in a future version. Use --command-property instead.");
                commandProperties = producerConfigs;
            }
            if (producerConfigFile != null) {
                System.out.println("Option --producer.config has been deprecated and will be removed in a future version. Use --command-config instead.");
                commandConfigFile = producerConfigFile;
            }
            this.producerProps = ProducerPerformance.readProps(commandProperties, commandConfigFile);
            if (this.bootstrapServer != null) {
                this.producerProps.put("bootstrap.servers", this.bootstrapServer);
            }
            boolean bl = this.transactionsEnabled = transactionDurationMsArg != null || transactionIdArg != null || this.producerProps.containsKey("transactional.id");
            if (this.transactionsEnabled) {
                Optional<String> txIdInProps = Optional.ofNullable(this.producerProps.get("transactional.id")).map(Object::toString);
                String transactionId = Optional.ofNullable(transactionIdArg).orElse(txIdInProps.orElse(ProducerPerformance.DEFAULT_TRANSACTION_ID_PREFIX + Uuid.randomUuid().toString()));
                this.producerProps.put("transactional.id", transactionId);
                if (transactionDurationMsArg == null) {
                    transactionDurationMsArg = 3000L;
                }
            }
            this.transactionDurationMs = transactionDurationMsArg;
        }
    }

    static class Stats {
        private final long start = System.currentTimeMillis();
        private final int[] latencies;
        private final long sampling;
        private final long reportingInterval;
        private long iteration = 0L;
        private int index;
        private long count;
        private long bytes;
        private int maxLatency;
        private long totalLatency;
        private long windowCount;
        private int windowMaxLatency;
        private long windowTotalLatency;
        private long windowBytes;
        private long windowStart = System.currentTimeMillis();
        private final boolean isSteadyState;
        private boolean suppressPrint;

        public Stats(long numRecords, long reportingInterval, boolean isSteadyState) {
            this.sampling = numRecords / Math.min(numRecords, 500000L);
            this.latencies = new int[(int)(numRecords / this.sampling) + 1];
            this.index = 0;
            this.maxLatency = 0;
            this.windowCount = 0L;
            this.windowMaxLatency = 0;
            this.windowTotalLatency = 0L;
            this.windowBytes = 0L;
            this.totalLatency = 0L;
            this.reportingInterval = reportingInterval;
            this.isSteadyState = isSteadyState;
            this.suppressPrint = false;
        }

        public void record(int latency, int bytes, long time) {
            ++this.count;
            this.bytes += (long)bytes;
            this.totalLatency += (long)latency;
            this.maxLatency = Math.max(this.maxLatency, latency);
            ++this.windowCount;
            this.windowBytes += (long)bytes;
            this.windowTotalLatency += (long)latency;
            this.windowMaxLatency = Math.max(this.windowMaxLatency, latency);
            if (this.iteration % this.sampling == 0L) {
                this.latencies[this.index] = latency;
                ++this.index;
            }
            if (time - this.windowStart >= this.reportingInterval) {
                if (this.isSteadyState && this.count == this.windowCount) {
                    System.out.println("In steady state.");
                }
                if (!this.suppressPrint) {
                    this.printWindow();
                }
                this.newWindow();
            }
            ++this.iteration;
        }

        public long totalCount() {
            return this.count;
        }

        public long currentWindowCount() {
            return this.windowCount;
        }

        public long iteration() {
            return this.iteration;
        }

        public long bytes() {
            return this.bytes;
        }

        public int index() {
            return this.index;
        }

        public void printWindow() {
            long elapsed = System.currentTimeMillis() - this.windowStart;
            double recsPerSec = 1000.0 * (double)this.windowCount / (double)elapsed;
            double mbPerSec = 1000.0 * (double)this.windowBytes / (double)elapsed / 1048576.0;
            System.out.printf("%d records sent, %.1f records/sec (%.2f MB/sec), %.1f ms avg latency, %.1f ms max latency.%n", this.windowCount, recsPerSec, mbPerSec, (double)this.windowTotalLatency / (double)this.windowCount, (double)this.windowMaxLatency);
        }

        public void newWindow() {
            this.windowStart = System.currentTimeMillis();
            this.windowCount = 0L;
            this.windowMaxLatency = 0;
            this.windowTotalLatency = 0L;
            this.windowBytes = 0L;
        }

        public void printTotal() {
            long elapsed = System.currentTimeMillis() - this.start;
            double recsPerSec = 1000.0 * (double)this.count / (double)elapsed;
            double mbPerSec = 1000.0 * (double)this.bytes / (double)elapsed / 1048576.0;
            int[] percs = Stats.percentiles(this.latencies, this.index, 0.5, 0.95, 0.99, 0.999);
            System.out.printf("%d%s records sent, %f records/sec (%.2f MB/sec), %.2f ms avg latency, %.2f ms max latency, %d ms 50th, %d ms 95th, %d ms 99th, %d ms 99.9th.%n", this.count, this.isSteadyState ? " steady state" : "", recsPerSec, mbPerSec, (double)this.totalLatency / (double)this.count, (double)this.maxLatency, percs[0], percs[1], percs[2], percs[3]);
        }

        private static int[] percentiles(int[] latencies, int count, double ... percentiles) {
            int size = Math.min(count, latencies.length);
            Arrays.sort(latencies, 0, size);
            int[] values = new int[percentiles.length];
            for (int i = 0; i < percentiles.length; ++i) {
                int index = (int)(percentiles[i] * (double)size);
                values[i] = latencies[index];
            }
            return values;
        }

        public void suppressPrinting() {
            this.suppressPrint = true;
        }
    }

    static final class PerfCallback
    implements Callback {
        private final long start;
        private final int bytes;
        private final Stats stats;
        private final Stats steadyStateStats;

        public PerfCallback(long start, int bytes, Stats stats, Stats steadyStateStats) {
            this.start = start;
            this.stats = stats;
            this.steadyStateStats = steadyStateStats;
            this.bytes = bytes;
        }

        public void onCompletion(RecordMetadata metadata, Exception exception) {
            long now = System.currentTimeMillis();
            int latency = (int)(now - this.start);
            if (exception == null) {
                this.stats.record(latency, this.bytes, now);
                if (this.steadyStateStats != null) {
                    this.steadyStateStats.record(latency, this.bytes, now);
                }
            }
            if (exception != null) {
                exception.printStackTrace();
            }
        }
    }
}

