Skip to content

Commit

Permalink
0.9.2 release
Browse files Browse the repository at this point in the history
  • Loading branch information
ajsutton authored Aug 25, 2022
2 parents dce532a + 8c8a0fa commit 57c7de7
Show file tree
Hide file tree
Showing 52 changed files with 429 additions and 31 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -189,3 +189,4 @@ $RECYCLE.BIN/
# Node
node_modules
package-lock.json
/src/jmh/java/generated/
24 changes: 17 additions & 7 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import java.nio.file.Paths
// ./gradlew publish -PcloudsmithUser=<user> -PcloudsmithApiKey=<api-key>

group = "io.libp2p"
version = "0.9.1-RELEASE"
version = "0.9.2-RELEASE"
description = "a minimal implementation of libp2p for the jvm"

plugins {
Expand All @@ -26,6 +26,7 @@ plugins {
id("maven-publish")
id("org.jetbrains.dokka").version("1.6.10")
id("org.jmailen.kotlinter").version("3.8.0")
id("java-test-fixtures")
}

repositories {
Expand All @@ -36,6 +37,13 @@ repositories {

val log4j2Version = "2.17.1"

sourceSets.create("jmh") {
compileClasspath += sourceSets["main"].runtimeClasspath
compileClasspath += sourceSets["testFixtures"].runtimeClasspath
runtimeClasspath += sourceSets["main"].runtimeClasspath
runtimeClasspath += sourceSets["testFixtures"].runtimeClasspath
}

dependencies {
api("io.netty:netty-all:4.1.69.Final")
api("com.google.protobuf:protobuf-java:3.19.2")
Expand All @@ -53,6 +61,9 @@ dependencies {
implementation("org.apache.logging.log4j:log4j-core:${log4j2Version}")
implementation("javax.xml.bind:jaxb-api:2.3.1")

testFixturesImplementation("org.apache.logging.log4j:log4j-api:${log4j2Version}")
testFixturesImplementation("com.google.guava:guava:31.0.1-jre")

testImplementation("org.junit.jupiter:junit-jupiter-api:5.8.2")
testImplementation("org.junit.jupiter:junit-jupiter-params:5.8.2")
testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:5.8.2")
Expand All @@ -61,14 +72,13 @@ dependencies {
testImplementation("org.mockito:mockito-junit-jupiter:4.2.0")
testImplementation("org.assertj:assertj-core:3.22.0")

"jmhImplementation"("org.openjdk.jmh:jmh-core:1.35")
"jmhAnnotationProcessor"("org.openjdk.jmh:jmh-generator-annprocess:1.35")
}

sourceSets {
main {
proto {
srcDir("src/main/proto")
}
}
task<JavaExec>("jmh") {
mainClass.set("org.openjdk.jmh.Main")
classpath = sourceSets["jmh"].compileClasspath + sourceSets["jmh"].runtimeClasspath
}

protobuf {
Expand Down
116 changes: 116 additions & 0 deletions src/jmh/java/io/libp2p/pubsub/gossip/GossipScoreBenchmark.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package io.libp2p.pubsub.gossip;

import io.libp2p.core.PeerId;
import io.libp2p.core.multiformats.Multiaddr;
import io.libp2p.core.multiformats.Protocol;
import io.libp2p.pubsub.DefaultPubsubMessage;
import io.libp2p.tools.schedulers.ControlledExecutorServiceImpl;
import io.libp2p.tools.schedulers.TimeController;
import io.libp2p.tools.schedulers.TimeControllerImpl;
import org.openjdk.jmh.annotations.*;
import org.openjdk.jmh.infra.Blackhole;
import pubsub.pb.Rpc;

import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;

@State(Scope.Thread)
@Fork(5)
@Warmup(iterations = 5, time = 1000, timeUnit = TimeUnit.MILLISECONDS)
@Measurement(iterations = 10, time = 1000, timeUnit = TimeUnit.MILLISECONDS)
public class GossipScoreBenchmark {

private final int peerCount = 5000;
private final int connectedCount = 2000;
private final int topicCount = 128;

private final List<String> topics = IntStream
.range(0, topicCount)
.mapToObj(i -> "Topic-" + i)
.collect(Collectors.toList());

private final List<PeerId> peerIds = Stream.generate(PeerId::random).limit(peerCount).collect(Collectors.toList());
private final List<Multiaddr> peerAddresses = IntStream
.range(0, peerCount)
.mapToObj(idx ->
Multiaddr.empty()
.withComponent(Protocol.IP4, new byte[]{(byte) (idx >>> 8 & 0xFF), (byte) (idx & 0xFF), 0, 0})
.withComponent(Protocol.TCP, new byte[]{0x23, 0x28}))
.collect(Collectors.toList());

private final TimeController timeController = new TimeControllerImpl();
private final ControlledExecutorServiceImpl controlledExecutor = new ControlledExecutorServiceImpl();
private final GossipScoreParams gossipScoreParams;
private final DefaultGossipScore score;

public GossipScoreBenchmark() {
Map<String, GossipTopicScoreParams> topicParamMap = topics.stream()
.collect(Collectors.toMap(Function.identity(), __ -> new GossipTopicScoreParams()));
GossipTopicsScoreParams gossipTopicsScoreParams = new GossipTopicsScoreParams(new GossipTopicScoreParams(), topicParamMap);

gossipScoreParams = new GossipScoreParams(new GossipPeerScoreParams(), gossipTopicsScoreParams, 0, 0, 0, 0, 0);
controlledExecutor.setTimeController(timeController);
score = new DefaultGossipScore(gossipScoreParams, controlledExecutor, timeController::getTime);

for (int i = 0; i < peerCount; i++) {
PeerId peerId = peerIds.get(i);
score.notifyConnected(peerId, peerAddresses.get(i));
for (String topic : topics) {
notifyUnseenMessage(peerId, topic);
}
}

for (int i = connectedCount; i < peerCount; i++) {
score.notifyDisconnected(peerIds.get(i));
}
}

private void notifyUnseenMessage(PeerId peerId, String topic) {
Rpc.Message message = Rpc.Message.newBuilder()
.addTopicIDs(topic)
.build();
score.notifyUnseenValidMessage(peerId, new DefaultPubsubMessage(message));
}

@Benchmark
public void scoresDelay0(Blackhole bh) {
for (int i = 0; i < connectedCount; i++) {
double s = score.score(peerIds.get(i));
bh.consume(s);
}
}

@Benchmark
public void scoresDelay100(Blackhole bh) {
timeController.addTime(100);

for (int i = 0; i < connectedCount; i++) {
double s = score.score(peerIds.get(i));
bh.consume(s);
}
}

@Benchmark
public void scoresDelay10000(Blackhole bh) {
timeController.addTime(10000);

for (int i = 0; i < connectedCount; i++) {
double s = score.score(peerIds.get(i));
bh.consume(s);
}
}

/**
* Uncomment for debugging
*/
// public static void main(String[] args) {
// GossipScoreBenchmark benchmark = new GossipScoreBenchmark();
// Blackhole blackhole = new Blackhole("Today's password is swordfish. I understand instantiating Blackholes directly is dangerous.");
// benchmark.scoresDelay0(blackhole);
// }
}
90 changes: 90 additions & 0 deletions src/main/kotlin/io/libp2p/etc/types/MultiBiMap.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package io.libp2p.etc.types

fun <TValue1, TValue2> mutableMultiBiMap(): MutableMultiBiMap<TValue1, TValue2> = HashSetMultiBiMap()

/**
* Associates values of type [TFirst] to a set of values of type [TSecond]
* and back: associates values of type [TSecond] to a set of values of type [TFirst]
*/
interface MultiBiMap<TFirst, TSecond> {

fun getByFirst(first: TFirst): Set<TSecond>
fun getBySecond(second: TSecond): Set<TFirst>

fun valuesFirst(): Set<TFirst>
fun valuesSecond(): Set<TSecond>

fun asFirstToSecondMap(): Map<TFirst, Set<TSecond>> = valuesFirst().associateWith { getByFirst(it) }
fun asSecondToFirstMap(): Map<TSecond, Set<TFirst>> = valuesSecond().associateWith { getBySecond(it) }
}

interface MutableMultiBiMap<TFirst, TSecond> : MultiBiMap<TFirst, TSecond> {

fun add(first: TFirst, second: TSecond)

fun remove(first: TFirst, second: TSecond)
fun removeAllByFirst(first: TFirst)
fun removeAllBySecond(second: TSecond)
}

internal class HashSetMultiBiMap<TFirst, TSecond> : MutableMultiBiMap<TFirst, TSecond> {
private val firstToSecondMap = mutableMapOf<TFirst, MutableSet<TSecond>>()
private val secondToFirstMap = mutableMapOf<TSecond, MutableSet<TFirst>>()

override fun getByFirst(first: TFirst): Set<TSecond> = firstToSecondMap[first] ?: emptySet()
override fun getBySecond(second: TSecond): Set<TFirst> = secondToFirstMap[second] ?: emptySet()
override fun valuesFirst(): Set<TFirst> = firstToSecondMap.keys
override fun valuesSecond(): Set<TSecond> = secondToFirstMap.keys
override fun asFirstToSecondMap(): Map<TFirst, Set<TSecond>> = firstToSecondMap
override fun asSecondToFirstMap(): Map<TSecond, Set<TFirst>> = secondToFirstMap

override fun add(first: TFirst, second: TSecond) {
firstToSecondMap.computeIfAbsent(first) { mutableSetOf() } += second
secondToFirstMap.computeIfAbsent(second) { mutableSetOf() } += first
}

private fun removeFromFirstToSecondMap(first: TFirst, second: TSecond) {
firstToSecondMap.compute(first) { _, curSecondValues ->
if (curSecondValues != null) {
curSecondValues -= second
if (curSecondValues.isNotEmpty()) {
curSecondValues
} else {
null
}
} else {
null
}
}
}

private fun removeFromSecondToFirstMap(first: TFirst, second: TSecond) {
secondToFirstMap.compute(second) { _, curFirstValues ->
if (curFirstValues != null) {
curFirstValues -= first
if (curFirstValues.isNotEmpty()) {
curFirstValues
} else {
null
}
} else {
null
}
}
}

override fun remove(first: TFirst, second: TSecond) {
removeFromFirstToSecondMap(first, second)
removeFromSecondToFirstMap(first, second)
}

override fun removeAllByFirst(first: TFirst) {
val removedSecondValues = firstToSecondMap.remove(first) ?: emptySet()
removedSecondValues.forEach { removeFromSecondToFirstMap(first, it) }
}

override fun removeAllBySecond(second: TSecond) {
val removedFirstValues = secondToFirstMap.remove(second) ?: emptySet()
removedFirstValues.forEach { removeFromFirstToSecondMap(it, second) }
}
}
2 changes: 1 addition & 1 deletion src/main/kotlin/io/libp2p/etc/util/P2PService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,8 @@ abstract class P2PService(
}

protected open fun streamDisconnected(stream: StreamHandler) {
val peerHandler = stream.getPeerHandler()
if (stream.aborted) return
val peerHandler = stream.getPeerHandler()
activePeersMutable -= peerHandler
if (peersMutable.remove(peerHandler)) {
onPeerDisconnected(peerHandler)
Expand Down
32 changes: 10 additions & 22 deletions src/main/kotlin/io/libp2p/pubsub/AbstractRouter.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,7 @@ import io.libp2p.core.BadPeerException
import io.libp2p.core.PeerId
import io.libp2p.core.Stream
import io.libp2p.core.pubsub.ValidationResult
import io.libp2p.etc.types.MultiSet
import io.libp2p.etc.types.completedExceptionally
import io.libp2p.etc.types.copy
import io.libp2p.etc.types.forward
import io.libp2p.etc.types.thenApplyAll
import io.libp2p.etc.types.toWBytes
import io.libp2p.etc.types.*
import io.libp2p.etc.util.P2PServiceSemiDuplex
import io.libp2p.etc.util.netty.protobuf.LimitedProtobufVarint32FrameDecoder
import io.netty.channel.ChannelHandler
Expand Down Expand Up @@ -51,7 +46,7 @@ abstract class AbstractRouter(

protected var msgHandler: PubsubMessageHandler = { throw IllegalStateException("Message handler is not initialized for PubsubRouter") }

protected open val peerTopics = MultiSet<PeerHandler, Topic>()
protected open val peersTopics = mutableMultiBiMap<PeerHandler, Topic>()
protected open val subscribedTopics = linkedSetOf<Topic>()
protected open val pendingRpcParts = PendingRpcPartsMap<RpcPartsQueue> { DefaultRpcPartsQueue() }
protected open val pendingMessagePromises = MultiSet<PeerHandler, CompletableFuture<Unit>>()
Expand Down Expand Up @@ -176,7 +171,8 @@ abstract class AbstractRouter(

try {
val subscriptions = msg.subscriptionsList.map { PubsubSubscription(it.topicid, it.subscribe) }
subscriptionFilter.filterIncomingSubscriptions(subscriptions, peerTopics[peer])
subscriptionFilter
.filterIncomingSubscriptions(subscriptions, peersTopics.getByFirst(peer))
.forEach { handleMessageSubscriptions(peer, it) }
} catch (e: Exception) {
logger.debug("Subscription filter error, ignoring message from peer $peer", e)
Expand Down Expand Up @@ -274,7 +270,7 @@ abstract class AbstractRouter(

override fun onPeerDisconnected(peer: PeerHandler) {
super.onPeerDisconnected(peer)
peerTopics.removeAll(peer)
peersTopics.removeAllByFirst(peer)
}

override fun onPeerWireException(peer: PeerHandler?, cause: Throwable) {
Expand All @@ -293,19 +289,15 @@ abstract class AbstractRouter(

private fun handleMessageSubscriptions(peer: PeerHandler, msg: PubsubSubscription) {
if (msg.subscribe) {
peerTopics[peer] += msg.topic
peersTopics.add(peer, msg.topic)
} else {
peerTopics[peer] -= msg.topic
peersTopics.remove(peer, msg.topic)
}
}

protected fun getTopicsPeers(topics: Collection<String>) =
activePeers.filter { topics.intersect(peerTopics[it]).isNotEmpty() }
protected fun getTopicPeers(topic: Topic) = peersTopics.getBySecond(topic)

protected fun getTopicPeers(topic: String) =
activePeers.filter { topic in peerTopics[it] }

override fun subscribe(vararg topics: String) {
override fun subscribe(vararg topics: Topic) {
runOnEventThread {
topics.forEach(::subscribe)
flushAllPending()
Expand All @@ -331,11 +323,7 @@ abstract class AbstractRouter(

override fun getPeerTopics(): CompletableFuture<Map<PeerId, Set<Topic>>> {
return submitOnEventThread {
val topicsByPeerId = hashMapOf<PeerId, Set<Topic>>()
peerTopics.forEach { entry ->
topicsByPeerId[entry.key.peerId] = HashSet(entry.value)
}
topicsByPeerId
peersTopics.asFirstToSecondMap().mapKeys { it.key.peerId }
}
}

Expand Down
5 changes: 4 additions & 1 deletion src/main/kotlin/io/libp2p/pubsub/flood/FloodRouter.kt
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ class FloodRouter(executor: ScheduledExecutorService = Executors.newSingleThread
}

private fun broadcast(msg: PubsubMessage, receivedFrom: PeerHandler?): CompletableFuture<Unit> {
val sentFutures = getTopicsPeers(msg.topics)
val peers = msg.topics
.map { getTopicPeers(it) }
.reduce { p1, p2 -> p1 + p2 }
val sentFutures = peers
.filter { it != receivedFrom }
.map { submitPublishMessage(it, msg) }
return anyComplete(sentFutures)
Expand Down
Loading

0 comments on commit 57c7de7

Please sign in to comment.