Skip to content

Commit

Permalink
[FLINK-35242][runtime] Add schema operator metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
yuxiqian committed Jun 12, 2024
1 parent 9299089 commit ffcd530
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeRequest;
import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeResponse;
import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeResultRequest;
import org.apache.flink.cdc.runtime.operators.schema.metrics.SchemaOperatorMetrics;
import org.apache.flink.cdc.runtime.operators.sink.SchemaEvolutionClient;
import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
Expand Down Expand Up @@ -106,6 +107,8 @@ public class SchemaOperator extends AbstractStreamOperator<Event>
private final long rpcTimeOutInMillis;
private final SchemaChangeBehavior schemaChangeBehavior;

private transient SchemaOperatorMetrics schemaOperatorMetrics;

@VisibleForTesting
public SchemaOperator(List<Tuple2<String, TableId>> routingRules) {
this.routingRules = routingRules;
Expand All @@ -132,6 +135,14 @@ public SchemaOperator(
this.schemaChangeBehavior = schemaChangeBehavior;
}

@Override
public void open() throws Exception {
super.open();
schemaOperatorMetrics =
new SchemaOperatorMetrics(
getRuntimeContext().getMetricGroup(), schemaChangeBehavior);
}

@Override
public void setup(
StreamTask<?, ?> containingTask,
Expand Down Expand Up @@ -375,6 +386,8 @@ private void handleSchemaChangeEvent(TableId tableId, SchemaChangeEvent schemaCh

output.collect(new StreamRecord<>(new FlushEvent(tableId)));
List<SchemaChangeEvent> expectedSchemaChangeEvents = response.getSchemaChangeEvents();
schemaOperatorMetrics.increaseSchemaChangeEvents(expectedSchemaChangeEvents.size());

// The request will block until flushing finished in each sink writer
ReleaseUpstreamResponse schemaEvolveResponse = requestReleaseUpstream();
List<SchemaChangeEvent> finishedSchemaChangeEvents =
Expand Down Expand Up @@ -421,6 +434,12 @@ private void handleSchemaChangeEvent(TableId tableId, SchemaChangeEvent schemaCh
finishedSchemaChangeEvents.size(),
failedSchemaChangeEvents.size(),
ignoredSchemaChangeEvents.size());

schemaOperatorMetrics.increaseFinishedSchemaChangeEvents(
finishedSchemaChangeEvents.size());
schemaOperatorMetrics.increaseFailedSchemaChangeEvents(failedSchemaChangeEvents.size());
schemaOperatorMetrics.increaseIgnoredSchemaChangeEvents(
ignoredSchemaChangeEvents.size());
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cdc.runtime.operators.schema.metrics;

import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.runtime.operators.schema.SchemaOperator;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.MetricGroup;

import java.util.HashMap;
import java.util.Map;

/** A collection class for handling metrics in {@link SchemaOperator}. */
public class SchemaOperatorMetrics {

/** Current schema change behavior. */
public static final String SCHEMA_CHANGE_BEHAVIOR = "schemaChangeBehavior";

public static final Map<SchemaChangeBehavior, Integer> SCHEMA_CHANGE_BEHAVIOR_INTEGER_MAP =
new HashMap<SchemaChangeBehavior, Integer>() {
{
put(SchemaChangeBehavior.IGNORE, 0);
put(SchemaChangeBehavior.TRY_EVOLVE, 1);
put(SchemaChangeBehavior.EVOLVE, 2);
put(SchemaChangeBehavior.EXCEPTION, 3);
}
};

/** Total count of schema change events received. */
public static final String NUM_SCHEMA_CHANGE_EVENTS = "numSchemaChangeEvents";

/** Number of successfully applied schema change events. */
public static final String NUM_FINISHED_SCHEMA_CHANGE_EVENTS = "numFinishedSchemaChangeEvents";

/** Number of schema change events that failed to apply. */
public static final String NUM_FAILED_SCHEMA_CHANGE_EVENTS = "numFailedSchemaChangeEvents";

/** Number of schema change events ignored. */
public static final String NUM_IGNORED_SCHEMA_CHANGE_EVENTS = "numIgnoredSchemaChangeEvents";

private final Counter numSchemaChangeEventsCounter;
private final Counter numFinishedSchemaChangeEventsCounter;
private final Counter numFailedSchemaChangeEventsCounter;
private final Counter numIgnoredSchemaChangeEventsCounter;

public SchemaOperatorMetrics(MetricGroup metricGroup, SchemaChangeBehavior behavior) {
numSchemaChangeEventsCounter = metricGroup.counter(NUM_SCHEMA_CHANGE_EVENTS);
numFinishedSchemaChangeEventsCounter =
metricGroup.counter(NUM_FINISHED_SCHEMA_CHANGE_EVENTS);
numFailedSchemaChangeEventsCounter = metricGroup.counter(NUM_FAILED_SCHEMA_CHANGE_EVENTS);
numIgnoredSchemaChangeEventsCounter = metricGroup.counter(NUM_IGNORED_SCHEMA_CHANGE_EVENTS);
metricGroup.gauge(
SCHEMA_CHANGE_BEHAVIOR, () -> SCHEMA_CHANGE_BEHAVIOR_INTEGER_MAP.get(behavior));
}

public void increaseSchemaChangeEvents(long count) {
numSchemaChangeEventsCounter.inc(count);
}

public void increaseFinishedSchemaChangeEvents(long count) {
numFinishedSchemaChangeEventsCounter.inc(count);
}

public void increaseFailedSchemaChangeEvents(long count) {
numFailedSchemaChangeEventsCounter.inc(count);
}

public void increaseIgnoredSchemaChangeEvents(long count) {
numIgnoredSchemaChangeEventsCounter.inc(count);
}
}

This file was deleted.

0 comments on commit ffcd530

Please sign in to comment.