Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for json_spec configurability #4821

Closed
devinrsmith opened this issue Nov 13, 2023 · 1 comment
Closed

Support for json_spec configurability #4821

devinrsmith opened this issue Nov 13, 2023 · 1 comment
Assignees
Milestone

Comments

@devinrsmith
Copy link
Member

A user ran into an issue where they were unable to parse a json stream due to "NaN" being present in their json stream.

com.fasterxml.jackson.core.JsonParseException: Non-standard token 'NaN': enable `JsonReadFeature.ALLOW_NON_NUMERIC_NUMBERS` to allow

While this isn't technically valid json, jackson does have explicit support for this (along with a variety of support for other options). We expose explicit ObjectMapper jsonSpec support in java, but don't have any easy wrappers around it for python. We might consider adding more options to json_spec to cover common use cases. In the meantime, this workaround was proposed for the user:

import jpy

from deephaven.column import Column
from deephaven.stream.kafka.consumer import KeyValueSpec
from deephaven.jcompat import j_hashmap


def custom_object_mapper():
    _DeserializationFeature = jpy.get_type(
        "com.fasterxml.jackson.databind.DeserializationFeature"
    )
    _JsonNodeFactory = jpy.get_type(
        "com.fasterxml.jackson.databind.node.JsonNodeFactory"
    )
    _JsonMapper = jpy.get_type("com.fasterxml.jackson.databind.json.JsonMapper")
    _JsonReadFeature = jpy.get_type("com.fasterxml.jackson.core.json.JsonReadFeature")
    return (
        _JsonMapper.builder()
        .enable(_JsonReadFeature.ALLOW_NON_NUMERIC_NUMBERS)
        .nodeFactory(_JsonNodeFactory.withExactBigDecimals(True))
        .configure(_DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS, True)
        .build()
    )


def custom_json_spec(
    col_defs: Union[Dict[str, DType], List[Tuple[str, DType]]], mapping: Dict = None
) -> KeyValueSpec:
    _JKafkaTools_Consume = jpy.get_type("io.deephaven.kafka.KafkaTools$Consume")

    try:
        if isinstance(col_defs, dict):
            col_defs = [Column(k, v).j_column_definition for k, v in col_defs.items()]
        else:
            col_defs = [Column(*t).j_column_definition for t in col_defs]

        mapper = custom_object_mapper()

        if mapping is None:
            return KeyValueSpec(
                j_spec=_JKafkaTools_Consume.jsonSpec(col_defs, None, mapper)
            )
        mapping = j_hashmap(mapping)
        return KeyValueSpec(
            j_spec=_JKafkaTools_Consume.jsonSpec(col_defs, mapping, mapper)
        )
    except Exception as e:
        raise DHError(e, "failed to create a Kafka key/value spec") from e

This specific workaround is based around https://github.com/deephaven/deephaven-core/blob/v0.30.0/py/server/deephaven/stream/kafka/consumer.py#L430-L461 and https://github.com/deephaven/deephaven-core/blob/v0.30.0/extensions/kafka/src/main/java/io/deephaven/kafka/ingest/JsonNodeUtil.java#L24-L26.

(Note: jpy-consortium/jpy#117 filed.)

@devinrsmith devinrsmith added this to the Backlog milestone Nov 13, 2023
@devinrsmith devinrsmith self-assigned this Nov 13, 2023
@devinrsmith
Copy link
Member Author

Dupe of #4178

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant