Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

Lens-317 #18

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.springframework.shell.core.annotation.CliOption;
import org.springframework.stereotype.Component;

import lombok.NonNull;

/**
* The Class LensDatabaseCommands.
*/
Expand Down Expand Up @@ -119,4 +121,18 @@ public APIResult doUpdate(String name, String path) {
protected APIResult doDelete(String name, boolean cascade) {
return getClient().dropDatabase(name, cascade);
}

/**
* Adds the jar.
*
* @param path the path
* @return the string
*/
@CliCommand(value = "add dbjar", help = "Add jar resource to the db")
public String addDBJar(
@CliOption(key = {"", "path"}, mandatory = true, help = "<path-to-jar-on-HDFS> / <path-to-jar-on-local>")
@NonNull String path) {
APIResult result = getClient().addDBJarResource(path);
return result.getMessage();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -612,4 +612,8 @@ public List<String> listResources(String type) {
public Response getLogs(String logFile) {
return this.connection.getLogs(logFile);
}

public APIResult addDBJarResource(String path) {
return this.connection.addResourceToDB("jar", path);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.lens.client;

import java.io.File;
import java.net.ConnectException;
import java.util.Iterator;
import java.util.List;
Expand All @@ -37,10 +38,8 @@
import org.apache.lens.api.util.MoxyJsonConfigurationContextResolver;
import org.apache.lens.client.exceptions.LensClientServerConnectionException;

import org.glassfish.jersey.media.multipart.FormDataBodyPart;
import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
import org.glassfish.jersey.media.multipart.FormDataMultiPart;
import org.glassfish.jersey.media.multipart.MultiPartFeature;
import org.glassfish.jersey.media.multipart.*;
import org.glassfish.jersey.media.multipart.file.FileDataBodyPart;
import org.glassfish.jersey.moxy.json.MoxyJsonFeature;

import lombok.Getter;
Expand Down Expand Up @@ -327,4 +326,40 @@ public String toString() {
sb.append('}');
return sb.toString();
}

/**
* Adds the resource to current DB.
*
* @param type the type
* @param resourcePath the resource path
* @return the API result
*/
public APIResult addResourceToDB(String type, String resourcePath) {
WebTarget target = getMetastoreWebTarget();
FormDataMultiPart mp = new FormDataMultiPart();
mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("type").build(), type));

File file = new File(resourcePath);
log.debug("uploading file path : " + file.getAbsolutePath() + "|size = " + file.length());
final FormDataContentDisposition dispo = FormDataContentDisposition//
.name("file")//
.fileName("db_0.jar")// temp dummy name
.size(file.length())//
.build();

FileDataBodyPart filePart = new FileDataBodyPart("file", file);
filePart.setContentDisposition(dispo);
mp.bodyPart(filePart);

MultiPart multiPart = new MultiPart();
multiPart.setMediaType(MediaType.MULTIPART_FORM_DATA_TYPE);


APIResult result = target.path("databases/jar").queryParam("sessionid", this.sessionHandle).request()
.post(Entity.entity(mp, multiPart.getMediaType()), APIResult.class);

log.debug(result.getStatus() + " - " + result);

return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.lens.server.api.metastore;

import java.io.InputStream;
import java.util.Date;
import java.util.List;

Expand Down Expand Up @@ -570,4 +571,6 @@ void updatePartition(LensSessionHandle sessionid, String tblName, String storage

void updatePartitions(LensSessionHandle sessionid, String tblName, String storageName,
XPartitionList partitions) throws LensException;

void addDBJar(LensSessionHandle sessionid, String type, InputStream is) throws LensException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static org.apache.lens.server.metastore.JAXBUtils.*;

import java.io.*;
import java.util.*;

import javax.ws.rs.BadRequestException;
Expand All @@ -37,14 +38,17 @@
import org.apache.lens.server.api.metastore.CubeMetastoreService;
import org.apache.lens.server.session.LensSessionImpl;

import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.*;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hive.service.cli.CLIService;

import com.google.common.collect.Lists;
Expand Down Expand Up @@ -823,6 +827,109 @@ public void updatePartitions(LensSessionHandle sessionid, String tblName, String
}
}

@Override
public void addDBJar(LensSessionHandle sessionid, String type, InputStream is) throws LensException {
// Read list of databases in
FileSystem serverFs = null;
FileSystem jarOrderFs = null;
FSDataOutputStream fos = null;
try {
acquire(sessionid);
String currentDB = SessionState.get().getCurrentDatabase();

String baseDir =
getHiveConf().get(LensConfConstants.DATABASE_RESOURCE_DIR, LensConfConstants.DEFAULT_DATABASE_RESOURCE_DIR);

String dbDir = baseDir + File.separator + currentDB;
log.info("Database specific resources at {}", dbDir);


Path resTopDirPath = new Path(dbDir);
serverFs = FileSystem.newInstance(resTopDirPath.toUri(), getHiveConf());
if (!serverFs.exists(resTopDirPath)) {
log.warn("Database resource location does not exist. Database jar can't be uploaded", dbDir);
throw new LensException("Database resource location does not exist. Database jar can't be uploaded");
}

Path resJarOrderPath = new Path(dbDir, "jar_order");
jarOrderFs = FileSystem.newInstance(resJarOrderPath.toUri(), getHiveConf());
if (jarOrderFs.exists(resJarOrderPath)) {
log.warn("Database jar_order file exist - {}. Database jar can't be uploaded", resJarOrderPath);
throw new LensException("Database jar_order file exist. Database jar can't be uploaded");
}

String tempFileName = currentDB + "_uploading.jar";

Path uploadingPath = new Path(dbDir, tempFileName);
FileSystem uploadingFs = FileSystem.newInstance(uploadingPath.toUri(), getHiveConf());
if (uploadingFs.exists(uploadingPath)) {
log.warn("Already uploading a file - {}. This Database jar can't be uploaded. Try later!", uploadingPath);
throw new LensException("Database jar file upload in progress . Database jar can't be uploaded. Try later!");
}

int lastIndex = 0;

Path dbFolderPath = new Path(baseDir, currentDB);
FileStatus[] existingFiles = serverFs.listStatus(dbFolderPath);
for (FileStatus fs : existingFiles) {
String fPath = fs.getPath().getName();
String[] tokens = fPath.split("_");

if (tokens.length > 1) {
int fIndex = Integer.parseInt(tokens[tokens.length - 1].substring(0, 1));
if (fIndex > lastIndex) {
lastIndex = fIndex;
}
}
}

int newIndex = lastIndex + 1;


Path resJarPath = new Path(baseDir, currentDB + File.separator + tempFileName);
log.info("new jar name : " + resJarPath.getName());
fos = serverFs.create(resJarPath);
IOUtils.copy(is, fos);
fos.flush();

Path renamePath = new Path(baseDir, currentDB + File.separator + currentDB + "_" + newIndex + ".jar");
serverFs.rename(resJarPath, renamePath);


} catch (FileNotFoundException e) {
log.error("FileNotFoundException", e);
throw new LensException(e);
} catch (IOException e) {
log.error("IOException", e);
throw new LensException(e);
} finally {
if (fos != null) {
try {
fos.close();
} catch (IOException e) {
log.error("Error closing file system instance fos", e);
}
}

if (serverFs != null) {
try {
serverFs.close();
} catch (IOException e) {
log.error("Error closing file system instance serverFs", e);
}
}

if (jarOrderFs != null) {
try {
jarOrderFs.close();
} catch (IOException e) {
log.error("Error closing file system instance jarOrderFs", e);
}
}
release(sessionid);
}
}

@Override
public int addPartitionsToDimStorage(LensSessionHandle sessionid,
String dimTblName, String storageName, XPartitionList partitions) throws LensException {
Expand Down Expand Up @@ -1467,4 +1574,5 @@ public HealthStatus getHealthStatus() {
: new HealthStatus(isHealthy, details.toString());
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static org.apache.lens.api.APIResult.*;

import java.io.*;
import java.util.List;

import javax.ws.rs.*;
Expand All @@ -40,6 +41,8 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hive.ql.metadata.HiveException;

import org.glassfish.jersey.media.multipart.FormDataParam;

import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -1240,7 +1243,7 @@ public StringList getAllDims(@QueryParam("sessionid") LensSessionHandle sessioni
@POST
@Path("/dimtables")
public APIResult createDimensionTable(@QueryParam("sessionid") LensSessionHandle sessionid,
XDimensionTable dimensionTable) {
XDimensionTable dimensionTable) {
checkSessionId(sessionid);
try {
getSvc().createDimensionTable(sessionid, dimensionTable);
Expand Down Expand Up @@ -1540,7 +1543,7 @@ public APIResult addPartitionToDimStorage(@QueryParam("sessionid") LensSessionHa
@Path("/dimtables/{dimTableName}/storages/{storage}/partition")
public APIResult updatePartitionOfDimStorage(@QueryParam("sessionid") LensSessionHandle sessionid,
@PathParam("dimTableName") String dimTableName,
@PathParam("storage") String storage,
@PathParam("storage") String storage,
XPartition partition) {
checkSessionId(sessionid);
checkNonNullArgs("Partition is null", partition);
Expand Down Expand Up @@ -1700,4 +1703,36 @@ public StringList getPartitionTimelines(@QueryParam("sessionid") LensSessionHand
throw exc;
}
}

/**
* Add a resource to the current DB
* <p></p>
* <p>
* The returned @{link APIResult} will have status SUCCEEDED <em>only if</em> the add operation was successful for all
* services running in this Lens server.
* </p>
*
* @param sessionid session handle object
* @param type The type of resource. Valid types are 'jar'
* @param fileInputStream stream of the resource. Local or HDFS path
* @return {@link APIResult} with state {@link Status#SUCCEEDED}, if add was successful. {@link APIResult} with state
* {@link Status#PARTIAL}, if add succeeded only for some services. {@link APIResult} with state
* {@link Status#FAILED}, if add has failed
*/
@POST
@Path("databases/jar")
@Consumes({MediaType.MULTIPART_FORM_DATA})
@Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML, MediaType.TEXT_PLAIN})
public APIResult addDBResource(@QueryParam("sessionid") LensSessionHandle sessionid,
@FormDataParam("type") String type, @FormDataParam("file") InputStream fileInputStream) {

try {
getSvc().addDBJar(sessionid, type, fileInputStream);
} catch (LensException e) {
log.error("Error in adding resource to db", e);
return new APIResult(Status.FAILED, e.getMessage());
}
return new APIResult(Status.SUCCEEDED, "Add resource succeeded");
}

}
Loading