Skip to content

Commit

Permalink
Merge pull request #1400 from confluentinc/v1.9.2rc
Browse files Browse the repository at this point in the history
  • Loading branch information
emasab committed Aug 3, 2022
2 parents c2c8b6b + 1021f55 commit cdc5f3b
Show file tree
Hide file tree
Showing 15 changed files with 317 additions and 135 deletions.
93 changes: 93 additions & 0 deletions .semaphore/semaphore.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
version: v1.0
name: Test on PR or create and upload wheels on tag.
agent:
machine:
type: s1-prod-mac-m1
global_job_config:
secrets:
- name: vault_sem2_approle
env_vars:
- name: LIBRDKAFKA_VERSION
value: v1.9.2
blocks:
- name: "Wheels: OSX x64"
run:
when: "tag =~ '.*'"
dependencies: []
task:
agent:
machine:
type: s1-prod-macos
env_vars:
- name: OS_NAME
value: osx
- name: ARCH
value: x64
jobs:
- name: Build
commands:
- cd $SEM_WORKSPACE
- export HOME=$SEM_WORKSPACE
- checkout
# needed on the self-hosted agent
- if [ ! -d ./tools ]; then cd $SEM_WORKSPACE/confluent-kafka-python; fi
- PIP_INSTALL_OPTIONS="--user" tools/wheels/build-wheels.sh "${LIBRDKAFKA_VERSION#v}" wheelhouse
- tar -czf wheelhouse-macOS-${ARCH}.tgz wheelhouse
- artifact push workflow wheelhouse-macOS-${ARCH}.tgz
- name: "Wheels: OSX arm64"
run:
when: "tag =~ '.*'"
dependencies: []
task:
env_vars:
- name: OS_NAME
value: osx
- name: CIBW_ARCHS
value: arm64
- name: ARCH
value: arm64
jobs:
- name: Build
commands:
- cd $SEM_WORKSPACE
- export HOME=$SEM_WORKSPACE
- checkout
# needed on the self-hosted agent
- if [ ! -d ./tools ]; then cd $SEM_WORKSPACE/confluent-kafka-python; fi
- PIP_INSTALL_OPTIONS="--user" tools/wheels/build-wheels.sh "${LIBRDKAFKA_VERSION#v}" wheelhouse
- tar -czf wheelhouse-macOS-${ARCH}.tgz wheelhouse
- artifact push workflow wheelhouse-macOS-${ARCH}.tgz

- name: Source package verification with Python 3 (OSX x64) +docs
dependencies: []
task:
agent:
machine:
type: s1-prod-macos
env_vars:
- name: OS_NAME
value: osx
- name: ARCH
value: arm64
jobs:
- name: Build
commands:
- cd $SEM_WORKSPACE
- export HOME=$SEM_WORKSPACE
- checkout
# needed on the self-hosted agent
- if [ ! -d ./tools ]; then cd $SEM_WORKSPACE/confluent-kafka-python; fi
# use a virtualenv
- python3 -m venv _venv && source _venv/bin/activate
- pip install -r docs/requirements.txt
- pip install -U protobuf
# install librdkafka
- lib_dir=dest/runtimes/$OS_NAME-$ARCH/native
- tools/wheels/install-librdkafka.sh "${LIBRDKAFKA_VERSION#v}" dest
- export CFLAGS="$CFLAGS -I${PWD}/dest/build/native/include"
- export LDFLAGS="$LDFLAGS -L${PWD}/${lib_dir}"
- export LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$PWD/$lib_dir"
- export DYLD_LIBRARY_PATH="$DYLD_LIBRARY_PATH:$PWD/$lib_dir"
# install confluent-kafka
- python setup.py build && python setup.py install
- make docs
33 changes: 3 additions & 30 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,37 +1,17 @@
env:
global:
- LIBRDKAFKA_VERSION=v1.9.0
- LIBRDKAFKA_VERSION=v1.9.2

jobs:
include:
- name: "Source package verification with Python 2.7 (Linux)"
if: false
- name: "Source package verification with Python 3.8 (Linux)"
os: linux
language: python
dist: xenial
python: "2.7"
env: LD_LIBRARY_PATH="$PWD/tmp-build/lib"
services: docker

- name: "Source package verification with Python 3.6 (Linux)"
os: linux
language: python
dist: xenial
python: "3.6"
python: "3.8"
env: LD_LIBRARY_PATH="$PWD/tmp-build/lib"
services: docker

- name: "Source package verification with Python 2.7 (OSX)"
if: false
os: osx
python: "2.7"
env: DYLD_LIBRARY_PATH="$PWD/tmp-build/lib" INTERPRETER_VERSION="2.7.17"

- name: "Source package verification with Python 3.6 (OSX) +docs"
os: osx
python: "3.6"
env: DYLD_LIBRARY_PATH="$PWD/tmp-build/lib" MK_DOCS="y" INTERPRETER_VERSION="3.6.5"

- name: "Wheels: Windows x64"
if: tag is present
os: windows
Expand Down Expand Up @@ -77,13 +57,6 @@ jobs:
env: BUILD_WHEELS=1
script: tools/wheels/build-wheels.sh ${LIBRDKAFKA_VERSION#v} wheelhouse


# Install test dependencies unconditionally
# Travis OSX envs requires some setup; see tools/prepare-osx.sh
# Install cibuildwheel if this is a tagged PR
before_install:
- if [[ $TRAVIS_OS_NAME == "osx" && $BUILD_WHEELS != 1 ]]; then tools/prepare-osx.sh ${INTERPRETER_VERSION} /tmp/venv && source /tmp/venv/bin/activate; fi

install:
# Install interceptors
- tools/install-interceptors.sh
Expand Down
28 changes: 28 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,33 @@
# Confluent's Python client for Apache Kafka

## v1.9.2

v1.9.2 is a maintenance release with the following fixes and enhancements:

- Support for setting principal and SASL extensions in oauth_cb
and handle failures (@Manicben, #1402)
- Wheel for macOS M1/arm64
- KIP-140 Admin API ACL fix:
When requesting multiple create_acls or delete_acls operations,
if the provided ACL bindings or ACL binding filters are not
unique, an exception will be thrown immediately rather than later
when the responses are read. (#1370).
- KIP-140 Admin API ACL fix:
Better documentation of the describe and delete ACLs behavior
when using the MATCH resource patter type in a filter. (#1373).
- Avro serialization examples:
added a parameter for using a generic or specific Avro schema. (#1381).

confluent-kafka-python is based on librdkafka v1.9.2, see the
[librdkafka release notes](https://github.com/edenhill/librdkafka/releases/tag/v1.9.2)
for a complete list of changes, enhancements, fixes and upgrade considerations.


## v1.9.1

There was no 1.9.1 release of the Python Client.


## v1.9.0

This is a feature release:
Expand Down
2 changes: 1 addition & 1 deletion docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
# built documents.
#
# The short X.Y version.
version = '1.9.0'
version = '1.9.2'
# The full version, including alpha/beta/rc tags.
release = version
######################################################################
Expand Down
2 changes: 1 addition & 1 deletion examples/docker/Dockerfile.alpine
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ FROM alpine:3.12

COPY . /usr/src/confluent-kafka-python

ENV LIBRDKAFKA_VERSION v1.9.0
ENV LIBRDKAFKA_VERSION v1.9.2
ENV KAFKACAT_VERSION master


Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def get_install_requirements(path):
setup(name='confluent-kafka',
# Make sure to bump CFL_VERSION* in confluent_kafka/src/confluent_kafka.h
# and version in docs/conf.py.
version='1.9.0',
version='1.9.2',
description='Confluent\'s Python client for Apache Kafka',
author='Confluent Inc',
author_email='[email protected]',
Expand Down
105 changes: 98 additions & 7 deletions src/confluent_kafka/src/confluent_kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -1522,13 +1522,73 @@ static void log_cb (const rd_kafka_t *rk, int level,
CallState_resume(cs);
}

/**
* @brief Translate Python \p key and \p value to C types and set on
* provided \p extensions char* array at the provided index.
*
* @returns 1 on success or 0 if an exception was raised.
*/
static int py_extensions_to_c (char **extensions, Py_ssize_t idx,
PyObject *key, PyObject *value) {
PyObject *ks, *ks8, *vo8 = NULL;
const char *k;
const char *v;
Py_ssize_t ksize = 0;
Py_ssize_t vsize = 0;

if (!(ks = cfl_PyObject_Unistr(key))) {
PyErr_SetString(PyExc_TypeError,
"expected extension key to be unicode "
"string");
return 0;
}

k = cfl_PyUnistr_AsUTF8(ks, &ks8);
ksize = (Py_ssize_t)strlen(k);

if (cfl_PyUnistr(_Check(value))) {
/* Unicode string, translate to utf-8. */
v = cfl_PyUnistr_AsUTF8(value, &vo8);
if (!v) {
Py_DECREF(ks);
Py_XDECREF(ks8);
return 0;
}
vsize = (Py_ssize_t)strlen(v);
} else {
PyErr_Format(PyExc_TypeError,
"expected extension value to be "
"unicode string, not %s",
((PyTypeObject *)PyObject_Type(value))->
tp_name);
Py_DECREF(ks);
Py_XDECREF(ks8);
return 0;
}

extensions[idx] = (char*)malloc(ksize);
strcpy(extensions[idx], k);
extensions[idx + 1] = (char*)malloc(vsize);
strcpy(extensions[idx + 1], v);

Py_DECREF(ks);
Py_XDECREF(ks8);
Py_XDECREF(vo8);

return 1;
}

static void oauth_cb (rd_kafka_t *rk, const char *oauthbearer_config,
void *opaque) {
Handle *h = opaque;
PyObject *eo, *result;
CallState *cs;
const char *token;
double expiry;
const char *principal = "";
PyObject *extensions = NULL;
char **rd_extensions = NULL;
Py_ssize_t rd_extensions_size = 0;
char err_msg[2048];
rd_kafka_resp_err_t err_code;

Expand All @@ -1539,26 +1599,57 @@ static void oauth_cb (rd_kafka_t *rk, const char *oauthbearer_config,
Py_DECREF(eo);

if (!result) {
goto err;
goto fail;
}
if (!PyArg_ParseTuple(result, "sd", &token, &expiry)) {
if (!PyArg_ParseTuple(result, "sd|sO!", &token, &expiry, &principal, &PyDict_Type, &extensions)) {
Py_DECREF(result);
PyErr_Format(PyExc_TypeError,
PyErr_SetString(PyExc_TypeError,
"expect returned value from oauth_cb "
"to be (token_str, expiry_time) tuple");
"to be (token_str, expiry_time[, principal, extensions]) tuple");
goto err;
}

if (extensions) {
int len = (int)PyDict_Size(extensions);
rd_extensions = (char **)malloc(2 * len * sizeof(char *));
Py_ssize_t pos = 0;
PyObject *ko, *vo;
while (PyDict_Next(extensions, &pos, &ko, &vo)) {
if (!py_extensions_to_c(rd_extensions, rd_extensions_size, ko, vo)) {
Py_DECREF(result);
free(rd_extensions);
goto err;
}
rd_extensions_size = rd_extensions_size + 2;
}
}

err_code = rd_kafka_oauthbearer_set_token(h->rk, token,
(int64_t)(expiry * 1000),
"", NULL, 0, err_msg,
principal, (const char **)rd_extensions, rd_extensions_size, err_msg,
sizeof(err_msg));
Py_DECREF(result);
if (err_code) {
if (rd_extensions) {
for(int i = 0; i < rd_extensions_size; i++) {
free(rd_extensions[i]);
}
free(rd_extensions);
}

if (err_code != RD_KAFKA_RESP_ERR_NO_ERROR) {
PyErr_Format(PyExc_ValueError, "%s", err_msg);
goto err;
goto fail;
}
goto done;

fail:
err_code = rd_kafka_oauthbearer_set_token_failure(h->rk, "OAuth callback raised exception");
if (err_code != RD_KAFKA_RESP_ERR_NO_ERROR) {
PyErr_SetString(PyExc_ValueError, "Failed to set token failure");
goto err;
}
PyErr_Clear();
goto done;
err:
CallState_crash(cs);
rd_kafka_yield(h->rk);
Expand Down
4 changes: 2 additions & 2 deletions src/confluent_kafka/src/confluent_kafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@
* 0xMMmmRRPP
* MM=major, mm=minor, RR=revision, PP=patchlevel (not used)
*/
#define CFL_VERSION 0x01090000
#define CFL_VERSION_STR "1.9.0"
#define CFL_VERSION 0x01090200
#define CFL_VERSION_STR "1.9.2"

/**
* Minimum required librdkafka version. This is checked both during
Expand Down
8 changes: 4 additions & 4 deletions tests/integration/integration_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1126,10 +1126,10 @@ def verify_avro_explicit_read_schema():
msgcount += 1
# Avro schema projection should return the two fields not present in the writer schema
try:
assert(msg.key().get('favorite_number') == 42)
assert(msg.key().get('favorite_color') == "purple")
assert(msg.value().get('favorite_number') == 42)
assert(msg.value().get('favorite_color') == "purple")
assert (msg.key().get('favorite_number') == 42)
assert (msg.key().get('favorite_color') == "purple")
assert (msg.value().get('favorite_number') == 42)
assert (msg.value().get('favorite_color') == "purple")
print("success: schema projection worked for explicit reader schema")
except KeyError:
raise confluent_kafka.avro.SerializerError("Schema projection failed when setting reader schema.")
Expand Down
Loading

0 comments on commit cdc5f3b

Please sign in to comment.