Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JSON-to-table adapter #2975

Draft
wants to merge 105 commits into
base: main
Choose a base branch
from
Draft

JSON-to-table adapter #2975

wants to merge 105 commits into from

Conversation

rbasralian
Copy link
Contributor

@rbasralian rbasralian commented Oct 7, 2022

High-throughput JSON adapter from DHE. Supports parallel processing of JSON records, handling of nested values, and expansion of array nodes. Also supports writing data from nested array nodes to separate tables.

Major differences from DHE:

  • Support subtables (parsing an array field of one node into a a separate table)
  • Ability to process messages synchronously (helpful in tests; required for subtables)

It could use another API layer to allow the DynamicTableWriter and the JSONToTableWriter adapter to be defined at the same time (this is not a problem in DHE because the table writer is basically defined by the XML schema).

We could also create something that generates an adapter based on a JSON schema.

@rcaudy rcaudy added the json label Oct 10, 2022
@rcaudy rcaudy added this to the Oct 2022 milestone Oct 10, 2022
Copy link
Member

@devinrsmith devinrsmith left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a very large changeset - I have not gone over it all; I would like to get some perspective before digging down into the nitty gritty details.

The ability to take json and turn it into a table is a great feature. (I would very much like it to not be a feature gated behind Kafka as it currently exists outside of this PR.) Raffi - are you the driving force behind wanting to get this feature into DHC, or is there a DHC user request?

My first concern is that we have bifurcated paths for parsing json - Kafka Json vs extensions-json. In an ideal world, Kafka Json would be able to explicitly use/share this code path.

One of the parts that makes the above challenging is that this PR is exhausting to a DynamicTableWriter. Instead, I think exhausting to a stream table makes a lot of sense. That makes it possible for the end user to consume a stream, a ring, or an append table. (I can't speak to the inner-workings of the Kafka Json parsing, but I do have some experience w/ stream table creation.)

There is also some language and structural interface around this PR that makes it more "enterprise" focused. "Database", "ingester", "write to disk". I'd like to make sure that we are exposing an appropriate interface for the DHC side.

I'm also concerned about the builders for creating the translation from JsonNode -> Table columns. Again, I haven't dug into the nitty-gritty, and I know that we want to have to handle complex nested cases, but I think it's a very important user facing part I'll want to take a closer look at.

I'm wondering if there may be a two-tiered approach we might take to simplify this process. A layer that is explicitly centered around JsonNode, with mapping from JsonPointer to type / WritableChunk. And then a user-friendly layer on top of it.

engine/time/src/main/java/io/deephaven/time/DateTime.java Outdated Show resolved Hide resolved
extensions/json/build.gradle Outdated Show resolved Hide resolved
Comment on lines +21 to +25
// Using io.confluent dependencies requires code in the toplevel build.gradle to add their maven repository.
// Note: the -ccs flavor is provided by confluent as their community edition. It is equivalent to the maven central
// version, but has a different version to make it easier to keep confluent dependencies aligned.
api 'org.apache.kafka:kafka-clients:7.1.1-ccs'
api 'io.confluent:kafka-avro-serializer:7.1.1'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused why json needs dependency on kafka. Maybe it will become clear as I continue on in PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think only because of io.deephaven.kafka.ingest.JsonNodeUtil — meant to move that here and make Kafka depend on this module, but didn't get around to it yet

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, moved that class & flipped the dependencies

project(path: ':configs')


Classpaths.inheritSlf4j(project, 'slf4j-simple', 'runtimeOnly')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not be depending on slf4j-simple (or any other concrete logging impls) that persists to runtime from a java-library. It's ok to do it for testRuntimeOnly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Comment on lines 55 to 58
// 'node==null || node.isMissingNode()' is OK here, because missing keys
// are allowed (which implicitly means null values for those keys are allowed).
// only explicit null values are disallowed.
if (!allowNullValues && node != null && node.isNull()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment doesn't line up w/ the code?

It's theoretically possible that an user/app would want to allow a missing field but disallow explicitly null.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the problem before was that !allowMissingKeys && allowNullValues didn't work — if you have a missing key, the node variable will just be null already (because we are calling this with the result of node.get(key), which generally returns null if the key is missing.

But if the key is there, with an explicit null value, then you will have a non-null node reference (so node != null) but it will be a com.fasterxml.jackson.databind.node.NullNode (so node.isNull())

Comment on lines 25 to 26
* @param ingestTime The time when this message was finished processing by its ingester and was ready to be written
* to disk.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We aren't writing messages to disk, are we?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nope that's just from the original files — fixed this and a few others

Comment on lines 19 to 21
/**
* The builder configures a factory for StringToTableWriterAdapters that accept JSON strings and writes a table.
*/
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do the interfaces actually require creating strings? (I thought that the kafka one operated on bytes or input stream...)

I think forcing json to wash through strings (if that indeed is the case) is a design limitation. I'd love for this json parsing logic to be used by kafka json.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

currently they do — the original code in DHE received messages from another library that only provided string message content. but I don't think it will be too tough to change this to takes bytes/streams/pre-parsed JsonNodes.

Comment on lines 1217 to 1223
try {
// this will unfortunately take MAX_WAIT_MILLIS to finish
adapter.waitForProcessing(MAX_WAIT_MILLIS);
Assert.fail("Expected timeout exception did not occur");
} catch (final TimeoutException ex) {
// expected;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, we need a better way to test this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed...

I renamed this to testWaitForProcessingTimeout() since that's all it's really testing.

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public interface DataToTableWriterAdapter {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This interface seems more generic than TableWriter - could probably be renamed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renamed to AsynchronousDataIngester

Comment on lines 26 to 28
/**
* Shut down the adapter. This <b>must not run {@link #cleanup cleanup}</b>; that is handled by the StreamContext.
*/
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's StreamContext?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StreamContext was in the original code. The point of this was "don't call cleanup() from just anywhere if you're writing to a DHE DIS" — updated it to be more generic/hopefully explain the problem

devinrsmith and others added 27 commits February 12, 2024 14:47
# Conflicts:
#	R/rdeephaven/DESCRIPTION
#	engine/table/src/main/java/io/deephaven/engine/table/impl/InitialFilterExecution.java
#	engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java
#	engine/table/src/main/java/io/deephaven/engine/table/impl/WhereListener.java
#	engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableWhereTest.java
#	gradle.properties
#	py/server/deephaven/_udf.py
#	web/client-api/types/package.json
…s when constructing adapter instead of in MutableInts when running field processors (which only worked with multithreading because the field processors happened to always run in the same order).
# Conflicts:
#	engine/table/src/main/java/io/deephaven/stream/StreamPublisherBase.java
#	extensions/json/src/main/java/io/deephaven/jsoningester/JsonNodeUtil.java
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants