Skip to content

Commit

Permalink
[HWORKS-588] Remove inode foreign key from external feature groups (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
SirOibaf authored Jun 13, 2023
1 parent 52b9f86 commit d7126e6
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public class FeatureStoreTagsEEController implements FeatureStoreTagControllerIf
@Override
public Map<String, String> getAll(Project accessProject, Users user, Featurestore featureStore,
Featuregroup featureGroup)
throws DatasetException, MetadataException {
throws DatasetException, MetadataException, FeaturestoreException {
String path = featuregroupController.getFeatureGroupLocation(featureGroup);
return tagsController.getAll(accessProject, user, path);
}
Expand Down Expand Up @@ -122,7 +122,7 @@ public Map<String, String> getAll(Project accessProject, Users user, Featurestor
@Override
public String get(Project accessProject, Users user, Featurestore featureStore, Featuregroup featureGroup,
String name)
throws DatasetException, MetadataException, SchematizedTagException {
throws DatasetException, MetadataException, SchematizedTagException, FeaturestoreException {

String path = featuregroupController.getFeatureGroupLocation(featureGroup);
return tagsController.get(accessProject, user, path, name);
Expand Down Expand Up @@ -188,7 +188,7 @@ public String get(Project accessProject, Users user, Featurestore featureStore,
@Override
public AttachTagResult upsert(Project project, Users user, Featurestore featureStore, Featuregroup featureGroup,
String name, String value)
throws MetadataException, SchematizedTagException {
throws MetadataException, SchematizedTagException, FeaturestoreException {

String path = featuregroupController.getFeatureGroupLocation(featureGroup);
return tagsController.upsert(project, user, path, name, value);
Expand Down Expand Up @@ -254,7 +254,7 @@ public AttachTagResult upsert(Project project, Users user, Featurestore featureS
@Override
public AttachTagResult upsert(Project project, Users user, Featurestore featureStore, Featuregroup featureGroup,
Map<String, String> newTags)
throws MetadataException, SchematizedTagException {
throws MetadataException, SchematizedTagException, FeaturestoreException {

String path = featuregroupController.getFeatureGroupLocation(featureGroup);
return tagsController.upsert(project, user, path, newTags);
Expand Down Expand Up @@ -315,7 +315,7 @@ public AttachTagResult upsert(Project project, Users user, Featurestore featureS
*/
@Override
public void deleteAll(Project accessProject, Users user, Featurestore featureStore, Featuregroup featureGroup)
throws MetadataException, DatasetException {
throws MetadataException, DatasetException, FeaturestoreException {

String path = featuregroupController.getFeatureGroupLocation(featureGroup);
tagsController.deleteAll(accessProject, user, path);
Expand Down Expand Up @@ -374,7 +374,7 @@ public void deleteAll(Project accessProject, Users user, Featurestore featureSto
@Override
public void delete(Project accessProject, Users user, Featurestore featureStore, Featuregroup featureGroup,
String name)
throws MetadataException, DatasetException {
throws MetadataException, DatasetException, FeaturestoreException {

String path = featuregroupController.getFeatureGroupLocation(featureGroup);
tagsController.delete(accessProject, user, path, name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.hops.hopsworks.common.dataset.util.DatasetPath;
import io.hops.hopsworks.common.featurestore.featuregroup.FeaturegroupController;
import io.hops.hopsworks.exceptions.DatasetException;
import io.hops.hopsworks.exceptions.FeaturestoreException;
import io.hops.hopsworks.persistence.entity.dataset.DatasetType;
import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.Featuregroup;

Expand Down Expand Up @@ -54,7 +55,7 @@ public void setFeatureGroup(Featuregroup featureGroup) {
}

@Override
protected DatasetPath getDatasetPath() throws DatasetException {
protected DatasetPath getDatasetPath() throws DatasetException, FeaturestoreException {
return datasetHelper.getDatasetPath(project, featuregroupController.getFeatureGroupLocation(featureGroup),
DatasetType.FEATURESTORE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,6 @@ public Response putTag(@Context SecurityContext sc,
@ApiParam(value = "Name of the tag", required = true) @PathParam("name") String name,
@ApiParam(value = "Value to set for the tag") String value)
throws MetadataException, SchematizedTagException, DatasetException, FeaturestoreException {

Users user = jwtHelper.getUserPrincipal(sc);
AttachTagResult result = tagController.upsert(user, getDatasetPath(), name, value);
FeatureStoreTagUri tagUri = new FeatureStoreTagUri(uriInfo, featureStore.getId(), getItemType(), getItemId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -847,7 +847,7 @@ public List<String> getFeatureNames(Featuregroup featuregroup, Project project,
}


public String getFeatureGroupLocation(Featuregroup featureGroup) {
public String getFeatureGroupLocation(Featuregroup featureGroup) throws FeaturestoreException {
// Cached feature groups also have a `location` field.
// the issue is that the host is slightly different due to a configuration of Hive
// so here we resolve only the path based on the indoe
Expand All @@ -856,7 +856,7 @@ public String getFeatureGroupLocation(Featuregroup featureGroup) {
} else if (featureGroup.getFeaturegroupType() == FeaturegroupType.STREAM_FEATURE_GROUP) {
return inodeController.getPath(featureGroup.getStreamFeatureGroup().getHiveTbls().getSdId().getInode());
} else {
return inodeController.getPath(featureGroup.getOnDemandFeaturegroup().getInode());
return onDemandFeaturegroupController.getFeatureGroupLocation(featureGroup);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@
import io.hops.hopsworks.common.featurestore.storageconnectors.FeaturestoreStorageConnectorDTO;
import io.hops.hopsworks.common.hdfs.DistributedFileSystemOps;
import io.hops.hopsworks.common.hdfs.DistributedFsService;
import io.hops.hopsworks.common.hdfs.HdfsUsersController;
import io.hops.hopsworks.common.hdfs.Utils;
import io.hops.hopsworks.common.hdfs.inode.InodeController;
import io.hops.hopsworks.exceptions.FeaturestoreException;
import io.hops.hopsworks.exceptions.HopsSecurityException;
import io.hops.hopsworks.exceptions.KafkaException;
Expand All @@ -45,7 +43,6 @@
import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.ondemand.OnDemandOption;
import io.hops.hopsworks.persistence.entity.featurestore.storageconnector.FeaturestoreConnector;
import io.hops.hopsworks.persistence.entity.featurestore.storageconnector.FeaturestoreConnectorType;
import io.hops.hopsworks.persistence.entity.hdfs.inode.Inode;
import io.hops.hopsworks.persistence.entity.project.Project;
import io.hops.hopsworks.persistence.entity.user.Users;
import io.hops.hopsworks.restutils.RESTCodes;
Expand Down Expand Up @@ -82,10 +79,6 @@ public class OnDemandFeaturegroupController {
@EJB
private DistributedFsService distributedFsService;
@EJB
private HdfsUsersController hdfsUsersController;
@EJB
private InodeController inodeController;
@EJB
private OnlineFeaturegroupController onlineFeatureGroupController;
@EJB
private FeatureGroupInputValidation featureGroupInputValidation;
Expand Down Expand Up @@ -132,13 +125,14 @@ public OnDemandFeaturegroup createOnDemandFeaturegroup(Featurestore featurestore
"Data format required when specifying " + connector.getConnectorType() + " storage connectors");
}

createFile(project, user, featurestore, onDemandFeaturegroupDTO);

//Persist on-demand featuregroup
OnDemandFeaturegroup onDemandFeaturegroup = new OnDemandFeaturegroup();
onDemandFeaturegroup.setDescription(onDemandFeaturegroupDTO.getDescription());
onDemandFeaturegroup.setFeaturestoreConnector(connector);
onDemandFeaturegroup.setQuery(onDemandFeaturegroupDTO.getQuery());
onDemandFeaturegroup.setFeatures(convertOnDemandFeatures(onDemandFeaturegroupDTO, onDemandFeaturegroup));
onDemandFeaturegroup.setInode(createFile(project, user, featurestore, onDemandFeaturegroupDTO));
onDemandFeaturegroup.setDataFormat(onDemandFeaturegroupDTO.getDataFormat());
onDemandFeaturegroup.setPath(onDemandFeaturegroupDTO.getPath());

Expand All @@ -154,13 +148,15 @@ public OnDemandFeaturegroup createOnDemandFeaturegroup(Featurestore featurestore
}

public OnDemandFeaturegroup createSpineGroup(Featurestore featurestore,
OnDemandFeaturegroupDTO onDemandFeaturegroupDTO, Project project, Users user) throws FeaturestoreException {
OnDemandFeaturegroupDTO onDemandFeaturegroupDTO,
Project project, Users user) throws FeaturestoreException {
createFile(project, user, featurestore, onDemandFeaturegroupDTO);

OnDemandFeaturegroup onDemandFeaturegroup = new OnDemandFeaturegroup();
onDemandFeaturegroup.setDescription(onDemandFeaturegroupDTO.getDescription());
onDemandFeaturegroup.setFeatures(convertOnDemandFeatures(onDemandFeaturegroupDTO, onDemandFeaturegroup));
onDemandFeaturegroup.setSpine(onDemandFeaturegroupDTO.getSpine());
onDemandFeaturegroup.setInode(createFile(project, user, featurestore, onDemandFeaturegroupDTO));


onDemandFeaturegroupFacade.persist(onDemandFeaturegroup);
return onDemandFeaturegroup;
}
Expand Down Expand Up @@ -224,8 +220,6 @@ public void updateOnDemandFeaturegroupMetadata(Project project, Users user, Feat
}
// finally merge in database
onDemandFeaturegroupFacade.updateMetadata(onDemandFeaturegroup);


}

private void updateOnDemandFeatures(OnDemandFeaturegroup onDemandFeaturegroup,
Expand Down Expand Up @@ -276,14 +270,11 @@ public void verifySchemaUnchangedAndValid(Collection<OnDemandFeature> previousSc
*/
public void removeOnDemandFeaturegroup(Featurestore featurestore, Featuregroup featuregroup,
Project project, Users user) throws FeaturestoreException {
String username = hdfsUsersController.getHdfsUserName(project, user);
DistributedFileSystemOps udfso = null;

// this is here for old feature groups that don't have a file
onDemandFeaturegroupFacade.remove(featuregroup.getOnDemandFeaturegroup());

DistributedFileSystemOps udfso = null;
try {
udfso = distributedFsService.getDfsOps(username);
udfso = distributedFsService.getDfsOps(project, user);
udfso.rm(getFilePath(featurestore, featuregroup.getName(), featuregroup.getVersion()), false);
} catch (IOException | URISyntaxException e) {
throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.COULD_NOT_DELETE_ON_DEMAND_FEATUREGROUP,
Expand All @@ -307,25 +298,29 @@ private List<OnDemandFeature> convertOnDemandFeatures(OnDemandFeaturegroupDTO on
return features;
}

private Inode createFile(Project project, Users user, Featurestore featurestore,
private void createFile(Project project, Users user, Featurestore featurestore,
OnDemandFeaturegroupDTO onDemandFeaturegroupDTO) throws FeaturestoreException {
String username = hdfsUsersController.getHdfsUserName(project, user);

Path path = null;
DistributedFileSystemOps udfso = null;
try {
path = getFilePath(featurestore, onDemandFeaturegroupDTO.getName(), onDemandFeaturegroupDTO.getVersion());
Path path = getFilePath(featurestore, onDemandFeaturegroupDTO.getName(), onDemandFeaturegroupDTO.getVersion());

udfso = distributedFsService.getDfsOps(username);
udfso = distributedFsService.getDfsOps(project, user);
udfso.touchz(path);
} catch (IOException | URISyntaxException e) {
throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.COULD_NOT_CREATE_ON_DEMAND_FEATUREGROUP,
Level.SEVERE, "Error creating the placeholder file", e.getMessage(), e);
} finally {
distributedFsService.closeDfsClient(udfso);
}
}

return inodeController.getInodeAtPath(path.toString());
public String getFeatureGroupLocation(Featuregroup featuregroup) throws FeaturestoreException {
try {
return getFilePath(featuregroup.getFeaturestore(), featuregroup.getName(), featuregroup.getVersion()).toString();
} catch (URISyntaxException e) {
throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.COULD_NOT_GET_FEATURE_GROUP_METADATA,
Level.SEVERE, "", e.getMessage(), e);
}
}

private Path getFilePath(Featurestore featurestore, String name, Integer version) throws URISyntaxException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
public interface FeatureStoreTagControllerIface {

Map<String, String> getAll(Project project, Users user, Featurestore featureStore, Featuregroup featureGroup)
throws DatasetException, MetadataException;
throws DatasetException, MetadataException, FeaturestoreException;

Map<String, String> getAll(Project project, Users user, Featurestore featureStore, TrainingDataset trainingDataset)
throws DatasetException, MetadataException, SchematizedTagException;
Expand All @@ -42,7 +42,7 @@ Map<String, String> getAll(Project project, Users user, Featurestore featureStor
throws DatasetException, MetadataException, SchematizedTagException, FeaturestoreException;

String get(Project project, Users user, Featurestore featureStore, Featuregroup featureGroup, String name)
throws DatasetException, MetadataException, SchematizedTagException;
throws DatasetException, MetadataException, SchematizedTagException, FeaturestoreException;

String get(Project project, Users user, Featurestore featureStore, TrainingDataset trainingDataset, String name)
throws DatasetException, MetadataException, SchematizedTagException;
Expand All @@ -52,7 +52,7 @@ String get(Project project, Users user, Featurestore featureStore, FeatureView f

AttachTagResult upsert(Project project, Users user, Featurestore featureStore, Featuregroup featureGroup,
String name, String value)
throws MetadataException, SchematizedTagException;
throws MetadataException, SchematizedTagException, FeaturestoreException;

AttachTagResult upsert(Project project, Users user, Featurestore featureStore, TrainingDataset trainingDataset,
String name, String value)
Expand All @@ -64,7 +64,7 @@ AttachTagResult upsert(Project project, Users user, Featurestore featureStore, F

AttachTagResult upsert(Project project, Users user, Featurestore featureStore, Featuregroup featureGroup,
Map<String, String> tags)
throws MetadataException, SchematizedTagException;
throws MetadataException, SchematizedTagException, FeaturestoreException;

AttachTagResult upsert(Project project, Users user, Featurestore featureStore, TrainingDataset trainingDataset,
Map<String, String> tags)
Expand All @@ -75,7 +75,7 @@ AttachTagResult upsert(Project project, Users user, Featurestore featureStore, F
throws MetadataException, SchematizedTagException, FeaturestoreException;

void deleteAll(Project project, Users user, Featurestore featureStore, Featuregroup featureGroup)
throws DatasetException, MetadataException;
throws DatasetException, MetadataException, FeaturestoreException;

void deleteAll(Project project, Users user, Featurestore featureStore, TrainingDataset trainingDataset)
throws DatasetException, MetadataException, SchematizedTagException;
Expand All @@ -84,7 +84,7 @@ void deleteAll(Project project, Users user, Featurestore featureStore, FeatureVi
throws DatasetException, MetadataException, SchematizedTagException, FeaturestoreException;

void delete(Project project, Users user, Featurestore featureStore, Featuregroup featureGroup, String name)
throws DatasetException, MetadataException;
throws DatasetException, MetadataException, FeaturestoreException;

void delete(Project project, Users user, Featurestore featureStore, TrainingDataset trainingDataset, String name)
throws DatasetException, MetadataException, SchematizedTagException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package io.hops.hopsworks.persistence.entity.featurestore.featuregroup.ondemand;

import io.hops.hopsworks.persistence.entity.featurestore.storageconnector.FeaturestoreConnector;
import io.hops.hopsworks.persistence.entity.hdfs.inode.Inode;

import javax.persistence.Basic;
import javax.persistence.CascadeType;
Expand All @@ -29,11 +28,9 @@
import javax.persistence.GenerationType;
import javax.persistence.Id;
import javax.persistence.JoinColumn;
import javax.persistence.JoinColumns;
import javax.persistence.NamedQueries;
import javax.persistence.NamedQuery;
import javax.persistence.OneToMany;
import javax.persistence.OneToOne;
import javax.persistence.Table;
import javax.xml.bind.annotation.XmlRootElement;
import java.io.Serializable;
Expand Down Expand Up @@ -78,13 +75,7 @@ public class OnDemandFeaturegroup implements Serializable {

@Column(name = "spine")
private boolean spine;
@JoinColumns({
@JoinColumn(name = "inode_pid", referencedColumnName = "parent_id"),
@JoinColumn(name = "inode_name", referencedColumnName = "name"),
@JoinColumn(name = "partition_id", referencedColumnName = "partition_id")})
@OneToOne(optional = false)
private Inode inode;


public static long getSerialVersionUID() {
return serialVersionUID;
}
Expand Down Expand Up @@ -125,14 +116,6 @@ public Integer getId() {
return id;
}

public Inode getInode() {
return inode;
}

public void setInode(Inode inode) {
this.inode = inode;
}

public OnDemandDataFormat getDataFormat() {
return dataFormat;
}
Expand Down

0 comments on commit d7126e6

Please sign in to comment.