From 5b9b8fae79cfbd8643ac83a15a2c63d25540aca3 Mon Sep 17 00:00:00 2001 From: mozhenghua Date: Wed, 6 Nov 2024 11:16:01 +0800 Subject: [PATCH] remove hudi component,and add processing for fill dataType for hive col https://github.com/datavane/tis/issues/391 --- tis-datax/pom.xml | 4 ++-- .../tis/plugin/datax/AbstractDFSReader.java | 2 +- .../plugin/datax/DataXDFSReaderWithMeta.java | 4 ++-- .../tis/hive/reader/DataXHiveReader.java | 24 ++++++++++++++++++- .../tis/hive/reader/HiveDFSLinker.java | 6 +++++ .../tis/hive/DefaultHiveMetaStore.java | 4 ++++ tis-incr/pom.xml | 2 +- 7 files changed, 39 insertions(+), 7 deletions(-) diff --git a/tis-datax/pom.xml b/tis-datax/pom.xml index fb4c92df0..cde904064 100644 --- a/tis-datax/pom.xml +++ b/tis-datax/pom.xml @@ -70,12 +70,12 @@ tis-datax-mariadb-plugin tis-datax-local-executor - + tis-datax-hdfs-plugin tis-datax-hdfs-reader-writer-plugin diff --git a/tis-datax/tis-datax-dfs-plugin/src/main/java/com/qlangtech/tis/plugin/datax/AbstractDFSReader.java b/tis-datax/tis-datax-dfs-plugin/src/main/java/com/qlangtech/tis/plugin/datax/AbstractDFSReader.java index 523e89844..4a9e88dac 100644 --- a/tis-datax/tis-datax-dfs-plugin/src/main/java/com/qlangtech/tis/plugin/datax/AbstractDFSReader.java +++ b/tis-datax/tis-datax-dfs-plugin/src/main/java/com/qlangtech/tis/plugin/datax/AbstractDFSReader.java @@ -74,7 +74,7 @@ public void startScanDependency() { * ================================================================================ */ @SubForm(desClazz = SelectedTab.class, idListGetScript = "return com.qlangtech.tis.plugin.datax.DataXDFSReaderWithMeta.getDFSFiles(filter);", atLeastOne = true) - public transient List selectedTabs; + public transient List selectedTabs; public abstract List getSelectedEntities(); diff --git a/tis-datax/tis-datax-dfs-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXDFSReaderWithMeta.java b/tis-datax/tis-datax-dfs-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXDFSReaderWithMeta.java index fa165f367..7b216be83 100644 --- a/tis-datax/tis-datax-dfs-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXDFSReaderWithMeta.java +++ b/tis-datax/tis-datax-dfs-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXDFSReaderWithMeta.java @@ -137,7 +137,7 @@ public List getTableMetadata(boolean inSink, EntityName table) t @Override public IGroupChildTaskIterator getSubTasks(Predicate filter) { - final List tabs = selectedTabs; + final List tabs = selectedTabs; final int tabsLength = tabs.size(); AtomicInteger selectedTabIndex = new AtomicInteger(0); ConcurrentHashMap> groupedInfo = new ConcurrentHashMap(); @@ -156,7 +156,7 @@ public boolean hasNext() { @Override public IDataxReaderContext next() { - SelectedTab tab = tabs.get(currentIndex); + ISelectedTab tab = tabs.get(currentIndex); ColumnMetaData.fillSelectedTabMeta(tab, (t) -> { List colsMeta = getFTPFileMetaData(EntityName.parse(t.getName()), dfs); diff --git a/tis-datax/tis-hive-flat-table-builder-plugin/src/main/java/com/qlangtech/tis/hive/reader/DataXHiveReader.java b/tis-datax/tis-hive-flat-table-builder-plugin/src/main/java/com/qlangtech/tis/hive/reader/DataXHiveReader.java index 966d3c5bb..166cac88f 100644 --- a/tis-datax/tis-hive-flat-table-builder-plugin/src/main/java/com/qlangtech/tis/hive/reader/DataXHiveReader.java +++ b/tis-datax/tis-hive-flat-table-builder-plugin/src/main/java/com/qlangtech/tis/hive/reader/DataXHiveReader.java @@ -35,6 +35,8 @@ import com.qlangtech.tis.plugin.datax.AbstractDFSReader; import com.qlangtech.tis.plugin.datax.DataXDFSReaderWithMeta; import com.qlangtech.tis.plugin.datax.SelectedTab; +import com.qlangtech.tis.plugin.datax.common.BasicDataXRdbmsReader; +import com.qlangtech.tis.plugin.datax.common.TableColsMeta; import com.qlangtech.tis.plugin.datax.format.FileFormat; import com.qlangtech.tis.plugin.ds.ColumnMetaData; import com.qlangtech.tis.plugin.ds.ISelectedTab; @@ -69,9 +71,29 @@ public static String getDftTemplate() { return IOUtils.loadResourceFromClasspath(DataXHiveReader.class, "DataXHiveReader-tpl.json"); } + private transient int preSelectedTabsHash; + @Override public List getSelectedTabs() { - return Objects.requireNonNull(this.selectedTabs, "selectedTabs can not be null").stream().collect(Collectors.toList()); + //BasicDataXRdbmsReader + Objects.requireNonNull(this.selectedTabs, "selectedTabs can not be null"); + if (this.preSelectedTabsHash == selectedTabs.hashCode()) { + return selectedTabs; + } + + try (TableColsMeta colMeta = this.getDfsLinker().getTabsMeta()) { + selectedTabs = this.selectedTabs.stream().map((tab) -> { + ColumnMetaData.fillSelectedTabMeta(tab, (t) -> { + return colMeta.get(t.getName()); + }); + return tab; + }).collect(Collectors.toList()); + this.preSelectedTabsHash = selectedTabs.hashCode(); + return selectedTabs; + } catch (Exception e) { + throw new RuntimeException(e); + } + } @Override diff --git a/tis-datax/tis-hive-flat-table-builder-plugin/src/main/java/com/qlangtech/tis/hive/reader/HiveDFSLinker.java b/tis-datax/tis-hive-flat-table-builder-plugin/src/main/java/com/qlangtech/tis/hive/reader/HiveDFSLinker.java index 2adabcade..f225deddd 100644 --- a/tis-datax/tis-hive-flat-table-builder-plugin/src/main/java/com/qlangtech/tis/hive/reader/HiveDFSLinker.java +++ b/tis-datax/tis-hive-flat-table-builder-plugin/src/main/java/com/qlangtech/tis/hive/reader/HiveDFSLinker.java @@ -35,6 +35,7 @@ import com.qlangtech.tis.plugin.annotation.FormFieldType; import com.qlangtech.tis.plugin.annotation.Validator; import com.qlangtech.tis.plugin.datax.common.BasicDataXRdbmsWriter; +import com.qlangtech.tis.plugin.datax.common.TableColsMeta; import com.qlangtech.tis.plugin.datax.format.FileFormat; import com.qlangtech.tis.plugin.datax.format.TextFormat; import com.qlangtech.tis.plugin.tdfs.IExclusiveTDFSType; @@ -86,6 +87,11 @@ public FileSystemFactory getFs() { return fileSystem; } + public TableColsMeta getTabsMeta() { + Hiveserver2DataSourceFactory dsFactory = getDataSourceFactory(); + return new TableColsMeta(dsFactory, dsFactory.dbName); + } + public Hiveserver2DataSourceFactory getDataSourceFactory() { if (StringUtils.isBlank(this.linker)) { diff --git a/tis-datax/tis-hive-plugin/src/main/java/com/qlangtech/tis/hive/DefaultHiveMetaStore.java b/tis-datax/tis-hive-plugin/src/main/java/com/qlangtech/tis/hive/DefaultHiveMetaStore.java index e40e1603c..e5999c220 100644 --- a/tis-datax/tis-hive-plugin/src/main/java/com/qlangtech/tis/hive/DefaultHiveMetaStore.java +++ b/tis-datax/tis-hive-plugin/src/main/java/com/qlangtech/tis/hive/DefaultHiveMetaStore.java @@ -87,6 +87,10 @@ public HiveTable getTable(String database, String tableName) { StorageDescriptor storageDesc = table.getSd(); +// for (FieldSchema hiveCol : storageDesc.getCols()) { +// +// } + return new HiveTable(table.getTableName()) { @Override public List getPartitionKeys() { diff --git a/tis-incr/pom.xml b/tis-incr/pom.xml index 55e244878..a1bbeb66d 100644 --- a/tis-incr/pom.xml +++ b/tis-incr/pom.xml @@ -67,7 +67,7 @@ tis-flink-cdc-mongdb-plugin - tis-sink-hudi-plugin + tis-flink-chunjun-mysql-plugin