Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix Bug]Fixed the problem that the weight component still uses the user of the JVM to access HDFS when using the proxy user to log in, resulting in no permission #124

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,8 @@ class LinkisClientExecutor extends SqlUtils with Logging{
* @param resultSet 结果集文件或结果集
* @return
*/
private def getResultSet(resultSet: String): util.List[util.Map[String, AnyRef]] = {
private def getResultSet(jobInfo: JobInfoResult): util.List[util.Map[String, AnyRef]] = {
val resultSet = VisualisUtils.getResultSetPath(jobInfo)
info(s"$umUser began to get the result of execution :$resultSet")
val rsFactory= ResultSetFactory.getInstance
val res = new util.ArrayList[util.Map[String, AnyRef]]()
Expand Down Expand Up @@ -170,11 +171,12 @@ class LinkisClientExecutor extends SqlUtils with Logging{
res
}else if(rsFactory.isResultSetPath(resultSet)){
val resPath = new FsPath(ResultHelper.getSchemaPath(resultSet))
val resultSetContent = rsFactory.getResultSetByPath(resPath)
val umUser = jobInfo.getRequestPersistTask.getUmUser
val resultSetContent = rsFactory.getResultSetByPath(resPath,umUser)
if(ResultSetFactory.TABLE_TYPE != resultSetContent.resultSetType()){
throw new VGErrorException(60014,"不支持不是表格的结果集")
}
val fs = FSFactory.getFs(resPath)
val fs = FSFactory.getFs(resPath.getFsType,umUser)
fs.init(null)
val reader =ResultSetReader.getResultSetReader(resultSetContent,fs.read(resPath))
val metaData = reader.getMetaData.asInstanceOf[TableMetaData]
Expand Down Expand Up @@ -220,7 +222,7 @@ class LinkisClientExecutor extends SqlUtils with Logging{
override def query4List(sql: String, limit: Int): util.List[util.Map[String, AnyRef]] = {
val jobExecuteResult = querySQLWithJobExecuteResult(sql, limit)
val jobInfo = linkisClient.getJobInfo(jobExecuteResult)
getResultSet(VisualisUtils.getResultSetPath(jobInfo))
getResultSet(jobInfo)
}

override def submit4Exec(sql: String, pageNo: Int, pageSize: Int, totalCount: Int, limit: Int, excludeColumns: util.Set[String]): PaginateWithExecStatus = {
Expand Down Expand Up @@ -278,10 +280,10 @@ class LinkisClientExecutor extends SqlUtils with Logging{
paginateWithQueryColumns.setStatus(jobInfoResult.getRequestPersistTask.getStatus)

val jobInfo = linkisClient.getJobInfo(jobExecuteResult)
val resultList = getResultSet(VisualisUtils.getResultSetPath(jobInfo))
val resultList = getResultSet(jobInfo)
paginateWithQueryColumns.setResultList(resultList)
paginateWithQueryColumns.setTotalCount(resultList.size())
val columns = ResultHelper.getResultType(VisualisUtils.getResultSetPath(jobInfo))
val columns = ResultHelper.getResultType(jobInfo)
paginateWithQueryColumns.setColumns(columns.map(col => new QueryColumn(col.columnName,col.dataType.typeName)).toList)
return paginateWithQueryColumns
}
Expand All @@ -290,11 +292,11 @@ class LinkisClientExecutor extends SqlUtils with Logging{
val paginateWithQueryColumns = new PaginateWithQueryColumns
val jobExecuteResult = querySQLWithJobExecuteResult(sql, limit)
val jobInfo = linkisClient.getJobInfo(jobExecuteResult)
val resultList = getResultSet(VisualisUtils.getResultSetPath(jobInfo))
val resultList = getResultSet(jobInfo)
paginateWithQueryColumns.setResultList(resultList)
paginateWithQueryColumns.setTotalCount(resultList.size())

val columns = ResultHelper.getResultType(VisualisUtils.getResultSetPath(jobInfo))
val columns = ResultHelper.getResultType(jobInfo)
paginateWithQueryColumns.setColumns(columns.map(col => new QueryColumn(col.columnName,col.dataType.typeName)).toList)
return paginateWithQueryColumns
}
Expand All @@ -318,7 +320,7 @@ class LinkisClientExecutor extends SqlUtils with Logging{
override def getColumns(sql: String): util.List[QueryColumn] = {
val jobExecuteResult = querySQLWithJobExecuteResult(sql, 2)
val jobInfo = linkisClient.getJobInfo(jobExecuteResult)
val columns = ResultHelper.getResultType(VisualisUtils.getResultSetPath(jobInfo))
val columns = ResultHelper.getResultType(jobInfo)
columns.map(col => new QueryColumn(col.columnName,col.dataType.typeName)).toList
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ class SparkEntranceExecutor extends SqlUtils with Logging{
val resultList = getResultSet(resultSets(resultSets.length - 1))
paginateWithQueryColumns.setResultList(resultList)
paginateWithQueryColumns.setTotalCount(resultList.size())
val columns = ResultHelper.getResultType(resultSets(resultSets.length - 1))
val columns = ResultHelper.getResultType(resultSets(resultSets.length - 1),user)
paginateWithQueryColumns.setColumns(columns.map(col => new QueryColumn(col.columnName,col.dataType.typeName)).toList)
}
return paginateWithQueryColumns;
Expand All @@ -361,7 +361,7 @@ class SparkEntranceExecutor extends SqlUtils with Logging{
val resultList = getResultSet(task.getResultLocation + VisualisUtils.RESULT_FILE_NAME.getValue)
paginateWithQueryColumns.setResultList(resultList)
paginateWithQueryColumns.setTotalCount(resultList.size())
val columns = ResultHelper.getResultType(task.getResultLocation + VisualisUtils.RESULT_FILE_NAME.getValue)
val columns = ResultHelper.getResultType(task.getResultLocation + VisualisUtils.RESULT_FILE_NAME.getValue,user)
paginateWithQueryColumns.setColumns(columns.map(col => new QueryColumn(col.columnName,col.dataType.typeName)).toList)
return paginateWithQueryColumns
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ import org.apache.linkis.storage.domain._
import org.apache.linkis.storage.resultset.table.TableMetaData
import org.apache.linkis.storage.resultset.{ResultSetFactory, ResultSetReader}
import com.webank.wedatasphere.dss.visualis.exception.VGErrorException
import com.webank.wedatasphere.dss.visualis.utils.VisualisUtils
import org.apache.linkis.adapt.LinkisUtils
import org.apache.linkis.common.io.FsPath
import org.apache.linkis.common.utils.Utils
import org.apache.linkis.server.BDPJettyServerHelper
import org.apache.linkis.storage.FSFactory
import org.apache.linkis.ujes.client.response.JobInfoResult
import org.json4s.DefaultFormats

/**
Expand All @@ -35,6 +37,27 @@ object ResultHelper {
}
}

@scala.throws[VGErrorException]
def getResultType(jobInfo: JobInfoResult): Array[Column] = {
getResultType(VisualisUtils.getResultSetPath(jobInfo),jobInfo.getRequestPersistTask.getUmUser)
}

@scala.throws[VGErrorException]
def getResultType(path: String,user:String): Array[Column] = {
val resPath = new FsPath(path)
val rsFactory = ResultSetFactory.getInstance
val resultSet = rsFactory.getResultSetByPath(resPath,user)
if (ResultSetFactory.TABLE_TYPE != resultSet.resultSetType()) {
throw new VGErrorException(70001, "不支持不是表格的结果集")
}
val fs = FSFactory.getFs(resPath.getFsType,user)
fs.init(null)
val reader = ResultSetReader.getResultSetReader(resultSet, fs.read(resPath))
val metaData = reader.getMetaData.asInstanceOf[TableMetaData]
Utils.tryQuietly(reader.close())
Utils.tryQuietly(fs.close())
metaData.columns
}

@scala.throws[VGErrorException]
def getResultType(path:String):Array[Column]={
Expand Down