From 101b6b21d783e3dba18f849c15cc96dccdd9a296 Mon Sep 17 00:00:00 2001 From: Emanuele Sabellico Date: Tue, 19 Jul 2022 19:14:32 +0200 Subject: [PATCH 01/11] librdkafka version v1.9.2-RC2 --- .travis.yml | 2 +- examples/docker/Dockerfile.alpine | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.travis.yml b/.travis.yml index 1b97ec8d8..ffbc3bb3e 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,6 +1,6 @@ env: global: - - LIBRDKAFKA_VERSION=v1.9.0 + - LIBRDKAFKA_VERSION=v1.9.2-RC2 jobs: include: diff --git a/examples/docker/Dockerfile.alpine b/examples/docker/Dockerfile.alpine index 6e492fa64..5090723eb 100644 --- a/examples/docker/Dockerfile.alpine +++ b/examples/docker/Dockerfile.alpine @@ -30,7 +30,7 @@ FROM alpine:3.12 COPY . /usr/src/confluent-kafka-python -ENV LIBRDKAFKA_VERSION v1.9.0 +ENV LIBRDKAFKA_VERSION v1.9.2-RC2 ENV KAFKACAT_VERSION master From 3b72361097872357f504b9da72efa8db6ec1099b Mon Sep 17 00:00:00 2001 From: Emanuele Sabellico Date: Tue, 19 Jul 2022 19:15:30 +0200 Subject: [PATCH 02/11] Version v1.9.2 --- docs/conf.py | 2 +- setup.py | 2 +- src/confluent_kafka/src/confluent_kafka.h | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/conf.py b/docs/conf.py index d13226f8a..613fba03a 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -34,7 +34,7 @@ # built documents. # # The short X.Y version. -version = '1.9.0' +version = '1.9.2' # The full version, including alpha/beta/rc tags. release = version ###################################################################### diff --git a/setup.py b/setup.py index eadf11646..f77895814 100755 --- a/setup.py +++ b/setup.py @@ -75,7 +75,7 @@ def get_install_requirements(path): setup(name='confluent-kafka', # Make sure to bump CFL_VERSION* in confluent_kafka/src/confluent_kafka.h # and version in docs/conf.py. - version='1.9.0', + version='1.9.2', description='Confluent\'s Python client for Apache Kafka', author='Confluent Inc', author_email='support@confluent.io', diff --git a/src/confluent_kafka/src/confluent_kafka.h b/src/confluent_kafka/src/confluent_kafka.h index 6658bab87..07086d715 100644 --- a/src/confluent_kafka/src/confluent_kafka.h +++ b/src/confluent_kafka/src/confluent_kafka.h @@ -42,8 +42,8 @@ * 0xMMmmRRPP * MM=major, mm=minor, RR=revision, PP=patchlevel (not used) */ -#define CFL_VERSION 0x01090000 -#define CFL_VERSION_STR "1.9.0" +#define CFL_VERSION 0x01090200 +#define CFL_VERSION_STR "1.9.2" /** * Minimum required librdkafka version. This is checked both during From 3e343bb2a0c4eb1f3dec0e5286e74a4b3872a3de Mon Sep 17 00:00:00 2001 From: Emanuele Sabellico Date: Tue, 19 Jul 2022 19:18:03 +0200 Subject: [PATCH 03/11] updated CHANGELOG --- CHANGELOG.md | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index b31db22c3..a88394199 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,31 @@ # Confluent's Python client for Apache Kafka +## v1.9.2 + +v1.9.2 is a maintenance release with the following fixes and enhancements: + + - Wheel for macOS M1/arm64 + - KIP-140 Admin API ACL fix: + When requesting multiple create_acls or delete_acls operations + if the provided ACL bindings or ACL binding filters are not + unique it throws an exception immediately instead of failing later + when the responses are read. (#1370). + - KIP-140 Admin API ACL fix: + Better documentation of the describe and delete ACLs behavior + when using the MATCH resource patter type in a filter. (#1373). + - Avro serialization examples: + added a parameter for using a generic or specific Avro schema. (#1381). + +confluent-kafka-python is based on librdkafka v1.9.2, see the +[librdkafka release notes](https://github.com/edenhill/librdkafka/releases/tag/v1.9.2) +for a complete list of changes, enhancements, fixes and upgrade considerations. + + +## v1.9.1 + +There was no 1.9.1 release of the Python Client. + + ## v1.9.0 This is a feature release: From eba2d0d7bbf598798fefcecb3d449f6fce36fc86 Mon Sep 17 00:00:00 2001 From: Emanuele Sabellico Date: Fri, 29 Jul 2022 22:36:59 +0200 Subject: [PATCH 04/11] macOS semaphore pipeline for x64 and arm64 --- .semaphore/semaphore.yml | 93 ++++++++++++++++++++++++++++++ .travis.yml | 45 +++------------ examples/docker/Dockerfile.alpine | 2 +- tools/prepare-osx.sh | 31 ---------- tools/wheels/build-wheels.sh | 13 +++-- tools/wheels/install-librdkafka.sh | 9 +-- 6 files changed, 115 insertions(+), 78 deletions(-) create mode 100644 .semaphore/semaphore.yml delete mode 100755 tools/prepare-osx.sh diff --git a/.semaphore/semaphore.yml b/.semaphore/semaphore.yml new file mode 100644 index 000000000..3b845788b --- /dev/null +++ b/.semaphore/semaphore.yml @@ -0,0 +1,93 @@ +version: v1.0 +name: Test on PR or create and upload wheels on tag. +agent: + machine: + type: s1-prod-mac-m1 +global_job_config: + secrets: + - name: vault_sem2_approle + env_vars: + - name: LIBRDKAFKA_VERSION + value: v1.9.2-RC3 +blocks: + - name: "Wheels: OSX x64" + run: + when: "tag =~ '.*'" + dependencies: [] + task: + agent: + machine: + type: s1-prod-macos + env_vars: + - name: OS_NAME + value: osx + - name: ARCH + value: x64 + jobs: + - name: Build + commands: + - cd $SEM_WORKSPACE + - export HOME=$SEM_WORKSPACE + - checkout + # needed on the self-hosted agent + - if [ ! -d ./tools ]; then cd $SEM_WORKSPACE/confluent-kafka-python; fi + - PIP_INSTALL_OPTIONS="--user" tools/wheels/build-wheels.sh "${LIBRDKAFKA_VERSION#v}" wheelhouse + - tar -czf wheelhouse-macOS-${ARCH}.tgz wheelhouse + - artifact push workflow wheelhouse-macOS-${ARCH}.tgz + - name: "Wheels: OSX arm64" + run: + when: "tag =~ '.*'" + dependencies: [] + task: + env_vars: + - name: OS_NAME + value: osx + - name: CIBW_ARCHS + value: arm64 + - name: ARCH + value: arm64 + jobs: + - name: Build + commands: + - cd $SEM_WORKSPACE + - export HOME=$SEM_WORKSPACE + - checkout + # needed on the self-hosted agent + - if [ ! -d ./tools ]; then cd $SEM_WORKSPACE/confluent-kafka-python; fi + - PIP_INSTALL_OPTIONS="--user" tools/wheels/build-wheels.sh "${LIBRDKAFKA_VERSION#v}" wheelhouse + - tar -czf wheelhouse-macOS-${ARCH}.tgz wheelhouse + - artifact push workflow wheelhouse-macOS-${ARCH}.tgz + + - name: Source package verification with Python 3 (OSX x64) +docs + dependencies: [] + task: + agent: + machine: + type: s1-prod-macos + env_vars: + - name: OS_NAME + value: osx + - name: ARCH + value: arm64 + jobs: + - name: Build + commands: + - cd $SEM_WORKSPACE + - export HOME=$SEM_WORKSPACE + - checkout + # needed on the self-hosted agent + - if [ ! -d ./tools ]; then cd $SEM_WORKSPACE/confluent-kafka-python; fi + # use a virtualenv + - python3 -m venv _venv && source _venv/bin/activate + - pip install -r docs/requirements.txt + - pip install -U protobuf + # install librdkafka + - lib_dir=dest/runtimes/$OS_NAME-$ARCH/native + - tools/wheels/install-librdkafka.sh "${LIBRDKAFKA_VERSION#v}" dest + - export CFLAGS="$CFLAGS -I${PWD}/dest/build/native/include" + - export LDFLAGS="$LDFLAGS -L${PWD}/${lib_dir}" + - export LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$PWD/$lib_dir" + - export DYLD_LIBRARY_PATH="$DYLD_LIBRARY_PATH:$PWD/$lib_dir" + # install confluent-kafka + - python setup.py build && python setup.py install + - make docs \ No newline at end of file diff --git a/.travis.yml b/.travis.yml index ffbc3bb3e..7dcf3d668 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,37 +1,17 @@ env: global: - - LIBRDKAFKA_VERSION=v1.9.2-RC2 + - LIBRDKAFKA_VERSION=v1.9.2-RC3 jobs: include: - - name: "Source package verification with Python 2.7 (Linux)" - if: false + - name: "Source package verification with Python 3.8 (Linux)" os: linux language: python dist: xenial - python: "2.7" - env: LD_LIBRARY_PATH="$PWD/tmp-build/lib" - services: docker - - - name: "Source package verification with Python 3.6 (Linux)" - os: linux - language: python - dist: xenial - python: "3.6" + python: "3.8" env: LD_LIBRARY_PATH="$PWD/tmp-build/lib" services: docker - - name: "Source package verification with Python 2.7 (OSX)" - if: false - os: osx - python: "2.7" - env: DYLD_LIBRARY_PATH="$PWD/tmp-build/lib" INTERPRETER_VERSION="2.7.17" - - - name: "Source package verification with Python 3.6 (OSX) +docs" - os: osx - python: "3.6" - env: DYLD_LIBRARY_PATH="$PWD/tmp-build/lib" MK_DOCS="y" INTERPRETER_VERSION="3.6.5" - - name: "Wheels: Windows x64" if: tag is present os: windows @@ -70,19 +50,12 @@ jobs: env: BUILD_WHEELS=1 script: tools/wheels/build-wheels.sh ${LIBRDKAFKA_VERSION#v} wheelhouse - - name: "Wheels: MacOSX x64" - if: tag is present - os: osx - language: shell - env: BUILD_WHEELS=1 - script: tools/wheels/build-wheels.sh ${LIBRDKAFKA_VERSION#v} wheelhouse - - -# Install test dependencies unconditionally -# Travis OSX envs requires some setup; see tools/prepare-osx.sh -# Install cibuildwheel if this is a tagged PR -before_install: - - if [[ $TRAVIS_OS_NAME == "osx" && $BUILD_WHEELS != 1 ]]; then tools/prepare-osx.sh ${INTERPRETER_VERSION} /tmp/venv && source /tmp/venv/bin/activate; fi + # - name: "Wheels: MacOSX x64" + # if: tag is present + # os: osx + # language: shell + # env: BUILD_WHEELS=1 + # script: tools/wheels/build-wheels.sh ${LIBRDKAFKA_VERSION#v} wheelhouse install: # Install interceptors diff --git a/examples/docker/Dockerfile.alpine b/examples/docker/Dockerfile.alpine index 5090723eb..b54de9116 100644 --- a/examples/docker/Dockerfile.alpine +++ b/examples/docker/Dockerfile.alpine @@ -30,7 +30,7 @@ FROM alpine:3.12 COPY . /usr/src/confluent-kafka-python -ENV LIBRDKAFKA_VERSION v1.9.2-RC2 +ENV LIBRDKAFKA_VERSION v1.9.2-RC3 ENV KAFKACAT_VERSION master diff --git a/tools/prepare-osx.sh b/tools/prepare-osx.sh deleted file mode 100755 index adb7c5ea0..000000000 --- a/tools/prepare-osx.sh +++ /dev/null @@ -1,31 +0,0 @@ -#!/bin/bash -# -# This script prepares the Travis OSX env with a particular interpreter -# https://docs.travis-ci.com/user/languages/python/ -# -# Default OSX environment -# https://docs.travis-ci.com/user/reference/osx/#compilers-and-build-toolchain -# -PY_INTERPRETER=$1 -VENV_HOME=$2 - -set -ev - -export HOMEBREW_NO_AUTO_UPDATE=1 -export HOMEBREW_NO_INSTALL_CLEANUP=1 -brew upgrade libtool || brew install libtool - -if [[ -z ${PY_INTERPRETER} ]] || [[ -z ${VENV_HOME} ]]; then - echo "Usage: $0 " - exit 1 -fi - -# Update virtualenv and install requested interpreter -echo "# Updating basic dependencies" -pip install -U pip -pip install virtualenv -pyenv install -f ${PY_INTERPRETER} - -# Create virtualenv -echo "# Constructing virtualenv for interpreter ${PY_INTERPRETER}" -virtualenv -p ~/.pyenv/versions/${PY_INTERPRETER}/bin/python ${VENV_HOME} diff --git a/tools/wheels/build-wheels.sh b/tools/wheels/build-wheels.sh index dd38f53e5..681798e1f 100755 --- a/tools/wheels/build-wheels.sh +++ b/tools/wheels/build-wheels.sh @@ -8,7 +8,7 @@ this_dir="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" # Skip PyPy, Python2, old Python3 versions, musl, and x86 builds. -export CIBW_SKIP="pp* cp27-* cp35-* *i686 *musllinux*" +export CIBW_SKIP="pp* cp27-* cp35-* *i686 *musllinux* $CIBW_SKIP" # Run a simple test suite export CIBW_TEST_REQUIRES="-r tests/requirements.txt" export CIBW_TEST_COMMAND="pytest {project}/tests/test_Producer.py" @@ -26,19 +26,20 @@ set -ex [[ -d $wheeldir ]] || mkdir -p "$wheeldir" +ARCH=${ARCH:-x64} case $OSTYPE in linux*) os=linux # Need to set up env vars (in docker) so that setup.py # finds librdkafka. - lib_dir=dest/runtimes/linux-x64/native + lib_dir=dest/runtimes/linux-$ARCH/native export CIBW_ENVIRONMENT="CFLAGS=-I\$PWD/dest/build/native/include LDFLAGS=-L\$PWD/$lib_dir LD_LIBRARY_PATH=\$LD_LIBRARY_PATH:\$PWD/$lib_dir" ;; darwin*) os=macos # Need to set up env vars so that setup.py finds librdkafka. - lib_dir=dest/runtimes/osx-x64/native + lib_dir=dest/runtimes/osx-$ARCH/native export CFLAGS="-I${PWD}/dest/build/native/include" export LDFLAGS="-L${PWD}/$lib_dir" ;; @@ -51,10 +52,10 @@ esac $this_dir/install-librdkafka.sh $librdkafka_version dest -install_pkgs=cibuildwheel==2.7.0 +install_pkgs=cibuildwheel==2.8.1 -python3 -m pip install $install_pkgs || - pip3 install $install_pkgs +python3 -m pip install ${PIP_INSTALL_OPTS} $install_pkgs || + pip3 install ${PIP_INSTALL_OPTS} $install_pkgs if [[ -z $TRAVIS ]]; then cibw_args="--platform $os" diff --git a/tools/wheels/install-librdkafka.sh b/tools/wheels/install-librdkafka.sh index 1b8751ccc..cee2ac115 100755 --- a/tools/wheels/install-librdkafka.sh +++ b/tools/wheels/install-librdkafka.sh @@ -23,22 +23,23 @@ curl -L -o lrk$VER.zip https://www.nuget.org/api/v2/package/librdkafka.redist/$V unzip lrk$VER.zip +ARCH=${ARCH:-x64} if [[ $OSTYPE == linux* ]]; then # Linux # Copy the librdkafka build with least dependencies to librdkafka.so.1 - cp -v runtimes/linux-x64/native/{centos6-librdkafka.so,librdkafka.so.1} - ldd runtimes/linux-x64/native/librdkafka.so.1 + cp -v runtimes/linux-$ARCH/native/{centos6-librdkafka.so,librdkafka.so.1} + ldd runtimes/linux-$ARCH/native/librdkafka.so.1 elif [[ $OSTYPE == darwin* ]]; then # MacOS X # Change the library's self-referencing name from # /Users/travis/.....somelocation/librdkafka.1.dylib to its local path. - install_name_tool -id $PWD/runtimes/osx-x64/native/librdkafka.dylib runtimes/osx-x64/native/librdkafka.dylib + install_name_tool -id $PWD/runtimes/osx-$ARCH/native/librdkafka.dylib runtimes/osx-$ARCH/native/librdkafka.dylib - otool -L runtimes/osx-x64/native/librdkafka.dylib + otool -L runtimes/osx-$ARCH/native/librdkafka.dylib fi popd From fed23909691aa000cc2cdd8dec340c29dc38a592 Mon Sep 17 00:00:00 2001 From: Emanuele Sabellico Date: Sat, 30 Jul 2022 00:51:08 +0200 Subject: [PATCH 05/11] upgraded wheel tests --- tools/smoketest.sh | 4 ++-- tools/test-manylinux.sh | 19 +++++-------------- 2 files changed, 7 insertions(+), 16 deletions(-) diff --git a/tools/smoketest.sh b/tools/smoketest.sh index ded067198..acfb4ac9a 100755 --- a/tools/smoketest.sh +++ b/tools/smoketest.sh @@ -29,8 +29,8 @@ fi pyvers_tested= -# Run tests with both python2 and python3 (whatever versions the OS provides) -for py in 2.7 3.8 ; do +# Run tests with python3 +for py in 3.8 ; do echo "$0: # Smoketest with Python$py" if ! python$py -V ; then diff --git a/tools/test-manylinux.sh b/tools/test-manylinux.sh index 83e3af79a..26ef95262 100755 --- a/tools/test-manylinux.sh +++ b/tools/test-manylinux.sh @@ -30,19 +30,13 @@ fi echo "$0 running from $(pwd)" - -function setup_centos { - # CentOS container setup - yum install -q -y python python3 epel-release curl -} - function setup_ubuntu { # Ubuntu container setup apt-get update - apt-get install -y python python3 curl + apt-get install -y python3.8 curl # python3-distutils is required on Ubuntu 18.04 and later but does # not exist on 14.04. - apt-get install -y python3-distutils || true + apt-get install -y python3.8-distutils || true } @@ -57,9 +51,7 @@ function run_single_in_docker { fi # Detect OS - if grep -qi centos /etc/system-release /etc/redhat-release 2>/dev/null ; then - setup_centos - elif grep -qiE 'ubuntu|debian' /etc/os-release 2>/dev/null ; then + if grep -qiE 'ubuntu|debian' /etc/os-release 2>/dev/null ; then setup_ubuntu else echo "WARNING: Don't know what platform I'm on: $(uname -a)" @@ -69,7 +61,7 @@ function run_single_in_docker { # in a plethora of possibly outdated Python requirements that # might interfere with the newer packages from PyPi, such as six. # Instead install it directly from PyPa. - curl https://bootstrap.pypa.io/get-pip.py | python + curl https://bootstrap.pypa.io/get-pip.py | python3.8 /io/tools/smoketest.sh "$wheelhouse" } @@ -86,8 +78,7 @@ function run_all_with_docker { [[ ! -z $DOCKER_IMAGES ]] || \ # LTS and stable release of popular Linux distros. - # We require >= Python 2.7 to be available (which rules out Centos 6.6) - DOCKER_IMAGES="ubuntu:14.04 ubuntu:16.04 ubuntu:18.04 ubuntu:20.04 centos:7 centos:8" + DOCKER_IMAGES="ubuntu:18.04 ubuntu:20.04" _wheels="$wheelhouse/*manylinux*.whl" From 19ef95d122483432d39ecd017bb883acf317438c Mon Sep 17 00:00:00 2001 From: Emanuele Sabellico Date: Tue, 2 Aug 2022 15:55:39 +0200 Subject: [PATCH 06/11] librdkafka version v1.9.2 --- .semaphore/semaphore.yml | 2 +- .travis.yml | 2 +- examples/docker/Dockerfile.alpine | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.semaphore/semaphore.yml b/.semaphore/semaphore.yml index 3b845788b..9768bfcc5 100644 --- a/.semaphore/semaphore.yml +++ b/.semaphore/semaphore.yml @@ -8,7 +8,7 @@ global_job_config: - name: vault_sem2_approle env_vars: - name: LIBRDKAFKA_VERSION - value: v1.9.2-RC3 + value: v1.9.2 blocks: - name: "Wheels: OSX x64" run: diff --git a/.travis.yml b/.travis.yml index 7dcf3d668..d78042a5e 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,6 +1,6 @@ env: global: - - LIBRDKAFKA_VERSION=v1.9.2-RC3 + - LIBRDKAFKA_VERSION=v1.9.2 jobs: include: diff --git a/examples/docker/Dockerfile.alpine b/examples/docker/Dockerfile.alpine index b54de9116..8e42153f9 100644 --- a/examples/docker/Dockerfile.alpine +++ b/examples/docker/Dockerfile.alpine @@ -30,7 +30,7 @@ FROM alpine:3.12 COPY . /usr/src/confluent-kafka-python -ENV LIBRDKAFKA_VERSION v1.9.2-RC3 +ENV LIBRDKAFKA_VERSION v1.9.2 ENV KAFKACAT_VERSION master From ef6319b9daad86a62fc6f025ad8833b3bc9c739f Mon Sep 17 00:00:00 2001 From: Emanuele Sabellico Date: Tue, 2 Aug 2022 16:03:09 +0200 Subject: [PATCH 07/11] macOS x64 wheels still built with Travis for prudence --- .travis.yml | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/.travis.yml b/.travis.yml index d78042a5e..3a2795cbd 100644 --- a/.travis.yml +++ b/.travis.yml @@ -50,12 +50,12 @@ jobs: env: BUILD_WHEELS=1 script: tools/wheels/build-wheels.sh ${LIBRDKAFKA_VERSION#v} wheelhouse - # - name: "Wheels: MacOSX x64" - # if: tag is present - # os: osx - # language: shell - # env: BUILD_WHEELS=1 - # script: tools/wheels/build-wheels.sh ${LIBRDKAFKA_VERSION#v} wheelhouse + - name: "Wheels: MacOSX x64" + if: tag is present + os: osx + language: shell + env: BUILD_WHEELS=1 + script: tools/wheels/build-wheels.sh ${LIBRDKAFKA_VERSION#v} wheelhouse install: # Install interceptors From 9ea3aae9bb085d394b37d0775d770520361672f1 Mon Sep 17 00:00:00 2001 From: Emanuele Sabellico Date: Tue, 2 Aug 2022 22:31:06 +0200 Subject: [PATCH 08/11] flake8 rule E275 compliant --- tests/integration/integration_test.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/integration/integration_test.py b/tests/integration/integration_test.py index d87288905..a109c2299 100755 --- a/tests/integration/integration_test.py +++ b/tests/integration/integration_test.py @@ -1126,10 +1126,10 @@ def verify_avro_explicit_read_schema(): msgcount += 1 # Avro schema projection should return the two fields not present in the writer schema try: - assert(msg.key().get('favorite_number') == 42) - assert(msg.key().get('favorite_color') == "purple") - assert(msg.value().get('favorite_number') == 42) - assert(msg.value().get('favorite_color') == "purple") + assert (msg.key().get('favorite_number') == 42) + assert (msg.key().get('favorite_color') == "purple") + assert (msg.value().get('favorite_number') == 42) + assert (msg.value().get('favorite_color') == "purple") print("success: schema projection worked for explicit reader schema") except KeyError: raise confluent_kafka.avro.SerializerError("Schema projection failed when setting reader schema.") From f8b6468c2388f9edd16806aae63c26547eee2a40 Mon Sep 17 00:00:00 2001 From: Ben Withers Date: Tue, 2 Aug 2022 22:36:06 +0100 Subject: [PATCH 09/11] Support setting principal and SASL extensions in oauth_cb, handle failures (#1402) * Support setting principal and SASL extensions in oauth_cb and handle token failures * removed global variables Co-authored-by: Emanuele Sabellico --- src/confluent_kafka/src/confluent_kafka.c | 103 ++++++++++++++++++++-- tests/test_misc.py | 99 ++++++++++++++------- 2 files changed, 164 insertions(+), 38 deletions(-) diff --git a/src/confluent_kafka/src/confluent_kafka.c b/src/confluent_kafka/src/confluent_kafka.c index a9dd8c17a..47513d843 100644 --- a/src/confluent_kafka/src/confluent_kafka.c +++ b/src/confluent_kafka/src/confluent_kafka.c @@ -1522,6 +1522,62 @@ static void log_cb (const rd_kafka_t *rk, int level, CallState_resume(cs); } +/** + * @brief Translate Python \p key and \p value to C types and set on + * provided \p extensions char* array at the provided index. + * + * @returns 1 on success or 0 if an exception was raised. + */ +static int py_extensions_to_c (char **extensions, Py_ssize_t idx, + PyObject *key, PyObject *value) { + PyObject *ks, *ks8, *vo8 = NULL; + const char *k; + const char *v; + Py_ssize_t ksize = 0; + Py_ssize_t vsize = 0; + + if (!(ks = cfl_PyObject_Unistr(key))) { + PyErr_SetString(PyExc_TypeError, + "expected extension key to be unicode " + "string"); + return 0; + } + + k = cfl_PyUnistr_AsUTF8(ks, &ks8); + ksize = (Py_ssize_t)strlen(k); + + if (cfl_PyUnistr(_Check(value))) { + /* Unicode string, translate to utf-8. */ + v = cfl_PyUnistr_AsUTF8(value, &vo8); + if (!v) { + Py_DECREF(ks); + Py_XDECREF(ks8); + return 0; + } + vsize = (Py_ssize_t)strlen(v); + } else { + PyErr_Format(PyExc_TypeError, + "expected extension value to be " + "unicode string, not %s", + ((PyTypeObject *)PyObject_Type(value))-> + tp_name); + Py_DECREF(ks); + Py_XDECREF(ks8); + return 0; + } + + extensions[idx] = (char*)malloc(ksize); + strcpy(extensions[idx], k); + extensions[idx + 1] = (char*)malloc(vsize); + strcpy(extensions[idx + 1], v); + + Py_DECREF(ks); + Py_XDECREF(ks8); + Py_XDECREF(vo8); + + return 1; +} + static void oauth_cb (rd_kafka_t *rk, const char *oauthbearer_config, void *opaque) { Handle *h = opaque; @@ -1529,6 +1585,10 @@ static void oauth_cb (rd_kafka_t *rk, const char *oauthbearer_config, CallState *cs; const char *token; double expiry; + const char *principal = ""; + PyObject *extensions = NULL; + char **rd_extensions = NULL; + Py_ssize_t rd_extensions_size = 0; char err_msg[2048]; rd_kafka_resp_err_t err_code; @@ -1539,26 +1599,57 @@ static void oauth_cb (rd_kafka_t *rk, const char *oauthbearer_config, Py_DECREF(eo); if (!result) { - goto err; + goto fail; } - if (!PyArg_ParseTuple(result, "sd", &token, &expiry)) { + if (!PyArg_ParseTuple(result, "sd|sO!", &token, &expiry, &principal, &PyDict_Type, &extensions)) { Py_DECREF(result); - PyErr_Format(PyExc_TypeError, + PyErr_SetString(PyExc_TypeError, "expect returned value from oauth_cb " "to be (token_str, expiry_time) tuple"); goto err; } + + if (extensions) { + int len = (int)PyDict_Size(extensions); + rd_extensions = (char **)malloc(2 * len * sizeof(char *)); + Py_ssize_t pos = 0; + PyObject *ko, *vo; + while (PyDict_Next(extensions, &pos, &ko, &vo)) { + if (!py_extensions_to_c(rd_extensions, rd_extensions_size, ko, vo)) { + Py_DECREF(result); + free(rd_extensions); + goto err; + } + rd_extensions_size = rd_extensions_size + 2; + } + } + err_code = rd_kafka_oauthbearer_set_token(h->rk, token, (int64_t)(expiry * 1000), - "", NULL, 0, err_msg, + principal, (const char **)rd_extensions, rd_extensions_size, err_msg, sizeof(err_msg)); Py_DECREF(result); - if (err_code) { + if (rd_extensions) { + for(int i = 0; i < rd_extensions_size; i++) { + free(rd_extensions[i]); + } + free(rd_extensions); + } + + if (err_code != RD_KAFKA_RESP_ERR_NO_ERROR) { PyErr_Format(PyExc_ValueError, "%s", err_msg); - goto err; + goto fail; } goto done; +fail: + err_code = rd_kafka_oauthbearer_set_token_failure(h->rk, "OAuth callback raised exception"); + if (err_code != RD_KAFKA_RESP_ERR_NO_ERROR) { + PyErr_SetString(PyExc_ValueError, "Failed to set token failure"); + goto err; + } + PyErr_Clear(); + goto done; err: CallState_crash(cs); rd_kafka_yield(h->rk); diff --git a/tests/test_misc.py b/tests/test_misc.py index cdf1147fe..ae016a3a9 100644 --- a/tests/test_misc.py +++ b/tests/test_misc.py @@ -24,22 +24,18 @@ def test_version(): assert confluent_kafka.version()[0] == confluent_kafka.__version__ -# global variable for error_cb call back function -seen_error_cb = False - - def test_error_cb(): """ Tests error_cb. """ + seen_error_cb = False def error_cb(error_msg): - global seen_error_cb + nonlocal seen_error_cb seen_error_cb = True acceptable_error_codes = (confluent_kafka.KafkaError._TRANSPORT, confluent_kafka.KafkaError._ALL_BROKERS_DOWN) assert error_msg.code() in acceptable_error_codes conf = {'bootstrap.servers': 'localhost:65531', # Purposely cause connection refused error 'group.id': 'test', - 'socket.timeout.ms': '100', 'session.timeout.ms': 1000, # Avoid close() blocking too long 'error_cb': error_cb } @@ -47,26 +43,22 @@ def error_cb(error_msg): kc = confluent_kafka.Consumer(**conf) kc.subscribe(["test"]) while not seen_error_cb: - kc.poll(timeout=1) + kc.poll(timeout=0.1) kc.close() -# global variable for stats_cb call back function -seen_stats_cb = False - - def test_stats_cb(): """ Tests stats_cb. """ + seen_stats_cb = False def stats_cb(stats_json_str): - global seen_stats_cb + nonlocal seen_stats_cb seen_stats_cb = True stats_json = json.loads(stats_json_str) assert len(stats_json['name']) > 0 conf = {'group.id': 'test', - 'socket.timeout.ms': '100', 'session.timeout.ms': 1000, # Avoid close() blocking too long 'statistics.interval.ms': 200, 'stats_cb': stats_cb @@ -76,22 +68,20 @@ def stats_cb(stats_json_str): kc.subscribe(["test"]) while not seen_stats_cb: - kc.poll(timeout=1) + kc.poll(timeout=0.1) kc.close() -seen_stats_cb_check_no_brokers = False - - def test_conf_none(): """ Issue #133 Test that None can be passed for NULL by setting bootstrap.servers to None. If None would be converted to a string then a broker would show up in statistics. Verify that it doesnt. """ + seen_stats_cb_check_no_brokers = False def stats_cb_check_no_brokers(stats_json_str): """ Make sure no brokers are reported in stats """ - global seen_stats_cb_check_no_brokers + nonlocal seen_stats_cb_check_no_brokers stats = json.loads(stats_json_str) assert len(stats['brokers']) == 0, "expected no brokers in stats: %s" % stats_json_str seen_stats_cb_check_no_brokers = True @@ -101,9 +91,8 @@ def stats_cb_check_no_brokers(stats_json_str): 'stats_cb': stats_cb_check_no_brokers} p = confluent_kafka.Producer(conf) - p.poll(timeout=1) + p.poll(timeout=0.1) - global seen_stats_cb_check_no_brokers assert seen_stats_cb_check_no_brokers @@ -130,15 +119,12 @@ def test_throttle_event_types(): assert str(throttle_event) == "broker/0 throttled for 10000 ms" -# global variable for oauth_cb call back function -seen_oauth_cb = False - - def test_oauth_cb(): """ Tests oauth_cb. """ + seen_oauth_cb = False def oauth_cb(oauth_config): - global seen_oauth_cb + nonlocal seen_oauth_cb seen_oauth_cb = True assert oauth_config == 'oauth_cb' return 'token', time.time() + 300.0 @@ -146,7 +132,6 @@ def oauth_cb(oauth_config): conf = {'group.id': 'test', 'security.protocol': 'sasl_plaintext', 'sasl.mechanisms': 'OAUTHBEARER', - 'socket.timeout.ms': '100', 'session.timeout.ms': 1000, # Avoid close() blocking too long 'sasl.oauthbearer.config': 'oauth_cb', 'oauth_cb': oauth_cb @@ -155,7 +140,59 @@ def oauth_cb(oauth_config): kc = confluent_kafka.Consumer(**conf) while not seen_oauth_cb: - kc.poll(timeout=1) + kc.poll(timeout=0.1) + kc.close() + + +def test_oauth_cb_principal_sasl_extensions(): + """ Tests oauth_cb. """ + seen_oauth_cb = False + + def oauth_cb(oauth_config): + nonlocal seen_oauth_cb + seen_oauth_cb = True + assert oauth_config == 'oauth_cb' + return 'token', time.time() + 300.0, oauth_config, {"extone": "extoneval", "exttwo": "exttwoval"} + + conf = {'group.id': 'test', + 'security.protocol': 'sasl_plaintext', + 'sasl.mechanisms': 'OAUTHBEARER', + 'session.timeout.ms': 100, # Avoid close() blocking too long + 'sasl.oauthbearer.config': 'oauth_cb', + 'oauth_cb': oauth_cb + } + + kc = confluent_kafka.Consumer(**conf) + + while not seen_oauth_cb: + kc.poll(timeout=0.1) + kc.close() + + +def test_oauth_cb_failure(): + """ Tests oauth_cb. """ + oauth_cb_count = 0 + + def oauth_cb(oauth_config): + nonlocal oauth_cb_count + oauth_cb_count += 1 + assert oauth_config == 'oauth_cb' + if oauth_cb_count == 2: + return 'token', time.time() + 100.0, oauth_config, {"extthree": "extthreeval"} + raise Exception + + conf = {'group.id': 'test', + 'security.protocol': 'sasl_plaintext', + 'sasl.mechanisms': 'OAUTHBEARER', + 'session.timeout.ms': 1000, # Avoid close() blocking too long + 'sasl.oauthbearer.config': 'oauth_cb', + 'oauth_cb': oauth_cb + } + + kc = confluent_kafka.Consumer(**conf) + + while oauth_cb_count < 2: + kc.poll(timeout=0.1) kc.close() @@ -194,11 +231,9 @@ def test_unordered_dict(init_func): client.poll(0) -# global variable for on_delivery call back function -seen_delivery_cb = False - - def test_topic_config_update(): + seen_delivery_cb = False + # *NOTE* default.topic.config has been deprecated. # This example remains to ensure backward-compatibility until its removal. confs = [{"message.timeout.ms": 600000, "default.topic.config": {"message.timeout.ms": 1000}}, @@ -207,7 +242,7 @@ def test_topic_config_update(): def on_delivery(err, msg): # Since there is no broker, produced messages should time out. - global seen_delivery_cb + nonlocal seen_delivery_cb seen_delivery_cb = True assert err.code() == confluent_kafka.KafkaError._MSG_TIMED_OUT From 5162a1262b3cd1730af98b28a065d40858e26958 Mon Sep 17 00:00:00 2001 From: Emanuele Sabellico Date: Tue, 2 Aug 2022 23:50:15 +0200 Subject: [PATCH 10/11] updated CHANGELOG --- CHANGELOG.md | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a88394199..d5e2a3b86 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,11 +4,13 @@ v1.9.2 is a maintenance release with the following fixes and enhancements: + - Support for setting principal and SASL extensions in oauth_cb + and handle failures (@Manicben, #1402) - Wheel for macOS M1/arm64 - KIP-140 Admin API ACL fix: - When requesting multiple create_acls or delete_acls operations + When requesting multiple create_acls or delete_acls operations, if the provided ACL bindings or ACL binding filters are not - unique it throws an exception immediately instead of failing later + unique, an exception will be thrown immediately rather than later when the responses are read. (#1370). - KIP-140 Admin API ACL fix: Better documentation of the describe and delete ACLs behavior From 1021f55314607c8d3f1bcbce1cbb9a16c8e15bad Mon Sep 17 00:00:00 2001 From: Emanuele Sabellico Date: Wed, 3 Aug 2022 16:49:48 +0200 Subject: [PATCH 11/11] Updated oauth_cb error message with new return values --- src/confluent_kafka/src/confluent_kafka.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/confluent_kafka/src/confluent_kafka.c b/src/confluent_kafka/src/confluent_kafka.c index 47513d843..e2329f706 100644 --- a/src/confluent_kafka/src/confluent_kafka.c +++ b/src/confluent_kafka/src/confluent_kafka.c @@ -1605,7 +1605,7 @@ static void oauth_cb (rd_kafka_t *rk, const char *oauthbearer_config, Py_DECREF(result); PyErr_SetString(PyExc_TypeError, "expect returned value from oauth_cb " - "to be (token_str, expiry_time) tuple"); + "to be (token_str, expiry_time[, principal, extensions]) tuple"); goto err; }