-
Notifications
You must be signed in to change notification settings - Fork 80
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
JSON-to-table adapter #2975
base: main
Are you sure you want to change the base?
JSON-to-table adapter #2975
Conversation
…to support writing to multiple tables from a single JSON record. also an `instantToTime()` function.
…in consumer threads.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a very large changeset - I have not gone over it all; I would like to get some perspective before digging down into the nitty gritty details.
The ability to take json and turn it into a table is a great feature. (I would very much like it to not be a feature gated behind Kafka as it currently exists outside of this PR.) Raffi - are you the driving force behind wanting to get this feature into DHC, or is there a DHC user request?
My first concern is that we have bifurcated paths for parsing json - Kafka Json vs extensions-json. In an ideal world, Kafka Json would be able to explicitly use/share this code path.
One of the parts that makes the above challenging is that this PR is exhausting to a DynamicTableWriter. Instead, I think exhausting to a stream table makes a lot of sense. That makes it possible for the end user to consume a stream, a ring, or an append table. (I can't speak to the inner-workings of the Kafka Json parsing, but I do have some experience w/ stream table creation.)
There is also some language and structural interface around this PR that makes it more "enterprise" focused. "Database", "ingester", "write to disk". I'd like to make sure that we are exposing an appropriate interface for the DHC side.
I'm also concerned about the builders for creating the translation from JsonNode -> Table columns. Again, I haven't dug into the nitty-gritty, and I know that we want to have to handle complex nested cases, but I think it's a very important user facing part I'll want to take a closer look at.
I'm wondering if there may be a two-tiered approach we might take to simplify this process. A layer that is explicitly centered around JsonNode
, with mapping from JsonPointer
to type / WritableChunk. And then a user-friendly layer on top of it.
// Using io.confluent dependencies requires code in the toplevel build.gradle to add their maven repository. | ||
// Note: the -ccs flavor is provided by confluent as their community edition. It is equivalent to the maven central | ||
// version, but has a different version to make it easier to keep confluent dependencies aligned. | ||
api 'org.apache.kafka:kafka-clients:7.1.1-ccs' | ||
api 'io.confluent:kafka-avro-serializer:7.1.1' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm confused why json
needs dependency on kafka. Maybe it will become clear as I continue on in PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Think only because of io.deephaven.kafka.ingest.JsonNodeUtil
— meant to move that here and make Kafka depend on this module, but didn't get around to it yet
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, moved that class & flipped the dependencies
extensions/json/build.gradle
Outdated
project(path: ':configs') | ||
|
||
|
||
Classpaths.inheritSlf4j(project, 'slf4j-simple', 'runtimeOnly') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should not be depending on slf4j-simple (or any other concrete logging impls) that persists to runtime from a java-library
. It's ok to do it for testRuntimeOnly
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
// 'node==null || node.isMissingNode()' is OK here, because missing keys | ||
// are allowed (which implicitly means null values for those keys are allowed). | ||
// only explicit null values are disallowed. | ||
if (!allowNullValues && node != null && node.isNull()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment doesn't line up w/ the code?
It's theoretically possible that an user/app would want to allow a missing field but disallow explicitly null.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the problem before was that !allowMissingKeys && allowNullValues
didn't work — if you have a missing key, the node
variable will just be null
already (because we are calling this with the result of node.get(key)
, which generally returns null
if the key is missing.
But if the key is there, with an explicit null value, then you will have a non-null node reference (so node != null)
but it will be a com.fasterxml.jackson.databind.node.NullNode
(so node.isNull()
)
* @param ingestTime The time when this message was finished processing by its ingester and was ready to be written | ||
* to disk. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We aren't writing messages to disk, are we?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nope that's just from the original files — fixed this and a few others
/** | ||
* The builder configures a factory for StringToTableWriterAdapters that accept JSON strings and writes a table. | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do the interfaces actually require creating strings? (I thought that the kafka one operated on bytes or input stream...)
I think forcing json to wash through strings (if that indeed is the case) is a design limitation. I'd love for this json parsing logic to be used by kafka json.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
currently they do — the original code in DHE received messages from another library that only provided string message content. but I don't think it will be too tough to change this to takes bytes/streams/pre-parsed JsonNodes.
try { | ||
// this will unfortunately take MAX_WAIT_MILLIS to finish | ||
adapter.waitForProcessing(MAX_WAIT_MILLIS); | ||
Assert.fail("Expected timeout exception did not occur"); | ||
} catch (final TimeoutException ex) { | ||
// expected; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO, we need a better way to test this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agreed...
I renamed this to testWaitForProcessingTimeout()
since that's all it's really testing.
import java.io.IOException; | ||
import java.util.concurrent.TimeoutException; | ||
|
||
public interface DataToTableWriterAdapter { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This interface seems more generic than TableWriter
- could probably be renamed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
renamed to AsynchronousDataIngester
/** | ||
* Shut down the adapter. This <b>must not run {@link #cleanup cleanup}</b>; that is handled by the StreamContext. | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's StreamContext?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
StreamContext was in the original code. The point of this was "don't call cleanup()
from just anywhere if you're writing to a DHE DIS" — updated it to be more generic/hopefully explain the problem
…ema at the same time
…he JSONToStreamPublisherAdapter)
# Conflicts: # R/rdeephaven/DESCRIPTION # engine/table/src/main/java/io/deephaven/engine/table/impl/InitialFilterExecution.java # engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java # engine/table/src/main/java/io/deephaven/engine/table/impl/WhereListener.java # engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableWhereTest.java # gradle.properties # py/server/deephaven/_udf.py # web/client-api/types/package.json
…s when constructing adapter instead of in MutableInts when running field processors (which only worked with multithreading because the field processors happened to always run in the same order).
# Conflicts: # engine/table/src/main/java/io/deephaven/stream/StreamPublisherBase.java # extensions/json/src/main/java/io/deephaven/jsoningester/JsonNodeUtil.java
High-throughput JSON adapter from DHE. Supports parallel processing of JSON records, handling of nested values, and expansion of array nodes. Also supports writing data from nested array nodes to separate tables.
Major differences from DHE:
It could use another API layer to allow the DynamicTableWriter and the JSONToTableWriter adapter to be defined at the same time (this is not a problem in DHE because the table writer is basically defined by the XML schema).
We could also create something that generates an adapter based on a JSON schema.