Skip to content

Commit

Permalink
Add RowStringInterpolator utility (#32367)
Browse files Browse the repository at this point in the history
* RowStringInterpolator

* spotless

* address comments

* address comments
  • Loading branch information
ahmedabu98 authored Sep 25, 2024
1 parent faf884a commit e3c6f47
Show file tree
Hide file tree
Showing 2 changed files with 356 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.util;

import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;

import java.io.Serializable;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Instant;

/**
* A utility that interpolates values in a pre-determined {@link String} using an input Beam {@link
* Row}.
*
* <p>The {@link RowStringInterpolator} looks for field names specified inside {curly braces}. For
* example, if the interpolator is configured with the String {@code "unified {foo} and streaming"},
* it will look for a field name {@code "foo"} in the input {@link Row} and substitute in that
* value. If a {@link RowStringInterpolator} is configured with a template String that contains no
* placeholders (i.e. no curly braces), it will simply return that String, untouched.
*
* <p>Nested fields can be specified using dot-notation (e.g. {@code "top.middle.nested"}).
*
* <p>Configure a {@link RowStringInterpolator} like so:
*
* <pre>{@code
* String template = "unified {foo} and {bar.baz}!";
* Row inputRow = {foo: "batch", bar: {baz: "streaming"}, ...};
*
* RowStringInterpolator interpolator = new RowStringInterpolator(template, beamSchema);
* String output = interpolator.interpolate(inputRow, window, paneInfo, timestamp);
* // output --> "unified batch and streaming!"
* }</pre>
*
* <p>Additionally, {@link #interpolate(Row, BoundedWindow, PaneInfo, Instant)} can be used in
* streaming scenarios to substitute windowing metadata into the template String. To make use of
* this, use the relevant placeholder:
*
* <ul>
* <li>$WINDOW: the window's string representation
* <li>$PANE_INDEX: the pane's index
* <li>$YYYY: the element timestamp's year
* <li>$MM: the element timestamp's month
* <li>$DD: the element timestamp's day
* </ul>
*
* <p>For example, your String template can look like:
*
* <pre>{@code "unified {foo} and {bar} since {$YYYY}-{$MM}!"}</pre>
*/
public class RowStringInterpolator implements Serializable {
private final String template;
private final Set<String> fieldsToReplace;
// Represents the string representation of the element's window
public static final String WINDOW = "$WINDOW";
public static final String PANE_INDEX = "$PANE_INDEX";
// Represents the element's pane index
public static final String YYYY = "$YYYY";
public static final String MM = "$MM";
public static final String DD = "$DD";
private static final Set<String> WINDOWING_METADATA =
Sets.newHashSet(WINDOW, PANE_INDEX, YYYY, MM, DD);
private static final Pattern TEMPLATE_PATTERN = Pattern.compile("\\{(.+?)}");

/**
* @param template a String template, potentially with placeholders in the form of curly braces,
* e.g. {@code "my {foo} template"}. During interpolation, these placeholders are replaced
* with values in the Beam Row. For more details and examples, refer to the top-level
* documentation.
* @param rowSchema {@link Row}s used for interpolation are expected to be compatible with this
* {@link Schema}.
*/
public RowStringInterpolator(String template, Schema rowSchema) {
this.template = template;

Matcher m = TEMPLATE_PATTERN.matcher(template);
fieldsToReplace = new HashSet<>();
while (m.find()) {
fieldsToReplace.add(checkStateNotNull(m.group(1)));
}

List<String> rowFields =
fieldsToReplace.stream()
.filter(f -> !WINDOWING_METADATA.contains(f))
.collect(Collectors.toList());

RowFilter.validateSchemaContainsFields(rowSchema, rowFields, "string interpolation");
}

/**
* Performs string interpolation on the template using values from the input {@link Row} and its
* windowing metadata.
*/
public String interpolate(Row row, BoundedWindow window, PaneInfo paneInfo, Instant timestamp) {
String interpolated = this.template;
for (String field : fieldsToReplace) {
Object val;
switch (field) {
case WINDOW:
val = window.toString();
break;
case PANE_INDEX:
val = paneInfo.getIndex();
break;
case YYYY:
val = timestamp.getChronology().year().get(timestamp.getMillis());
break;
case MM:
val = timestamp.getChronology().monthOfYear().get(timestamp.getMillis());
break;
case DD:
val = timestamp.getChronology().dayOfMonth().get(timestamp.getMillis());
break;
default:
val = MoreObjects.firstNonNull(getValue(row, field), "");
break;
}

interpolated = interpolated.replace("{" + field + "}", String.valueOf(val));
}
return interpolated;
}

private @Nullable Object getValue(@Nullable Row row, String fieldPath) {
if (row == null) {
return null;
}
int dotIndex = fieldPath.indexOf('.');
String field = dotIndex == -1 ? fieldPath : fieldPath.substring(0, dotIndex);
Preconditions.checkArgument(
row.getSchema().hasField(field), "Invalid row does not contain field '%s'.", field);

if (dotIndex == -1) {
return row.getValue(field);
}
return getValue(row.getRow(field), fieldPath.substring(dotIndex + 1));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.util;

import static org.junit.Assert.assertEquals;

import java.util.Arrays;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.values.Row;
import org.joda.time.DateTime;
import org.joda.time.Instant;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

/** Test class for {@link RowStringInterpolator}. */
public class RowStringInterpolatorTest {
@Rule public ExpectedException thrown = ExpectedException.none();

private static final Schema DOUBLY_NESTED_ROW_SCHEMA =
Schema.builder()
.addStringField("doubly_nested_str")
.addInt32Field("doubly_nested_int")
.build();

private static final Schema NESTED_ROW_SCHEMA =
Schema.builder()
.addStringField("nested_str")
.addInt32Field("nested_int")
.addFloatField("nested_float")
.addRowField("nested_row", DOUBLY_NESTED_ROW_SCHEMA)
.build();
private static final Schema ROW_SCHEMA =
Schema.builder()
.addStringField("str")
.addBooleanField("bool")
.addInt32Field("int")
.addNullableInt32Field("nullable_int")
.addArrayField("arr_int", Schema.FieldType.INT32)
.addRowField("row", NESTED_ROW_SCHEMA)
.addNullableRowField("nullable_row", NESTED_ROW_SCHEMA)
.build();

@Test
public void testInvalidRowThrowsHelpfulError() {
String template = "foo {str}";
RowStringInterpolator interpolator = new RowStringInterpolator(template, ROW_SCHEMA);

Row invalidRow = Row.nullRow(Schema.builder().addNullableStringField("xyz").build());

thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("Invalid row does not contain field 'str'.");

interpolator.interpolate(invalidRow, null, null, null);
}

@Test
public void testInvalidRowThrowsHelpfulErrorForNestedFields() {
String template = "foo {row.nested_int}";
RowStringInterpolator interpolator = new RowStringInterpolator(template, ROW_SCHEMA);

Schema nestedSchema = Schema.builder().addNullableStringField("xyz").build();
Row invalidRow =
Row.withSchema(Schema.builder().addNullableRowField("row", nestedSchema).build())
.addValue(Row.nullRow(nestedSchema))
.build();

thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("Invalid row does not contain field 'nested_int'.");

interpolator.interpolate(invalidRow, null, null, null);
}

@Test
public void testInvalidRowThrowsHelpfulErrorForDoublyNestedFields() {
String template = "foo {row.nested_row.doubly_nested_int}";
RowStringInterpolator interpolator = new RowStringInterpolator(template, ROW_SCHEMA);

Schema doublyNestedSchema = Schema.builder().addNullableStringField("xyz").build();
Schema nestedSchema =
Schema.builder().addNullableRowField("nested_row", doublyNestedSchema).build();
Row invalidRow =
Row.withSchema(Schema.builder().addNullableRowField("row", doublyNestedSchema).build())
.addValue(
Row.withSchema(nestedSchema).addValue(Row.nullRow(doublyNestedSchema)).build())
.build();

thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("Invalid row does not contain field 'doubly_nested_int'.");

interpolator.interpolate(invalidRow, null, null, null);
}

private static final Row ROW =
Row.withSchema(ROW_SCHEMA)
.addValue("str_value")
.addValue(true)
.addValue(123)
.addValue(null)
.addValue(Arrays.asList(1, 2, 3, 4, 5))
.addValue(
Row.withSchema(NESTED_ROW_SCHEMA)
.addValue("nested_str_value")
.addValue(456)
.addValue(1.234f)
.addValue(
Row.withSchema(DOUBLY_NESTED_ROW_SCHEMA)
.addValue("doubly_nested_str_value")
.addValue(789)
.build())
.build())
.addValue(null)
.build();

@Test
public void testTopLevelInterpolation() {
String template = "foo {str}, bar {bool}, baz {int}, xyz {nullable_int}";
RowStringInterpolator interpolator = new RowStringInterpolator(template, ROW_SCHEMA);

String output = interpolator.interpolate(ROW, null, null, null);

assertEquals("foo str_value, bar true, baz 123, xyz ", output);
}

@Test
public void testNestedLevelInterpolation() {
String template = "foo {str}, bar {row.nested_str}, baz {row.nested_float}";
RowStringInterpolator interpolator = new RowStringInterpolator(template, ROW_SCHEMA);

String output = interpolator.interpolate(ROW, null, null, null);

assertEquals("foo str_value, bar nested_str_value, baz 1.234", output);
}

@Test
public void testDoublyNestedInterpolation() {
String template =
"foo {str}, bar {row.nested_row.doubly_nested_str}, baz {row.nested_row.doubly_nested_int}";
RowStringInterpolator interpolator = new RowStringInterpolator(template, ROW_SCHEMA);

String output = interpolator.interpolate(ROW, null, null, null);

assertEquals("foo str_value, bar doubly_nested_str_value, baz 789", output);
}

@Test
public void testInterpolateWindowingInformation() {
String template =
String.format(
"str: {str}, window: {%s}, pane: {%s}, year: {%s}, month: {%s}, day: {%s}",
RowStringInterpolator.WINDOW,
RowStringInterpolator.PANE_INDEX,
RowStringInterpolator.YYYY,
RowStringInterpolator.MM,
RowStringInterpolator.DD);

RowStringInterpolator interpolator = new RowStringInterpolator(template, ROW_SCHEMA);

Instant instant = new DateTime(2024, 8, 28, 12, 0).toInstant();

String output =
interpolator.interpolate(
ROW,
GlobalWindow.INSTANCE,
PaneInfo.createPane(false, false, PaneInfo.Timing.ON_TIME, 2, 0),
instant);
String expected =
String.format(
"str: str_value, window: %s, pane: 2, year: 2024, month: 8, day: 28",
GlobalWindow.INSTANCE);

assertEquals(expected, output);
}
}

0 comments on commit e3c6f47

Please sign in to comment.