Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(spark): Support local filesystem in spark library for graphar-info temporarily #561

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ import org.yaml.snakeyaml.constructor.Constructor
import scala.beans.BeanProperty
import org.yaml.snakeyaml.LoaderOptions

import java.io.InputStream
import java.nio.file.{Files, Paths}

/** Edge info is a class to store the edge meta information. */
class EdgeInfo() {
@BeanProperty var src_label: String = ""
Expand Down Expand Up @@ -629,11 +632,20 @@ class EdgeInfo() {
object EdgeInfo {

/** Load a yaml file from path and construct a EdgeInfo from it. */
def loadEdgeInfo(edgeInfoPath: String, spark: SparkSession): EdgeInfo = {
val path = new Path(edgeInfoPath)
val fs = path.getFileSystem(spark.sparkContext.hadoopConfiguration)
val input = fs.open(path)
def loadEdgeInfo(
edgeInfoPath: String,
spark: Option[SparkSession]
): EdgeInfo = {
val inputStream: InputStream = spark match {
case Some(s) =>
val path = new Path(edgeInfoPath)
val fs = path.getFileSystem(s.sparkContext.hadoopConfiguration)
fs.open(path)
case None =>
Files.newInputStream(Paths.get(edgeInfoPath))
}
val yaml = new Yaml(new Constructor(classOf[EdgeInfo], new LoaderOptions()))
return yaml.load(input).asInstanceOf[EdgeInfo]
try yaml.loadAs(inputStream, classOf[EdgeInfo])
finally inputStream.close()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ import org.yaml.snakeyaml.constructor.Constructor
import scala.beans.BeanProperty
import org.yaml.snakeyaml.LoaderOptions

import java.io.InputStream
import java.nio.file.{Files, Paths}

/** Main data type in gar enumeration */
object GarType extends Enumeration {
type GarType = Value
Expand Down Expand Up @@ -374,14 +377,24 @@ class GraphInfo() {
object GraphInfo {

/** Load a yaml file from path and construct a GraphInfo from it. */
def loadGraphInfo(graphInfoPath: String, spark: SparkSession): GraphInfo = {
val path = new Path(graphInfoPath)
val fs = path.getFileSystem(spark.sparkContext.hadoopConfiguration)
val input = fs.open(path)
def loadGraphInfo(
graphInfoPath: String,
spark: Option[SparkSession] = None
): GraphInfo = {
val inputStream: InputStream = spark match {
case Some(s) =>
val path = new Path(graphInfoPath)
val fs = path.getFileSystem(s.sparkContext.hadoopConfiguration)
fs.open(path)
case None =>
Files.newInputStream(Paths.get(graphInfoPath))
}
val yaml = new Yaml(
new Constructor(classOf[GraphInfo], new LoaderOptions())
)
val graph_info = yaml.load(input).asInstanceOf[GraphInfo]
val graph_info =
try yaml.loadAs(inputStream, classOf[GraphInfo])
finally inputStream.close()
if (graph_info.getPrefix == "") {
val pos = graphInfoPath.lastIndexOf('/')
if (pos != -1) {
Expand All @@ -407,6 +420,6 @@ object GraphInfo {
val edge_info = EdgeInfo.loadEdgeInfo(path, spark)
graph_info.addEdgeInfo(edge_info)
}
return graph_info
graph_info
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,16 @@
package org.apache.graphar

import org.apache.hadoop.fs.Path
import org.apache.spark.sql.{SparkSession}
import org.yaml.snakeyaml.{Yaml, DumperOptions}
import org.apache.spark.sql.SparkSession
import org.yaml.snakeyaml.{DumperOptions, Yaml}
import org.yaml.snakeyaml.constructor.Constructor

import scala.beans.BeanProperty
import org.yaml.snakeyaml.LoaderOptions

import java.io.InputStream
import java.nio.file.{Files, Paths}

/** VertexInfo is a class to store the vertex meta information. */
class VertexInfo() {
@BeanProperty var label: String = ""
Expand Down Expand Up @@ -311,14 +315,20 @@ object VertexInfo {
/** Load a yaml file from path and construct a VertexInfo from it. */
def loadVertexInfo(
vertexInfoPath: String,
spark: SparkSession
spark: Option[SparkSession] = None
): VertexInfo = {
val path = new Path(vertexInfoPath)
val fs = path.getFileSystem(spark.sparkContext.hadoopConfiguration)
val input = fs.open(path)
val inputStream: InputStream = spark match {
case Some(s) =>
val path = new Path(vertexInfoPath)
val fs = path.getFileSystem(s.sparkContext.hadoopConfiguration)
fs.open(path)
case None =>
Files.newInputStream(Paths.get(vertexInfoPath))
}
val yaml = new Yaml(
new Constructor(classOf[VertexInfo], new LoaderOptions())
)
return yaml.load(input).asInstanceOf[VertexInfo]
try yaml.loadAs(inputStream, classOf[VertexInfo])
finally inputStream.close()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ object GraphAr2Nebula {

// path to the graph information file
val graphInfoPath: String = args(0)
val graphInfo = GraphInfo.loadGraphInfo(graphInfoPath, spark)
val graphInfo = GraphInfo.loadGraphInfo(graphInfoPath, Some(spark))

// The edge data need to convert src and dst to the vertex id,
// so we need to read the vertex data with index column.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ object GraphAr2Neo4j {

// path to the graph information file
val graphInfoPath: String = args(0)
val graphInfo = GraphInfo.loadGraphInfo(graphInfoPath, spark)
val graphInfo = GraphInfo.loadGraphInfo(graphInfoPath, Some(spark))

val graphData = GraphReader.read(graphInfoPath, spark)
val vertexData = graphData._1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ object GraphReader {
Map[String, DataFrame]
]] = {
// load graph info
val graph_info = GraphInfo.loadGraphInfo(graphInfoPath, spark)
val graph_info = GraphInfo.loadGraphInfo(graphInfoPath, Some(spark))

// conduct reading
readWithGraphInfo(graph_info, spark)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ object GraphTransformer {
while (dest_vertices_it.hasNext()) {
// load dest edge info
val path = dest_prefix + dest_vertices_it.next()
val dest_vertex_info = VertexInfo.loadVertexInfo(path, spark)
val dest_vertex_info = VertexInfo.loadVertexInfo(path, Some(spark))
// load source vertex info
val label = dest_vertex_info.getLabel()
if (!sourceVertexInfosMap.contains(label)) {
Expand Down Expand Up @@ -114,7 +114,7 @@ object GraphTransformer {
while (dest_edges_it.hasNext()) {
// load dest edge info
val path = dest_prefix + dest_edges_it.next()
val dest_edge_info = EdgeInfo.loadEdgeInfo(path, spark)
val dest_edge_info = EdgeInfo.loadEdgeInfo(path, Some(spark))
// load source edge info
val key = dest_edge_info.getConcatKey()
if (!sourceEdgeInfosMap.contains(key)) {
Expand Down Expand Up @@ -233,10 +233,12 @@ object GraphTransformer {
spark: SparkSession
): Unit = {
// load source graph info
val source_graph_info = GraphInfo.loadGraphInfo(sourceGraphInfoPath, spark)
val source_graph_info =
GraphInfo.loadGraphInfo(sourceGraphInfoPath, Some(spark))

// load dest graph info
val dest_graph_info = GraphInfo.loadGraphInfo(destGraphInfoPath, spark)
val dest_graph_info =
GraphInfo.loadGraphInfo(destGraphInfoPath, Some(spark))

// conduct transformation
transform(source_graph_info, dest_graph_info, spark)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ class GraphWriter() {
*/
def write(graphInfoPath: String, spark: SparkSession): Unit = {
// load graph info
val graph_info = GraphInfo.loadGraphInfo(graphInfoPath, spark)
val graph_info = GraphInfo.loadGraphInfo(graphInfoPath, Some(spark))
write(graph_info, spark)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class ComputeExampleSuite extends BaseTestSuite {
// read vertex DataFrame
val prefix = testData + "/ldbc_sample/parquet/"
val vertex_yaml = prefix + "/person.vertex.yml"
val vertex_info = VertexInfo.loadVertexInfo(vertex_yaml, spark)
val vertex_info = VertexInfo.loadVertexInfo(vertex_yaml, Some(spark))

val vertex_reader = new VertexReader(prefix, vertex_info, spark)
val vertices_num = vertex_reader.readVerticesNumber()
Expand All @@ -40,7 +40,7 @@ class ComputeExampleSuite extends BaseTestSuite {

// read edge DataFrame
val edge_yaml = prefix + "person_knows_person.edge.yml"
val edge_info = EdgeInfo.loadEdgeInfo(edge_yaml, spark)
val edge_info = EdgeInfo.loadEdgeInfo(edge_yaml, Some(spark))
val adj_list_type = AdjListType.ordered_by_source

val edge_reader = new EdgeReader(prefix, edge_info, adj_list_type, spark)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class GraphInfoSuite extends BaseTestSuite {
// read graph yaml
val prefix = testData + "/ldbc_sample/csv/"
val yaml_path = prefix + "ldbc_sample.graph.yml"
val graph_info = GraphInfo.loadGraphInfo(yaml_path, spark)
val graph_info = GraphInfo.loadGraphInfo(yaml_path, Some(spark))

val vertex_info = graph_info.getVertexInfo("person")
assert(vertex_info.getLabel == "person")
Expand All @@ -45,7 +45,7 @@ class GraphInfoSuite extends BaseTestSuite {

test("load vertex info") {
val yaml_path = testData + "/ldbc_sample/csv/person.vertex.yml"
val vertex_info = VertexInfo.loadVertexInfo(yaml_path, spark)
val vertex_info = VertexInfo.loadVertexInfo(yaml_path, Some(spark))

assert(vertex_info.getLabel == "person")
assert(vertex_info.isValidated)
Expand Down Expand Up @@ -122,7 +122,7 @@ class GraphInfoSuite extends BaseTestSuite {

test("load edge info") {
val yaml_path = testData + "/ldbc_sample/csv/person_knows_person.edge.yml"
val edge_info = EdgeInfo.loadEdgeInfo(yaml_path, spark)
val edge_info = EdgeInfo.loadEdgeInfo(yaml_path, Some(spark))

assert(edge_info.getSrc_label == "person")
assert(edge_info.getDst_label == "person")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class TestGraphReaderSuite extends BaseTestSuite {
test("read graphs by graph infos") {
// load graph info
val path = testData + "/ldbc_sample/parquet/ldbc_sample.graph.yml"
val graph_info = GraphInfo.loadGraphInfo(path, spark)
val graph_info = GraphInfo.loadGraphInfo(path, Some(spark))

// conduct reading
val vertex_edge_df_pair = GraphReader.readWithGraphInfo(graph_info, spark)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class TestGraphTransformerSuite extends BaseTestSuite {
val dest_path = testData + "/transformer/ldbc_sample.graph.yml"
GraphTransformer.transform(source_path, dest_path, spark)

val dest_graph_info = GraphInfo.loadGraphInfo(dest_path, spark)
val dest_graph_info = GraphInfo.loadGraphInfo(dest_path, Some(spark))
val prefix = dest_graph_info.getPrefix
val fs = FileSystem.get(
new Path(prefix).toUri(),
Expand Down Expand Up @@ -63,11 +63,11 @@ class TestGraphTransformerSuite extends BaseTestSuite {
test("transform graphs by graph infos") {
// load source graph info
val source_path = testData + "/ldbc_sample/parquet/ldbc_sample.graph.yml"
val source_graph_info = GraphInfo.loadGraphInfo(source_path, spark)
val source_graph_info = GraphInfo.loadGraphInfo(source_path, Some(spark))

// load dest graph info
val dest_path = testData + "/transformer/ldbc_sample.graph.yml"
val dest_graph_info = GraphInfo.loadGraphInfo(dest_path, spark)
val dest_graph_info = GraphInfo.loadGraphInfo(dest_path, Some(spark))

// conduct transformation
GraphTransformer.transform(source_graph_info, dest_graph_info, spark)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ class ReaderSuite extends BaseTestSuite {
// construct the vertex information
val prefix = testData + "/ldbc_sample/parquet/"
val vertex_yaml = prefix + "person.vertex.yml"
val vertex_info = VertexInfo.loadVertexInfo(vertex_yaml, spark)
val vertex_info = VertexInfo.loadVertexInfo(vertex_yaml, Some(spark))

// construct the vertex reader
val reader = new VertexReader(prefix, vertex_info, spark)
Expand Down Expand Up @@ -195,7 +195,7 @@ class ReaderSuite extends BaseTestSuite {
// construct the vertex information
val prefix = testData + "/ldbc_sample/json/"
val vertex_yaml = prefix + "Person.vertex.yml"
val vertex_info = VertexInfo.loadVertexInfo(vertex_yaml, spark)
val vertex_info = VertexInfo.loadVertexInfo(vertex_yaml, Some(spark))

// construct the vertex reader
val reader = new VertexReader(prefix, vertex_info, spark)
Expand All @@ -214,7 +214,7 @@ class ReaderSuite extends BaseTestSuite {
// construct the edge information
val prefix = testData + "/ldbc_sample/csv/"
val edge_yaml = prefix + "person_knows_person.edge.yml"
val edge_info = EdgeInfo.loadEdgeInfo(edge_yaml, spark)
val edge_info = EdgeInfo.loadEdgeInfo(edge_yaml, Some(spark))

// construct the edge reader
val adj_list_type = AdjListType.ordered_by_source
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class WriterSuite extends BaseTestSuite {

// read vertex yaml
val vertex_yaml_path = testData + "/ldbc_sample/parquet/person.vertex.yml"
val vertex_info = VertexInfo.loadVertexInfo(vertex_yaml_path, spark)
val vertex_info = VertexInfo.loadVertexInfo(vertex_yaml_path, Some(spark))

// generate vertex index column for vertex DataFrame
val vertex_df_with_index =
Expand Down Expand Up @@ -97,7 +97,7 @@ class WriterSuite extends BaseTestSuite {
// read edge yaml
val edge_yaml_path =
testData + "/ldbc_sample/csv/person_knows_person.edge.yml"
val edge_info = EdgeInfo.loadEdgeInfo(edge_yaml_path, spark)
val edge_info = EdgeInfo.loadEdgeInfo(edge_yaml_path, Some(spark))
val adj_list_type = AdjListType.ordered_by_source

// generate vertex index for edge DataFrame
Expand Down Expand Up @@ -230,12 +230,12 @@ class WriterSuite extends BaseTestSuite {

// read vertex yaml
val vertex_yaml_path = testData + "/ldbc_sample/csv/person.vertex.yml"
val vertex_info = VertexInfo.loadVertexInfo(vertex_yaml_path, spark)
val vertex_info = VertexInfo.loadVertexInfo(vertex_yaml_path, Some(spark))

// read edge yaml
val edge_yaml_path =
testData + "/ldbc_sample/csv/person_knows_person.edge.yml"
val edge_info = EdgeInfo.loadEdgeInfo(edge_yaml_path, spark)
val edge_info = EdgeInfo.loadEdgeInfo(edge_yaml_path, Some(spark))
val vertex_chunk_size = edge_info.getSrc_chunk_size()
val vertex_chunk_num =
(vertex_num + vertex_chunk_size - 1) / vertex_chunk_size
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class TransformExampleSuite extends BaseTestSuite {
// read from orc files
val prefix = testData + "/ldbc_sample/orc/"
val vertex_yaml = prefix + "person.vertex.yml"
val vertex_info = VertexInfo.loadVertexInfo(vertex_yaml, spark)
val vertex_info = VertexInfo.loadVertexInfo(vertex_yaml, Some(spark))

val reader = new VertexReader(prefix, vertex_info, spark)
val vertices_num = reader.readVerticesNumber()
Expand All @@ -41,7 +41,7 @@ class TransformExampleSuite extends BaseTestSuite {
val output_prefix: String = "/tmp/example/"
val output_vertex_yaml = testData + "/ldbc_sample/parquet/person.vertex.yml"
val output_vertex_info =
VertexInfo.loadVertexInfo(output_vertex_yaml, spark)
VertexInfo.loadVertexInfo(output_vertex_yaml, Some(spark))

val writer =
new VertexWriter(output_prefix, output_vertex_info, vertex_df_with_index)
Expand All @@ -62,13 +62,13 @@ class TransformExampleSuite extends BaseTestSuite {
val prefix = testData + "/ldbc_sample/parquet/"
// get vertex num
val vertex_yaml = prefix + "person.vertex.yml"
val vertex_info = VertexInfo.loadVertexInfo(vertex_yaml, spark)
val vertex_info = VertexInfo.loadVertexInfo(vertex_yaml, Some(spark))
// construct the vertex reader
val vreader = new VertexReader(prefix, vertex_info, spark)
val vertexNum = vreader.readVerticesNumber()
// read edges of unordered_by_source type
val edge_yaml = prefix + "person_knows_person.edge.yml"
val edge_info = EdgeInfo.loadEdgeInfo(edge_yaml, spark)
val edge_info = EdgeInfo.loadEdgeInfo(edge_yaml, Some(spark))

val adj_list_type = AdjListType.unordered_by_source
val reader = new EdgeReader(prefix, edge_info, adj_list_type, spark)
Expand Down
Loading