Skip to content

Commit

Permalink
[feature](write)add metrics for flink load data (apache#287)
Browse files Browse the repository at this point in the history
  • Loading branch information
vinlee19 authored Jan 5, 2024
1 parent f92a0f0 commit dc1f0ec
Show file tree
Hide file tree
Showing 5 changed files with 475 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
public class RespContent {

@JsonProperty(value = "TxnId")
private long txnId;
private Long txnId;

@JsonProperty(value = "Label")
private String label;
Expand All @@ -44,42 +44,42 @@ public class RespContent {
private String message;

@JsonProperty(value = "NumberTotalRows")
private long numberTotalRows;
private Long numberTotalRows;

@JsonProperty(value = "NumberLoadedRows")
private long numberLoadedRows;
private Long numberLoadedRows;

@JsonProperty(value = "NumberFilteredRows")
private int numberFilteredRows;
private Integer numberFilteredRows;

@JsonProperty(value = "NumberUnselectedRows")
private int numberUnselectedRows;
private Integer numberUnselectedRows;

@JsonProperty(value = "LoadBytes")
private long loadBytes;
private Long loadBytes;

@JsonProperty(value = "LoadTimeMs")
private int loadTimeMs;
private Integer loadTimeMs;

@JsonProperty(value = "BeginTxnTimeMs")
private int beginTxnTimeMs;
private Integer beginTxnTimeMs;

@JsonProperty(value = "StreamLoadPutTimeMs")
private int streamLoadPutTimeMs;
private Integer streamLoadPutTimeMs;

@JsonProperty(value = "ReadDataTimeMs")
private int readDataTimeMs;
private Integer readDataTimeMs;

@JsonProperty(value = "WriteDataTimeMs")
private int writeDataTimeMs;
private Integer writeDataTimeMs;

@JsonProperty(value = "CommitAndPublishTimeMs")
private int commitAndPublishTimeMs;
private Integer commitAndPublishTimeMs;

@JsonProperty(value = "ErrorURL")
private String errorURL;

public long getTxnId() {
public Long getTxnId() {
return txnId;
}

Expand All @@ -99,6 +99,50 @@ public String getExistingJobStatus() {
return existingJobStatus;
}

public Long getNumberTotalRows() {
return numberTotalRows;
}

public Long getNumberLoadedRows() {
return numberLoadedRows;
}

public Integer getNumberFilteredRows() {
return numberFilteredRows;
}

public Integer getNumberUnselectedRows() {
return numberUnselectedRows;
}

public Long getLoadBytes() {
return loadBytes;
}

public Integer getLoadTimeMs() {
return loadTimeMs;
}

public Integer getBeginTxnTimeMs() {
return beginTxnTimeMs;
}

public Integer getStreamLoadPutTimeMs() {
return streamLoadPutTimeMs;
}

public Integer getReadDataTimeMs() {
return readDataTimeMs;
}

public Integer getWriteDataTimeMs() {
return writeDataTimeMs;
}

public Integer getCommitAndPublishTimeMs() {
return commitAndPublishTimeMs;
}

@Override
public String toString() {
ObjectMapper mapper = new ObjectMapper();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,8 @@ public Future<CloseableHttpResponse> getPendingLoadFuture() {
/**
* try to discard pending transactions with labels beginning with labelSuffix.
*
* @param labelSuffix
* @param chkID
* @param labelSuffix the suffix of the stream load.
* @param chkID checkpoint id of task.
* @throws Exception
*/
public void abortPreCommit(String labelSuffix, long chkID) throws Exception {
Expand Down Expand Up @@ -260,7 +260,7 @@ public RespContent stopLoad(String label) throws IOException {
/**
* start write data for new checkpoint.
*
* @param label
* @param label the label of Stream Load.
* @throws IOException
*/
public void startLoad(String label, boolean isResume) throws IOException {
Expand Down
Loading

0 comments on commit dc1f0ec

Please sign in to comment.