Skip to content

Commit

Permalink
feat: mark untagged manifests as garbage during GC and delete (#680)
Browse files Browse the repository at this point in the history
Related to #664

Signed-off-by: Xiaoxuan Wang <[email protected]>
  • Loading branch information
wangxiaoxuan273 authored Jan 23, 2024
1 parent 82cc505 commit 64fedf4
Show file tree
Hide file tree
Showing 8 changed files with 543 additions and 41 deletions.
97 changes: 76 additions & 21 deletions content/oci/oci.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"oras.land/oras-go/v2/internal/container/set"
"oras.land/oras-go/v2/internal/descriptor"
"oras.land/oras-go/v2/internal/graph"
"oras.land/oras-go/v2/internal/manifestutil"
"oras.land/oras-go/v2/internal/resolver"
"oras.land/oras-go/v2/registry"
)
Expand All @@ -57,8 +58,8 @@ type Store struct {

// AutoGC controls if the OCI store will automatically clean newly produced
// dangling (unreferenced) blobs during Delete() operation. For example the
// blobs whose manifests have been deleted. Manifests in index.json will not
// be deleted.
// blobs whose manifests have been deleted. Tagged manifests will not be
// deleted.
// - Default value: true.
AutoGC bool

Expand All @@ -76,8 +77,9 @@ type Store struct {
graph *graph.Memory

// sync ensures that most operations can be done concurrently, while Delete
// has the exclusive access to Store if a delete operation is underway. Operations
// such as Fetch, Push use sync.RLock(), while Delete uses sync.Lock().
// has the exclusive access to Store if a delete operation is underway.
// Operations such as Fetch, Push use sync.RLock(), while Delete uses
// sync.Lock().
sync sync.RWMutex
// indexLock ensures that only one go-routine is writing to the index.
indexLock sync.Mutex
Expand Down Expand Up @@ -190,9 +192,8 @@ func (s *Store) Delete(ctx context.Context, target ocispec.Descriptor) error {
}
if s.AutoGC {
for _, d := range danglings {
// do not delete existing manifests in tagResolver
_, err = s.tagResolver.Resolve(ctx, string(d.Digest))
if errors.Is(err, errdef.ErrNotFound) {
// do not delete existing tagged manifests
if !s.isTagged(d) {
deleteQueue = append(deleteQueue, d)
}
}
Expand Down Expand Up @@ -455,19 +456,6 @@ func (s *Store) writeIndexFile() error {
return os.WriteFile(s.indexPath, indexJSON, 0666)
}

// reloadIndex reloads the index and updates metadata by creating a new store.
func (s *Store) reloadIndex(ctx context.Context) error {
newStore, err := NewWithContext(ctx, s.root)
if err != nil {
return err
}
s.index = newStore.index
s.storage = newStore.storage
s.tagResolver = newStore.tagResolver
s.graph = newStore.graph
return nil
}

// GC removes garbage from Store. Unsaved index will be lost. To prevent unexpected
// loss, call SaveIndex() before GC or set AutoSaveIndex to true.
// The garbage to be cleaned are:
Expand All @@ -478,7 +466,7 @@ func (s *Store) GC(ctx context.Context) error {
defer s.sync.Unlock()

// get reachable nodes by reloading the index
err := s.reloadIndex(ctx)
err := s.gcIndex(ctx)
if err != nil {
return fmt.Errorf("unable to reload index: %w", err)
}
Expand Down Expand Up @@ -526,6 +514,73 @@ func (s *Store) GC(ctx context.Context) error {
return nil
}

// gcIndex reloads the index and updates metadata. Information of untagged blobs
// are cleaned and only tagged blobs remain.
func (s *Store) gcIndex(ctx context.Context) error {
tagResolver := resolver.NewMemory()
graph := graph.NewMemory()
tagged := set.New[digest.Digest]()

// index tagged manifests
refMap := s.tagResolver.Map()
for ref, desc := range refMap {
if ref == desc.Digest.String() {
continue
}
if err := tagResolver.Tag(ctx, deleteAnnotationRefName(desc), desc.Digest.String()); err != nil {
return err
}
if err := tagResolver.Tag(ctx, desc, ref); err != nil {
return err
}
plain := descriptor.Plain(desc)
if err := graph.IndexAll(ctx, s.storage, plain); err != nil {
return err
}
tagged.Add(desc.Digest)
}

// index referrer manifests
for ref, desc := range refMap {
if ref != desc.Digest.String() || tagged.Contains(desc.Digest) {
continue
}
// check if the referrers manifest can traverse to the existing graph
subject := &desc
for {
subject, err := manifestutil.Subject(ctx, s.storage, *subject)
if err != nil {
return err
}
if subject == nil {
break
}
if graph.Exists(*subject) {
if err := tagResolver.Tag(ctx, deleteAnnotationRefName(desc), desc.Digest.String()); err != nil {
return err
}
plain := descriptor.Plain(desc)
if err := graph.IndexAll(ctx, s.storage, plain); err != nil {
return err
}
break
}
}
}
s.tagResolver = tagResolver
s.graph = graph
return nil
}

// isTagged checks if the blob given by the descriptor is tagged.
func (s *Store) isTagged(desc ocispec.Descriptor) bool {
tagSet := s.tagResolver.TagSet(desc)
if tagSet.Contains(string(desc.Digest)) {
return len(tagSet) > 1
}
return len(tagSet) > 0
}

// unsafeStore is used to bypass lock restrictions in Delete.
type unsafeStore struct {
*Store
Expand Down
124 changes: 117 additions & 7 deletions content/oci/oci_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2900,7 +2900,7 @@ func TestStore_GC(t *testing.T) {
appendBlob(ocispec.MediaTypeImageLayer, []byte("blob")) // Blob 1
appendBlob(ocispec.MediaTypeImageLayer, []byte("dangling layer")) // Blob 2, dangling layer
generateManifest(descs[0], nil, descs[1]) // Blob 3, valid manifest
generateManifest(descs[0], &descs[3], descs[1]) // Blob 4, referrer of a valid manifest, not in index.json, should be cleaned with current implementation
generateManifest(descs[0], &descs[3], descs[1]) // Blob 4, referrer of a valid manifest
appendBlob(ocispec.MediaTypeImageLayer, []byte("dangling layer 2")) // Blob 5, dangling layer
generateArtifactManifest(descs[4]) // blob 6, dangling artifact
generateManifest(descs[0], &descs[5], descs[1]) // Blob 7, referrer of a dangling manifest
Expand All @@ -2913,6 +2913,8 @@ func TestStore_GC(t *testing.T) {
appendBlob(ocispec.MediaTypeImageLayer, []byte("garbage layer 2")) // Blob 14, garbage layer 2
generateManifest(descs[6], nil, descs[7]) // Blob 15, garbage manifest 2
generateManifest(descs[0], &descs[13], descs[1]) // Blob 16, referrer of a garbage manifest
appendBlob(ocispec.MediaTypeImageLayer, []byte("another layer")) // Blob 17, untagged manifest
generateManifest(descs[0], nil, descs[17]) // Blob 18, valid untagged manifest

// push blobs 0 - blobs 10 into s
for i := 0; i <= 10; i++ {
Expand All @@ -2922,23 +2924,31 @@ func TestStore_GC(t *testing.T) {
}
}

// remove blobs 4 - blobs 10 from index.json
for i := 4; i <= 10; i++ {
// push blobs 17 - blobs 18 into s
for i := 17; i <= 18; i++ {
err := s.Push(ctx, descs[i], bytes.NewReader(blobs[i]))
if err != nil {
t.Errorf("failed to push test content to src: %d: %v", i, err)
}
}

// remove blobs 5 - blobs 10 from index.json
for i := 5; i <= 10; i++ {
s.tagResolver.Untag(string(descs[i].Digest))
}
s.SaveIndex()

// push blobs 11 - blobs 16 into s.storage, making them garbage as their metadata
// doesn't exist in s
for i := 11; i < len(blobs); i++ {
for i := 11; i < 17; i++ {
err := s.storage.Push(ctx, descs[i], bytes.NewReader(blobs[i]))
if err != nil {
t.Errorf("failed to push test content to src: %d: %v", i, err)
}
}

// confirm that all the blobs are in the storage
for i := 11; i < len(blobs); i++ {
for i := 0; i < len(blobs); i++ {
exists, err := s.Exists(ctx, descs[i])
if err != nil {
t.Fatal(err)
Expand All @@ -2948,13 +2958,113 @@ func TestStore_GC(t *testing.T) {
}
}

// perform GC
// tag manifest blob 3
s.Tag(ctx, descs[3], "latest")

// perform double GC
if err = s.GC(ctx); err != nil {
t.Fatal(err)
}
if err = s.GC(ctx); err != nil {
t.Fatal(err)
}

// verify existence
wantExistence := []bool{true, true, false, true, false, false, false, false, false, false, false, false, false, false, false, false, false}
wantExistence := []bool{true, true, false, true, true, false, false, false,
false, false, false, false, false, false, false, false, false, false, false}
for i, wantValue := range wantExistence {
exists, err := s.Exists(ctx, descs[i])
if err != nil {
t.Fatal(err)
}
if exists != wantValue {
t.Fatalf("want existence %d to be %v, got %v", i, wantValue, exists)
}
}
}

func TestStore_GCAndDeleteOnIndex(t *testing.T) {
tempDir := t.TempDir()
s, err := New(tempDir)
if err != nil {
t.Fatal("New() error =", err)
}
ctx := context.Background()

// generate test content
var blobs [][]byte
var descs []ocispec.Descriptor
appendBlob := func(mediaType string, blob []byte) {
blobs = append(blobs, blob)
descs = append(descs, ocispec.Descriptor{
MediaType: mediaType,
Digest: digest.FromBytes(blob),
Size: int64(len(blob)),
})
}
generateManifest := func(config ocispec.Descriptor, subject *ocispec.Descriptor, layers ...ocispec.Descriptor) {
manifest := ocispec.Manifest{
Config: config,
Subject: subject,
Layers: layers,
}
manifestJSON, err := json.Marshal(manifest)
if err != nil {
t.Fatal(err)
}
appendBlob(ocispec.MediaTypeImageManifest, manifestJSON)
}
generateImageIndex := func(manifests ...ocispec.Descriptor) {
index := ocispec.Index{
Manifests: manifests,
}
indexJSON, err := json.Marshal(index)
if err != nil {
t.Fatal(err)
}
appendBlob(ocispec.MediaTypeImageIndex, indexJSON)
}

appendBlob(ocispec.MediaTypeImageConfig, []byte("config")) // Blob 0
appendBlob(ocispec.MediaTypeImageLayer, []byte("blob1")) // Blob 1
generateManifest(descs[0], nil, descs[1]) // Blob 2, manifest
appendBlob(ocispec.MediaTypeImageLayer, []byte("blob2")) // Blob 3
generateManifest(descs[0], nil, descs[3]) // Blob 4, manifest
appendBlob(ocispec.MediaTypeImageLayer, []byte("blob3")) // Blob 5
generateManifest(descs[0], nil, descs[5]) // Blob 6, manifest
appendBlob(ocispec.MediaTypeImageLayer, []byte("blob4")) // Blob 7
generateManifest(descs[0], nil, descs[7]) // Blob 8, manifest
generateImageIndex(descs[2], descs[4], descs[6], descs[8]) // blob 9, image index

// push all blobs into the store
for i := 0; i < len(blobs); i++ {
err := s.Push(ctx, descs[i], bytes.NewReader(blobs[i]))
if err != nil {
t.Errorf("failed to push test content to src: %d: %v", i, err)
}
}

// confirm that all the blobs are in the storage
for i := 0; i < len(blobs); i++ {
exists, err := s.Exists(ctx, descs[i])
if err != nil {
t.Fatal(err)
}
if !exists {
t.Fatalf("descs[%d] should exist", i)
}
}

// tag manifest blob 8
s.Tag(ctx, descs[8], "latest")

// delete the image index
if err := s.Delete(ctx, descs[9]); err != nil {
t.Fatal(err)
}

// verify existence
wantExistence := []bool{true, false, false, false, false, false, false, true, true, false}
for i, wantValue := range wantExistence {
exists, err := s.Exists(ctx, descs[i])
if err != nil {
Expand Down
13 changes: 13 additions & 0 deletions internal/graph/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,9 @@ func (m *Memory) Remove(node ocispec.Descriptor) []ocispec.Descriptor {

// DigestSet returns the set of node digest in memory.
func (m *Memory) DigestSet() set.Set[digest.Digest] {
m.lock.RLock()
defer m.lock.RUnlock()

s := set.New[digest.Digest]()
for desc := range m.nodes {
s.Add(desc.Digest)
Expand Down Expand Up @@ -186,3 +189,13 @@ func (m *Memory) index(ctx context.Context, fetcher content.Fetcher, node ocispe
}
return successors, nil
}

// Exists checks if the node exists in the graph
func (m *Memory) Exists(node ocispec.Descriptor) bool {
m.lock.RLock()
defer m.lock.RUnlock()

nodeKey := descriptor.FromOCI(node)
_, exists := m.nodes[nodeKey]
return exists
}
Loading

0 comments on commit 64fedf4

Please sign in to comment.