Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

produce opaque #21

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions configuration.c
Original file line number Diff line number Diff line change
Expand Up @@ -158,29 +158,30 @@ static void kafka_conf_error_cb(rd_kafka_t *rk, int err, const char *reason, voi
zval_ptr_dtor(&args[2]);
}

static void kafka_conf_dr_msg_cb(rd_kafka_t *rk, const rd_kafka_message_t *msg, void *opaque)
void kafka_conf_dr_msg_cb(rd_kafka_t *rk, const rd_kafka_message_t *msg, void *opaque)
{
kafka_conf_callbacks *cbs = (kafka_conf_callbacks*) opaque;
zval args[2];

if (!opaque) {
return;
}
zval args[3];

if (!cbs->dr_msg) {
if (!opaque || !cbs->dr_msg) {
return;
}

ZVAL_NULL(&args[0]);
ZVAL_NULL(&args[1]);
ZVAL_NULL(&args[2]);

ZVAL_ZVAL(&args[0], &cbs->zrk, 1, 0);
kafka_message_new(&args[1], msg);
if (NULL != msg->_private) {
ZVAL_ZVAL(&args[2], msg->_private, 1, 0);
}

kafka_call_function(&cbs->dr_msg->fci, &cbs->dr_msg->fcc, NULL, 2, args);
kafka_call_function(&cbs->dr_msg->fci, &cbs->dr_msg->fcc, NULL, 3, args);

zval_ptr_dtor(&args[0]);
zval_ptr_dtor(&args[1]);
zval_ptr_dtor(&args[2]);
}

static int kafka_conf_stats_cb(rd_kafka_t *rk, char *json, size_t json_len, void *opaque)
Expand Down
8 changes: 7 additions & 1 deletion php_simple_kafka_client_int.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,16 @@ typedef void (*kafka_metadata_collection_ctor_t)(zval *renurn_value, zval *zmeta

#else // PHP 7

#define IS_MIXED 16

#define Z_KAFKA_OBJ zval

#define Z_KAFKA_PROP_OBJ(object) object

#define kafka_get_debug_object(type, object) get_object(object)

#define ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(pass_by_ref, name, type_hint, allow_null, default_value) ZEND_ARG_INFO(pass_by_ref, name)
#define ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(pass_by_ref, name, type_hint, allow_null, default_value) \
ZEND_ARG_INFO(pass_by_ref, name)

#define Z_PARAM_ARRAY_HT_OR_NULL(dest) \
Z_PARAM_ARRAY_HT_EX(dest, 1, 0)
Expand All @@ -112,6 +115,9 @@ typedef void (*kafka_metadata_collection_ctor_t)(zval *renurn_value, zval *zmeta
#define Z_PARAM_STRING_OR_NULL(dest, dest_len) \
Z_PARAM_STRING_EX(dest, dest_len, 1, 0)

#define Z_PARAM_ZVAL_OR_NULL(dest) \
Z_PARAM_ZVAL_EX(dest, 1, 0)

#endif

#ifdef PHP_WIN32
Expand Down
61 changes: 61 additions & 0 deletions tests/produce_with_opaque.phpt
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
--TEST--
Produce, consume
--SKIPIF--
<?php
require __DIR__ . '/integration-tests-check.php';
--FILE--
<?php
require __DIR__ . '/integration-tests-check.php';

$conf = new SimpleKafkaClient\Configuration();
$conf->set('metadata.broker.list', getenv('TEST_KAFKA_BROKERS'));

$conf->setDrMsgCb(function (SimpleKafkaClient\Producer $kafka, SimpleKafkaClient\Message $message, $opaque) {
if (RD_KAFKA_RESP_ERR_NO_ERROR !== $message->err) {
$errorStr = rd_kafka_err2str($message->err);

echo sprintf('Message FAILED (%s, %s) to send with payload => %s', $message->err, $errorStr, $message->payload) . PHP_EOL;
} else {
if (false === is_string($opaque)) {
$opaque = 'opaque was already freed';
}

echo sprintf('Message opaque: %s', $opaque) . PHP_EOL;
}
});

$producer = new SimpleKafkaClient\Producer($conf);
$topic = $producer->getTopicHandle('pure-php-test-topic');
$amountTestMessages = 10;

for ($i = 0; $i < $amountTestMessages; ++$i) {
$topic->producev(
RD_KAFKA_PARTITION_UA,
RD_KAFKA_MSG_F_BLOCK, // will block produce if queue is full
sprintf('test message-%d',$i),
sprintf('test-key-%d', $i),
[
'some' => sprintf('header value %d', $i)
],
null,
"opaque $i"
);

$producer->poll(0);
}

$result = $producer->flush(20000);
if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
echo 'Was not able to shutdown within 20s. Messages might be lost!' . PHP_EOL;
}
--EXPECT--
Message key test-key-0 and opaque: opaque 0
Message key test-key-1 and opaque: opaque 1
Message key test-key-2 and opaque: opaque 2
Message key test-key-3 and opaque: opaque 3
Message key test-key-4 and opaque: opaque 4
Message key test-key-5 and opaque: opaque 5
Message key test-key-6 and opaque: opaque 6
Message key test-key-7 and opaque: opaque 7
Message key test-key-8 and opaque: opaque 8
Message key test-key-9 and opaque: opaque 9
20 changes: 16 additions & 4 deletions topic.c
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,15 @@ ZEND_METHOD(SimpleKafkaClient_ProducerTopic, produce)
int ret;
rd_kafka_resp_err_t err;
kafka_topic_object *intern;
zval *opaque = NULL;

ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 2, 4)
ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 2, 5)
Z_PARAM_LONG(partition)
Z_PARAM_LONG(msgflags)
Z_PARAM_OPTIONAL
Z_PARAM_STRING_OR_NULL(payload, payload_len)
Z_PARAM_STRING_OR_NULL(key, key_len)
Z_PARAM_ZVAL_OR_NULL(opaque)
ZEND_PARSE_PARAMETERS_END();

if (partition != RD_KAFKA_PARTITION_UA && (partition < 0 || partition > 0x7FFFFFFF)) {
Expand All @@ -132,9 +134,13 @@ ZEND_METHOD(SimpleKafkaClient_ProducerTopic, produce)
return;
}

if (NULL != opaque) {
Z_ADDREF_P(opaque);
}

intern = get_kafka_topic_object(getThis());

ret = rd_kafka_produce(intern->rkt, partition, msgflags | RD_KAFKA_MSG_F_COPY, payload, payload_len, key, key_len, NULL);
ret = rd_kafka_produce(intern->rkt, partition, msgflags | RD_KAFKA_MSG_F_COPY, payload, payload_len, key, key_len, opaque);

if (ret == -1) {
err = rd_kafka_last_error();
Expand All @@ -160,19 +166,20 @@ ZEND_METHOD(SimpleKafkaClient_ProducerTopic, producev)
HashTable *headersParam = NULL;
HashPosition headersParamPos;
char *header_key;
zval *header_value;
zval *header_value, *opaque = NULL;
rd_kafka_headers_t *headers;
zend_long timestamp_ms = 0;
zend_bool timestamp_ms_is_null = 0;

ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 2, 6)
ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 2, 7)
Z_PARAM_LONG(partition)
Z_PARAM_LONG(msgflags)
Z_PARAM_OPTIONAL
Z_PARAM_STRING_OR_NULL(payload, payload_len)
Z_PARAM_STRING_OR_NULL(key, key_len)
Z_PARAM_ARRAY_HT_OR_NULL(headersParam)
Z_PARAM_LONG_OR_NULL(timestamp_ms, timestamp_ms_is_null)
Z_PARAM_ZVAL_OR_NULL(opaque)
ZEND_PARSE_PARAMETERS_END();

if (partition != RD_KAFKA_PARTITION_UA && (partition < 0 || partition > 0x7FFFFFFF)) {
Expand All @@ -185,6 +192,10 @@ ZEND_METHOD(SimpleKafkaClient_ProducerTopic, producev)
return;
}

if (NULL != opaque) {
Z_ADDREF_P(opaque);
}

if (timestamp_ms_is_null == 1) {
timestamp_ms = 0;
}
Expand Down Expand Up @@ -224,6 +235,7 @@ ZEND_METHOD(SimpleKafkaClient_ProducerTopic, producev)
RD_KAFKA_V_KEY(key, key_len),
RD_KAFKA_V_TIMESTAMP(timestamp_ms),
RD_KAFKA_V_HEADERS(headers),
RD_KAFKA_V_OPAQUE(opaque),
RD_KAFKA_V_END
);

Expand Down
4 changes: 2 additions & 2 deletions topic.stub.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class ProducerTopic extends Topic
{
private function __construct() {}

public function produce(int $partition, int $msgFlags, ?string $payload = null, ?string $key = null): void {}
public function produce(int $partition, int $msgFlags, ?string $payload = null, ?string $key = null, mixed $opaque = null): void {}

public function producev(int $partition, int $msgFlags, ?string $payload = null, ?string $key = null, ?array $headers = null, ?int $timestampMs = null): void {}
public function producev(int $partition, int $msgFlags, ?string $payload = null, ?string $key = null, ?array $headers = null, ?int $timestampMs = null, mixed $opaque = null): void {}
}
4 changes: 3 additions & 1 deletion topic_arginfo.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/* This is a generated file, edit the .stub.php file instead.
* Stub hash: 679e8a50ee764eb12f6a43ecaa2bf396c74235dc */
* Stub hash: 88d2ac53ad8266413f1ab448883a9b2b439120ef */

ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Topic_getName, 0, 0, IS_STRING, 0)
ZEND_END_ARG_INFO()
Expand All @@ -14,6 +14,7 @@ ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Producer
ZEND_ARG_TYPE_INFO(0, msgFlags, IS_LONG, 0)
ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, payload, IS_STRING, 1, "null")
ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, key, IS_STRING, 1, "null")
ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, opaque, IS_MIXED, 0, "null")
ZEND_END_ARG_INFO()

ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_ProducerTopic_producev, 0, 2, IS_VOID, 0)
Expand All @@ -23,6 +24,7 @@ ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Producer
ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, key, IS_STRING, 1, "null")
ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, headers, IS_ARRAY, 1, "null")
ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, timestampMs, IS_LONG, 1, "null")
ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, opaque, IS_MIXED, 0, "null")
ZEND_END_ARG_INFO()


Expand Down