Skip to content

Commit

Permalink
Merge pull request #23 from filedrive-team/0.4.1
Browse files Browse the repository at this point in the history
0.4.1
  • Loading branch information
beeleelee authored Jun 8, 2021
2 parents d78531e + 38eb8c8 commit b5343d7
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 26 deletions.
18 changes: 11 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
Go-graphsplit
==================
[![](https://img.shields.io/github/go-mod/go-version/filedrive-team/go-graphsplit)]()
[![](https://goreportcard.com/badge/github.com/filedrive-team/go-graphsplit)](https://goreportcard.com/report/github.com/filedrive-team/go-graphsplit)
[![](https://img.shields.io/github/license/filedrive-team/go-graphsplit)](https://github.com/filedrive-team/go-graphsplit/blob/main/LICENSE)

> A tool for splitting large dataset into graph slices fit for making deal in the Filecoin Network.

Expand All @@ -15,10 +19,10 @@ git clone https://github.com/filedrive-team/go-graphsplit.git

cd go-graphsplit

// get submodules
# get submodules
git submodule update --init --recursive

// build filecoin-ffi
# build filecoin-ffi
make ffi

make
Expand All @@ -45,17 +49,17 @@ Splitting dataset:
--parent-path=/path/to/dataset \
/path/to/dataset
```
Notes: A manifest.csv will created to save the mapping with graph slice name and the payload cid. As following:
Notes: A manifest.csv will created to save the mapping with graph slice name, the payload cid and slice inner structure. As following:
```sh
cat /path/to/car-dir/manifest.csv
payload_cid,filename
Qm...,graph-slice-name.car
payload_cid,filename,detail
Qm...,graph-slice-name.car,inner-structure-json
```
If set --calc-commp=true, two another fields would be add to manifest.csv
```sh
cat /path/to/car-dir/manifest.csv
payload_cid,filename,piece_cid,piece_size
Qm...,graph-slice-name.car,baga...,16646144
payload_cid,filename,piece_cid,piece_size,detail
Qm...,graph-slice-name.car,baga...,16646144,inner-structure-json
```

Import car file to IPFS:
Expand Down
16 changes: 8 additions & 8 deletions chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@ import (
var log = logging.Logger("graphsplit")

type GraphBuildCallback interface {
OnSuccess(node ipld.Node, graphName string)
OnSuccess(node ipld.Node, graphName, fsDetail string)
OnError(error)
}

type commPCallback struct {
carDir string
}

func (cc *commPCallback) OnSuccess(node ipld.Node, graphName string) {
func (cc *commPCallback) OnSuccess(node ipld.Node, graphName, fsDetail string) {
commpStartTime := time.Now()
carfilepath := path.Join(cc.carDir, node.Cid().String()+".car")
cpRes, err := CalcCommP(context.TODO(), carfilepath)
Expand All @@ -47,11 +47,11 @@ func (cc *commPCallback) OnSuccess(node ipld.Node, graphName string) {
}
defer f.Close()
if isCreateAction {
if _, err := f.Write([]byte("playload_cid,filename,piece_cid,piece_size\n")); err != nil {
if _, err := f.Write([]byte("playload_cid,filename,piece_cid,piece_size,detail\n")); err != nil {
log.Fatal(err)
}
}
if _, err := f.Write([]byte(fmt.Sprintf("%s,%s,%s,%d\n", node.Cid(), graphName, cpRes.Root.String(), cpRes.Size))); err != nil {
if _, err := f.Write([]byte(fmt.Sprintf("%s,%s,%s,%d,%s\n", node.Cid(), graphName, cpRes.Root.String(), cpRes.Size, fsDetail))); err != nil {
log.Fatal(err)
}
}
Expand All @@ -64,7 +64,7 @@ type csvCallback struct {
carDir string
}

func (cc *csvCallback) OnSuccess(node ipld.Node, graphName string) {
func (cc *csvCallback) OnSuccess(node ipld.Node, graphName, fsDetail string) {
// Add node inof to manifest.csv
manifestPath := path.Join(cc.carDir, "manifest.csv")
_, err := os.Stat(manifestPath)
Expand All @@ -81,11 +81,11 @@ func (cc *csvCallback) OnSuccess(node ipld.Node, graphName string) {
}
defer f.Close()
if isCreateAction {
if _, err := f.Write([]byte("playload_cid,filename\n")); err != nil {
if _, err := f.Write([]byte("playload_cid,filename,detail\n")); err != nil {
log.Fatal(err)
}
}
if _, err := f.Write([]byte(fmt.Sprintf("%s,%s\n", node.Cid(), graphName))); err != nil {
if _, err := f.Write([]byte(fmt.Sprintf("%s,%s,%s\n", node.Cid(), graphName, fsDetail))); err != nil {
log.Fatal(err)
}
}
Expand All @@ -96,7 +96,7 @@ func (cc *csvCallback) OnError(err error) {

type errCallback struct{}

func (cc *errCallback) OnSuccess(ipld.Node, string) {}
func (cc *errCallback) OnSuccess(ipld.Node, string, string) {}
func (cc *errCallback) OnError(err error) {
log.Fatal(err)
}
Expand Down
3 changes: 3 additions & 0 deletions cmd/graphsplit/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ var chunkCmd = &cli.Command{
sliceSize := c.Uint64("slice-size")
parentPath := c.String("parent-path")
carDir := c.String("car-dir")
if !graphsplit.ExistDir(carDir) {
return xerrors.Errorf("Unexpected! The path of car-dir does not exist")
}
graphName := c.String("graph-name")
if sliceSize == 0 {
return xerrors.Errorf("Unexpected! Slice size has been set as 0")
Expand Down
2 changes: 1 addition & 1 deletion restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func NodeWriteTo(nd files.Node, fpath string) error {
case files.Directory:
if !ExistDir(fpath) {
err := os.Mkdir(fpath, 0777)
if err != nil {
if err != nil && os.IsNotExist(err) {
return err
}
}
Expand Down
112 changes: 102 additions & 10 deletions utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package graphsplit

import (
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
Expand All @@ -19,6 +20,7 @@ import (
bstore "github.com/ipfs/go-ipfs-blockstore"
chunker "github.com/ipfs/go-ipfs-chunker"
offline "github.com/ipfs/go-ipfs-exchange-offline"
format "github.com/ipfs/go-ipld-format"
"github.com/ipfs/go-merkledag"
dag "github.com/ipfs/go-merkledag"
"github.com/ipfs/go-unixfs"
Expand All @@ -45,23 +47,101 @@ type Finfo struct {
SeekEnd int64
}

// file system tree node
type fsNode struct {
Name string
Hash string
Size uint64
Link []fsNode
}

type FSBuilder struct {
root *dag.ProtoNode
ds ipld.DAGService
}

func NewFSBuilder(root *dag.ProtoNode, ds ipld.DAGService) *FSBuilder {
return &FSBuilder{root, ds}
}

func (b *FSBuilder) Build() (*fsNode, error) {
fsn, err := unixfs.FSNodeFromBytes(b.root.Data())
if err != nil {
return nil, xerrors.Errorf("input dag is not a unixfs node: %s", err)
}

rootn := &fsNode{
Hash: b.root.Cid().String(),
Size: fsn.FileSize(),
Link: []fsNode{},
}
if !fsn.IsDir() {
return rootn, nil
}
for _, ln := range b.root.Links() {
fn, err := b.getNodeByLink(ln)
if err != nil {
return nil, err
}
rootn.Link = append(rootn.Link, fn)
}

return rootn, nil
}

func (b *FSBuilder) getNodeByLink(ln *format.Link) (fn fsNode, err error) {
ctx := context.Background()
fn = fsNode{
Name: ln.Name,
Hash: ln.Cid.String(),
Size: ln.Size,
}
nd, err := b.ds.Get(ctx, ln.Cid)
if err != nil {
log.Warn(err)
return
}

nnd, ok := nd.(*dag.ProtoNode)
if !ok {
err = xerrors.Errorf("failed to transformed to dag.ProtoNode")
return
}
fsn, err := unixfs.FSNodeFromBytes(nnd.Data())
if err != nil {
log.Warnf("input dag is not a unixfs node: %s", err)
return
}
if !fsn.IsDir() {
return
}
for _, ln := range nnd.Links() {
node, err := b.getNodeByLink(ln)
if err != nil {
return node, err
}
fn.Link = append(fn.Link, node)
}
return
}

func BuildIpldGraph(ctx context.Context, fileList []Finfo, graphName, parentPath, carDir string, parallel int, cb GraphBuildCallback) {
node, err := buildIpldGraph(ctx, fileList, parentPath, carDir, parallel)
node, fsDetail, err := buildIpldGraph(ctx, fileList, parentPath, carDir, parallel)
if err != nil {
//log.Fatal(err)
cb.OnError(err)
return
}
cb.OnSuccess(node, graphName)
cb.OnSuccess(node, graphName, fsDetail)
}

func buildIpldGraph(ctx context.Context, fileList []Finfo, parentPath, carDir string, parallel int) (ipld.Node, error) {
func buildIpldGraph(ctx context.Context, fileList []Finfo, parentPath, carDir string, parallel int) (ipld.Node, string, error) {
bs2 := bstore.NewBlockstore(dss.MutexWrap(datastore.NewMapDatastore()))
dagServ := merkledag.NewDAGService(blockservice.New(bs2, offline.Exchange(bs2)))

cidBuilder, err := merkledag.PrefixForCidVersion(0)
if err != nil {
return nil, err
return nil, "", err
}
fileNodeMap := make(map[string]*dag.ProtoNode)
dirNodeMap := make(map[string]*dag.ProtoNode)
Expand Down Expand Up @@ -115,8 +195,11 @@ func buildIpldGraph(ctx context.Context, fileList []Finfo, parentPath, carDir st
// log.Info(item.Path)
// log.Infof("file name: %s, file size: %d, item size: %d, seek-start:%d, seek-end:%d", item.Name, item.Info.Size(), item.SeekEnd-item.SeekStart, item.SeekStart, item.SeekEnd)
dirStr := path.Dir(item.Path)

if parentPath != "" && strings.HasPrefix(dirStr, parentPath) {
parentPath = path.Clean(parentPath)
// when parent path equal target path, and the parent path is also a file path
if parentPath == path.Clean(item.Path) {
dirStr = ""
} else if parentPath != "" && strings.HasPrefix(dirStr, parentPath) {
dirStr = dirStr[len(parentPath):]
}

Expand Down Expand Up @@ -175,7 +258,7 @@ func buildIpldGraph(ctx context.Context, fileList []Finfo, parentPath, carDir st
if isLinked(parentNode, dir) {
parentNode, err = parentNode.UpdateNodeLink(dir, dirNode)
if err != nil {
return nil, err
return nil, "", err
}
dirNodeMap[parentKey] = parentNode
} else {
Expand All @@ -197,7 +280,7 @@ func buildIpldGraph(ctx context.Context, fileList []Finfo, parentPath, carDir st
//car
carF, err := os.Create(path.Join(carDir, rootNode.Cid().String()+".car"))
if err != nil {
return nil, err
return nil, "", err
}
defer carF.Close()
selector := allSelector()
Expand All @@ -206,13 +289,22 @@ func buildIpldGraph(ctx context.Context, fileList []Finfo, parentPath, carDir st
// cario := cario.NewCarIO()
// err = cario.WriteCar(context.Background(), bs2, rootNode.Cid(), selector, carF)
if err != nil {
return nil, err
return nil, "", err
}
log.Infof("generate car file completed, time elapsed: %s", time.Now().Sub(genCarStartTime))

fsBuilder := NewFSBuilder(rootNode, dagServ)
fsNode, err := fsBuilder.Build()
if err != nil {
return nil, "", err
}
fsNodeBytes, err := json.Marshal(fsNode)
if err != nil {
return nil, "", err
}
//log.Info(dirNodeMap)
fmt.Println("++++++++++++ finished to build ipld +++++++++++++")
return rootNode, nil
return rootNode, fmt.Sprintf("%s", fsNodeBytes), nil
}

func allSelector() ipldprime.Node {
Expand Down

0 comments on commit b5343d7

Please sign in to comment.