Skip to content

Commit

Permalink
update rocketmq engine
Browse files Browse the repository at this point in the history
  • Loading branch information
guyinyou committed Nov 16, 2023
1 parent 0a68a61 commit 71b6ff2
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 2 deletions.
2 changes: 1 addition & 1 deletion driver-rocketmq/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
<artifactId>driver-rocketmq</artifactId>

<properties>
<rocketmq.version>4.9.3</rocketmq.version>
<rocketmq.version>5.1.4</rocketmq.version>
</properties>

<dependencies>
Expand Down
10 changes: 10 additions & 0 deletions driver-rocketmq/rocketmq.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,15 @@ driverClass: io.openmessaging.benchmark.driver.rocketmq.RocketMQBenchmarkDriver
clusterName: DefaultCluster
namesrvAddr: 127.0.0.1:9876
vipChannelEnabled: false

batchCQ: true
autoBatch: true
# batchMaxBytes: 32768
# batchMaxDelayMs: 10
# totalBatchMaxBytes: 33554432

enableBackpressure: true
backpressureConcurrency: 1024

accessKey:
secretKey:
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@
import io.openmessaging.benchmark.driver.rocketmq.client.RocketMQClientConfig;
import java.io.File;
import java.io.IOException;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
Expand All @@ -37,12 +39,18 @@
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.TopicAttributes;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.admin.MQAdminExt;
import org.apache.rocketmq.tools.command.CommandUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -80,6 +88,28 @@ public String getTopicNamePrefix() {
return "RocketMQ-Benchmark";
}

Map<String, Set<String>> cachedBrokerAddr = new ConcurrentHashMap<>();

int fetchCnt = 0;

private synchronized Set<String> fetchMasterAndSlaveAddrByClusterName(final MQAdminExt adminExt,
final String clusterName) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException {
Set<String> brokerList = cachedBrokerAddr.get(clusterName);
if (brokerList == null) {
brokerList =
CommandUtil.fetchMasterAndSlaveAddrByClusterName(
adminExt, this.rmqClientConfig.clusterName);
cachedBrokerAddr.put(clusterName, brokerList);
if (brokerList.isEmpty()) {
throw new RuntimeException("get brokerAddr return null, clusterName: " + clusterName);
}
}
if (fetchCnt++ % 100 == 0) {
log.info("fetch brokerAddr count: " + fetchCnt);
}
return brokerList;
}

@Override
public CompletableFuture<Void> createTopic(final String topic, final int partitions) {
return CompletableFuture.runAsync(
Expand All @@ -90,10 +120,13 @@ public CompletableFuture<Void> createTopic(final String topic, final int partiti
topicConfig.setReadQueueNums(partitions);
topicConfig.setWriteQueueNums(partitions);
topicConfig.setTopicName(topic);
if (Boolean.TRUE.equals(this.rmqClientConfig.batchCQ)) {
topicConfig.getAttributes().put("+" + TopicAttributes.QUEUE_TYPE_ATTRIBUTE.getName(), "BatchCQ");
}

try {
Set<String> brokerList =
CommandUtil.fetchMasterAddrByClusterName(
fetchMasterAndSlaveAddrByClusterName(
this.rmqAdmin, this.rmqClientConfig.clusterName);
topicConfig.setReadQueueNums(Math.max(1, partitions / brokerList.size()));
topicConfig.setWriteQueueNums(Math.max(1, partitions / brokerList.size()));
Expand Down Expand Up @@ -130,6 +163,26 @@ public CompletableFuture<BenchmarkProducer> createProducer(final String topic) {
if (null != this.rmqClientConfig.compressMsgBodyOverHowmuch) {
rmqProducer.setCompressMsgBodyOverHowmuch(this.rmqClientConfig.compressMsgBodyOverHowmuch);
}

if (null != this.rmqClientConfig.autoBatch) {
rmqProducer.setAutoBatch(this.rmqClientConfig.autoBatch);
}
if (null != this.rmqClientConfig.batchMaxBytes) {
rmqProducer.batchMaxBytes(this.rmqClientConfig.batchMaxBytes);
}
if (null != this.rmqClientConfig.batchMaxDelayMs) {
rmqProducer.batchMaxDelayMs(this.rmqClientConfig.batchMaxDelayMs);
}
if (null != this.rmqClientConfig.totalBatchMaxBytes) {
rmqProducer.totalBatchMaxBytes(this.rmqClientConfig.totalBatchMaxBytes);
}
if (null != this.rmqClientConfig.enableBackpressure) {
rmqProducer.setEnableBackpressureForAsyncMode(this.rmqClientConfig.enableBackpressure);
}
if (null != this.rmqClientConfig.backpressureConcurrency) {
rmqProducer.setBackPressureForAsyncSendNum(this.rmqClientConfig.backpressureConcurrency);
}

try {
rmqProducer.start();
} catch (MQClientException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@ public class RocketMQClientConfig {
public Boolean vipChannelEnabled;
public Integer maxMessageSize;
public Integer compressMsgBodyOverHowmuch;
public Boolean batchCQ;
public Boolean autoBatch;
public Integer batchMaxBytes;
public Integer batchMaxDelayMs;
public Integer totalBatchMaxBytes;

public Boolean enableBackpressure;
public Integer backpressureConcurrency;
public String accessKey;
public String secretKey;
}

0 comments on commit 71b6ff2

Please sign in to comment.