Skip to content

Commit

Permalink
apacheGH-41056: [GLib][FlightRPC] Add gaflight_client_do_put() and re…
Browse files Browse the repository at this point in the history
…lated APIs (apache#43813)

### Rationale for this change

DoPut is needed to upload data.

### What changes are included in this PR?

* Add `gaflight_client_do_put()`
* Add `GAFlightStreamWriter`
* Add `GAFlightMetadataReader`
* Add `GAFlightDoPutResult`
* Fix `GAFlightRecordBatchWriter` API

### Are these changes tested?

No. They aren't tested yet. We will add tests when we implement server side DoPut.

### Are there any user-facing changes?

Yes.
* GitHub Issue: apache#41056

Authored-by: Sutou Kouhei <[email protected]>
Signed-off-by: Sutou Kouhei <[email protected]>
  • Loading branch information
kou authored Aug 27, 2024
1 parent fa5d158 commit c30bb6a
Show file tree
Hide file tree
Showing 6 changed files with 421 additions and 92 deletions.
337 changes: 336 additions & 1 deletion c_glib/arrow-flight-glib/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,19 @@ G_BEGIN_DECLS
* #GAFlightStreamReader is a class for reading record batches from a
* server.
*
* #GAFlightStreamWriter is a class for writing record batches to a
* server.
*
* #GAFlightMetadataReader is a class for reading metadata from a
* server.
*
* #GAFlightCallOptions is a class for options of each call.
*
* #GAFlightClientOptions is a class for options of each client.
*
* #GAFlightDoPutResult is a class that has gaflight_client_do_put()
* result.
*
* #GAFlightClient is a class for Apache Arrow Flight client.
*
* Since: 5.0.0
Expand All @@ -56,6 +65,128 @@ gaflight_stream_reader_class_init(GAFlightStreamReaderClass *klass)
{
}

G_DEFINE_TYPE(GAFlightStreamWriter,
gaflight_stream_writer,
GAFLIGHT_TYPE_RECORD_BATCH_WRITER)

static void
gaflight_stream_writer_init(GAFlightStreamWriter *object)
{
}

static void
gaflight_stream_writer_class_init(GAFlightStreamWriterClass *klass)
{
}

/**
* gaflight_stream_writer_done_writing:
* @writer: A #GAFlightStreamWriter.
* @error: (nullable): Return location for a #GError or %NULL.
*
* Returns: %TRUE on success, %FALSE on error.
*
* Since: 18.0.0
*/
gboolean
gaflight_stream_writer_done_writing(GAFlightStreamWriter *writer, GError **error)
{
auto flight_writer = std::static_pointer_cast<arrow::flight::FlightStreamWriter>(
garrow_record_batch_writer_get_raw(GARROW_RECORD_BATCH_WRITER(writer)));
return garrow::check(error,
flight_writer->DoneWriting(),
"[flight-stream-writer][done-writing]");
}

struct GAFlightMetadataReaderPrivate
{
arrow::flight::FlightMetadataReader *reader;
};

enum {
PROP_METADATA_READER_READER = 1,
};

G_DEFINE_TYPE_WITH_PRIVATE(GAFlightMetadataReader,
gaflight_metadata_reader,
G_TYPE_OBJECT)

#define GAFLIGHT_METADATA_READER_GET_PRIVATE(object) \
static_cast<GAFlightMetadataReaderPrivate *>( \
gaflight_metadata_reader_get_instance_private(GAFLIGHT_METADATA_READER(object)))

static void
gaflight_metadata_reader_finalize(GObject *object)
{
auto priv = GAFLIGHT_METADATA_READER_GET_PRIVATE(object);
delete priv->reader;
G_OBJECT_CLASS(gaflight_metadata_reader_parent_class)->finalize(object);
}

static void
gaflight_metadata_reader_set_property(GObject *object,
guint prop_id,
const GValue *value,
GParamSpec *pspec)
{
auto priv = GAFLIGHT_METADATA_READER_GET_PRIVATE(object);

switch (prop_id) {
case PROP_METADATA_READER_READER:
priv->reader =
static_cast<arrow::flight::FlightMetadataReader *>(g_value_get_pointer(value));
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
break;
}
}

static void
gaflight_metadata_reader_init(GAFlightMetadataReader *object)
{
}

static void
gaflight_metadata_reader_class_init(GAFlightMetadataReaderClass *klass)
{
auto gobject_class = G_OBJECT_CLASS(klass);

gobject_class->finalize = gaflight_metadata_reader_finalize;
gobject_class->set_property = gaflight_metadata_reader_set_property;

GParamSpec *spec;
spec = g_param_spec_pointer(
"reader",
nullptr,
nullptr,
static_cast<GParamFlags>(G_PARAM_WRITABLE | G_PARAM_CONSTRUCT_ONLY));
g_object_class_install_property(gobject_class, PROP_METADATA_READER_READER, spec);
}

/**
* gaflight_metadata_reader_read:
* @reader: A #GAFlightMetadataReader.
* @error: (nullable): Return location for a #GError or %NULL.
*
* Returns: (transfer full): The metadata on success, %NULL on error.
*
* Since: 18.0.0
*/
GArrowBuffer *
gaflight_metadata_reader_read(GAFlightMetadataReader *reader, GError **error)
{
auto flight_reader = gaflight_metadata_reader_get_raw(reader);
std::shared_ptr<arrow::Buffer> metadata;
if (garrow::check(error,
flight_reader->ReadMetadata(&metadata),
"[flight-metadata-reader][read]")) {
return garrow_buffer_new_raw(&metadata);
} else {
return nullptr;
}
}

typedef struct GAFlightCallOptionsPrivate_
{
arrow::flight::FlightCallOptions options;
Expand Down Expand Up @@ -385,6 +516,137 @@ gaflight_client_options_new(void)
g_object_new(GAFLIGHT_TYPE_CLIENT_OPTIONS, NULL));
}

struct GAFlightDoPutResultPrivate
{
GAFlightStreamWriter *writer;
GAFlightMetadataReader *reader;
};

enum {
PROP_DO_PUT_RESULT_RESULT = 1,
PROP_DO_PUT_RESULT_WRITER,
PROP_DO_PUT_RESULT_READER,
};

G_DEFINE_TYPE_WITH_PRIVATE(GAFlightDoPutResult, gaflight_do_put_result, G_TYPE_OBJECT)

#define GAFLIGHT_DO_PUT_RESULT_GET_PRIVATE(object) \
static_cast<GAFlightDoPutResultPrivate *>( \
gaflight_do_put_result_get_instance_private(GAFLIGHT_DO_PUT_RESULT(object)))

static void
gaflight_do_put_result_dispose(GObject *object)
{
auto priv = GAFLIGHT_DO_PUT_RESULT_GET_PRIVATE(object);

if (priv->writer) {
g_object_unref(priv->writer);
priv->writer = nullptr;
}

if (priv->reader) {
g_object_unref(priv->reader);
priv->reader = nullptr;
}

G_OBJECT_CLASS(gaflight_do_put_result_parent_class)->dispose(object);
}

static void
gaflight_do_put_result_init(GAFlightDoPutResult *object)
{
}

static void
gaflight_do_put_result_set_property(GObject *object,
guint prop_id,
const GValue *value,
GParamSpec *pspec)
{
auto priv = GAFLIGHT_DO_PUT_RESULT_GET_PRIVATE(object);

switch (prop_id) {
case PROP_DO_PUT_RESULT_RESULT:
{
auto result = static_cast<arrow::flight::FlightClient::DoPutResult *>(
g_value_get_pointer(value));
priv->writer = gaflight_stream_writer_new_raw(result->writer.release());
priv->reader = gaflight_metadata_reader_new_raw(result->reader.release());
break;
}
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
break;
}
}

static void
gaflight_do_put_result_get_property(GObject *object,
guint prop_id,
GValue *value,
GParamSpec *pspec)
{
auto priv = GAFLIGHT_DO_PUT_RESULT_GET_PRIVATE(object);

switch (prop_id) {
case PROP_DO_PUT_RESULT_WRITER:
g_value_set_object(value, priv->writer);
break;
case PROP_DO_PUT_RESULT_READER:
g_value_set_object(value, priv->reader);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
break;
}
}

static void
gaflight_do_put_result_class_init(GAFlightDoPutResultClass *klass)
{
auto gobject_class = G_OBJECT_CLASS(klass);

gobject_class->dispose = gaflight_do_put_result_dispose;
gobject_class->set_property = gaflight_do_put_result_set_property;
gobject_class->get_property = gaflight_do_put_result_get_property;

GParamSpec *spec;
spec = g_param_spec_pointer(
"result",
nullptr,
nullptr,
static_cast<GParamFlags>(G_PARAM_WRITABLE | G_PARAM_CONSTRUCT_ONLY));
g_object_class_install_property(gobject_class, PROP_DO_PUT_RESULT_RESULT, spec);

/**
* GAFlightDoPutResult:writer:
*
* A writer to write record batches to.
*
* Since: 18.0.0
*/
spec = g_param_spec_object("writer",
nullptr,
nullptr,
GAFLIGHT_TYPE_STREAM_WRITER,
static_cast<GParamFlags>(G_PARAM_READABLE));
g_object_class_install_property(gobject_class, PROP_DO_PUT_RESULT_WRITER, spec);

/**
* GAFlightDoPutResult:reader:
*
* A reader for application metadata from the server.
*
* Since: 18.0.0
*/
spec = g_param_spec_object("reader",
nullptr,
nullptr,
GAFLIGHT_TYPE_METADATA_READER,
static_cast<GParamFlags>(G_PARAM_READABLE));
g_object_class_install_property(gobject_class, PROP_DO_PUT_RESULT_READER, spec);
}

struct GAFlightClientPrivate
{
std::shared_ptr<arrow::flight::FlightClient> client;
Expand Down Expand Up @@ -661,6 +923,51 @@ gaflight_client_do_get(GAFlightClient *client,
return gaflight_stream_reader_new_raw(flight_reader.release(), TRUE);
}

/**
* gaflight_client_do_put:
* @client: A #GAFlightClient.
* @descriptor: A #GAFlightDescriptor.
* @schema: A #GArrowSchema.
* @options: (nullable): A #GAFlightCallOptions.
* @error: (nullable): Return location for a #GError or %NULL.
*
* Upload data to a Flight described by the given descriptor. The
* caller must call garrow_record_batch_writer_close() on the
* returned stream once they are done writing.
*
* The reader and writer are linked; closing the writer will also
* close the reader. Use garrow_flight_stream_writer_done_writing() to
* only close the write side of the channel.
*
* Returns: (nullable) (transfer full):
* The #GAFlighDoPutResult holding a reader and a writer on success,
* %NULL on error.
*
* Since: 18.0.0
*/
GAFlightDoPutResult *
gaflight_client_do_put(GAFlightClient *client,
GAFlightDescriptor *descriptor,
GArrowSchema *schema,
GAFlightCallOptions *options,
GError **error)
{
auto flight_client = gaflight_client_get_raw(client);
auto flight_descriptor = gaflight_descriptor_get_raw(descriptor);
auto arrow_schema = garrow_schema_get_raw(schema);
arrow::flight::FlightCallOptions flight_default_options;
auto flight_options = &flight_default_options;
if (options) {
flight_options = gaflight_call_options_get_raw(options);
}
auto result = flight_client->DoPut(*flight_options, *flight_descriptor, arrow_schema);
if (!garrow::check(error, result, "[flight-client][do-put]")) {
return nullptr;
}
auto flight_result = std::move(*result);
return gaflight_do_put_result_new_raw(&flight_result);
}

G_END_DECLS

GAFlightStreamReader *
Expand All @@ -672,7 +979,28 @@ gaflight_stream_reader_new_raw(arrow::flight::FlightStreamReader *flight_reader,
flight_reader,
"is-owner",
is_owner,
NULL));
nullptr));
}

GAFlightStreamWriter *
gaflight_stream_writer_new_raw(arrow::flight::FlightStreamWriter *flight_writer)
{
return GAFLIGHT_STREAM_WRITER(
g_object_new(GAFLIGHT_TYPE_STREAM_WRITER, "writer", flight_writer, nullptr));
}

GAFlightMetadataReader *
gaflight_metadata_reader_new_raw(arrow::flight::FlightMetadataReader *flight_reader)
{
return GAFLIGHT_METADATA_READER(
g_object_new(GAFLIGHT_TYPE_METADATA_READER, "reader", flight_reader, nullptr));
}

arrow::flight::FlightMetadataReader *
gaflight_metadata_reader_get_raw(GAFlightMetadataReader *reader)
{
auto priv = GAFLIGHT_METADATA_READER_GET_PRIVATE(reader);
return priv->reader;
}

arrow::flight::FlightCallOptions *
Expand All @@ -689,6 +1017,13 @@ gaflight_client_options_get_raw(GAFlightClientOptions *options)
return &(priv->options);
}

GAFlightDoPutResult *
gaflight_do_put_result_new_raw(arrow::flight::FlightClient::DoPutResult *flight_result)
{
return GAFLIGHT_DO_PUT_RESULT(
g_object_new(GAFLIGHT_TYPE_DO_PUT_RESULT, "result", flight_result, nullptr));
}

std::shared_ptr<arrow::flight::FlightClient>
gaflight_client_get_raw(GAFlightClient *client)
{
Expand Down
Loading

0 comments on commit c30bb6a

Please sign in to comment.