-
Notifications
You must be signed in to change notification settings - Fork 366
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Azure Servicebus Source Connector #1222
Conversation
.../java/io/lenses/streamreactor/connect/azure/servicebus/source/AzureServiceBusSourceTask.java
Outdated
Show resolved
Hide resolved
0b67920
to
478f867
Compare
f239c00
to
b1e20b2
Compare
The CVE is already an issue on Dep Checker side, but they don't seem keen to fixing it anytime soon |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good test coverage, but a few things to think about structure wise. Some of the things I've highlighted are stylistic - we tend to favour a more functional style.
Please reach out if I can help you with anything or if you would like to discuss in more detail.
...va/io/lenses/streamreactor/connect/azure/servicebus/mapping/AzureServiceBusSourceRecord.java
Outdated
Show resolved
Hide resolved
...io/lenses/streamreactor/connect/azure/servicebus/mapping/ServiceBusToSourceRecordMapper.java
Outdated
Show resolved
Hide resolved
...ava/io/lenses/streamreactor/connect/azure/servicebus/mapping/ServiceBusValueSchemaField.java
Outdated
Show resolved
Hide resolved
...ava/io/lenses/streamreactor/connect/azure/servicebus/mapping/ServiceBusValueSchemaField.java
Outdated
Show resolved
Hide resolved
.../io/lenses/streamreactor/connect/azure/servicebus/source/AzureServiceBusSourceConnector.java
Outdated
Show resolved
Hide resolved
...t/java/io/lenses/streamreactor/connect/azure/servicebus/source/TaskToReceiverBridgeTest.java
Outdated
Show resolved
Hide resolved
...t/java/io/lenses/streamreactor/connect/azure/servicebus/source/TaskToReceiverBridgeTest.java
Outdated
Show resolved
Hide resolved
...t/java/io/lenses/streamreactor/connect/azure/servicebus/source/TaskToReceiverBridgeTest.java
Outdated
Show resolved
Hide resolved
...test/java/io/lenses/streamreactor/connect/azure/servicebus/util/KcqlConfigBusMapperTest.java
Outdated
Show resolved
Hide resolved
...test/java/io/lenses/streamreactor/connect/azure/servicebus/util/KcqlConfigBusMapperTest.java
Show resolved
Hide resolved
.../main/java/io/lenses/streamreactor/connect/azure/servicebus/source/TaskToReceiverBridge.java
Show resolved
Hide resolved
.../io/lenses/streamreactor/connect/azure/servicebus/source/AzureServiceBusSourceConnector.java
Outdated
Show resolved
Hide resolved
...ava/io/lenses/streamreactor/connect/azure/servicebus/mapping/ServiceBusValueSchemaField.java
Outdated
Show resolved
Hide resolved
...ava/io/lenses/streamreactor/connect/azure/servicebus/mapping/ServiceBusValueSchemaField.java
Outdated
Show resolved
Hide resolved
...ava/io/lenses/streamreactor/connect/azure/servicebus/mapping/ServiceBusValueSchemaField.java
Outdated
Show resolved
Hide resolved
.../io/lenses/streamreactor/connect/azure/servicebus/source/AzureServiceBusSourceConnector.java
Outdated
Show resolved
Hide resolved
.../io/lenses/streamreactor/connect/azure/servicebus/source/AzureServiceBusSourceConnector.java
Outdated
Show resolved
Hide resolved
.../main/java/io/lenses/streamreactor/connect/azure/servicebus/source/TaskToReceiverBridge.java
Show resolved
Hide resolved
.../main/java/io/lenses/streamreactor/connect/azure/servicebus/source/TaskToReceiverBridge.java
Outdated
Show resolved
Hide resolved
.../main/java/io/lenses/streamreactor/connect/azure/servicebus/source/TaskToReceiverBridge.java
Outdated
Show resolved
Hide resolved
.../main/java/io/lenses/streamreactor/connect/azure/servicebus/source/TaskToReceiverBridge.java
Outdated
Show resolved
Hide resolved
.../main/java/io/lenses/streamreactor/connect/azure/servicebus/source/TaskToReceiverBridge.java
Outdated
Show resolved
Hide resolved
.../main/java/io/lenses/streamreactor/connect/azure/servicebus/source/TaskToReceiverBridge.java
Outdated
Show resolved
Hide resolved
.../main/java/io/lenses/streamreactor/connect/azure/servicebus/source/TaskToReceiverBridge.java
Outdated
Show resolved
Hide resolved
.../main/java/io/lenses/streamreactor/connect/azure/servicebus/source/TaskToReceiverBridge.java
Outdated
Show resolved
Hide resolved
.../main/java/io/lenses/streamreactor/connect/azure/servicebus/source/TaskToReceiverBridge.java
Outdated
Show resolved
Hide resolved
.../lenses/streamreactor/connect/azure/servicebus/source/ServiceBusPartitionOffsetProvider.java
Outdated
Show resolved
Hide resolved
offer = recordsQueue.offer(serviceBusMessageHolder, FIVE_SECONDS_TIMEOUT, TimeUnit.SECONDS); | ||
} | ||
} catch (InterruptedException e) { | ||
log.info("{} has been interrupted on offering", receiverId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we consider additional handling for the InterruptedException beyond logging? For example, should we retry the operation, perform specific cleanup actions, or propagate the exception to maintain the application's interrupted state?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question! For now:
- If record won't reach recordsQueue it won't be sent to Kafka
- Therefore it won't be committed in Service Bus (the logic to commit it won't work), which will mean we'll have to process it again anyway.
However, I still consider this a valid point. Log probably should be more visible, so I will definitely bump it's level to WARN (we can think if ERROR too) and maybe we can also consider timeout to be configurable?
).define(AzureServiceBusConfigConstants.CONNECTION_STRING, | ||
Type.STRING, | ||
Importance.HIGH, | ||
AzureServiceBusConfigConstants.CONNECTION_STRING_DOC, | ||
CONNECTION_GROUP, | ||
2, | ||
ConfigDef.Width.LONG, | ||
AzureServiceBusConfigConstants.CONNECTION_STRING | ||
).define(AzureServiceBusConfigConstants.KCQL_CONFIG, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wanted to point you to this
Customers won't always be able to use connection string for connecting to azure, especially if they are running on an Azure instance in which they will have default chain from the instance profile. I believe the original PR was more complete in this respect?
But if we can we should identify code we can reuse here and move it into the Java repo so we are not inventing the wheel and duplicating throughout the codebase. I did similar recently for GCP cloud connectors, so can help with this if you would like.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are two ways endorsed in official Service Bus docs: one being Passwordless, second being Connection String. Since the first one will work using specifically-installed Azure commandline and our connectors are being ran inside docker container, it would require us to include Azure commandline and ways to work with it within Lenses Box (which will no doubt be huge amounts of work).
Connection String already proved to work for our clients in other Azure connectors, so that's why I've chosen to implement this path.
My take on it is: unless we get a specific requirement to support another type of authentication, let's just use what we have (as this is another feature that we don't know if ever will be used yet)
fea619e
to
634953a
Compare
...lenses/streamreactor/connect/azure/servicebus/source/AzureServiceBusSourceConnectorTest.java
Outdated
Show resolved
Hide resolved
...lenses/streamreactor/connect/azure/servicebus/source/AzureServiceBusSourceConnectorTest.java
Outdated
Show resolved
Hide resolved
...lenses/streamreactor/connect/azure/servicebus/source/AzureServiceBusSourceConnectorTest.java
Outdated
Show resolved
Hide resolved
.../io/lenses/streamreactor/connect/azure/servicebus/source/AzureServiceBusSourceConnector.java
Outdated
Show resolved
Hide resolved
...es/streamreactor/connect/azure/servicebus/mapping/ServiceBusPartitionOffsetProviderTest.java
Outdated
Show resolved
Hide resolved
.../io/lenses/streamreactor/connect/azure/servicebus/source/AzureServiceBusSourceConnector.java
Outdated
Show resolved
Hide resolved
.../main/java/io/lenses/streamreactor/connect/azure/servicebus/source/TaskToReceiverBridge.java
Outdated
Show resolved
Hide resolved
public static List<Kcql> mapKcqlsFromConfig(String kcqlString) { | ||
List<Kcql> kcqls = Kcql.parseMultiple(kcqlString); | ||
Map<String, String> inputToOutputTopics = new HashMap<>(kcqls.size()); | ||
|
||
for (Kcql kcql : kcqls) { | ||
String inputTopic = kcql.getSource(); | ||
String outputTopic = kcql.getTarget(); | ||
|
||
if (!azureNameMatchesAgainstRegex(inputTopic, MAX_BUS_NAME_LENGTH)) { | ||
throw new ConfigException(String.format(TOPIC_NAME_ERROR_MESSAGE, "Input", inputTopic)); | ||
} | ||
if (!azureNameMatchesAgainstRegex(outputTopic, MAX_BUS_NAME_LENGTH)) { | ||
throw new ConfigException(String.format(TOPIC_NAME_ERROR_MESSAGE, "Output", outputTopic)); | ||
} | ||
if (inputToOutputTopics.containsKey(inputTopic)) { | ||
throw new ConfigException(String.format("Input %s cannot be mapped twice.", inputTopic)); | ||
} | ||
if (inputToOutputTopics.containsValue(outputTopic)) { | ||
throw new ConfigException(String.format("Output %s cannot be mapped twice.", outputTopic)); | ||
} | ||
|
||
List<ServiceBusKcqlProperties> notSatisfiedProperties = checkForNecessaryKcqlProperties(kcql); | ||
if (!notSatisfiedProperties.isEmpty()) { | ||
String missingPropertiesError = | ||
notSatisfiedProperties.stream() | ||
.map(ServiceBusKcqlProperties::getPropertyName) | ||
.collect(Collectors.joining(",")); | ||
throw new ConfigException( | ||
String.format("Following non-optional properties missing in KCQL: %s", missingPropertiesError)); | ||
} | ||
|
||
checkForValidPropertyValues(kcql.getProperties()); | ||
|
||
inputToOutputTopics.put(inputTopic, outputTopic); | ||
} | ||
|
||
return kcqls; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we split the validation logic and mapping logic into separate concerns?
Something like (this isn't a complete implementation):
public static List<Kcql> mapKcqlsFromConfig(String kcqlString) { | |
List<Kcql> kcqls = Kcql.parseMultiple(kcqlString); | |
Map<String, String> inputToOutputTopics = new HashMap<>(kcqls.size()); | |
for (Kcql kcql : kcqls) { | |
String inputTopic = kcql.getSource(); | |
String outputTopic = kcql.getTarget(); | |
if (!azureNameMatchesAgainstRegex(inputTopic, MAX_BUS_NAME_LENGTH)) { | |
throw new ConfigException(String.format(TOPIC_NAME_ERROR_MESSAGE, "Input", inputTopic)); | |
} | |
if (!azureNameMatchesAgainstRegex(outputTopic, MAX_BUS_NAME_LENGTH)) { | |
throw new ConfigException(String.format(TOPIC_NAME_ERROR_MESSAGE, "Output", outputTopic)); | |
} | |
if (inputToOutputTopics.containsKey(inputTopic)) { | |
throw new ConfigException(String.format("Input %s cannot be mapped twice.", inputTopic)); | |
} | |
if (inputToOutputTopics.containsValue(outputTopic)) { | |
throw new ConfigException(String.format("Output %s cannot be mapped twice.", outputTopic)); | |
} | |
List<ServiceBusKcqlProperties> notSatisfiedProperties = checkForNecessaryKcqlProperties(kcql); | |
if (!notSatisfiedProperties.isEmpty()) { | |
String missingPropertiesError = | |
notSatisfiedProperties.stream() | |
.map(ServiceBusKcqlProperties::getPropertyName) | |
.collect(Collectors.joining(",")); | |
throw new ConfigException( | |
String.format("Following non-optional properties missing in KCQL: %s", missingPropertiesError)); | |
} | |
checkForValidPropertyValues(kcql.getProperties()); | |
inputToOutputTopics.put(inputTopic, outputTopic); | |
} | |
return kcqls; | |
} | |
public static List<Kcql> mapKcqlsFromConfig(String kcqlString) { | |
List<Kcql> kcqls = Kcql.parseMultiple(kcqlString); | |
Map<String, String> inputToOutputTopics = kcqls.stream() | |
.collect(Collectors.toMap(Kcql::getSource, Kcql::getTarget)); | |
validateMappings(inputToOutputTopics); | |
return kcqls; | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I splitted the mapping and validation logic to make it more clear (thanks for all advices here!)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No problem, and it's definitely an improvement, but can you see a way to remove the mutable ArrayList and instead use the streaming API to produce an immutable one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done (i)
.../io/lenses/streamreactor/connect/azure/servicebus/source/AzureServiceBusSourceConnector.java
Show resolved
Hide resolved
.../main/java/io/lenses/streamreactor/connect/azure/servicebus/source/TaskToReceiverBridge.java
Outdated
Show resolved
Hide resolved
5295a92
to
2c54bf2
Compare
Merged on dev repo. |
Description
Initial version of Azure ServiceBus Connector. This is based on a PR which is 3 years old but translated to java and cleaned up. It has some notable differences (e.g. it's no longer needed to construct
connectionString
as Azure provides it whole, we also don't create Administration client as it required highest privileges which could be a security issue). It also bumps up Lombok versionand cleans up some areas (e.g. val is now final var).Feedback is very welcome.
Testing