Skip to content

Commit

Permalink
[Feature][MetaData] Add metadata fetch execute page
Browse files Browse the repository at this point in the history
  • Loading branch information
zixi0825 committed Nov 18, 2023
1 parent 030e715 commit 92db8d3
Show file tree
Hide file tree
Showing 16 changed files with 391 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,13 @@ public ConnectorResponse getDatabases(GetDatabasesRequestParam param) throws SQL

private JdbcExecutorClient getJdbcExecutorClient(String dataSourceParam) {
JdbcConnectionInfo jdbcConnectionInfo = JSONUtils.parseObject(dataSourceParam, JdbcConnectionInfo.class);

return jdbcExecutorClientManager.getExecutorClient(JdbcDataSourceInfoManager.getDatasourceInfo(dataSourceParam, getDatasourceInfo(jdbcConnectionInfo)));
return jdbcExecutorClientManager.getExecutorClient(
JdbcDataSourceInfoManager.getDatasourceInfo(dataSourceParam, getDatasourceInfo(jdbcConnectionInfo)));
}

private JdbcExecutorClient getJdbcExecutorClient(String dataSourceParam, JdbcConnectionInfo jdbcConnectionInfo) {
return jdbcExecutorClientManager.getExecutorClient(JdbcDataSourceInfoManager.getDatasourceInfo(dataSourceParam, getDatasourceInfo(jdbcConnectionInfo)));
return jdbcExecutorClientManager.getExecutorClient(
JdbcDataSourceInfoManager.getDatasourceInfo(dataSourceParam, getDatasourceInfo(jdbcConnectionInfo)));
}

@Override
Expand Down Expand Up @@ -124,7 +125,7 @@ public ConnectorResponse getTables(GetTablesRequestParam param) throws SQLExcept
}

} catch (Exception e) {
logger.error("get table list error: {0}", e);
logger.error("get table list error: ", e);
} finally {
JdbcDataSourceUtils.releaseConnection(connection);
}
Expand Down Expand Up @@ -155,8 +156,7 @@ public ConnectorResponse getColumns(GetColumnsRequestParam param) throws SQLExce
tableColumnInfo = new TableColumnInfo(tableName, primaryKeys, columns);
}
} catch (SQLException e) {
logger.error(e.toString(), e);
throw new SQLException(e.getMessage() + ", " + dataSourceParam);
logger.error("get column list error , param is {} : ", param, e);
} finally {
JdbcDataSourceUtils.releaseConnection(connection);
}
Expand All @@ -183,7 +183,7 @@ public ConnectorResponse testConnect(TestConnectionRequestParam param) {

return ConnectorResponse.builder().status(ConnectorResponse.Status.SUCCESS).result(result).build();
} catch (SQLException e) {
logger.error(e.toString(), e);
logger.error("test connect error, param is {} :", JSONUtils.toJsonString(param), e);
}

return ConnectorResponse.builder().status(ConnectorResponse.Status.SUCCESS).result(false).build();
Expand All @@ -199,11 +199,12 @@ private List<String> getPrimaryKeys(String catalog, String schema, String tableN
if (rs == null) {
return primaryKeys;
}

while (rs.next()) {
primaryKeys.add(rs.getString("COLUMN_NAME"));
}
} catch (Exception e) {
logger.error(e.toString(), e);
logger.error("get primary key error, param is {} :", schema + "." + tableName, e);
} finally {
JdbcDataSourceUtils.closeResult(rs);
}
Expand All @@ -225,7 +226,7 @@ public List<ColumnInfo> getColumns(String catalog, String schema, String tableNa
columnList.add(new ColumnInfo(name, rawType, comment,false));
}
} catch (Exception e) {
logger.error(e.toString(), e);
logger.error("get column error, param is {} :", schema + "." + tableName, e);
} finally {
JdbcDataSourceUtils.closeResult(rs);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.datavines.server.api.controller;

import io.datavines.core.aop.RefreshToken;
import io.datavines.core.constant.DataVinesConstants;
import io.datavines.server.repository.service.CatalogMetaDataFetchTaskService;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.*;

@Api(value = "catalog", tags = "catalog", produces = MediaType.APPLICATION_JSON_VALUE)
@RestController
@RequestMapping(value = DataVinesConstants.BASE_API_PATH + "/catalog/metadata/task", produces = MediaType.APPLICATION_JSON_VALUE)
@RefreshToken
@Validated
public class CatalogMetaDataController {

@Autowired
private CatalogMetaDataFetchTaskService catalogMetaDataFetchTaskService;

@ApiOperation(value = "get job page")
@GetMapping(value = "/page")
public Object page(@RequestParam("datasourceId") Long datasourceId,
@RequestParam("pageNumber") Integer pageNumber,
@RequestParam("pageSize") Integer pageSize) {
return catalogMetaDataFetchTaskService.getFetchTaskPage(datasourceId, pageNumber, pageSize);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.datavines.server.api.dto.vo.catalog;

import com.fasterxml.jackson.annotation.JsonFormat;
import io.datavines.common.enums.ExecutionStatus;
import lombok.Data;

import java.io.Serializable;
import java.time.LocalDateTime;

@Data
public class CatalogMetaDataFetchTaskVO implements Serializable {

private static final long serialVersionUID = -1L;

private Long id;

private String type;

private String databaseName;

private String tableName;

private String executeHost;

private ExecutionStatus status;

@JsonFormat(pattern="yyyy-MM-dd HH:mm:ss",timezone = "GMT+8")
private LocalDateTime submitTime;

@JsonFormat(pattern="yyyy-MM-dd HH:mm:ss",timezone = "GMT+8")
private LocalDateTime scheduleTime;

@JsonFormat(pattern="yyyy-MM-dd HH:mm:ss",timezone = "GMT+8")
private LocalDateTime startTime;

@JsonFormat(pattern="yyyy-MM-dd HH:mm:ss",timezone = "GMT+8")
private LocalDateTime endTime;

public String getStatus() {
return status.getDescription();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,6 @@
@Slf4j
public class CatalogMetaDataFetchTaskScheduler extends Thread {

private final String CATALOG_METADATA_TASK_LOCK_KEY =
CommonPropertyUtils.getString(CommonPropertyUtils.CATALOG_METADATA_TASK_LOCK_KEY, CommonPropertyUtils.CATALOG_METADATA_TASK_LOCK_KEY_DEFAULT);

private static final int[] RETRY_BACKOFF = {1, 2, 3, 5, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10};

private final JobExternalService jobExternalService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,7 @@ private void executeFetchDataSource() throws SQLException {
GetDatabasesRequestParam param = new GetDatabasesRequestParam();
param.setType(dataSource.getType());
param.setDataSourceParam(dataSource.getParam());
ConnectorResponse connectorResponse =
connectorFactory.getConnector().getDatabases(param);
ConnectorResponse connectorResponse = connectorFactory.getConnector().getDatabases(param);

if (connectorResponse == null || connectorResponse.getResult() == null) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,16 @@

import com.baomidou.mybatisplus.core.mapper.BaseMapper;

import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import io.datavines.server.api.dto.vo.catalog.CatalogMetaDataFetchTaskVO;
import io.datavines.server.repository.entity.catalog.CatalogMetaDataFetchTask;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;

@Mapper
public interface CatalogMetaDataFetchTaskMapper extends BaseMapper<CatalogMetaDataFetchTask> {

IPage<CatalogMetaDataFetchTaskVO> getJobExecutionPage(Page<CatalogMetaDataFetchTaskVO> page,
@Param("datasourceId") Long datasourceId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@
*/
package io.datavines.server.repository.service;

import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.service.IService;
import io.datavines.server.api.dto.bo.catalog.CatalogRefresh;
import io.datavines.server.api.dto.vo.catalog.CatalogMetaDataFetchTaskVO;
import io.datavines.server.repository.entity.catalog.CatalogMetaDataFetchTask;

import java.time.LocalDateTime;
Expand All @@ -42,4 +44,6 @@ public interface CatalogMetaDataFetchTaskService extends IService<CatalogMetaDat
boolean deleteByDataSourceId(long dataSourceId);

LocalDateTime getRefreshTime(long dataSourceId, String databaseName, String tableName);

IPage<CatalogMetaDataFetchTaskVO> getFetchTaskPage(Long datasourceId,Integer pageNumber, Integer pageSize);
}
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,8 @@ private CatalogMetaDataFetchTaskSchedule update(CatalogMetaDataFetchTaskSchedule
public boolean deleteById(long id) {
CatalogMetaDataFetchTaskSchedule catalogMetaDataFetchTaskSchedule = getById(id);
if (catalogMetaDataFetchTaskSchedule != null) {
Boolean deleteJob = quartzExecutor.deleteJob(getScheduleJobInfo(catalogMetaDataFetchTaskSchedule));
if (!deleteJob ) {
boolean deleteJob = quartzExecutor.deleteJob(getScheduleJobInfo(catalogMetaDataFetchTaskSchedule));
if (!deleteJob) {
return false;
}
return removeById(id);
Expand All @@ -175,7 +175,7 @@ public boolean deleteByDataSourceId(long dataSourceId) {
return false;
}

Boolean deleteJob = quartzExecutor.deleteJob(getScheduleJobInfo(catalogMetaDataFetchTaskSchedule));
boolean deleteJob = quartzExecutor.deleteJob(getScheduleJobInfo(catalogMetaDataFetchTaskSchedule));
if (!deleteJob ) {
return false;
}
Expand Down Expand Up @@ -228,7 +228,7 @@ private void updateCatalogTaskScheduleParam(CatalogMetaDataFetchTaskSchedule cat
throw new DataVinesServerException(Status.SCHEDULE_PARAMETER_IS_NULL_ERROR);
}

Boolean isValid = quartzExecutor.isValid(param.getCrontab());
boolean isValid = quartzExecutor.isValid(param.getCrontab());
if (!isValid) {
throw new DataVinesServerException(Status.SCHEDULE_CRON_IS_INVALID_ERROR, param.getCrontab());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package io.datavines.server.repository.service.impl;

import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import io.datavines.common.enums.ExecutionStatus;
import io.datavines.common.utils.CommonPropertyUtils;
Expand All @@ -26,6 +28,7 @@
import io.datavines.core.enums.Status;
import io.datavines.core.exception.DataVinesServerException;
import io.datavines.server.api.dto.bo.catalog.CatalogRefresh;
import io.datavines.server.api.dto.vo.catalog.CatalogMetaDataFetchTaskVO;
import io.datavines.server.enums.FetchType;
import io.datavines.server.registry.RegistryHolder;
import io.datavines.server.repository.entity.catalog.CatalogMetaDataFetchCommand;
Expand Down Expand Up @@ -68,8 +71,7 @@ public long refreshCatalog(CatalogRefresh catalogRefresh) {
queryWrapper.lambda().eq(CatalogMetaDataFetchTask::getStatus,0)
.eq(CatalogMetaDataFetchTask::getDataSourceId, catalogRefresh.getDatasourceId())
.eq(CatalogMetaDataFetchTask::getParameter, JSONUtils.toJsonString(catalogRefresh));
List<CatalogMetaDataFetchTask> oldTaskList =
baseMapper.selectList(queryWrapper);
List<CatalogMetaDataFetchTask> oldTaskList = baseMapper.selectList(queryWrapper);

if (CollectionUtils.isNotEmpty(oldTaskList)) {
registryHolder.release("1028");
Expand Down Expand Up @@ -164,6 +166,7 @@ public String getTaskExecuteHost(Long catalogTaskId) {
}

@Override
@Transactional(rollbackFor = Exception.class)
public boolean deleteByDataSourceId(long dataSourceId) {
remove(new QueryWrapper<CatalogMetaDataFetchTask>().eq("datasource_id", dataSourceId));
catalogMetaDataFetchTaskScheduleService.deleteByDataSourceId(dataSourceId);
Expand Down Expand Up @@ -215,4 +218,10 @@ public LocalDateTime getRefreshTime(long dataSourceId, String databaseName, Stri

return refreshTime;
}

@Override
public IPage<CatalogMetaDataFetchTaskVO> getFetchTaskPage(Long datasourceId,Integer pageNumber, Integer pageSize) {
Page<CatalogMetaDataFetchTaskVO> page = new Page<>(pageNumber, pageSize);
return baseMapper.getJobExecutionPage(page, datasourceId);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
<?xml version="1.0" encoding="UTF-8" ?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="io.datavines.server.repository.mapper.CatalogMetaDataFetchTaskMapper">

<sql id="basic_sql">
select * from dv_catalog_metadata_fetch_task
<where>
<if test="datasourceId != null">
datasource_id = #{datasourceId}
</if>
</where>
</sql>

<select id="getJobExecutionPage" resultType="io.datavines.server.api.dto.vo.catalog.CatalogMetaDataFetchTaskVO">
select p.id, p.type, p.execute_host, p.database_name, p.table_name, p.status, p.submit_time, p.start_time, p.end_time, p.schedule_time
from (<include refid="basic_sql"/>) p
order by p.update_time desc
</select>
</mapper>
8 changes: 6 additions & 2 deletions datavines-ui/src/locale/en_US.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,9 @@ export default {

home_search_placeholder: 'Search to DataSource',
home_create_datasource: 'Create DataSource',
home_metadata_fetch: 'Metadata Fetcher',
home_metadata_fetch_schedule: 'Metadata Fetcher Schedule',
home_metadata_fetch: 'Metadata Manager',
home_metadata_fetch_now: 'Fetch',
home_metadata_fetch_schedule: 'Metadata Schedule Manager',

// jobs
jobs_list: 'Jobs',
Expand All @@ -102,6 +103,9 @@ export default {
jobs_task_table_name: 'Table',
jobs_task_column_name: 'Column',
jobs_task_metric_type: 'Metric Type',
jobs_task_execute_host: 'Execute Host',
jobs_task_schedule_time: 'Schedule Time',
jobs_task_submit_time: 'Submit Time',
jobs_task_start_time: 'Start Time',
jobs_task_end_time: 'End Time',
jobs_task_stop_btn: 'Stop',
Expand Down
8 changes: 6 additions & 2 deletions datavines-ui/src/locale/zh_CN.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,9 @@ export default {

home_search_placeholder: '搜索数据源',
home_create_datasource: '创建数据源',
home_metadata_fetch: '元数据抓取',
home_metadata_fetch_schedule: '元数据抓取调度',
home_metadata_fetch: '元数据管理',
home_metadata_fetch_now: '立即抓取',
home_metadata_fetch_schedule: '元数据调度管理',

// jobs
jobs_list: '规则作业列表',
Expand All @@ -102,6 +103,9 @@ export default {
jobs_task_table_name: '表名',
jobs_task_column_name: '列名',
jobs_task_metric_type: '规则类型',
jobs_task_execute_host: '执行服务器地址',
jobs_task_schedule_time: '调度时间',
jobs_task_submit_time: '提交时间',
jobs_task_start_time: '开始时间',
jobs_task_end_time: '结束时间',
jobs_task_stop_btn: '停止',
Expand Down
Loading

0 comments on commit 92db8d3

Please sign in to comment.