Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: grpc error when reading Kafka with flink runner #32628

Open
1 of 17 tasks
paulhtremblay opened this issue Oct 2, 2024 · 1 comment
Open
1 of 17 tasks

[Bug]: grpc error when reading Kafka with flink runner #32628

paulhtremblay opened this issue Oct 2, 2024 · 1 comment

Comments

@paulhtremblay
Copy link

paulhtremblay commented Oct 2, 2024

What happened?

I am attempting to read in a Kafka topic. but am getting a grcp error with apache_beam 2.59. I do not get the error with apache_beam 2.55.

 def run(
 47     bootstrap_servers,
 48     ):
 49     known_args, pipeline_args = _get_args()
 50     pipeline_options = PipelineOptions(
 51         region= 'us-central1',
 52         project= known_args.project,
 53         runner= 'FlinkRunner',
 54         flink_master="localhost:8081",
 55         environment_type="LOOPBACK",
 56         streaming=True,
 57     )
 58 
 59     with beam.Pipeline(options=pipeline_options) as pipeline:
 60 
 61         ride_col = (
 62             pipeline
 63             | ReadFromKafka(
 64                 consumer_config={'bootstrap.servers': bootstrap_servers,
 65                     'group.id': 'my-group',
 66                     'isolation.level': 'read_uncommitted',
 67                     },
 68                 topics=[known_args.topic],
 69                 max_num_records = 2,
 70                 commit_offset_in_finalize = True,
 71                 start_read_time = int(time.mktime(datetime.datetime(2024,10,1).timetuple())),
 72                 with_metadata=True)
 73             | beam.Map(lambda record: convert_kafka_record_to_dictionary(record))
 74             | WriteToFiles("output/kafka_flink1")
 75             )
 76 

Here is the trackback:

Traceback (most recent call last):
  File "/home/paulhtremblay/data-engineering/dataflow_/read_kafka_with_flink1.py", line 79, in <module>
    run(
  File "/home/paulhtremblay/data-engineering/dataflow_/read_kafka_with_flink1.py", line 62, in run
    pipeline
  File "/home/paulhtremblay/Envs/beam311/lib/python3.11/site-packages/apache_beam/transforms/ptransform.py", line 623, in __ror__
    result = p.apply(self, pvalueish, label)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/paulhtremblay/Envs/beam311/lib/python3.11/site-packages/apache_beam/pipeline.py", line 748, in apply
    pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/paulhtremblay/Envs/beam311/lib/python3.11/site-packages/apache_beam/runners/runner.py", line 191, in apply
    return self.apply_PTransform(transform, input, options)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/paulhtremblay/Envs/beam311/lib/python3.11/site-packages/apache_beam/runners/runner.py", line 195, in apply_PTransform
    return transform.expand(input)
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/paulhtremblay/Envs/beam311/lib/python3.11/site-packages/apache_beam/transforms/external.py", line 752, in expand
    response = service.Expand(request)
               ^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/paulhtremblay/Envs/beam311/lib/python3.11/site-packages/grpc/_channel.py", line 1181, in __call__
    return _end_unary_response_blocking(state, call, False, None)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/paulhtremblay/Envs/beam311/lib/python3.11/site-packages/grpc/_channel.py", line 1006, in _end_unary_response_blocking
    raise _InactiveRpcError(state)  # pytype: disable=not-instantiable
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
        status = StatusCode.UNKNOWN
        details = "Application error processing RPC"
        debug_error_string = "UNKNOWN:Error received from peer  {grpc_message:"Application error processing RPC", grpc_status:2, created_time:"2024-10-02T15:27:43.737327994+00:00"}"

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Infrastructure
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
@paulhtremblay
Copy link
Author

Sorry, this is incorrect. I just ran the pipeline with DirectRunner, and got the same error. It seems to be an error with the apache_beam version being too recent.

However, I am still getting an error with the FlinkRunner, but a different one. Will update...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant