Skip to content

Commit

Permalink
Merge pull request #32342 Fix writing raw messages to pubsub.
Browse files Browse the repository at this point in the history
  • Loading branch information
robertwb authored Oct 1, 2024
2 parents 00445ad + 79b534c commit 0ca3f19
Showing 1 changed file with 14 additions and 1 deletion.
15 changes: 14 additions & 1 deletion sdks/python/apache_beam/yaml/yaml_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,20 @@ def _create_formatter(
field_names = [field.name for field in beam_schema.fields]
if len(field_names) != 1:
raise ValueError(f'Expecting exactly one field, found {field_names}')
return lambda row: getattr(row, field_names[0])

def convert_to_bytes(row):
output = getattr(row, field_names[0])
if isinstance(output, bytes):
return output
elif isinstance(output, str):
return output.encode('utf-8')
else:
raise ValueError(
f"Cannot encode payload for WriteToPubSub. "
f"Expected valid string or bytes object, "
f"got {repr(output)} of type {type(output)}.")

return convert_to_bytes
elif format == 'JSON':
return json_utils.json_formater(beam_schema)
elif format == 'AVRO':
Expand Down

0 comments on commit 0ca3f19

Please sign in to comment.