Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
vdesabou committed Sep 2, 2023
1 parent fbab868 commit de12038
Show file tree
Hide file tree
Showing 6 changed files with 263 additions and 161 deletions.
252 changes: 128 additions & 124 deletions scripts/cli/completions.bash

Large diffs are not rendered by default.

86 changes: 72 additions & 14 deletions scripts/cli/playground
Original file line number Diff line number Diff line change
Expand Up @@ -2713,7 +2713,13 @@ playground_topic_produce_usage() {

# :flag.usage
printf " %s\n" "$(magenta "--validate")"
printf " Validate schema\n"
printf " ☑️ Validate schema according to connect converter used\n"
echo

# :flag.usage
printf " %s\n" "$(magenta "--validate-config VALIDATE-CONFIG (repeatable)")"
printf " 🔩 Converter configuration parameters to use\n \n 🎓 Tip: you can pass multiple parameters by specifying --validate-config\n multiple times\n"
printf " Allowed: scrub.invalid.names=true, enhanced.avro.schema.support=true, connect.meta.data=false, object.additional.properties=false, use.optional.for.nonrequired=true, ignore.default.for.nullables=true, generalized.sum.type.support=true\n"
echo

# :command.usage_fixed_flags
Expand Down Expand Up @@ -11577,6 +11583,8 @@ playground_topic_produce_command() {
compatibility="${args[--compatibility]}"
value_subject_name_strategy="${args[--value-subject-name-strategy]}"
validate="${args[--validate]}"
# Convert the space delimited string to an array
eval "validate_config=(${args[--validate-config]})"

tmp_dir=$(mktemp -d -t ci-XXXXXXXXXX)
trap 'rm -rf $tmp_dir' EXIT
Expand Down Expand Up @@ -11756,11 +11764,6 @@ playground_topic_produce_command() {
SECONDS=0
case "${schema_type}" in
json|sql)
if [[ -n "$validate" ]]
then
logerror "❌ --validate is set and only supports avro, protobuf or json-schema"
exit 1
fi
# https://github.com/MaterializeInc/datagen
set +e
docker run --rm -i -v $schema_file:/app/schema.$schema_type materialize/datagen -s schema.$schema_type -n $nb_messages_to_generate --dry-run > $tmp_dir/result.log
Expand Down Expand Up @@ -11788,11 +11791,6 @@ playground_topic_produce_command() {
docker run --rm -v $tmp_dir:/tmp/ -v $schema_file:/app/schema.proto -e NB_MESSAGES=$nb_messages_to_generate vdesabou/protobuf-faker > $tmp_dir/out.json
;;
raw)
if [[ -n "$validate" ]]
then
logerror "❌ --validate is set and only supports avro, protobuf or json-schema"
exit 1
fi
if jq -e . >/dev/null 2>&1 <<< "$(cat "$schema_file")"
then
log "💫 payload is single json, it will be sent as one record"
Expand Down Expand Up @@ -11877,6 +11875,7 @@ playground_topic_produce_command() {

if [[ -n "$validate" ]]
then
log "✔️ --validate is set, validating schema now..."
set +e
log "🏗 Building jar for schema-validator"
docker run -i --rm -e TAG=$TAG_BASE -v "${root_folder}/scripts/cli/src/schema-validator":/usr/src/mymaven -v "$HOME/.m2":/root/.m2 -v "$root_folder/scripts/settings.xml:/tmp/settings.xml" -v "${root_folder}/scripts/cli/src/schema-validator/target:/usr/src/mymaven/target" -w /usr/src/mymaven maven:3.6.1-jdk-11 mvn -s /tmp/settings.xml -Dkafka.tag=$TAG package > /tmp/result.log 2>&1
Expand All @@ -11891,9 +11890,36 @@ playground_topic_produce_command() {
docker cp ${root_folder}/scripts/cli/src/schema-validator/target/schema-validator-1.0.0-jar-with-dependencies.jar connect:/tmp/schema-validator-1.0.0-jar-with-dependencies.jar > /dev/null 2>&1
docker cp $schema_file connect:/tmp/schema.json > /dev/null 2>&1
docker cp $tmp_dir/out.json connect:/tmp/message.json > /dev/null 2>&1
env_list=""
for conf in "${validate_config[@]}"
do
case "${conf}" in
"scrub.invalid.names=true")
env_list="$env_list -e KAFKA_SCRUB_INVALID_NAMES=true"
;;
"enhanced.avro.schema.support=true")
env_list="$env_list -e KAFKA_ENHANCED_AVRO_SCHEMA_SUPPORT=true"
;;
"connect.meta.data=false")
env_list="$env_list -e KAFKA_CONNECT_META_DATA=false"
;;
"use.optional.for.nonrequired=true")
env_list="$env_list -e KAFKA_USE_OPTIONAL_FOR_NONREQUIRED=true"
;;
"ignore.default.for.nullables=true")
env_list="$env_list -e KAFKA_IGNORE_DEFAULT_FOR_NULLABLES=true"
;;
"generalized.sum.type.support=true")
env_list="$env_list -e KAFKA_GENERALIZED_SUM_TYPE_SUPPORT=true"
;;
*)
logerror "default (none of above)"
;;
esac
done

log "✔️ Validating schema now..."
docker exec -e SCHEMA_TYPE=$schema_type connect bash -c "java -jar /tmp/schema-validator-1.0.0-jar-with-dependencies.jar" > $tmp_dir/result.log
docker exec $env_list -e SCHEMA_TYPE=$schema_type connect bash -c "java -jar /tmp/schema-validator-1.0.0-jar-with-dependencies.jar" > $tmp_dir/result.log
set +e
nb=$(grep -c "ERROR" $tmp_dir/result.log)
if [ $nb -ne 0 ]
then
Expand All @@ -11903,6 +11929,7 @@ playground_topic_produce_command() {
else
log "👌 schema is valid according to $schema_type converter"
fi
set -e
fi

playground topic get-number-records --topic $topic > $tmp_dir/result.log 2>$tmp_dir/result.log
Expand Down Expand Up @@ -11953,9 +11980,14 @@ playground_topic_produce_command() {

;;
*)
if [[ -n "$validate" ]]
then
logerror "❌ --validate is set but $schema_type is used. This is only valid for avro|json-schema|protobuf"
exit 1
fi
if [[ -n "$value_subject_name_strategy" ]]
then
logerror "--value-subject-name-strategy is set but $schema_type is used. This is only valid for avro|json-schema|protobuf"
logerror "--value-subject-name-strategy is set but $schema_type is used. This is only valid for avro|json-schema|protobuf"
exit 1

fi
Expand Down Expand Up @@ -18792,6 +18824,25 @@ playground_topic_produce_parse_requirements() {
shift
;;

# :flag.case
--validate-config)

# :flag.case_arg
if [[ -n ${2+x} ]]; then

if [[ -z ${args['--validate-config']+x} ]]; then
args['--validate-config']="\"$2\""
else
args['--validate-config']="${args[--validate-config]} \"$2\""
fi
shift
shift
else
printf "%s\n" "--validate-config requires an argument: --validate-config VALIDATE-CONFIG" >&2
exit 1
fi
;;

-?*)
printf "invalid option: %s\n" "$key" >&2
exit 1
Expand Down Expand Up @@ -18828,6 +18879,13 @@ playground_topic_produce_parse_requirements() {
printf "%s\n" "--value-subject-name-strategy must be one of: TopicNameStrategy, RecordNameStrategy, TopicRecordNameStrategy" >&2
exit 1
fi
eval "input_array=(${args[--validate-config]})"
for i in "${input_array[@]}"; do
if [[ ! $i =~ ^(scrub.invalid.names=true|enhanced.avro.schema.support=true|connect.meta.data=false|object.additional.properties=false|use.optional.for.nonrequired=true|ignore.default.for.nullables=true|generalized.sum.type.support=true)$ ]]; then
printf "%s\n" "--validate-config must be one of: scrub.invalid.names=true, enhanced.avro.schema.support=true, connect.meta.data=false, object.additional.properties=false, use.optional.for.nonrequired=true, ignore.default.for.nullables=true, generalized.sum.type.support=true" >&2
exit 1
fi
done

# :command.user_filter
filter_error=$(filter_schema_registry_running)
Expand Down
20 changes: 19 additions & 1 deletion scripts/cli/src/bashly.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1248,7 +1248,25 @@ commands:
- long: --validate
required: false
help: |-
Validate schema according to connect sink converter used
☑️ Validate schema according to connect converter used
- long: --validate-config
arg: validate-config
help: |-
🔩 Converter configuration parameters to use
🎓 Tip: you can pass multiple parameters by specifying --validate-config multiple times
required: false
repeatable: true
allowed:
- scrub.invalid.names=true
- enhanced.avro.schema.support=true
- connect.meta.data=false
- object.additional.properties=false
- use.optional.for.nonrequired=true
- ignore.default.for.nullables=true
- generalized.sum.type.support=true

examples: |
playground topic produce --tombstone --topic a-topic --key mykey
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,14 @@ public SchemaValidator() throws ExecutionException, InterruptedException {
}

private void start() throws InterruptedException {
logger.info("creating schema validator with props: {}", properties);
logger.info("Creating schema validator with properties: {}", properties);
Faker faker = new Faker();
String randomName = faker.name().firstName();
SchemaAndValue converted1 = null;
try {
if (schemaType.equals("json-schema")) {
JsonNode rawSchemaJson = readJsonNode("/tmp/schema.json");
ObjectMapper mapper = new ObjectMapper();

File from = new File("/tmp/message.json");
JsonNode masterJSON = mapper.readTree(from);
CachedSchemaRegistryClient schemaRegistryClient = new CachedSchemaRegistryClient("http://schema-registry:8081",1000);
Expand All @@ -81,10 +80,7 @@ private void start() throws InterruptedException {
Decoder decoder = DecoderFactory.get().jsonDecoder(schema, json);
DatumReader<GenericRecord> reader = new SpecificDatumReader<>(schema);
GenericRecord record = reader.read(null, decoder);

CachedSchemaRegistryClient schemaRegistryClient = new CachedSchemaRegistryClient("http://schema-registry:8081",1000);


schemaRegistryClient.register(randomName+"-value", new AvroSchema(schema));
KafkaAvroSerializer serializer = new KafkaAvroSerializer(schemaRegistryClient);
AvroConverter converter = new AvroConverter();
Expand All @@ -95,7 +91,7 @@ private void start() throws InterruptedException {
}
if(converted1 != null)
{
logger.info("Connect Schema is: {}", converted1.toString());
logger.info("Connect schema is: {}", converted1.toString());
}
} catch(Exception e) {
logger.error("Exception: ", e);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
log4j.rootLogger=ERROR, stdout
log4j.rootLogger=INFO, stdout

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
Expand Down
56 changes: 41 additions & 15 deletions scripts/cli/src/topic_produce_command.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ tombstone="${args[--tombstone]}"
compatibility="${args[--compatibility]}"
value_subject_name_strategy="${args[--value-subject-name-strategy]}"
validate="${args[--validate]}"
# Convert the space delimited string to an array
eval "validate_config=(${args[--validate-config]})"

tmp_dir=$(mktemp -d -t ci-XXXXXXXXXX)
trap 'rm -rf $tmp_dir' EXIT
Expand Down Expand Up @@ -189,11 +191,6 @@ else
SECONDS=0
case "${schema_type}" in
json|sql)
if [[ -n "$validate" ]]
then
logerror "❌ --validate is set and only supports avro, protobuf or json-schema"
exit 1
fi
# https://github.com/MaterializeInc/datagen
set +e
docker run --rm -i -v $schema_file:/app/schema.$schema_type materialize/datagen -s schema.$schema_type -n $nb_messages_to_generate --dry-run > $tmp_dir/result.log
Expand Down Expand Up @@ -221,11 +218,6 @@ else
docker run --rm -v $tmp_dir:/tmp/ -v $schema_file:/app/schema.proto -e NB_MESSAGES=$nb_messages_to_generate vdesabou/protobuf-faker > $tmp_dir/out.json
;;
raw)
if [[ -n "$validate" ]]
then
logerror "❌ --validate is set and only supports avro, protobuf or json-schema"
exit 1
fi
if jq -e . >/dev/null 2>&1 <<< "$(cat "$schema_file")"
then
log "💫 payload is single json, it will be sent as one record"
Expand Down Expand Up @@ -325,17 +317,46 @@ then
docker cp ${root_folder}/scripts/cli/src/schema-validator/target/schema-validator-1.0.0-jar-with-dependencies.jar connect:/tmp/schema-validator-1.0.0-jar-with-dependencies.jar > /dev/null 2>&1
docker cp $schema_file connect:/tmp/schema.json > /dev/null 2>&1
docker cp $tmp_dir/out.json connect:/tmp/message.json > /dev/null 2>&1

docker exec -e SCHEMA_TYPE=$schema_type connect bash -c "java -jar /tmp/schema-validator-1.0.0-jar-with-dependencies.jar" > $tmp_dir/result.log
env_list=""
for conf in "${validate_config[@]}"
do
case "${conf}" in
"scrub.invalid.names=true")
env_list="$env_list -e KAFKA_SCRUB_INVALID_NAMES=true"
;;
"enhanced.avro.schema.support=true")
env_list="$env_list -e KAFKA_ENHANCED_AVRO_SCHEMA_SUPPORT=true"
;;
"connect.meta.data=false")
env_list="$env_list -e KAFKA_CONNECT_META_DATA=false"
;;
"use.optional.for.nonrequired=true")
env_list="$env_list -e KAFKA_USE_OPTIONAL_FOR_NONREQUIRED=true"
;;
"ignore.default.for.nullables=true")
env_list="$env_list -e KAFKA_IGNORE_DEFAULT_FOR_NULLABLES=true"
;;
"generalized.sum.type.support=true")
env_list="$env_list -e KAFKA_GENERALIZED_SUM_TYPE_SUPPORT=true"
;;
*)
logerror "default (none of above)"
;;
esac
done

docker exec $env_list -e SCHEMA_TYPE=$schema_type connect bash -c "java -jar /tmp/schema-validator-1.0.0-jar-with-dependencies.jar" > $tmp_dir/result.log
set +e
nb=$(grep -c "ERROR" $tmp_dir/result.log)
if [ $nb -ne 0 ]
then
logerror "❌ schema is not valid according to $schema_type sink converter"
logerror "❌ schema is not valid according to $schema_type converter"
cat $tmp_dir/result.log
exit 1
else
log "👌 schema is valid according to $schema_type sink converter"
log "👌 schema is valid according to $schema_type converter"
fi
set -e
fi

playground topic get-number-records --topic $topic > $tmp_dir/result.log 2>$tmp_dir/result.log
Expand Down Expand Up @@ -386,9 +407,14 @@ case "${schema_type}" in

;;
*)
if [[ -n "$validate" ]]
then
logerror "❌ --validate is set but $schema_type is used. This is only valid for avro|json-schema|protobuf"
exit 1
fi
if [[ -n "$value_subject_name_strategy" ]]
then
logerror "--value-subject-name-strategy is set but $schema_type is used. This is only valid for avro|json-schema|protobuf"
logerror "--value-subject-name-strategy is set but $schema_type is used. This is only valid for avro|json-schema|protobuf"
exit 1
fi
;;
Expand Down

0 comments on commit de12038

Please sign in to comment.