Skip to content

Commit

Permalink
[flink] Bridge Flink's metrics system with Paimon's metrics (#2177)
Browse files Browse the repository at this point in the history
This closes #2177.
  • Loading branch information
tsreaper authored Oct 27, 2023
1 parent ed79cfe commit a8d2bdd
Show file tree
Hide file tree
Showing 17 changed files with 573 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.paimon.flink.VersionedSerializerWrapper;
import org.apache.paimon.flink.sink.CommittableStateManager;
import org.apache.paimon.flink.sink.Committer;
import org.apache.paimon.flink.sink.CommitterMetrics;
import org.apache.paimon.flink.sink.CommitterOperator;
import org.apache.paimon.flink.sink.FlinkSink;
import org.apache.paimon.flink.sink.FlinkStreamPartitioner;
Expand Down Expand Up @@ -158,8 +157,7 @@ protected OneInputStreamOperator<CdcMultiplexRecord, MultiTableCommittable> crea
// commit new files list even if they're empty.
// Otherwise we can't tell if the commit is successful after
// a restart.
return (user, metricGroup) ->
new StoreMultiCommitter(catalogLoader, user, new CommitterMetrics(metricGroup));
return (user, metricGroup) -> new StoreMultiCommitter(catalogLoader, user, metricGroup);
}

protected CommittableStateManager<WrappedManifestCommittable> createCommittableStateManager() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.metrics;

import org.apache.paimon.metrics.Counter;

/** {@link Counter} which wraps a Flink's {@link org.apache.flink.metrics.Counter}. */
public class FlinkCounter implements Counter {

private final org.apache.flink.metrics.Counter wrapped;

public FlinkCounter(org.apache.flink.metrics.Counter wrapped) {
this.wrapped = wrapped;
}

@Override
public void inc() {
wrapped.inc();
}

@Override
public void inc(long n) {
wrapped.inc(n);
}

@Override
public void dec() {
wrapped.dec();
}

@Override
public void dec(long n) {
wrapped.dec(n);
}

@Override
public long getCount() {
return wrapped.getCount();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.metrics;

import org.apache.paimon.metrics.Gauge;

/** {@link Gauge} which wraps a Flink's {@link org.apache.flink.metrics.Gauge}. */
public class FlinkGauge<T> implements Gauge<T> {

private final org.apache.flink.metrics.Gauge<T> wrapped;

public FlinkGauge(org.apache.flink.metrics.Gauge<T> wrapped) {
this.wrapped = wrapped;
}

@Override
public T getValue() {
return wrapped.getValue();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.metrics;

import org.apache.paimon.metrics.Histogram;
import org.apache.paimon.metrics.HistogramStatistics;

/** {@link Histogram} which wraps a Flink's {@link org.apache.flink.metrics.Histogram}. */
public class FlinkHistogram implements Histogram {

private final org.apache.flink.metrics.Histogram wrapped;

public FlinkHistogram(org.apache.flink.metrics.Histogram wrapped) {
this.wrapped = wrapped;
}

@Override
public void update(long value) {
wrapped.update(value);
}

@Override
public long getCount() {
return wrapped.getCount();
}

@Override
public HistogramStatistics getStatistics() {
org.apache.flink.metrics.HistogramStatistics stats = wrapped.getStatistics();

return new HistogramStatistics() {

@Override
public double getQuantile(double quantile) {
return stats.getQuantile(quantile);
}

@Override
public long[] getValues() {
return stats.getValues();
}

@Override
public int size() {
return stats.size();
}

@Override
public double getMean() {
return stats.getMean();
}

@Override
public double getStdDev() {
return stats.getStdDev();
}

@Override
public long getMax() {
return stats.getMax();
}

@Override
public long getMin() {
return stats.getMin();
}
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.metrics;

import org.apache.paimon.metrics.Counter;
import org.apache.paimon.metrics.Gauge;
import org.apache.paimon.metrics.Histogram;
import org.apache.paimon.metrics.Metric;
import org.apache.paimon.metrics.MetricGroup;

import java.util.Collections;
import java.util.Map;

/**
* {@link MetricGroup} which wraps a Flink's {@link org.apache.flink.metrics.MetricGroup} and
* register all metrics into Flink's metric system.
*/
public class FlinkMetricGroup implements MetricGroup {

private static final String PAIMON_GROUP_NAME = "paimon";

private final org.apache.flink.metrics.MetricGroup wrapped;
private final String groupName;
private final Map<String, String> variables;

public FlinkMetricGroup(
org.apache.flink.metrics.MetricGroup wrapped,
String groupName,
Map<String, String> variables) {
wrapped = wrapped.addGroup(PAIMON_GROUP_NAME);
for (Map.Entry<String, String> entry : variables.entrySet()) {
wrapped = wrapped.addGroup(entry.getKey(), entry.getValue());
}
wrapped = wrapped.addGroup(groupName);

this.wrapped = wrapped;
this.groupName = groupName;
this.variables = variables;
}

@Override
public Counter counter(String name) {
return new FlinkCounter(wrapped.counter(name));
}

@Override
public <T> Gauge<T> gauge(String name, Gauge<T> gauge) {
return new FlinkGauge<>(wrapped.gauge(name, gauge::getValue));
}

@Override
public Histogram histogram(String name, int windowSize) {
return new FlinkHistogram(
wrapped.histogram(
name,
new org.apache.flink.runtime.metrics.DescriptiveStatisticsHistogram(
windowSize)));
}

@Override
public Map<String, String> getAllVariables() {
return Collections.unmodifiableMap(variables);
}

@Override
public String getGroupName() {
return groupName;
}

@Override
public Map<String, Metric> getMetrics() {
throw new UnsupportedOperationException(
"FlinkMetricGroup does not support fetching all metrics. "
+ "Please read the metrics through Flink's metric system.");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.metrics;

import org.apache.paimon.metrics.MetricGroup;
import org.apache.paimon.metrics.MetricRegistry;

import java.util.Map;

/** {@link MetricRegistry} to create {@link FlinkMetricGroup}. */
public class FlinkMetricRegistry extends MetricRegistry {

private final org.apache.flink.metrics.MetricGroup group;

public FlinkMetricRegistry(org.apache.flink.metrics.MetricGroup group) {
this.group = group;
}

@Override
protected MetricGroup createMetricGroup(String groupName, Map<String, String> variables) {
return new FlinkMetricGroup(group, groupName, variables);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

package org.apache.paimon.flink.sink;

import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
import org.apache.flink.metrics.groups.OperatorMetricGroup;

import java.io.IOException;
import java.io.Serializable;
Expand Down Expand Up @@ -54,6 +54,6 @@ GlobalCommitT combine(long checkpointId, long watermark, List<CommitT> committab
interface Factory<CommitT, GlobalCommitT> extends Serializable {

Committer<CommitT, GlobalCommitT> create(
String commitUser, OperatorIOMetricGroup metricGroup);
String commitUser, OperatorMetricGroup metricGroup);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public void initializeState(StateInitializationContext context) throws Exception
StateUtils.getSingleValueFromState(
context, "commit_user_state", String.class, initialCommitUser);
// parallelism of commit operator is always 1, so commitUser will never be null
committer = committerFactory.create(commitUser, getMetricGroup().getIOMetricGroup());
committer = committerFactory.create(commitUser, getMetricGroup());

committableStateManager.initializeState(context, committer);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@ protected OneInputStreamOperator<RowData, Committable> createWriteOperator(
@Override
protected Committer.Factory<Committable, ManifestCommittable> createCommitterFactory(
boolean streamingCheckpointEnabled) {
return (user, metricGroup) ->
new StoreCommitter(table.newCommit(user), new CommitterMetrics(metricGroup));
return (user, metricGroup) -> new StoreCommitter(table.newCommit(user), metricGroup);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ protected Committer.Factory<Committable, ManifestCommittable> createCommitterFac
table.newCommit(user)
.withOverwrite(overwritePartition)
.ignoreEmptyCommit(!streamingCheckpointEnabled),
new CommitterMetrics(metricGroup));
metricGroup);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,7 @@ protected OneInputStreamOperator<RowData, MultiTableCommittable> createWriteOper
protected Committer.Factory<MultiTableCommittable, WrappedManifestCommittable>
createCommitterFactory() {
return (user, metricGroup) ->
new StoreMultiCommitter(
catalogLoader, user, new CommitterMetrics(metricGroup), true);
new StoreMultiCommitter(catalogLoader, user, metricGroup, true);
}

protected CommittableStateManager<WrappedManifestCommittable> createCommittableStateManager() {
Expand Down
Loading

0 comments on commit a8d2bdd

Please sign in to comment.