diff --git a/tis-assemble/src/main/java/com/qlangtech/tis/fullbuild/workflow/SingleTableDump.java b/tis-assemble/src/main/java/com/qlangtech/tis/fullbuild/workflow/SingleTableDump.java deleted file mode 100644 index b2439ac12..000000000 --- a/tis-assemble/src/main/java/com/qlangtech/tis/fullbuild/workflow/SingleTableDump.java +++ /dev/null @@ -1,353 +0,0 @@ -///** -// * Licensed to the Apache Software Foundation (ASF) under one -// * or more contributor license agreements. See the NOTICE file -// * distributed with this work for additional information -// * regarding copyright ownership. The ASF licenses this file -// * to you under the Apache License, Version 2.0 (the -// * "License"); you may not use this file except in compliance -// * with the License. You may obtain a copy of the License at -// *

-// * http://www.apache.org/licenses/LICENSE-2.0 -// *

-// * Unless required by applicable law or agreed to in writing, software -// * distributed under the License is distributed on an "AS IS" BASIS, -// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// * See the License for the specific language governing permissions and -// * limitations under the License. -// */ -//package com.qlangtech.tis.fullbuild.workflow; -// -//import com.google.common.collect.Lists; -//import com.google.common.collect.Maps; -//import com.qlangtech.tis.assemble.ExecResult; -//import com.qlangtech.tis.assemble.FullbuildPhase; -//import com.qlangtech.tis.cloud.ITISCoordinator; -//import com.qlangtech.tis.exec.ExecChainContextUtils; -//import com.qlangtech.tis.exec.IExecChainContext; -//import com.qlangtech.tis.exec.impl.WorkflowDumpAndJoinInterceptor; -//import com.qlangtech.tis.fullbuild.indexbuild.DftTabPartition; -//import com.qlangtech.tis.fullbuild.indexbuild.IRemoteJobTrigger; -//import com.qlangtech.tis.fullbuild.indexbuild.RunningStatus; -//import com.qlangtech.tis.fullbuild.indexbuild.TaskContext; -//import com.qlangtech.tis.fullbuild.phasestatus.impl.DumpPhaseStatus; -//import com.qlangtech.tis.fullbuild.phasestatus.impl.DumpPhaseStatus.TableDumpStatus; -//import com.qlangtech.tis.fullbuild.taskflow.AdapterTask; -//import com.qlangtech.tis.fullbuild.taskflow.DataflowTask; -//import com.qlangtech.tis.manage.common.Config; -////import com.qlangtech.tis.offline.TableDumpFactory; -//import com.qlangtech.tis.order.center.IParamContext; -//import com.qlangtech.tis.order.dump.task.ITableDumpConstant; -//import com.qlangtech.tis.pubhook.common.FileUtils; -//import com.qlangtech.tis.sql.parser.TabPartitions; -//import com.qlangtech.tis.sql.parser.meta.DependencyNode; -//import com.qlangtech.tis.sql.parser.tuple.creator.EntityName; -//import com.qlangtech.tis.trigger.zk.AbstractWatcher; -//import com.qlangtech.tis.utils.Utils; -//import org.apache.commons.lang.StringUtils; -//import org.apache.zookeeper.Watcher; -//import org.apache.zookeeper.data.Stat; -//import org.json.JSONObject; -//import org.json.JSONTokener; -//import org.slf4j.Logger; -//import org.slf4j.LoggerFactory; -// -//import java.io.File; -//import java.time.format.DateTimeFormatter; -//import java.util.List; -//import java.util.Map; -//import java.util.Optional; -//import java.util.concurrent.CountDownLatch; -//import java.util.concurrent.TimeUnit; -// -///** -// * @author 百岁(baisui@qlangtech.com) -// * @date 2020/04/13 -// */ -//public class SingleTableDump extends DataflowTask { -// -// private static final Logger logger = LoggerFactory.getLogger(SingleTableDump.class); -// -// private final int dataSourceTableId; -// -// private String pt; -// -// private boolean forceReDump; -// -// private final TableDumpProgress tableDumpProgress; -// -// private final EntityName dumpTable; -// -// private boolean hasValidTableDump; -// -// private final IExecChainContext execChainContext; -// -// private final int taskid; -// -// public static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyyMMddHHmmss"); -// -// @Override -// public FullbuildPhase phase() { -// return FullbuildPhase.FullDump; -// } -// -// private final ITISCoordinator zkClient; -// -// private static final String TABLE_DUMP_ZK_PREFIX = "/tis/table_dump/"; -// -// private final TableDumpStatus tableDumpStatus; -// -// private final TableDumpFactory tableDumpFactory; -// -// /** -// * @param dump -// * @param hasValidTableDump 在之前的导入中是否有 -// * @param pt -// * @param zkClient -// * @param execChainContext -// * @param dumpPhaseStatus -// */ -// public SingleTableDump(DependencyNode dump, boolean hasValidTableDump, String pt, ITISCoordinator zkClient, IExecChainContext execChainContext, DumpPhaseStatus dumpPhaseStatus) { -// super(dump.getId()); -// // DumpTable.create(dump.getDbName(), dump.getName()); -// this.dumpTable = dump.parseEntityName(); -// this.tableDumpStatus = dumpPhaseStatus.getTable(String.valueOf(dumpTable)); -// this.dataSourceTableId = Integer.parseInt(dump.getTabid()); -// this.tableDumpFactory = execChainContext.getTableDumpFactory(); -// this.hasValidTableDump = hasValidTableDump; -// this.zkClient = zkClient; -// if (this.zkClient == null) { -// throw new NullPointerException("zkClient can not be null"); -// } -// this.tableDumpProgress = new TableDumpProgress(this.dataSourceTableId); -// this.tableDumpProgress.setState(ExecuteStatus.NEW); -// this.forceReDump = false; -// if (hasValidTableDump) { -// this.pt = pt; -// } else { -// this.pt = execChainContext.getPartitionTimestamp(); -// } -// this.execChainContext = execChainContext; -// this.taskid = this.execChainContext.getTaskId(); -// } -// -// @Override -// public String getIdentityName() { -// return this.dumpTable.getFullName(); -// } -// -// @Override -// public void run() throws Exception { -// DumpTableRunningStatus status = this.call(); -// if (!status.runningStatus.isSuccess()) { -// this.signTaskFaild(); -// throw new IllegalStateException("table dump faild:" + dumpTable.getFullName()); -// } else { -// logger.info(this.dumpTable.getFullName() + " dump success, pt:" + status.pt + ",dumpNodeId:" + id + ",taskid:" + this.taskid); -// this.signTaskSuccess(); -// } -// } -// -// @Override -// protected Map getTaskWorkStatus() { -// return AdapterTask.createTaskWorkStatus(this.execChainContext); -// } -// -// /** -// * description: 提交给线程池 -// */ -// // @Override -// public DumpTableRunningStatus call() { -// final CountDownLatch localTaskCountDownLatch = new CountDownLatch(1); -// if (hasDumpTaskInProgress(localTaskCountDownLatch)) { -// // 先判断是否有一个正在执行的任务 -// // 直接接到那个任务上去 -// logger.info("table dump task:" + dumpTable.getNameWithPath() + " has dump task in progress wait"); -// tableDumpProgress.setState(ExecuteStatus.IN_PROGRESS); -// try { -// // 等待另外的进程将表dump完成 -// if (!localTaskCountDownLatch.await(WorkflowDumpAndJoinInterceptor.TIME_OUT_HOURS, TimeUnit.HOURS)) { -// throw new IllegalStateException(dumpTable.getFullName() + " dump faild"); -// } -// } catch (InterruptedException e) { -// throw new RuntimeException(e); -// } -// if (tableDumpProgress.getState() == ExecuteStatus.SUCCESS) { -// return new DumpTableRunningStatus(new RunningStatus(1.0f, true, true), dumpTable, this.pt); -// } else { -// return triggerRemoteDumpJob(); -// } -// } else { -// if (forceReDump) { -// return triggerRemoteDumpJob(); -// } else { -// // 判断是否有一个有效的dump -// if (hasValidTableDump) { -// logger.info("table dump task:" + this.dumpTable.getNameWithPath() + " has a valid dump, so do not need dump again"); -// tableDumpProgress.setState(ExecuteStatus.SUCCESS); -// recordPt(); -// return new DumpTableRunningStatus(new RunningStatus(1f, true, true), dumpTable, this.pt); -// } else { -// return triggerRemoteDumpJob(); -// } -// } -// } -// } -// -// private DumpTableRunningStatus triggerRemoteDumpJob() { -// // = HeteroEnum.DS_DUMP.getPlugin(); -// ; -// Map params = Maps.newHashMap(); -// params.put(IParamContext.KEY_TASK_ID, String.valueOf(taskid)); -// params.put(ITableDumpConstant.DUMP_START_TIME, this.pt); -// -// TaskContext taskContext = TaskContext.create((key) -> params.get(key)); -// taskContext.setCoordinator(this.zkClient); -// -// IRemoteJobTrigger job = tableDumpFactory.createSingleTableDumpJob(dumpTable, taskContext); -// job.submitJob(); -// RunningStatus runningStatus = job.getRunningStatus(); -// while (!runningStatus.isComplete()) { -// try { -// Thread.sleep(3000); -// } catch (InterruptedException e) { -// } -// runningStatus = job.getRunningStatus(); -// } -// if (runningStatus.isSuccess()) { -// this.addTableDumpRecord(ExecResult.SUCCESS, pt); -// recordPt(); -// } else { -// this.addTableDumpRecord(ExecResult.FAILD, pt); -// } -// // } -// return new DumpTableRunningStatus(runningStatus, dumpTable, pt); -// } -// -// /** -// * 添加当前任务的pt -// */ -// private void recordPt() { -// // Map dateParams = ExecChainContextUtils.getDependencyTablesPartitions(execChainContext); -// TabPartitions dateParams = ExecChainContextUtils.getDependencyTablesPartitions(execChainContext); -// dateParams.putPt(this.dumpTable, new DftTabPartition(pt)); -// } -// -// /** -// * 查看是否有正在进行的dump任务 -// * -// * @return the boolean -// */ -// private boolean hasDumpTaskInProgress(final CountDownLatch localTaskCountDownLatch) { -// final String path = TABLE_DUMP_ZK_PREFIX + this.dumpTable.getDbName() + "_" + this.dumpTable.getTableName(); -// try { -// if (!zkClient.exists(path, true)) { -// // 不存在的话直接返回 -// return false; -// } -// byte[] bytes = zkClient.getData(path, new AbstractWatcher() { -// -// @Override -// protected void process(Watcher watcher) throws InterruptedException { -// // if (event.getType() == Event.EventType.NodeDeleted) { -// Thread.sleep(1000); -// if (tableDumpProgress.getState() == ExecuteStatus.IN_PROGRESS) { -// // 去数据库看看 这次任务有没有成功 -// if (isTableDumpSuccess(dumpTable, pt)) { -// tableDumpProgress.setState(ExecuteStatus.SUCCESS); -// recordPt(); -// } else { -// tableDumpProgress.setState(ExecuteStatus.FAILED); -// } -// localTaskCountDownLatch.countDown(); -// } -// // } -// } -// }, new Stat(), true); -// pt = new String(bytes); -// return true; -// } catch (Exception e) { -// // System.out.println("zookeeper path " + path + " not exists"); -// logger.warn("zookeeper path " + path + " not exists", e); -// return false; -// } -//// catch (InterruptedException e) { -//// // System.out.println( + " InterruptedException"); -//// logger.warn("zookeeper path " + path, e); -//// return false; -//// } -// } -// -// private void addTableDumpRecord(ExecResult execResult, String pt) { -// addTableDumpRecord(this.dumpTable, execResult, pt); -// } -// -// /** -// * 向tis-console中发布表dump的結果 -// */ -// private static void addTableDumpRecord(EntityName dumpTable, ExecResult execResult, String pt) { -// // JSONObject j = new JSONObject(); -// // j.put("pt", pt); -// // // String url = WorkflowDumpAndJoinInterceptor.WORKFLOW_CONFIG_URL_POST_FORMAT.format(new Object[]{"fullbuild_workflow_action", "do_add_table_dump_record"}); -// // //List postParams = new LinkedList<>(); -// // //postParams.add(new PostParam("datasource_table_id", Integer.toString(dataSourceTableId))); -// // //postParams.add(new PostParam("hive_table_name", dumpTable.getNameWithPath())); -// // postParams.add(new PostParam("state", String.valueOf(execResult.getValue()))); -// // //postParams.add(new PostParam("info", j.toString())); -// // postParams.add(new PostParam("pt", pt)); -// // 向本地文件系统中写入表执行状态 -// JSONObject json = new JSONObject(); -// json.put("state", execResult.getValue()); -// json.put("pt", pt); -// File tableDumpLog = getTableDumpLog(dumpTable); -// FileUtils.append(tableDumpLog, json.toString()); -// } -// -// private static File getTableDumpLog(EntityName dumpTable) { -// return new File(Config.getDataDir(), "tab_dump_logs/" + dumpTable.getNameWithPath()); -// } -// -// private static boolean isTableDumpSuccess(EntityName dumpTable, String pt) { -// File tableDumpLog = getTableDumpLog(dumpTable); -// if (!tableDumpLog.exists()) { -// return false; -// } -// List status = Lists.newArrayList(); -// Utils.readLastNLine(tableDumpLog, 4, (line) -> { -// JSONTokener t = new JSONTokener(line); -// JSONObject stat = new JSONObject(t); -// status.add(new DumpTableStatus(stat.getString("pt"), ExecResult.parse(stat.getInt("state")))); -// }); -// Optional match = status.stream().filter((r) -> { -// return r.execResult == ExecResult.SUCCESS && StringUtils.equals(pt, r.pt); -// }).findFirst(); -// return match.isPresent(); -// } -// -// private static class DumpTableStatus { -// -// private final String pt; -// -// private final ExecResult execResult; -// -// public DumpTableStatus(String pt, ExecResult execResult) { -// this.pt = pt; -// this.execResult = execResult; -// } -// } -// -// public static class DumpTableRunningStatus { -// -// public final RunningStatus runningStatus; -// -// public final EntityName dumpTable; -// -// public final String pt; -// -// public DumpTableRunningStatus(RunningStatus runningStatus, EntityName dumpTable, String pt) { -// super(); -// this.runningStatus = runningStatus; -// this.dumpTable = dumpTable; -// this.pt = pt; -// } -// } -//} diff --git a/tis-plugin/src/main/java/com/qlangtech/tis/datax/IDataxProcessor.java b/tis-plugin/src/main/java/com/qlangtech/tis/datax/IDataxProcessor.java index 20700ac4c..fc1028200 100644 --- a/tis-plugin/src/main/java/com/qlangtech/tis/datax/IDataxProcessor.java +++ b/tis-plugin/src/main/java/com/qlangtech/tis/datax/IDataxProcessor.java @@ -104,7 +104,8 @@ public TableAlias() { public TableAlias(String from) { this.from = from; // 如果使用oracle的表,表名中可能出现点,所以要将它去掉 - this.to = StringUtils.substringAfterLast(from, "."); + int indexOfCommon = StringUtils.indexOf(from, "."); + this.to = indexOfCommon > -1 ? StringUtils.substring(from, indexOfCommon + 1) : from; } public String getFrom() { diff --git a/tis-plugin/src/main/java/com/qlangtech/tis/extension/Descriptor.java b/tis-plugin/src/main/java/com/qlangtech/tis/extension/Descriptor.java index ce3caa340..c264a1716 100644 --- a/tis-plugin/src/main/java/com/qlangtech/tis/extension/Descriptor.java +++ b/tis-plugin/src/main/java/com/qlangtech/tis/extension/Descriptor.java @@ -909,6 +909,24 @@ private void addFieldRequiredError(IFieldErrorHandler msgHandler, Context contex msgHandler.addFieldError(context, attrKey, ValidatorCommons.MSG_EMPTY_INPUT_ERROR); } + public ParseDescribable newInstance( + String appName, // + FormData formData // + ) { + return newInstance(IPluginContext.namedContext(appName), formData.body, Optional.empty()); + } + + public static class FormData { + final HashMap body = Maps.newHashMap(); + + public JSONObject addProp(String key, String val) { + JSONObject o = new JSONObject(); + o.put(Descriptor.KEY_primaryVal, val); + body.put(key, o); + return o; + } + } + // @Override public ParseDescribable newInstance( IPluginContext pluginContext, // diff --git a/tis-plugin/src/main/java/com/qlangtech/tis/plugin/ds/BasicDataSourceFactory.java b/tis-plugin/src/main/java/com/qlangtech/tis/plugin/ds/BasicDataSourceFactory.java index 4d3765a2a..7dd60d148 100644 --- a/tis-plugin/src/main/java/com/qlangtech/tis/plugin/ds/BasicDataSourceFactory.java +++ b/tis-plugin/src/main/java/com/qlangtech/tis/plugin/ds/BasicDataSourceFactory.java @@ -34,6 +34,7 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; +import java.time.ZoneId; import java.util.ArrayList; import java.util.List; import java.util.regex.Matcher; @@ -45,6 +46,9 @@ * @create: 2021-06-06 19:48 **/ public abstract class BasicDataSourceFactory extends DataSourceFactory implements JdbcUrlBuilder { + // + public static final ZoneId DEFAULT_SERVER_TIME_ZONE = ZoneId.of("Asia/Shanghai"); + private static final Logger logger = LoggerFactory.getLogger(BasicDataSourceFactory.class); @FormField(identity = true, ordinal = 0, type = FormFieldType.INPUTTEXT, validate = {Validator.require, Validator.identity}) diff --git a/tis-plugin/src/test/java/TestAll.java b/tis-plugin/src/test/java/TestAll.java index e12e50ddb..404a9ec49 100644 --- a/tis-plugin/src/test/java/TestAll.java +++ b/tis-plugin/src/test/java/TestAll.java @@ -1,23 +1,24 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ import com.qlangtech.tis.TestTIS; import com.qlangtech.tis.datax.impl.TestDataxReader; +import com.qlangtech.tis.datax.impl.TestTableAlias; import com.qlangtech.tis.db.parser.TestDBConfigParser; import com.qlangtech.tis.db.parser.TestScannerPatterns; import com.qlangtech.tis.extension.TestDescriptor; @@ -45,6 +46,7 @@ public class TestAll extends TestCase { public static Test suite() { TestSuite suite = new TestSuite(); + suite.addTestSuite(TestTableAlias.class); suite.addTestSuite(TestScannerPatterns.class); suite.addTestSuite(TestDBConfigParser.class); suite.addTestSuite(TestValidatorCommons.class); diff --git a/tis-plugin/src/test/java/com/qlangtech/tis/datax/impl/TestTableAlias.java b/tis-plugin/src/test/java/com/qlangtech/tis/datax/impl/TestTableAlias.java new file mode 100644 index 000000000..3a0db9509 --- /dev/null +++ b/tis-plugin/src/test/java/com/qlangtech/tis/datax/impl/TestTableAlias.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.qlangtech.tis.datax.impl; + +import com.qlangtech.tis.datax.IDataxProcessor; +import junit.framework.TestCase; + +/** + * @author: 百岁(baisui@qlangtech.com) + * @create: 2022-01-13 14:24 + **/ +public class TestTableAlias extends TestCase { + + public void testTableAlias() { + String tableName = "instancedetail"; + String dbName = "order"; + + IDataxProcessor.TableAlias tableAlias = new IDataxProcessor.TableAlias(tableName); + assertEquals(tableName, tableAlias.getFrom()); + assertEquals(tableName, tableAlias.getTo()); + + tableAlias = new IDataxProcessor.TableAlias(dbName + "." + tableName); + + assertEquals(dbName + "." + tableName, tableAlias.getFrom()); + assertEquals(tableName, tableAlias.getTo()); + } +}