From 478e50ff37e79592ce13cd1101b0b13de1f0924e Mon Sep 17 00:00:00 2001 From: franz1981 Date: Wed, 4 Nov 2020 15:13:47 +0100 Subject: [PATCH] PROTON-2287 Improve Symbol decoding cache --- .../proton/codec/CompositeReadableBuffer.java | 100 +++++++++++--- .../apache/qpid/proton/codec/DecoderImpl.java | 14 +- .../org/apache/qpid/proton/codec/Hashing.java | 125 ++++++++++++++++++ .../qpid/proton/codec/ReadableBuffer.java | 4 +- .../apache/qpid/proton/codec/SymbolType.java | 4 +- tests/performance-jmh/pom.xml | 2 +- ...ompositeReadableBufferEqualsBenchmark.java | 15 +-- .../qpid/proton/hash/BytesHashBenchmark.java | 102 ++++++++++++++ 8 files changed, 333 insertions(+), 33 deletions(-) create mode 100644 proton-j/src/main/java/org/apache/qpid/proton/codec/Hashing.java create mode 100644 tests/performance-jmh/src/main/java/org/apache/qpid/proton/hash/BytesHashBenchmark.java diff --git a/proton-j/src/main/java/org/apache/qpid/proton/codec/CompositeReadableBuffer.java b/proton-j/src/main/java/org/apache/qpid/proton/codec/CompositeReadableBuffer.java index abbb40b76..eb617756a 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/codec/CompositeReadableBuffer.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/codec/CompositeReadableBuffer.java @@ -145,6 +145,13 @@ public byte get(int index) { throw new IndexOutOfBoundsException("The given index is not valid: " + index); } + return _get(index); + } + + /** + * Unchecked ie no bound-checks get + */ + private byte _get(int index) { byte result = 0; if (index == position) { @@ -813,8 +820,8 @@ public int hashCode() { int remaining = remaining(); if (currentArrayIndex < 0 || remaining <= currentArray.length - currentOffset) { - while (remaining > 0) { - hash = 31 * hash + currentArray[currentOffset + --remaining]; + if (remaining > 0) { + hash = Hashing.byteBufferCompatibleHashCode(currentArray, currentOffset, currentOffset + remaining); } } else { hash = hashCodeFromComponents(); @@ -875,7 +882,7 @@ public boolean equals(Object other) { return true; } - if (hasArray() || remaining <= currentArray.length - currentOffset) { + if (remaining <= currentArray.length - currentOffset || hasArray()) { // Either there is only one array, or the span to compare is within a single chunk of this buffer, // allowing the compare to directly access the underlying array instead of using slower get methods. return equals(currentArray, currentOffset, remaining, buffer); @@ -885,6 +892,40 @@ public boolean equals(Object other) { } private static boolean equals(byte[] buffer, int start, int length, ReadableBuffer other) { + if (other.hasArray()) { + // fast-path: jdk 11 has a vectorized Arrays::equals for ranged comparisons, but + // sadly JDK 8 nope so let's try to save at least bound checks + final int otherStart = other.arrayOffset() + other.position(); + return equals(buffer, start, other.array(), otherStart, length); + } else if (other instanceof ByteBufferReader) { + return rawEquals(buffer, start, length, other.byteBuffer()); + } + return rawEquals(buffer, start, length, other); + } + + private static boolean uncheckedEquals(byte[] buffer, int start, int length, CompositeReadableBuffer other) { + final int position = other.position(); + for (int i = 0; i < length; i++) { + if (buffer[start + i] != other._get(position + i)) { + return false; + } + } + return true; + } + + private static boolean uncheckedEquals(CompositeReadableBuffer buffer, ByteBuffer other, int length) { + assert buffer.remaining() >= length; + final int otherPosition = other.position(); + final int bufferPosition = buffer.position(); + for (int i = 0; i < length; i++) { + if (buffer._get(bufferPosition + i) != other.get(otherPosition + i)) { + return false; + } + } + return true; + } + + private static boolean rawEquals(byte[] buffer, int start, int length, ByteBuffer other) { final int position = other.position(); for (int i = 0; i < length; i++) { if (buffer[start + i] != other.get(position + i)) { @@ -894,18 +935,47 @@ private static boolean equals(byte[] buffer, int start, int length, ReadableBuff return true; } - private static boolean equals(ReadableBuffer buffer, ReadableBuffer other) { - final int origPos = buffer.position(); - try { - for (int i = other.position(); buffer.hasRemaining(); i++) { - if (!equals(buffer.get(), other.get(i))) { - return false; - } + private static boolean rawEquals(byte[] buffer, int start, int length, ReadableBuffer other) { + final int position = other.position(); + for (int i = 0; i < length; i++) { + if (buffer[start + i] != other.get(position + i)) { + return false; + } + } + return true; + } + + private static boolean equals(byte[] a, int aStart, byte[] b, int bStart, int length) { + for (int i = 0; i < length; i++) { + if (a[aStart + i] != b[bStart + i]) { + return false; } - return true; - } finally { - buffer.position(origPos); } + return true; + } + + private static boolean equals(CompositeReadableBuffer buffer, ReadableBuffer other) { + final int bufferRemaining = buffer.remaining(); + if (other.hasArray()) { + final int otherStart = other.arrayOffset() + other.position(); + // check if otherEnd is beyond other limits, because the underline array is just limited by the capacity + if (other.limit() < otherStart + bufferRemaining) { + throw new BufferUnderflowException(); + } + return uncheckedEquals(other.array(), otherStart, bufferRemaining, buffer); + } + if (other instanceof ByteBufferReader) { + return uncheckedEquals(buffer, other.byteBuffer(), bufferRemaining); + } + // slow path + final int bufferPosition = buffer.position(); + final int otherPosition = other.position(); + for (int i = 0; i < bufferRemaining; i++) { + if (buffer._get(bufferPosition + i) != other.get(otherPosition + i)) { + return false; + } + } + return true; } @Override @@ -923,10 +993,6 @@ public String toString() { return builder.toString(); } - private static boolean equals(byte x, byte y) { - return x == y; - } - private void maybeMoveToNextArray() { if (currentArray.length == currentOffset) { if (currentArrayIndex >= 0 && currentArrayIndex < (contents.size() - 1)) { diff --git a/proton-j/src/main/java/org/apache/qpid/proton/codec/DecoderImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/codec/DecoderImpl.java index 12be52617..848383aee 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/codec/DecoderImpl.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/codec/DecoderImpl.java @@ -1075,8 +1075,18 @@ void readRaw(final byte[] data, final int offset, final int length) V readRaw(TypeDecoder decoder, int size) { - V decode = decoder.decode(this, _buffer.slice().limit(size)); - _buffer.position(_buffer.position()+size); + final int originalLimit = _buffer.limit(); + final int originalPosition = _buffer.position(); + final V decode; + try { + decode = decoder.decode(this, _buffer.limit(originalPosition + size)); + } catch (Throwable t) { + _buffer.position(originalPosition); + throw t; + } finally { + _buffer.limit(originalLimit); + } + _buffer.position(originalPosition + size); return decode; } diff --git a/proton-j/src/main/java/org/apache/qpid/proton/codec/Hashing.java b/proton-j/src/main/java/org/apache/qpid/proton/codec/Hashing.java new file mode 100644 index 000000000..9cf0bca8f --- /dev/null +++ b/proton-j/src/main/java/org/apache/qpid/proton/codec/Hashing.java @@ -0,0 +1,125 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.proton.codec; + +import java.nio.ByteBuffer; + +public class Hashing { + + private Hashing() { + + } + + // this const propagation should be already handled by the JIT + // but we do this to make it more readable + private static final int PRIME_1 = 31; + private static final int PRIME_2 = PRIME_1 * PRIME_1; + private static final int PRIME_3 = PRIME_2 * PRIME_1; + private static final int PRIME_4 = PRIME_3 * PRIME_1; + private static final int PRIME_5 = PRIME_4 * PRIME_1; + private static final int PRIME_6 = PRIME_5 * PRIME_1; + private static final int PRIME_7 = PRIME_6 * PRIME_1; + private static final int PRIME_8 = PRIME_7 * PRIME_1; + + public static int byteBufferCompatibleHashCode(ByteBuffer byteBuffer) { + if (byteBuffer.hasArray()) { + final int arrayOffset = byteBuffer.arrayOffset(); + final int arrayPosition = arrayOffset + byteBuffer.position(); + final int arrayLimit = arrayOffset + byteBuffer.limit(); + return byteBufferCompatibleHashCode(byteBuffer.array(), arrayPosition, arrayLimit); + } + // direct ByteBuffers does have some heavy-weight bound checks and memory barriers that + // we just hope JIT to be better then us! + return byteBuffer.hashCode(); + } + + public static int byteBufferCompatibleHashCode(byte[] bytes, int position, int limit) { + int h = 1; + int remaining = limit - position; + if (remaining == 0) { + return h; + } + int index = limit - 1; + // unrolled version + final int bytesCount = remaining & 7; + if (bytesCount > 0) { + assert h == 1; + h = unrolledHashCode(bytes, index, bytesCount, 1); + index -= bytesCount; + } + final long longsCount = remaining >>> 3; + // let's break the data dependency of each per element hash code + // and save bound checks by manual unrolling 8 ops at time + for (int i = 0; i < longsCount; i++) { + final byte b7 = bytes[index]; + final byte b6 = bytes[index - 1]; + final byte b5 = bytes[index - 2]; + final byte b4 = bytes[index - 3]; + final byte b3 = bytes[index - 4]; + final byte b2 = bytes[index - 5]; + final byte b1 = bytes[index - 6]; + final byte b0 = bytes[index - 7]; + h = PRIME_8 * h + + PRIME_7 * b7 + + PRIME_6 * b6 + + PRIME_5 * b5 + + PRIME_4 * b4 + + PRIME_3 * b3 + + PRIME_2 * b2 + + PRIME_1 * b1 + + b0; + index -= Long.BYTES; + } + return h; + } + + private static int unrolledHashCode(byte[] bytes, int index, int bytesCount, int h) { + // there is still the hash data dependency but is more friendly + // then a plain loop, given that we know no loop is needed here + assert bytesCount > 0 && bytesCount < 8; + h = PRIME_1 * h + bytes[index]; + if (bytesCount == 1) { + return h; + } + h = PRIME_1 * h + bytes[index - 1]; + if (bytesCount == 2) { + return h; + } + h = PRIME_1 * h + bytes[index - 2]; + if (bytesCount == 3) { + return h; + } + h = PRIME_1 * h + bytes[index - 3]; + if (bytesCount == 4) { + return h; + } + h = PRIME_1 * h + bytes[index - 4]; + if (bytesCount == 5) { + return h; + } + h = PRIME_1 * h + bytes[index - 5]; + if (bytesCount == 6) { + return h; + } + h = PRIME_1 * h + bytes[index - 6]; + return h; + } +} diff --git a/proton-j/src/main/java/org/apache/qpid/proton/codec/ReadableBuffer.java b/proton-j/src/main/java/org/apache/qpid/proton/codec/ReadableBuffer.java index 05cbb9743..a6eb81add 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/codec/ReadableBuffer.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/codec/ReadableBuffer.java @@ -330,7 +330,7 @@ public interface ReadableBuffer { final class ByteBufferReader implements ReadableBuffer { - private ByteBuffer buffer; + private final ByteBuffer buffer; public static ByteBufferReader allocate(int size) { ByteBuffer allocated = ByteBuffer.allocate(size); @@ -522,7 +522,7 @@ public String toString() { @Override public int hashCode() { - return buffer.hashCode(); + return Hashing.byteBufferCompatibleHashCode(buffer); } @Override diff --git a/proton-j/src/main/java/org/apache/qpid/proton/codec/SymbolType.java b/proton-j/src/main/java/org/apache/qpid/proton/codec/SymbolType.java index 6c89cba63..70649b3f1 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/codec/SymbolType.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/codec/SymbolType.java @@ -35,7 +35,7 @@ public class SymbolType extends AbstractPrimitiveType private final SymbolEncoding _shortSymbolEncoding; private final Map _symbolCache = new HashMap(); - private DecoderImpl.TypeDecoder _symbolCreator = + private final DecoderImpl.TypeDecoder _symbolCreator = new DecoderImpl.TypeDecoder() { @Override @@ -44,7 +44,7 @@ public Symbol decode(DecoderImpl decoder, ReadableBuffer buffer) Symbol symbol = _symbolCache.get(buffer); if (symbol == null) { - byte[] bytes = new byte[buffer.limit()]; + byte[] bytes = new byte[buffer.remaining()]; buffer.get(bytes); String str = new String(bytes, ASCII_CHARSET); diff --git a/tests/performance-jmh/pom.xml b/tests/performance-jmh/pom.xml index 400c2bfd0..2d5817910 100644 --- a/tests/performance-jmh/pom.xml +++ b/tests/performance-jmh/pom.xml @@ -30,7 +30,7 @@ Proton-J JMH Performance Tests - 1.19 + 1.25.2 diff --git a/tests/performance-jmh/src/main/java/org/apache/qpid/proton/codec/CompositeReadableBufferEqualsBenchmark.java b/tests/performance-jmh/src/main/java/org/apache/qpid/proton/codec/CompositeReadableBufferEqualsBenchmark.java index dc6b0eff3..b7376bd1c 100644 --- a/tests/performance-jmh/src/main/java/org/apache/qpid/proton/codec/CompositeReadableBufferEqualsBenchmark.java +++ b/tests/performance-jmh/src/main/java/org/apache/qpid/proton/codec/CompositeReadableBufferEqualsBenchmark.java @@ -23,6 +23,7 @@ import org.apache.qpid.proton.codec.ReadableBuffer.ByteBufferReader; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; import org.openjdk.jmh.annotations.Measurement; import org.openjdk.jmh.annotations.Mode; import org.openjdk.jmh.annotations.OutputTimeUnit; @@ -43,15 +44,16 @@ @State(Scope.Benchmark) @BenchmarkMode(Mode.AverageTime) @OutputTimeUnit(TimeUnit.NANOSECONDS) -@Warmup(iterations = 5, time = 1) -@Measurement(iterations = 5, time = 1) +@Warmup(iterations = 5, time = 400, timeUnit = TimeUnit.MILLISECONDS) +@Measurement(iterations = 5, time = 400, timeUnit = TimeUnit.MILLISECONDS) +@Fork(2) public class CompositeReadableBufferEqualsBenchmark { private CompositeReadableBuffer composite; - @Param({"8", "64", "1024"}) + @Param({"8", "16", "64"}) private int size; private ReadableBuffer.ByteBufferReader bufferReader; - @Param({"false", "true"}) + @Param({ "false", "true" }) private boolean direct; @Param({"1", "2"}) private int chunks; @@ -97,11 +99,6 @@ public static void main(String[] args) throws RunnerException { public static void runBenchmark(Class benchmarkClass) throws RunnerException { final Options opt = new OptionsBuilder() .include(benchmarkClass.getSimpleName()) - .addProfiler(GCProfiler.class) - .shouldDoGC(true) - .warmupIterations(5) - .measurementIterations(5) - .forks(1) .build(); new Runner(opt).run(); } diff --git a/tests/performance-jmh/src/main/java/org/apache/qpid/proton/hash/BytesHashBenchmark.java b/tests/performance-jmh/src/main/java/org/apache/qpid/proton/hash/BytesHashBenchmark.java new file mode 100644 index 000000000..d56b29d13 --- /dev/null +++ b/tests/performance-jmh/src/main/java/org/apache/qpid/proton/hash/BytesHashBenchmark.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.qpid.proton.hash; + +import org.apache.qpid.proton.codec.Hashing; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.RunnerException; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; + +import java.nio.ByteBuffer; +import java.util.SplittableRandom; +import java.util.concurrent.TimeUnit; + +@State(Scope.Benchmark) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.NANOSECONDS) +@Warmup(iterations = 5, time = 1) +@Measurement(iterations = 5, time = 1) +@Fork(2) +public class BytesHashBenchmark { + + @Param({ "7", "24", "32" }) + int size; + + @Param({ "6", "12" }) + int logPermutations; + + @Param({ "1" }) + int seed; + + int permutations; + + ByteBuffer[] data; + private int i; + + @Param({ "false", "true" }) + private boolean direct; + + @Setup(Level.Trial) + public void init() { + SplittableRandom random = new SplittableRandom(seed); + permutations = 1 << logPermutations; + this.data = new ByteBuffer[permutations]; + for (int i = 0; i < permutations; ++i) { + data[i] = direct? ByteBuffer.allocateDirect(size) : ByteBuffer.allocate(size); + final int limit = random.nextInt(Math.max(0, size - 8), size); + for (int j = 0; j < limit; j++) { + int value = random.nextInt(Byte.MIN_VALUE, Byte.MAX_VALUE + 1); + data[i].put(j, (byte) value); + } + data[i].limit(limit); + } + } + + @Benchmark + public int vanillaHashCode() { + return getData().hashCode(); + } + + @Benchmark + public int protonHashCode() { + return Hashing.byteBufferCompatibleHashCode(getData()); + } + + private ByteBuffer getData() { + return data[i++ & (permutations - 1)]; + } + + public static void main(String[] args) throws RunnerException { + runBenchmark(BytesHashBenchmark.class); + } + + public static void runBenchmark(Class benchmarkClass) throws RunnerException { + final Options opt = new OptionsBuilder() + .include(benchmarkClass.getSimpleName()) + .build(); + new Runner(opt).run(); + } +}