diff --git a/pom.xml b/pom.xml index dbe82b9ddb1..7c9cf2920ed 100644 --- a/pom.xml +++ b/pom.xml @@ -236,7 +236,7 @@ org.apache.sysds.performance.Main - SystemDS.jar SystemDS-tests.jar + SystemDS.jar ${project.build.directory}/${project.artifactId}-${project.version}-tests.jar diff --git a/src/test/java/org/apache/sysds/performance/Main.java b/src/test/java/org/apache/sysds/performance/Main.java index 17a87bf0c06..1e51a703bf7 100644 --- a/src/test/java/org/apache/sysds/performance/Main.java +++ b/src/test/java/org/apache/sysds/performance/Main.java @@ -19,17 +19,114 @@ package org.apache.sysds.performance; -import org.apache.sysds.performance.compression.SteamCompressTest; +import org.apache.sysds.performance.compression.IOBandwidth; +import org.apache.sysds.performance.compression.SchemaTest; +import org.apache.sysds.performance.compression.Serialize; +import org.apache.sysds.performance.compression.StreamCompress; +import org.apache.sysds.performance.generators.ConstMatrix; +import org.apache.sysds.performance.generators.GenMatrices; +import org.apache.sysds.runtime.util.CommonThreadPool; public class Main { + private static void exec(int prog, String[] args) throws InterruptedException, Exception { + switch(prog) { + case 1: + new StreamCompress(100, new GenMatrices(10000, 100, 32, 1.0)).run(); + break; + case 2: + new SchemaTest(100, new GenMatrices(10000, 1000, 32, 1.0)).run(); + break; + case 3: + new SchemaTest(100, new GenMatrices(1000, 1, 32, 1.0)).run(); + new SchemaTest(100, new GenMatrices(1000, 10, 32, 1.0)).run(); + new SchemaTest(100, new GenMatrices(1000, 100, 32, 1.0)).run(); + new SchemaTest(100, new GenMatrices(1000, 1000, 32, 1.0)).run(); + break; + case 4: + new SchemaTest(100, new GenMatrices(1000, 1000, 1, 1.0)).run(); + new SchemaTest(100, new GenMatrices(1000, 1000, 2, 1.0)).run(); + new SchemaTest(100, new GenMatrices(1000, 1000, 4, 1.0)).run(); + new SchemaTest(100, new GenMatrices(1000, 1000, 8, 1.0)).run(); + new SchemaTest(100, new GenMatrices(1000, 1000, 16, 1.0)).run(); + new SchemaTest(100, new GenMatrices(1000, 1000, 32, 1.0)).run(); + new SchemaTest(100, new GenMatrices(1000, 1000, 64, 1.0)).run(); + new SchemaTest(100, new GenMatrices(1000, 1000, 128, 1.0)).run(); + new SchemaTest(100, new GenMatrices(1000, 1000, 256, 1.0)).run(); + new SchemaTest(100, new GenMatrices(1000, 1000, 512, 1.0)).run(); + break; + case 5: + new SchemaTest(100, new ConstMatrix(1000, 100, 32, 1.0)).runCom(); + break; + case 6: + new SchemaTest(100, new GenMatrices(1000, 1000, 32, 0.3)).run(); + break; + case 7: + new SchemaTest(100, new ConstMatrix(1000, 1000, 32, 0.3)).runCom(); + break; + case 8: + new IOBandwidth(100, new ConstMatrix(1000, 1000, 32, 1.0)).run(); + break; + case 9: + run9(args); + break; + case 10: + run10(args); + break; + case 11: + run11(args, -1); + break; + case 12: + run11(args, Integer.parseInt(args[7])); + break; + default: + break; + } + } + + private static void run9(String[] args) throws InterruptedException, Exception { + int rows = Integer.parseInt(args[1]); + int cols = Integer.parseInt(args[2]); + int unique = Integer.parseInt(args[3]); + double sparsity = Double.parseDouble(args[4]); + int k = Integer.parseInt(args[5]); + int n = Integer.parseInt(args[6]); + new IOBandwidth(n, new ConstMatrix(rows, cols, unique, sparsity), k).run(); + } + + private static void run10(String[] args) throws InterruptedException, Exception { + int rows = Integer.parseInt(args[1]); + int cols = Integer.parseInt(args[2]); + int unique = Integer.parseInt(args[3]); + double sparsity = Double.parseDouble(args[4]); + int k = Integer.parseInt(args[5]); + int n = Integer.parseInt(args[6]); + new IOBandwidth(n, new ConstMatrix(rows, cols, unique, sparsity), k).runVector(); + } + + private static void run11(String[] args, int id) throws InterruptedException, Exception { + int rows = Integer.parseInt(args[1]); + int cols = Integer.parseInt(args[2]); + int unique = Integer.parseInt(args[3]); + double sparsity = Double.parseDouble(args[4]); + int k = Integer.parseInt(args[5]); + int n = Integer.parseInt(args[6]); + + Serialize s = new Serialize(n, new ConstMatrix(rows, cols, unique, sparsity), k); + + if(id == -1) + s.run(); + else + s.run(id); + } + public static void main(String[] args) { - try{ - SteamCompressTest.P1(); + try { + exec(Integer.parseInt(args[0]), args); + CommonThreadPool.get().shutdown(); } - catch(Exception e){ + catch(Exception e) { e.printStackTrace(); - } } } diff --git a/src/test/java/org/apache/sysds/performance/TimingUtils.java b/src/test/java/org/apache/sysds/performance/TimingUtils.java new file mode 100644 index 00000000000..ed10b0a1d0c --- /dev/null +++ b/src/test/java/org/apache/sysds/performance/TimingUtils.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.performance; + +import java.util.Arrays; + +import org.apache.sysds.performance.generators.IGenerate; +import org.apache.sysds.runtime.controlprogram.parfor.stat.Timing; + +/** + * Util methods for the Performance suite + */ +public interface TimingUtils { + + /** A specification enum for the type of statistics to gather from the time measurements */ + public enum StatsType { + MEAN_STD; + } + + /** The specified measurement to use in this case. Can be set from any of the programs */ + public static StatsType st = StatsType.MEAN_STD; + + /** + * Time the given function call + * + * @param f The function to execute + * @return The time it took + */ + public static double time(F f) { + Timing time = new Timing(true); + f.run(); + return time.stop(); + } + + /** + * Time the function and print using the string given prepended. + * + * @param f The function to time + * @param p The print statement + */ + public static void time(F f, String p) { + Timing time = new Timing(true); + f.run(); + System.out.print(p); + System.out.println(time.stop()); + } + + /** + * Time the function given assuming that it should put result into the given time array at index i. + * + * @param f The function to time + * @param times The time array to put the time result into + * @param i The index to put it into + */ + public static void time(F f, double[] times, int i) { + Timing time = new Timing(true); + f.run(); + times[i] = time.stop(); + } + + /** + * Time the given function a number of time using the generator to populate the input allocations without including + * it in the timing of the operation + * + * @param f The function to time + * @param rep The number of repetitions to make + * @param bq The generator for the input + * @return A list of the individual repetitions execution time + * @throws InterruptedException An exception in case the job gets interrupted + */ + public static double[] time(F f, int rep, IGenerate bq) throws InterruptedException { + double[] times = new double[rep]; + for(int i = 0; i < rep; i++) { + while(bq.isEmpty()) + Thread.sleep(bq.defaultWaitTime()); + time(f, times, i); + } + return times; + } + + /** + * Calculate the statistics of the times executed The default is to calculate the mean and standard deviation and + * return that as a string + * + * @param v The times observed + * @return The status string. + */ + public static String stats(double[] v) { + switch(st) { + case MEAN_STD: + default: + return statsMeanSTD(v); + } + } + + private static String statsMeanSTD(double[] v) { + final int l = v.length; + final int remove = (int) Math.floor((double) l * 0.05); + Arrays.sort(v); + + double total = 0; + final int el = v.length - remove * 2; + for(int i = remove; i < l - remove; i++) + total += v[i]; + + double mean = total / el; + + double var = 0; + for(int i = remove; i < l - remove; i++) + var += Math.pow(Math.abs(v[i] - mean), 2); + + double std = Math.sqrt(var / el); + + return String.format("%8.3f+-%7.3f ms", mean, std); + } + + /** + * Interface method to enable timed calling from other Classes + */ + interface F { + void run(); + } +} diff --git a/src/test/java/org/apache/sysds/performance/Util.java b/src/test/java/org/apache/sysds/performance/Util.java deleted file mode 100644 index 9bb2f8f9b8e..00000000000 --- a/src/test/java/org/apache/sysds/performance/Util.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.sysds.performance; - -import java.util.Arrays; -import java.util.concurrent.BlockingQueue; - -import org.apache.sysds.runtime.controlprogram.parfor.stat.Timing; - -public interface Util { - public static double time(F f) { - Timing time = new Timing(true); - f.run(); - return time.stop(); - } - - public static void time(F f, String p) { - Timing time = new Timing(true); - f.run(); - System.out.print(p); - System.out.println(time.stop()); - } - - public static void time(F f, double[] times, int i) { - Timing time = new Timing(true); - f.run(); - times[i] = time.stop(); - } - - public static double[] time(F f, int rep, BlockingQueue bq) throws InterruptedException { - double[] times = new double[rep]; - for(int i = 0; i < rep; i++) { - while(bq.isEmpty()) - Thread.sleep(100); - Util.time(f, times, i); - } - return times; - } - - public static String stats(double[] v) { - - Arrays.sort(v); - final int l = v.length; - - double min = v[0]; - double max = v[l - 1]; - double q25 = v[(int) (l / 4)]; - double q50 = v[(int) (l / 2)]; - double q75 = v[(int) ((l / 4) * 3)]; - - return String.format("[%.3f, %.3f, %.3f, %.3f, %.3f]", min, q25, q50, q75, max); - } - - interface F { - void run(); - } -} diff --git a/src/test/java/org/apache/sysds/performance/compression/APerfTest.java b/src/test/java/org/apache/sysds/performance/compression/APerfTest.java new file mode 100644 index 00000000000..de67e9ff7c7 --- /dev/null +++ b/src/test/java/org/apache/sysds/performance/compression/APerfTest.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.performance.compression; + +import java.util.ArrayList; + +import org.apache.sysds.performance.TimingUtils; +import org.apache.sysds.performance.TimingUtils.F; +import org.apache.sysds.performance.generators.IGenerate; + +public abstract class APerfTest { + + /** The Result array that all the results of the individual executions is producing */ + protected final ArrayList ret; + + /** A Task que that guarantee that the execution is not to long */ + protected final IGenerate gen; + + /** Default Repetitions */ + protected final int N; + + protected APerfTest(int N, IGenerate gen) { + ret = new ArrayList(N); + this.gen = gen; + this.N = N; + } + + protected void execute(F f, String name) throws InterruptedException { + warmup(f, 10); + gen.generate(N); + ret.clear(); + double[] times = TimingUtils.time(f, N, gen); + String retS = makeResString(times); + System.out.println(String.format("%35s, %s, %10s", name, TimingUtils.stats(times), retS)); + } + + protected void warmup(F f, int n) throws InterruptedException { + gen.generate(N); + ret.clear(); + } + + protected void execute(F f, String name, int N) throws InterruptedException { + gen.generate(N); + ret.clear(); + double[] times = TimingUtils.time(f, N, gen); + String retS = makeResString(times); + System.out.println(String.format("%35s, %s, %10s", name, TimingUtils.stats(times), retS)); + } + + protected abstract String makeResString(); + + protected String makeResString(double[] times){ + return makeResString(); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(String.format("%20s ", this.getClass().getSimpleName())); + sb.append(" Repetitions: ").append(N).append(" "); + sb.append(gen); + return sb.toString(); + } +} diff --git a/src/test/java/org/apache/sysds/performance/compression/IOBandwidth.java b/src/test/java/org/apache/sysds/performance/compression/IOBandwidth.java new file mode 100644 index 00000000000..c09dddd9b41 --- /dev/null +++ b/src/test/java/org/apache/sysds/performance/compression/IOBandwidth.java @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.performance.compression; + +import org.apache.sysds.performance.generators.IGenerate; +import org.apache.sysds.runtime.compress.CompressedMatrixBlock; +import org.apache.sysds.runtime.compress.CompressedMatrixBlockFactory; +import org.apache.sysds.runtime.compress.colgroup.AColGroup.CompressionType; +import org.apache.sysds.runtime.compress.colgroup.scheme.CompressionScheme; +import org.apache.sysds.runtime.compress.lib.CLALibScheme; +import org.apache.sysds.runtime.matrix.data.LibMatrixMult; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; +import org.apache.sysds.test.TestUtils; + +public class IOBandwidth extends APerfTest { + + final int k; + + public IOBandwidth(int N, IGenerate gen) { + super(N, gen); + k = 1; + } + + public IOBandwidth(int N, IGenerate gen, int k) { + super(N, gen); + this.k = k; + } + + public void run() throws Exception, InterruptedException { + System.out.println(this); + warmup(() -> sumTask(k), N); + execute(() -> sumTask(k), "Sum"); + execute(() -> maxTask(k), "Max"); + final MatrixBlock v = genVector(); + execute(() -> matrixVector(v, k), "MV mult"); + + final CompressionScheme sch2 = CLALibScheme.getScheme(getC()); + execute(() -> updateAndApplyScheme(sch2, k), "Update&Apply Scheme"); + execute(() -> updateAndApplySchemeFused(sch2, k), "Update&Apply Scheme Fused"); + execute(() -> applyScheme(sch2, k), "Apply Scheme"); + execute(() -> fromEmptySchemeDoNotKeep(k), "Update&Apply from Empty"); + execute(() -> compressTask(k), "Normal Compression"); + + } + + public void runVector() throws Exception, InterruptedException { + System.out.println(this); + final MatrixBlock v = genVector(); + execute(() -> matrixVector(v, k), "MV mult"); + execute(() -> sumTask(k), "Sum"); + execute(() -> maxTask(k), "Max"); + } + + private void matrixVector(MatrixBlock v, int k) { + MatrixBlock mb = gen.take(); + long in = mb.getInMemorySize(); + MatrixBlock r = LibMatrixMult.matrixMult(mb, v, k); + long out = r.getInMemorySize(); + ret.add(new InOut(in, out)); + } + + private void sumTask(int k) { + MatrixBlock mb = gen.take(); + long in = mb.getInMemorySize(); + MatrixBlock r = mb.sum(k); + long out = r.getInMemorySize(); + ret.add(new InOut(in, out)); + } + + private void maxTask(int k){ + + MatrixBlock mb = gen.take(); + long in = mb.getInMemorySize(); + MatrixBlock r = mb.max(k); + long out = r.getInMemorySize(); + ret.add(new InOut(in, out)); + } + + private void compressTask(int k) { + MatrixBlock mb = gen.take(); + long in = mb.getInMemorySize(); + MatrixBlock cmb = CompressedMatrixBlockFactory.compress(mb, k).getLeft(); + long out = cmb.getInMemorySize(); + ret.add(new InOut(in, out)); + } + + private void applyScheme(CompressionScheme sch, int k) { + MatrixBlock mb = gen.take(); + long in = mb.getInMemorySize(); + MatrixBlock cmb = sch.encode(mb, k); + long out = cmb.getInMemorySize(); + ret.add(new InOut(in, out)); + } + + private void updateAndApplyScheme(CompressionScheme sch, int k) { + MatrixBlock mb = gen.take(); + long in = mb.getInMemorySize(); + sch.update(mb, k); + MatrixBlock cmb = sch.encode(mb, k); + long out = cmb.getInMemorySize(); + ret.add(new InOut(in, out)); + } + + private void updateAndApplySchemeFused(CompressionScheme sch, int k) { + MatrixBlock mb = gen.take(); + long in = mb.getInMemorySize(); + MatrixBlock cmb = sch.updateAndEncode(mb, k); + long out = cmb.getInMemorySize(); + ret.add(new InOut(in, out)); + } + + private void fromEmptySchemeDoNotKeep(int k) { + MatrixBlock mb = gen.take(); + long in = mb.getInMemorySize(); + CompressionScheme sch = CLALibScheme.genScheme(CompressionType.EMPTY, mb.getNumColumns()); + MatrixBlock cmb = sch.updateAndEncode(mb, k); + long out = cmb.getInMemorySize(); + ret.add(new InOut(in, out)); + } + + private CompressedMatrixBlock getC() throws InterruptedException { + gen.generate(1); + MatrixBlock mb = gen.take(); + return (CompressedMatrixBlock) CompressedMatrixBlockFactory.compress(mb).getLeft(); + } + + private MatrixBlock genVector() throws InterruptedException { + gen.generate(1); + MatrixBlock mb = gen.take(); + MatrixBlock vector = TestUtils.generateTestMatrixBlock(mb.getNumColumns(), 1, -1.0, 1.0, 1.0, 324); + return vector; + } + + @Override + protected String makeResString() { + throw new RuntimeException("Do not call"); + } + + @Override + protected String makeResString(double[] times) { + double totalIn = 0; + double totalOut = 0; + double totalTime = 0.0; + for(int i = 0; i < ret.size(); i++) // set times + ret.get(i).time = times[i] / 1000; // ms to sec + + ret.sort(IOBandwidth::compare); + + final int l = ret.size(); + final int remove = (int) Math.floor((double) l * 0.05); + + final int el = l - remove * 2; + + for(int i = remove; i < ret.size() - remove; i++) { + InOut e = ret.get(i); + totalIn += e.in; + totalOut += e.out; + totalTime += e.time; + } + + double bytePerMsIn = totalIn / totalTime; + double bytePerMsOut = totalOut / totalTime; + // double meanTime = totalTime / el; + + double varIn = 0; + double varOut = 0; + // double varTime = 0; + + for(int i = remove; i < ret.size() - remove; i++) { + InOut e = ret.get(i); + varIn += Math.pow(e.in / e.time - bytePerMsIn, 2); + varOut += Math.pow(e.out / e.time - bytePerMsOut, 2); + } + + double stdIn = Math.sqrt(varIn / el); + double stdOut = Math.sqrt(varOut / el); + + return String.format("%12.0f+-%12.0f Byte/s, %12.0f+-%12.0f Byte/s", bytePerMsIn, stdIn, bytePerMsOut, stdOut); + } + + public static int compare(InOut a, InOut b) { + if(a.time == b.time) + return 0; + else if(a.time < b.time) + return -1; + else + return 1; + } + + @Override + public String toString() { + return super.toString() + " threads: " + k; + } + + protected class InOut { + protected long in; + protected long out; + protected double time; + + protected InOut(long in, long out) { + this.in = in; + this.out = out; + } + + } + +} diff --git a/src/test/java/org/apache/sysds/performance/compression/SchemaTest.java b/src/test/java/org/apache/sysds/performance/compression/SchemaTest.java new file mode 100644 index 00000000000..61309f41b3b --- /dev/null +++ b/src/test/java/org/apache/sysds/performance/compression/SchemaTest.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sysds.performance.compression; + +import org.apache.sysds.performance.generators.IGenerate; +import org.apache.sysds.runtime.compress.CompressedMatrixBlock; +import org.apache.sysds.runtime.compress.CompressedMatrixBlockFactory; +import org.apache.sysds.runtime.compress.colgroup.AColGroup.CompressionType; +import org.apache.sysds.runtime.compress.colgroup.scheme.CompressionScheme; +import org.apache.sysds.runtime.compress.lib.CLALibScheme; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; + +public class SchemaTest extends APerfTest { + + public SchemaTest(int N, IGenerate gen) { + super(N, gen); + } + + public void run() throws Exception, InterruptedException { + System.out.println(this); + execute(() -> sumTask(), "Sum Task -- Warmup"); + execute(() -> compressTask(), "Compress Normal 10 blocks"); + final CompressedMatrixBlock cmb = (CompressedMatrixBlock) ret.get(0); + final CompressionScheme sch = CLALibScheme.getScheme(cmb); + execute(() -> updateScheme(sch), "Update Scheme"); + execute(() -> applyScheme(sch), "Apply Scheme"); + final CompressionScheme sch2 = CLALibScheme.getScheme(cmb); + execute(() -> updateAndApplyScheme(sch2), "Update & Apply Scheme"); + execute(() -> fromEmptyScheme(), "From Empty Update & Apply Scheme"); + } + + public void runCom() throws Exception, InterruptedException { + execute(() -> compressTaskDoNotKeep(), "Compress Normal 10 blocks", 10); + for(int i = 0; i < 100; i++) + execute(() -> fromEmptySchemeDoNotKeep(), "From Empty Update & Apply Scheme", 10000); + + } + + protected String makeResString() { + return ""; + } + + private void sumTask() { + gen.take().sum(); + } + + private void compressTask() { + ret.add(CompressedMatrixBlockFactory.compress(gen.take()).getLeft()); + } + + private void compressTaskDoNotKeep() { + CompressedMatrixBlockFactory.compress(gen.take()).getLeft(); + } + + private void updateScheme(CompressionScheme sch) { + sch.update(gen.take()); + } + + private void applyScheme(CompressionScheme sch) { + ret.add(sch.encode(gen.take())); + } + + private void updateAndApplyScheme(CompressionScheme sch) { + MatrixBlock mb = gen.take(); + sch.update(mb); + ret.add(sch.encode(mb)); + } + + private void fromEmptyScheme() { + MatrixBlock mb = gen.take(); + CompressionScheme sch = CLALibScheme.genScheme(CompressionType.EMPTY, mb.getNumColumns()); + sch.update(mb); + ret.add(sch.encode(mb)); + } + + private void fromEmptySchemeDoNotKeep() { + MatrixBlock mb = gen.take(); + CompressionScheme sch = CLALibScheme.genScheme(CompressionType.EMPTY, mb.getNumColumns()); + + sch.update(mb); + sch.encode(mb); + } +} diff --git a/src/test/java/org/apache/sysds/performance/compression/Serialize.java b/src/test/java/org/apache/sysds/performance/compression/Serialize.java new file mode 100644 index 00000000000..17fc3feda0a --- /dev/null +++ b/src/test/java/org/apache/sysds/performance/compression/Serialize.java @@ -0,0 +1,406 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.performance.compression; + +import java.io.BufferedOutputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.Paths; + +import org.apache.sysds.performance.generators.IGenerate; +import org.apache.sysds.runtime.compress.CompressedMatrixBlock; +import org.apache.sysds.runtime.compress.CompressedMatrixBlockFactory; +import org.apache.sysds.runtime.compress.colgroup.scheme.CompressionScheme; +import org.apache.sysds.runtime.compress.io.WriterCompressed; +import org.apache.sysds.runtime.compress.lib.CLALibScheme; +import org.apache.sysds.runtime.io.MatrixWriter; +import org.apache.sysds.runtime.io.WriterBinaryBlockParallel; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; + +public class Serialize extends APerfTest { + + static final String file = "./perftmp.bin"; + final int k; + + public Serialize(int N, IGenerate gen) { + super(N, gen); + k = 1; + } + + public Serialize(int N, IGenerate gen, int k) { + super(N, gen); + this.k = k; + } + + public void run() throws Exception, InterruptedException { + System.out.println(this); + warmup(() -> sumTask(k), N); + cleanup(); + execute(() -> writeUncompressed(k), "Serialize"); + execute(() -> diskUncompressed(k), "CustomDisk"); + cleanup(); + execute(() -> standardIO(k), "StandardDisk"); + cleanup(); + + execute(() -> compressTask(k), "Compress Normal"); + execute(() -> writeCompressTask(k), "Compress Normal Serialize"); + execute(() -> diskCompressTask(k), "Compress Normal CustomDisk"); + cleanup(); + execute(() -> standardCompressedIO(k), "Compress StandardIO"); + cleanup(); + + final CompressionScheme sch2 = CLALibScheme.getScheme(getC()); + execute(() -> updateAndApplySchemeFused(sch2, k), "Update&Apply Scheme Fused"); + execute(() -> writeUpdateAndApplySchemeFused(sch2, k), "Update&Apply Scheme Fused Serialize"); + cleanup(); + execute(() -> diskUpdateAndApplySchemeFused(sch2, k), "Update&Apply Scheme Fused Disk"); + cleanup(); + execute(() -> standardCompressedIOUpdateAndApply(sch2, k), "Update&Apply Standard IO"); + } + + public void run(int i) throws Exception, InterruptedException { + warmup(() -> sumTask(k), N); + final CompressionScheme sch = CLALibScheme.getScheme(getC()); + cleanup(); + switch(i) { + case 1: + execute(() -> writeUncompressed(k), "Serialize"); + break; + case 2: + execute(() -> diskUncompressed(k), "CustomDisk"); + break; + case 3: + execute(() -> standardIO(k), "StandardDisk"); + break; + case 4: + execute(() -> compressTask(k), "Compress Normal"); + break; + case 5: + execute(() -> writeCompressTask(k), "Compress Normal Serialize"); + break; + case 6: + execute(() -> diskCompressTask(k), "Compress Normal CustomDisk"); + break; + case 7: + execute(() -> standardCompressedIO(k), "Compress StandardIO"); + break; + case 8: + execute(() -> updateAndApplySchemeFused(sch, k), "Update&Apply Scheme Fused"); + break; + case 9: + execute(() -> writeUpdateAndApplySchemeFused(sch, k), "Update&Apply Scheme Fused Serialize"); + break; + case 10: + execute(() -> diskUpdateAndApplySchemeFused(sch, k), "Update&Apply Scheme Fused Disk"); + break; + case 11: + execute(() -> standardCompressedIOUpdateAndApply(sch, k), "Update&Apply Standard IO"); + break; + } + cleanup(); + } + + private void writeUncompressed(int k) { + MatrixBlock mb = gen.take(); + Sink o = serialize(mb); + ret.add(new InOut(mb.getInMemorySize(), o.size())); + } + + private void diskUncompressed(int k) { + MatrixBlock mb = gen.take(); + Disk o = serializeD(mb); + ret.add(new InOut(mb.getInMemorySize(), o.size())); + } + + private void standardIO(int k) { + try { + MatrixWriter w = new WriterBinaryBlockParallel(1); + MatrixBlock mb = gen.take(); + w.writeMatrixToHDFS(mb, file, mb.getNumRows(), mb.getNumColumns(), 1000, mb.getNonZeros(), false); + ret.add(new InOut(mb.getInMemorySize(), Files.size(Paths.get(file)))); + } + catch(Exception e) { + throw new RuntimeException(e); + } + } + + private void compressTask(int k) { + MatrixBlock mb = gen.take(); + long in = mb.getInMemorySize(); + MatrixBlock cmb = CompressedMatrixBlockFactory.compress(mb, k).getLeft(); + long out = cmb.getInMemorySize(); + ret.add(new InOut(in, out)); + } + + private void writeCompressTask(int k) { + MatrixBlock mb = gen.take(); + long in = mb.getInMemorySize(); + MatrixBlock cmb = CompressedMatrixBlockFactory.compress(mb, k).getLeft(); + Sink o = serialize(cmb); + ret.add(new InOut(in, o.size())); + } + + private void diskCompressTask(int k) { + MatrixBlock mb = gen.take(); + long in = mb.getInMemorySize(); + MatrixBlock cmb = CompressedMatrixBlockFactory.compress(mb, k).getLeft(); + Disk o = serializeD(cmb); + ret.add(new InOut(in, o.size())); + } + + private void standardCompressedIO(int k) { + try { + // MatrixWriter w = new WriterBinaryBlockParallel(1); + MatrixBlock mb = gen.take(); + WriterCompressed.writeCompressedMatrixToHDFS(mb, file); + ret.add(new InOut(mb.getInMemorySize(), Files.size(Paths.get(file)))); + } + catch(Exception e) { + throw new RuntimeException(e); + } + } + + private void updateAndApplySchemeFused(CompressionScheme sch, int k) { + MatrixBlock mb = gen.take(); + long in = mb.getInMemorySize(); + MatrixBlock cmb = sch.updateAndEncode(mb, k); + long out = cmb.getInMemorySize(); + ret.add(new InOut(in, out)); + } + + private void writeUpdateAndApplySchemeFused(CompressionScheme sch, int k) { + MatrixBlock mb = gen.take(); + long in = mb.getInMemorySize(); + MatrixBlock cmb = sch.updateAndEncode(mb, k); + Sink o = serialize(cmb); + ret.add(new InOut(in, o.size())); + } + + private void diskUpdateAndApplySchemeFused(CompressionScheme sch, int k) { + MatrixBlock mb = gen.take(); + long in = mb.getInMemorySize(); + MatrixBlock cmb = sch.updateAndEncode(mb, k); + Disk o = serializeD(cmb); + ret.add(new InOut(in, o.size())); + } + + private void standardCompressedIOUpdateAndApply(CompressionScheme sch, int k) { + try { + // MatrixWriter w = new WriterBinaryBlockParallel(1); + MatrixBlock mb = gen.take(); + MatrixBlock cmb = sch.updateAndEncode(mb, k); + WriterCompressed.writeCompressedMatrixToHDFS(cmb, file); + ret.add(new InOut(mb.getInMemorySize(), Files.size(Paths.get(file)))); + } + catch(Exception e) { + throw new RuntimeException(e); + } + } + + private void sumTask(int k) { + MatrixBlock mb = gen.take(); + long in = mb.getInMemorySize(); + MatrixBlock r = mb.sum(k); + long out = r.getInMemorySize(); + ret.add(new InOut(in, out)); + } + + private CompressedMatrixBlock getC() throws InterruptedException { + gen.generate(1); + MatrixBlock mb = gen.take(); + return (CompressedMatrixBlock) CompressedMatrixBlockFactory.compress(mb).getLeft(); + } + + @Override + protected String makeResString() { + throw new RuntimeException("Do not call"); + } + + @Override + protected String makeResString(double[] times) { + double totalIn = 0; + double totalOut = 0; + double totalTime = 0.0; + for(int i = 0; i < ret.size(); i++) // set times + ret.get(i).time = times[i] / 1000; // ms to sec + + ret.sort(Serialize::compare); + + final int l = ret.size(); + final int remove = (int) Math.floor((double) l * 0.05); + + final int el = l - remove * 2; + + for(int i = remove; i < ret.size() - remove; i++) { + InOut e = ret.get(i); + totalIn += e.in; + totalOut += e.out; + totalTime += e.time; + } + + double bytePerMsIn = totalIn / totalTime; + double bytePerMsOut = totalOut / totalTime; + // double meanTime = totalTime / el; + + double varIn = 0; + double varOut = 0; + // double varTime = 0; + + for(int i = remove; i < ret.size() - remove; i++) { + InOut e = ret.get(i); + varIn += Math.pow(e.in / e.time - bytePerMsIn, 2); + varOut += Math.pow(e.out / e.time - bytePerMsOut, 2); + } + + double stdIn = Math.sqrt(varIn / el); + double stdOut = Math.sqrt(varOut / el); + + return String.format("%12.0f+-%12.0f Byte/s, %12.0f+-%12.0f Byte/s", bytePerMsIn, stdIn, bytePerMsOut, stdOut); + } + + public static int compare(InOut a, InOut b) { + if(a.time == b.time) + return 0; + else if(a.time < b.time) + return -1; + else + return 1; + } + + public static Sink serialize(MatrixBlock mb) { + try { + Sink s = new Sink(); + DataOutputStream fos = new DataOutputStream(s); + mb.write(fos); + return s; + } + catch(IOException e) { + throw new RuntimeException(e); + } + } + + public static Disk serializeD(MatrixBlock mb) { + try { + Disk s = new Disk(); + DataOutputStream fos = new DataOutputStream(s); + mb.write(fos); + return s; + } + catch(IOException e) { + throw new RuntimeException(e); + } + } + + private static class Sink extends OutputStream { + long s = 0L; + + @Override + public void write(int b) throws IOException { + s++; + } + + @Override + public void write(byte[] b) throws IOException { + s += b.length; + } + + public long size() { + return s; + } + + } + + private static class Disk extends OutputStream { + final FileOutputStream writer; + final BufferedOutputStream buf; + long s = 0L; + + protected Disk() throws FileNotFoundException { + writer = new FileOutputStream(file); + buf = new BufferedOutputStream(writer, 4096); + } + + @Override + public void write(int b) throws IOException { + s++; + buf.write(b); + } + + @Override + public void write(byte[] b) throws IOException { + s += b.length; + buf.write(b); + } + + public long size() { + try { + buf.close(); + writer.close(); + return s; + } + catch(Exception e) { + return s; + } + } + } + + private void cleanup() { + File f = new File(file); + if(f.exists()) { + if(f.isDirectory()) + deleteDirectory(f); + else + f.delete(); + } + } + + boolean deleteDirectory(File directoryToBeDeleted) { + File[] allContents = directoryToBeDeleted.listFiles(); + if(allContents != null) { + for(File file : allContents) { + deleteDirectory(file); + } + } + return directoryToBeDeleted.delete(); + } + + @Override + public String toString() { + return super.toString() + " threads: " + k; + } + + protected class InOut { + protected long in; + protected long out; + protected double time; + + protected InOut(long in, long out) { + this.in = in; + this.out = out; + } + + } + +} diff --git a/src/test/java/org/apache/sysds/performance/compression/SteamCompressTest.java b/src/test/java/org/apache/sysds/performance/compression/SteamCompressTest.java deleted file mode 100644 index e414e516320..00000000000 --- a/src/test/java/org/apache/sysds/performance/compression/SteamCompressTest.java +++ /dev/null @@ -1,206 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.sysds.performance.compression; - -import java.io.ByteArrayOutputStream; -import java.io.DataOutputStream; -import java.util.ArrayList; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CompletableFuture; -import java.util.zip.Deflater; -import java.util.zip.DeflaterOutputStream; - -import org.apache.sysds.performance.Util; -import org.apache.sysds.performance.Util.F; -import org.apache.sysds.runtime.compress.CompressedMatrixBlockFactory; -import org.apache.sysds.runtime.matrix.data.MatrixBlock; -import org.apache.sysds.runtime.util.CommonThreadPool; -import org.apache.sysds.test.TestUtils; - -public class SteamCompressTest { - - private static BlockingQueue tasks = new ArrayBlockingQueue<>(8); - private static ArrayList ret; - - public static void P1() throws Exception, InterruptedException { - System.out.println("Running Steam Compression Test"); - CommonThreadPool.get(2); - - execute(() -> sumTask(), "Sum Task -- Warmup"); - execute(() -> blockSizeTask(), "In Memory Block Size"); - execute(() -> writeSteam(), "Write Blocks Stream"); - execute(() -> writeSteamDeflaterOutputStreamDef(), "Write Stream Deflate"); - execute(() -> writeSteamDeflaterOutputStreamSpeed(), "Write Stream Deflate Speedy"); - execute(() -> compressTask(), "In Memory Compress Individual (CI)"); - execute(() -> writeStreamCompressTask(), "Write CI Stream"); - execute(() -> writeStreamCompressDeflaterOutputStreamTask(), "Write CI Deflate Stream"); - execute(() -> writeStreamCompressDeflaterOutputStreamTaskSpeedy(), "Write CI Deflate Stream Speedy"); - - } - - private static void execute(F f, String name) throws InterruptedException { - final int N = 100; - fillTasks(N); - if(ret == null) - ret = new ArrayList(); - else - ret.clear(); - double[] times = Util.time(f, N, tasks); - Double avgRes = ret.stream().mapToDouble(a -> a).average().getAsDouble(); - System.out.println(String.format("%35s, %50s, %10.2f", name, Util.stats(times), avgRes)); - - } - - private static void sumTask() { - try { - ret.add(tasks.take().sum()); - } - catch(Exception e) { - e.printStackTrace(); - throw new RuntimeException("Failed sum"); - } - } - - private static void blockSizeTask() { - try { - ret.add((double)tasks.take().getInMemorySize()); - } - catch(Exception e) { - e.printStackTrace(); - throw new RuntimeException("Failed sum"); - } - } - - private static void compressTask() { - try { - MatrixBlock mb = CompressedMatrixBlockFactory.compress(tasks.take()).getLeft(); - ret.add((double) mb.getInMemorySize()); - } - catch(Exception e) { - e.printStackTrace(); - throw new RuntimeException("Failed compress"); - } - } - - private static void writeStreamCompressTask() { - try { - MatrixBlock mb = CompressedMatrixBlockFactory.compress(tasks.take()).getLeft(); - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - DataOutputStream fos = new DataOutputStream(bos); - mb.write(fos); - ret.add((double) bos.size()); - } - catch(Exception e) { - e.printStackTrace(); - throw new RuntimeException("Failed compress"); - } - } - - private static void writeStreamCompressDeflaterOutputStreamTask() { - try { - MatrixBlock mb = CompressedMatrixBlockFactory.compress(tasks.take()).getLeft(); - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - DeflaterOutputStream decorator = new DeflaterOutputStream(bos); - DataOutputStream fos = new DataOutputStream(decorator); - mb.write(fos); - ret.add((double) bos.size()); - } - catch(Exception e) { - e.printStackTrace(); - throw new RuntimeException("Failed compress"); - } - } - - private static void writeStreamCompressDeflaterOutputStreamTaskSpeedy() { - try { - MatrixBlock mb = CompressedMatrixBlockFactory.compress(tasks.take()).getLeft(); - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - DeflaterOutputStream decorator = new DeflaterOutputStream(bos, new Deflater(Deflater.BEST_SPEED)); - DataOutputStream fos = new DataOutputStream(decorator); - mb.write(fos); - ret.add((double) bos.size()); - } - catch(Exception e) { - e.printStackTrace(); - throw new RuntimeException("Failed compress"); - } - } - - private static void writeSteam() { - try { - MatrixBlock mb = tasks.take(); - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - DataOutputStream fos = new DataOutputStream(bos); - mb.write(fos); - ret.add((double) bos.size()); - } - catch(Exception e) { - e.printStackTrace(); - throw new RuntimeException("failed Write Stream"); - } - } - - private static void writeSteamDeflaterOutputStreamDef() { - try { - MatrixBlock mb = tasks.take(); - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - DeflaterOutputStream decorator = new DeflaterOutputStream(bos); - DataOutputStream fos = new DataOutputStream(decorator); - mb.write(fos); - ret.add((double) bos.size()); - - } - catch(Exception e) { - e.printStackTrace(); - throw new RuntimeException("Failed compress"); - } - } - - private static void writeSteamDeflaterOutputStreamSpeed() { - try { - MatrixBlock mb = tasks.take(); - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - DeflaterOutputStream decorator = new DeflaterOutputStream(bos, new Deflater(Deflater.BEST_SPEED)); - DataOutputStream fos = new DataOutputStream(decorator); - mb.write(fos); - ret.add((double) bos.size()); - } - catch(Exception e) { - e.printStackTrace(); - throw new RuntimeException("Failed compress"); - } - } - - private static void fillTasks(int nBlocks) { - CompletableFuture.runAsync(() -> { - - for(int i = 0; i < nBlocks; i++) { - MatrixBlock mb = TestUtils.round(TestUtils.generateTestMatrixBlock(1000, 100, 0, 32, 0.2, i)); - try { - tasks.put(mb); - } - catch(InterruptedException e) { - e.printStackTrace(); - } - } - }); - } -} diff --git a/src/test/java/org/apache/sysds/performance/compression/StreamCompress.java b/src/test/java/org/apache/sysds/performance/compression/StreamCompress.java new file mode 100644 index 00000000000..97418a570ca --- /dev/null +++ b/src/test/java/org/apache/sysds/performance/compression/StreamCompress.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.performance.compression; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.zip.Deflater; +import java.util.zip.DeflaterOutputStream; + +import org.apache.sysds.performance.generators.IGenerate; +import org.apache.sysds.runtime.compress.CompressedMatrixBlockFactory; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; + +public class StreamCompress extends APerfTest { + + public StreamCompress(int N, IGenerate gen) { + super(N, gen); + } + + public void run() throws Exception, InterruptedException, IOException { + System.out.println("Running Steam Compression Test"); + System.out.println(this); + + warmup(() -> sumTask(), 10); + execute(() -> blockSizeTask(), "In Memory Block Size"); + execute(() -> writeSteam(), "Write Blocks Stream"); + execute(() -> writeSteamDeflaterOutputStreamDef(), "Write Stream Deflate"); + execute(() -> writeSteamDeflaterOutputStreamSpeed(), "Write Stream Deflate Speedy"); + execute(() -> compressTask(), "In Memory Compress Individual (CI)"); + execute(() -> writeStreamCompressTask(), "Write CI Stream"); + execute(() -> writeStreamCompressDeflaterOutputStreamTask(), "Write CI Deflate Stream"); + execute(() -> writeStreamCompressDeflaterOutputStreamTaskSpeedy(), "Write CI Deflate Stream Speedy"); + + } + + @Override + protected String makeResString() { + Double avgRes = ret.stream().mapToDouble(a -> a).average().getAsDouble(); + return String.format("%10.2f", avgRes); + } + + private void sumTask() { + ret.add(gen.take().sum()); + } + + private void blockSizeTask() { + ret.add((double) gen.take().getInMemorySize()); + } + + private void compressTask() { + + MatrixBlock mb = CompressedMatrixBlockFactory.compress(gen.take()).getLeft(); + ret.add((double) mb.getInMemorySize()); + + } + + private void writeStreamCompressTask() { + + MatrixBlock mb = CompressedMatrixBlockFactory.compress(gen.take()).getLeft(); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + DataOutputStream fos = new DataOutputStream(bos); + try { + mb.write(fos); + } + catch(IOException e) { + throw new RuntimeException(e); + } + ret.add((double) bos.size()); + + } + + private void writeStreamCompressDeflaterOutputStreamTask() { + + MatrixBlock mb = CompressedMatrixBlockFactory.compress(gen.take()).getLeft(); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + DeflaterOutputStream decorator = new DeflaterOutputStream(bos); + DataOutputStream fos = new DataOutputStream(decorator); + try { + mb.write(fos); + } + catch(IOException e) { + throw new RuntimeException(e); + } + ret.add((double) bos.size()); + + } + + private void writeStreamCompressDeflaterOutputStreamTaskSpeedy() { + + MatrixBlock mb = CompressedMatrixBlockFactory.compress(gen.take()).getLeft(); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + DeflaterOutputStream decorator = new DeflaterOutputStream(bos, new Deflater(Deflater.BEST_SPEED)); + DataOutputStream fos = new DataOutputStream(decorator); + try { + mb.write(fos); + } + catch(IOException e) { + throw new RuntimeException(e); + } + ret.add((double) bos.size()); + + } + + private void writeSteam() { + + MatrixBlock mb = gen.take(); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + DataOutputStream fos = new DataOutputStream(bos); + try { + mb.write(fos); + } + catch(IOException e) { + throw new RuntimeException(e); + } + ret.add((double) bos.size()); + + } + + private void writeSteamDeflaterOutputStreamDef() { + + MatrixBlock mb = gen.take(); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + DeflaterOutputStream decorator = new DeflaterOutputStream(bos); + DataOutputStream fos = new DataOutputStream(decorator); + try { + mb.write(fos); + } + catch(IOException e) { + throw new RuntimeException(e); + } + ret.add((double) bos.size()); + + } + + private void writeSteamDeflaterOutputStreamSpeed() { + + MatrixBlock mb = gen.take(); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + DeflaterOutputStream decorator = new DeflaterOutputStream(bos, new Deflater(Deflater.BEST_SPEED)); + DataOutputStream fos = new DataOutputStream(decorator); + try { + mb.write(fos); + } + catch(IOException e) { + throw new RuntimeException(e); + } + ret.add((double) bos.size()); + + } + +} diff --git a/src/test/java/org/apache/sysds/performance/generators/ConstMatrix.java b/src/test/java/org/apache/sysds/performance/generators/ConstMatrix.java new file mode 100644 index 00000000000..f01d0a2075b --- /dev/null +++ b/src/test/java/org/apache/sysds/performance/generators/ConstMatrix.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.performance.generators; + +import org.apache.sysds.common.Types; +import org.apache.sysds.runtime.functionobjects.ReduceAll; +import org.apache.sysds.runtime.instructions.cp.AggregateUnaryCPInstruction.AUType; +import org.apache.sysds.runtime.matrix.data.LibMatrixCountDistinct; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; +import org.apache.sysds.runtime.matrix.operators.CountDistinctOperator; +import org.apache.sysds.test.TestUtils; + +public class ConstMatrix implements IGenerate { + + protected final MatrixBlock mb; + protected final int nVal; + + public ConstMatrix(MatrixBlock mb) { + this.mb = mb; + this.nVal = (int) LibMatrixCountDistinct + .estimateDistinctValues(mb, + new CountDistinctOperator(AUType.COUNT_DISTINCT, Types.Direction.RowCol, ReduceAll.getReduceAllFnObject())) + .getValue(0, 0); + } + + public ConstMatrix(int r, int c, int nVal, double s) { + this.mb = TestUtils.ceil(TestUtils.generateTestMatrixBlock(r, c, 0, nVal, s, 42)); + this.nVal = nVal; + } + + @Override + public MatrixBlock take() { + return mb; + } + + @Override + public void generate(int N) throws InterruptedException { + // do nothing + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(this.getClass().getSimpleName()); + sb.append(" ( Rows:").append(mb.getNumRows()); + sb.append(", Cols:").append(mb.getNumColumns()); + sb.append(", Spar:").append(mb.getSparsity()); + sb.append(", Unique: ").append(nVal); + sb.append(")"); + return sb.toString(); + } + + @Override + public boolean isEmpty() { + return false; + } + + @Override + public int defaultWaitTime() { + return 0; + } + +} diff --git a/src/test/java/org/apache/sysds/performance/generators/GenMatrices.java b/src/test/java/org/apache/sysds/performance/generators/GenMatrices.java new file mode 100644 index 00000000000..f96233ae6fe --- /dev/null +++ b/src/test/java/org/apache/sysds/performance/generators/GenMatrices.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.performance.generators; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; + +import org.apache.sysds.runtime.matrix.data.MatrixBlock; +import org.apache.sysds.runtime.util.CommonThreadPool; +import org.apache.sysds.test.TestUtils; + +public class GenMatrices implements IGenerate { + + /** A Task que that guarantee that the execution is not to long */ + protected final BlockingQueue tasks; + /** The number of rows in each task block */ + protected final int r; + /** The number of cols in each task block */ + protected final int c; + /** The number of max unique values */ + protected final int nVal; + /** The sparsity of the generated matrices */ + protected final double s; + /** The initial seed */ + protected final int seed; + + public GenMatrices(int r, int c, int nVal, double s) { + // Make a thread pool if not already there + CommonThreadPool.get(); + tasks = new ArrayBlockingQueue<>(8); + this.r = r; + this.c = c; + this.nVal = nVal; + this.s = s; + this.seed = 42; + } + + @Override + public MatrixBlock take() { + try { + return tasks.take(); + } + catch(Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public void generate(int N) throws InterruptedException { + CompletableFuture.runAsync(() -> { + try { + for(int i = 0; i < N; i++) { + tasks.put(TestUtils.ceil(TestUtils.generateTestMatrixBlock(r, c, 0, nVal, s, i + seed))); + } + } + catch(InterruptedException e) { + e.printStackTrace(); + } + }); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(this.getClass().getSimpleName()); + sb.append(" rand(").append(r).append(", ").append(c).append(", ").append(nVal).append(", ").append(s).append(")"); + sb.append(" Seed: ").append(seed); + return sb.toString(); + } + + @Override + public boolean isEmpty() { + return tasks.isEmpty(); + } + + @Override + public int defaultWaitTime() { + return 100; + } + +} diff --git a/src/test/java/org/apache/sysds/performance/generators/IGenerate.java b/src/test/java/org/apache/sysds/performance/generators/IGenerate.java new file mode 100644 index 00000000000..ee39590bf31 --- /dev/null +++ b/src/test/java/org/apache/sysds/performance/generators/IGenerate.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.performance.generators; + +/** + * Generator interface for task generation. + */ +public interface IGenerate { + + /** + * Validate if the generator is empty, and we have to wait for elements. + * + * @return If the generator is empty + */ + public boolean isEmpty(); + + /** + * Default wait time for the generator to fill + * + * @return The wait time + */ + public int defaultWaitTime(); + + /** + * A Blocking take operation that waits for the Generator to fill that element + * + * @return An task element + */ + public T take(); + + /** + * A Non blocking async operation that generates elements for the task que + * + * @param N The number of elements to create + * @throws InterruptedException An exception if the task is interrupted + */ + public void generate(int N) throws InterruptedException; + +}