import ClusterParams from '@/models/ClusterParams'
import ClusterResources from '@/models/ClusterResources';
import SparkConfig from '@/models/SparkConfig';
import UnusedResources from '@/models/UnusedResources';

export default class Calculations {
    private clusterParams: ClusterParams;
    private clusterResources: ClusterResources;
    private sparkConfig: SparkConfig;
    private unusedResources: UnusedResources;
    private MEMORY_OVERHEAD_MIN = 384;

    constructor(
        clusterParams: ClusterParams,
        clusterResources: ClusterResources,
        sparkConfig: SparkConfig,
        unusedResources: UnusedResources
    ) {
        this.clusterParams = clusterParams;
        this.clusterResources = clusterResources;
        this.sparkConfig = sparkConfig;
        this.unusedResources = unusedResources;
    }
    calculate(): {
        clusterParams: ClusterParams;
        clusterResources: ClusterResources;
        sparkConfig: SparkConfig;
        unusedResources: UnusedResources;
    } {
        this.calculateClusterResources();
        this.calculateSparkDefaultConfig();
        this.calculateUnusedResources();
        const { clusterParams,
            clusterResources,
            sparkConfig,
            unusedResources } = this;
        return {
            clusterParams,
            clusterResources,
            sparkConfig,
            unusedResources
        }
    }
    /** CLUSTER RESOURCES CALCULATION */
    private calculateClusterResources(): void {
        this.calculateTotalMem();
        this.calculateOverHeadMemory();
        this.calculateAvailableCores();
        this.calculateAvailableMemory();
        this.calculateNumberOfExecutorsPerNode();
        this.calculateTotalMemPerExecutor();
    }

    private calculateTotalMem(): void {
        const { memoryPerNode, numberOfNodes } = this.clusterParams;
        this.clusterResources.totalMem = memoryPerNode * numberOfNodes;
    }

    private calculateOverHeadMemory(): void {
        const { totalMem } = this.clusterResources;
        const { memoryOverhead } = this.clusterParams;

        this.clusterResources.overheadMemory =
            Math.round(Math.max((totalMem * (memoryOverhead / 100) * 1024), this.MEMORY_OVERHEAD_MIN) / 1024);
    }

    private calculateAvailableCores(): void {
        const { numberOfCores, numberOfNodes } = this.clusterParams;
        this.clusterResources.availableCores = (numberOfCores - 1) * numberOfNodes;
    }

    private calculateAvailableMemory(): void {
        const { totalMem, overheadMemory } = this.clusterResources;
        this.clusterResources.availableMemory = Math.ceil(
            totalMem - overheadMemory
        );
    }

    private calculateNumberOfExecutorsPerNode() {
        const { numberOfCores } = this.clusterParams;
        const { executorCores } = this.sparkConfig;
        this.clusterResources.numberOfExecutorsPerNode = Math.floor((numberOfCores - 1) / executorCores);
    }

    private calculateTotalMemPerExecutor() {
        const { memoryPerNode } = this.clusterParams;
        const { numberOfExecutorsPerNode } = this.clusterResources;
        this.clusterResources.totalMemPerExecutor = Math.floor((memoryPerNode - 1) / numberOfExecutorsPerNode);
    }
    /** SPARK CONFIG CALCULATION */
    private calculateSparkDefaultConfig(): void {
        this.calculateDriverCores();
        this.calculateSparkExecutorMemory();
        this.calculateExecutorInstances();
        this.calculateDefaultParallelism();
        this.calculateDriverMemory();
        this.calculateDriverMaxResultSize();
        this.calculateDriverMemoryOverHead();
        this.calculateExecutorMemoryOverHead();
    }

    private calculateSparkExecutorMemory(): void {
        const { memoryOverhead } = this.clusterParams;
        const { totalMemPerExecutor } = this.clusterResources;
        this.sparkConfig.executorMemory = Math.max(
            Math.floor(totalMemPerExecutor * (1 - memoryOverhead / 100)),
            1
        );
    }

    private calculateExecutorInstances(): void {
        const { numberOfNodes } = this.clusterParams;
        const { numberOfExecutorsPerNode } = this.clusterResources;
        const executorInstances = numberOfExecutorsPerNode * numberOfNodes - 1;
        this.sparkConfig.executorInstances = executorInstances < 1 ? 1 : executorInstances;
    }

    private calculateDefaultParallelism() {
        const { executorCores, executorInstances } = this.sparkConfig;
        const { parallelismPerCore } = this.clusterParams;
        this.sparkConfig.defaultParallelism =
            Math.max(executorCores * executorInstances * parallelismPerCore, 2);
    }

    private calculateDriverMemory() {
        this.sparkConfig.driverMemory = this.sparkConfig.executorMemory;
    }
    private calculateDriverMaxResultSize() {
        this.sparkConfig.driverMaxResultSize = this.sparkConfig.driverMemory;
    }

    private calculateDriverMemoryOverHead() {
        const { driverMemory } = this.sparkConfig;
        const { memoryOverhead } = this.clusterParams;
        this.sparkConfig.driverMemoryOverhead = Math.max(
            Math.floor(driverMemory * memoryOverhead / 100 * 1024),
            this.MEMORY_OVERHEAD_MIN
        );
    }
    private calculateExecutorMemoryOverHead() {
        const { executorMemory } = this.sparkConfig;
        const { memoryOverhead } = this.clusterParams;
        this.sparkConfig.executorMemoryOverhead = Math.max(
            Math.floor(executorMemory * memoryOverhead / 100 * 1024),
            this.MEMORY_OVERHEAD_MIN
        );
    }

    private calculateDriverCores() {
        this.sparkConfig.driverCores = this.sparkConfig.executorCores;
    }

    /** UNUSED RESOURCES CALCULATION */
    private calculateUnusedResources() {
        this.calculateUnusedMemoryPerNode();
        this.calculateUnusedCoresPerNode();
    }

    private calculateUnusedMemoryPerNode() {
        const { memoryPerNode } = this.clusterParams;
        const { executorMemory } = this.sparkConfig;
        const { numberOfExecutorsPerNode } = this.clusterResources;
        this.unusedResources.unusedMemoryPerNode = memoryPerNode - executorMemory * numberOfExecutorsPerNode;
    }

    private calculateUnusedCoresPerNode() {
        const { numberOfCores } = this.clusterParams;
        const { executorCores } = this.sparkConfig;
        const { numberOfExecutorsPerNode } = this.clusterResources;
        this.unusedResources.unusedCoresPerNode = numberOfCores - numberOfExecutorsPerNode * executorCores;
    }

    public getFormattedCommand() {
        return `./bin/spark-submit --name "luminousmen app" --class <app class> --master yarn --deploy-mode cluster --num-executors ${this.sparkConfig.executorInstances} --executor-memory ${this.sparkConfig.executorMemory}g --executor-cores ${this.sparkConfig.executorCores} --driver-memory ${this.sparkConfig.driverMemory}g --driver-cores ${this.sparkConfig.driverCores} --conf spark.default.parallelism=${this.sparkConfig.defaultParallelism} --conf spark.driver.maxResultSize=${this.sparkConfig.driverMaxResultSize}g --conf spark.driver.memoryOverhead=${this.sparkConfig.driverMemoryOverhead}m --conf spark.executor.memoryOverhead=${this.sparkConfig.executorMemoryOverhead}m --conf spark.dynamicAllocation.enabled=false --conf spark.sql.adaptive.enabled=true <application jar>`;
    }

    public getFormattedCommandAmazon() {
        return `"Configurations":[
    {
        "Classification": "yarn-site",
        "Properties": {
        "yarn.nodemanager.vmem-check-enabled": "false",
        "yarn.nodemanager.pmem-check-enabled": "false"
        }
    },
    {
        "Classification": "spark",
        "Properties": {
            "maximizeResourceAllocation": "false"
        }
    },
    {
        "Classification": "spark-defaults",
        "Properties": {
            "spark.dynamicAllocation.enabled": "false",
            "spark.sql.adaptive.enabled": "true",
            "spark.driver.memory": "${this.sparkConfig.driverMemory}G",
            "spark.executor.memory": "${this.sparkConfig.executorMemory}G",
            "spark.executor.cores": "${this.sparkConfig.executorCores}",
            "spark.executor.instances": "${this.sparkConfig.executorInstances}",
            "spark.executor.memoryOverhead": "${this.sparkConfig.executorMemoryOverhead}M",
            "spark.driver.memoryOverhead": "${this.sparkConfig.driverMemoryOverhead}M",
            "spark.memory.fraction": "0.80",
            "spark.executor.extraJavaOptions": "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p'",
            "spark.driver.extraJavaOptions": "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p'",
            "spark.yarn.scheduler.reporterThread.maxFailures": "5",
            "spark.storage.level": "MEMORY_AND_DISK_SER",
            "spark.rdd.compress": "true",
            "spark.shuffle.compress": "true",
            "spark.shuffle.spill.compress": "true",
            "spark.default.parallelism": "${this.sparkConfig.defaultParallelism}"
        }
    },
    {
        "Classification": "mapred-site",
        "Properties": {
            "mapreduce.map.output.compress": "true"
        }
    },
    {
        "Classification": "hadoop-env",
        "Configurations": [{
            "Classification": "export",
            "Configurations": [],
            "Properties": {
                "JAVA_HOME": "/usr/lib/jvm/java-1.8.0"
            }
        }],
        "Properties": {}
    },
    {
        "Classification": "spark-env",
        "Configurations": [{
            "Classification": "export",
            "Properties": {
                "JAVA_HOME": "/usr/lib/jvm/java-1.8.0"
            }
        }],
        "Properties": {}
    }
]`;
    }

    public calculateScore() {
        const memory =
            this.unusedResources.unusedMemoryPerNode /
            this.clusterParams.memoryPerNode;
        const cores =
            this.unusedResources.unusedCoresPerNode /
            this.clusterParams.numberOfCores;
        const result = (2 * cores + memory) / 3;
        return result;
    }
}

