Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support exchange properties #646

Merged
merged 16 commits into from
Oct 17, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.streamnative.pulsar.handlers.amqp;

import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
Expand All @@ -27,15 +28,20 @@ public abstract class AbstractAmqpExchange implements AmqpExchange {
protected Set<AmqpQueue> queues;
protected boolean durable;
protected boolean autoDelete;
protected boolean internal;
protected Map<String, Object> arguments;
public static final String DEFAULT_EXCHANGE_DURABLE = "aop.direct.durable";

protected AbstractAmqpExchange(String exchangeName, AmqpExchange.Type exchangeType,
Set<AmqpQueue> queues, boolean durable, boolean autoDelete) {
Set<AmqpQueue> queues, boolean durable, boolean autoDelete, boolean internal,
Map<String, Object> arguments) {
this.exchangeName = exchangeName;
this.exchangeType = exchangeType;
this.queues = queues;
this.durable = durable;
this.autoDelete = autoDelete;
this.internal = internal;
this.arguments = arguments;
}

@Override
Expand Down Expand Up @@ -69,6 +75,16 @@ public boolean getAutoDelete() {
return autoDelete;
}

@Override
public boolean getInternal() {
return internal;
}

@Override
public Map<String, Object> getArguments() {
return arguments;
}

@Override
public AmqpExchange.Type getType() {
return exchangeType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,16 +172,17 @@ public void receiveExchangeDeclare(AMQShortString exchange, AMQShortString type,
type, passive, durable, autoDelete, internal, nowait, arguments);
}

this.exchangeService.exchangeDeclare(connection.getNamespaceName(), exchange.toString(), type.toString(),
passive, durable, autoDelete, internal, arguments).thenAccept(__ -> {
this.exchangeService.exchangeDeclare(connection.getNamespaceName(), exchange.toString(),
type != null ? type.toString() : null,
passive, durable, autoDelete, internal, FieldTable.convertToMap(arguments)).thenAccept(__ -> {
if (!nowait) {
connection.writeFrame(
connection.getMethodRegistry().createExchangeDeclareOkBody().generateFrame(channelId));
}
}).exceptionally(t -> {
log.error("Failed to declare exchange {} in vhost {}. type: {}, passive: {}, durable: {}, "
+ "autoDelete: {}, nowait: {}", type, passive, durable, autoDelete, nowait,
exchange, connection.getNamespaceName(), t);
+ "autoDelete: {}, nowait: {}", exchange, connection.getNamespaceName(), type, passive,
durable, autoDelete, nowait, t);
handleAoPException(t);
return null;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.streamnative.pulsar.handlers.amqp;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.Position;
Expand Down Expand Up @@ -68,6 +69,10 @@ public static Type value(String type) {

boolean getAutoDelete();

boolean getInternal();

Map<String, Object> getArguments();

Topic getTopic();

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@
import io.streamnative.pulsar.handlers.amqp.common.exception.EmptyLookupResultException;
import io.streamnative.pulsar.handlers.amqp.common.exception.NamespaceNotFoundException;
import java.lang.reflect.Field;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.lookup.LookupResult;
import org.apache.pulsar.broker.namespace.LookupOptions;
import org.apache.pulsar.broker.service.AbstractTopic;
import org.apache.pulsar.broker.service.Topic;
Expand All @@ -43,10 +46,15 @@ public AmqpTopicManager(PulsarService pulsarService) {
}

public Topic getOrCreateTopic(String topicName, boolean createIfMissing) {
return getTopic(topicName, createIfMissing).join();
return getTopic(topicName, createIfMissing, null).join();
}

public CompletableFuture<Topic> getTopic(String topicName, boolean createIfMissing) {
public Topic getOrCreateTopic(String topicName, boolean createIfMissing, Map<String, String> properties) {
return getTopic(topicName, createIfMissing, properties).join();
}

public CompletableFuture<Topic> getTopic(String topicName, boolean createIfMissing,
Map<String, String> properties) {
CompletableFuture<Topic> topicCompletableFuture = new CompletableFuture<>();
if (null == pulsarService) {
log.error("PulsarService is not set.");
Expand All @@ -57,26 +65,16 @@ public CompletableFuture<Topic> getTopic(String topicName, boolean createIfMissi
// Check the namespace first to make sure the namespace is existing.
pulsarService.getPulsarResources().getNamespaceResources().getPoliciesAsync(tpName.getNamespaceObject())
.thenCompose(policies -> {
if (!policies.isPresent()) {
if (policies.isEmpty()) {
return FutureUtil.failedFuture(new NamespaceNotFoundException(tpName.getNamespaceObject()));
}
// setup ownership of service unit to this broker
return pulsarService.getNamespaceService().getBrokerServiceUrlAsync(
tpName, LookupOptions.builder().authoritative(true).build());
})
.thenCompose(lookupOp -> {
if (!lookupOp.isPresent()) {
return FutureUtil.failedFuture(new EmptyLookupResultException(tpName));
}

if (log.isDebugEnabled()) {
log.debug("Get broker service url for {}. lookupResult: {}",
topicName, lookupOp.get().getLookupData().getBrokerUrl());
}
return pulsarService.getBrokerService().getTopic(topicName, createIfMissing);
})
.thenCompose(lookupOp -> createOrGetTopic(lookupOp, topicName, createIfMissing, properties))
.thenAccept(topicOp -> {
if (!topicOp.isPresent()) {
if (topicOp.isEmpty()) {
log.error("Get empty topic for name {}", topicName);
topicCompletableFuture.complete(null);
return;
Expand Down Expand Up @@ -106,6 +104,25 @@ public CompletableFuture<Topic> getTopic(String topicName, boolean createIfMissi
return topicCompletableFuture;
}

private CompletableFuture<Optional<Topic>> createOrGetTopic(Optional<LookupResult> lookupOp,
String topicName,
boolean createIfMissing,
Map<String, String> properties) {
if (lookupOp.isEmpty()) {
return FutureUtil.failedFuture(new EmptyLookupResultException(topicName));
}

if (log.isDebugEnabled()) {
log.debug("Get broker service url for {}. lookupResult: {}",
topicName, lookupOp.get().getLookupData().getBrokerUrl());
}
if (properties != null) {
return pulsarService.getBrokerService().getTopic(topicName, createIfMissing, properties);
} else {
return pulsarService.getBrokerService().getTopic(topicName, createIfMissing);
}
}

private boolean checkTopicIsFenced(Topic topic, CompletableFuture<Topic> topicCompletableFuture) {
try {
if (fenceField == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,17 @@

package io.streamnative.pulsar.handlers.amqp;

import static io.streamnative.pulsar.handlers.amqp.impl.PersistentExchange.ARGUMENTS;
import static io.streamnative.pulsar.handlers.amqp.impl.PersistentExchange.AUTO_DELETE;
import static io.streamnative.pulsar.handlers.amqp.impl.PersistentExchange.DURABLE;
import static io.streamnative.pulsar.handlers.amqp.impl.PersistentExchange.INTERNAL;
import static io.streamnative.pulsar.handlers.amqp.impl.PersistentExchange.TYPE;

import io.streamnative.pulsar.handlers.amqp.common.exception.AoPException;
import io.streamnative.pulsar.handlers.amqp.impl.PersistentExchange;
import io.streamnative.pulsar.handlers.amqp.utils.ExchangeUtil;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -26,6 +36,7 @@
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.qpid.server.protocol.ErrorCodes;

/**
* Container for all exchanges in the broker.
Expand All @@ -44,6 +55,14 @@ protected ExchangeContainer(AmqpTopicManager amqpTopicManager, PulsarService pul
@Getter
private Map<NamespaceName, Map<String, CompletableFuture<AmqpExchange>>> exchangeMap = new ConcurrentHashMap<>();


public CompletableFuture<AmqpExchange> asyncGetExchange(NamespaceName namespaceName,
String exchangeName,
boolean createIfMissing,
String exchangeType) {
return asyncGetExchange(namespaceName, exchangeName, createIfMissing, exchangeType, true, false, false, null);
}

/**
* Get or create exchange.
*
Expand All @@ -52,12 +71,17 @@ protected ExchangeContainer(AmqpTopicManager amqpTopicManager, PulsarService pul
* @param createIfMissing true to create the exchange if not existed, and exchangeType should be not null
* false to get the exchange and return null if not existed
* @param exchangeType type of exchange: direct,fanout,topic and headers
* @param arguments other properties (construction arguments) for the exchange
* @return the completableFuture of get result
*/
public CompletableFuture<AmqpExchange> asyncGetExchange(NamespaceName namespaceName,
String exchangeName,
boolean createIfMissing,
String exchangeType) {
String exchangeType,
boolean durable,
boolean autoDelete,
boolean internal,
Map<String, Object> arguments) {
CompletableFuture<AmqpExchange> amqpExchangeCompletableFuture = new CompletableFuture<>();
if (StringUtils.isEmpty(exchangeType) && createIfMissing) {
log.error("[{}][{}] ExchangeType should be set when createIfMissing is true.", namespaceName, exchangeName);
Expand All @@ -83,7 +107,23 @@ public CompletableFuture<AmqpExchange> asyncGetExchange(NamespaceName namespaceN
return existingAmqpExchangeFuture;
} else {
String topicName = PersistentExchange.getExchangeTopicName(namespaceName, exchangeName);
CompletableFuture<Topic> topicCompletableFuture = amqpTopicManager.getTopic(topicName, createIfMissing);
Map<String, String> initProperties = new HashMap<>();
if (createIfMissing) {
// if first create the exchange, try to set properties for exchange
try {
initProperties = ExchangeUtil.generateTopicProperties(exchangeName, exchangeType, durable,
autoDelete, internal, arguments, Collections.EMPTY_LIST);
} catch (Exception e) {
log.error("Failed to generate topic properties for exchange {} in vhost {}.",
exchangeName, namespaceName, e);
amqpExchangeCompletableFuture.completeExceptionally(e);
removeExchangeFuture(namespaceName, exchangeName);
return amqpExchangeCompletableFuture;
}
}

CompletableFuture<Topic> topicCompletableFuture = amqpTopicManager.getTopic(
topicName, createIfMissing, initProperties);
topicCompletableFuture.whenComplete((topic, throwable) -> {
if (throwable != null) {
log.error("[{}][{}] Failed to get exchange topic.", namespaceName, exchangeName, throwable);
Expand All @@ -98,17 +138,34 @@ public CompletableFuture<AmqpExchange> asyncGetExchange(NamespaceName namespaceN
// recover metadata if existed
PersistentTopic persistentTopic = (PersistentTopic) topic;
Map<String, String> properties = persistentTopic.getManagedLedger().getProperties();
AmqpExchange.Type amqpExchangeType;
// if properties has type, ignore the exchangeType
if (null != properties && properties.size() > 0
&& null != properties.get(PersistentExchange.TYPE)) {
String type = properties.get(PersistentExchange.TYPE);
amqpExchangeType = AmqpExchange.Type.value(type);
} else {
amqpExchangeType = AmqpExchange.Type.value(exchangeType);
if (createIfMissing && !exchangeDeclareCheck(
amqpExchangeCompletableFuture, namespaceName.getLocalName(),
exchangeName, exchangeType, durable, autoDelete, properties)) {
return;
}

PersistentExchange amqpExchange;
try {
Map<String, Object> currentArguments =
ExchangeUtil.covertStringValueAsObjectMap(properties.get(ARGUMENTS));
String currentType = properties.get(TYPE);
boolean currentDurable = Boolean.parseBoolean(
properties.getOrDefault(DURABLE, "true"));
boolean currentAutoDelete = Boolean.parseBoolean(
properties.getOrDefault(AUTO_DELETE, "false"));
boolean currentInternal = Boolean.parseBoolean(
properties.getOrDefault(INTERNAL, "false"));
amqpExchange = new PersistentExchange(exchangeName,
AmqpExchange.Type.value(currentType),
persistentTopic, currentDurable, currentAutoDelete, currentInternal,
currentArguments);
} catch (Exception e) {
log.error("Failed to init exchange {} in vhost {}.",
exchangeName, namespaceName.getLocalName(), e);
amqpExchangeCompletableFuture.completeExceptionally(e);
removeExchangeFuture(namespaceName, exchangeName);
return;
}
PersistentExchange amqpExchange = new PersistentExchange(exchangeName,
amqpExchangeType, persistentTopic, false);
amqpExchangeCompletableFuture.complete(amqpExchange);
}
}
Expand All @@ -117,6 +174,43 @@ public CompletableFuture<AmqpExchange> asyncGetExchange(NamespaceName namespaceN
return amqpExchangeCompletableFuture;
}

private boolean exchangeDeclareCheck(CompletableFuture<AmqpExchange> exchangeFuture, String vhost,
String exchangeName, String exchangeType, boolean durable, boolean autoDelete,
Map<String, String> properties) {
if (properties == null || properties.isEmpty()) {
return true;
}

String replyTextFormat = "PRECONDITION_FAILED - inequivalent arg '%s' for exchange '" + exchangeName + "' in "
+ "vhost '" + vhost + "': received '%s' but current is '%s'";
String currentType = properties.get(TYPE);
if (!StringUtils.equals(properties.get(TYPE), exchangeType)) {
exchangeFuture.completeExceptionally(new AoPException(ErrorCodes.IN_USE,
String.format(replyTextFormat, "type", exchangeType, currentType), true, false));
return false;
}

if (properties.containsKey(DURABLE)) {
boolean currentDurable = Boolean.parseBoolean(properties.get(DURABLE));
if (durable != currentDurable) {
exchangeFuture.completeExceptionally(new AoPException(ErrorCodes.IN_USE,
String.format(replyTextFormat, "durable", durable, currentDurable), true, false));
return false;
}
}

if (properties.containsKey(AUTO_DELETE)) {
boolean currentAutoDelete = Boolean.parseBoolean(properties.get(AUTO_DELETE));
if (autoDelete != currentAutoDelete) {
exchangeFuture.completeExceptionally(new AoPException(ErrorCodes.IN_USE,
String.format(replyTextFormat, "auto_delete", autoDelete, currentAutoDelete), true, false));
return false;
}
}

return true;
}

/**
* Delete the exchange by namespace and exchange name.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@

package io.streamnative.pulsar.handlers.amqp;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.qpid.server.protocol.v0_8.FieldTable;

/**
* Logic of exchange.
Expand All @@ -38,7 +38,7 @@ public interface ExchangeService {
*/
CompletableFuture<AmqpExchange> exchangeDeclare(NamespaceName namespaceName, String exchange, String type,
boolean passive, boolean durable, boolean autoDelete,
boolean internal, FieldTable arguments);
boolean internal, Map<String, Object> arguments);

/**
* Delete a exchange.
Expand Down
Loading