Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: mark untagged manifests as garbage during GC and delete #680

Merged
merged 16 commits into from
Jan 23, 2024
97 changes: 76 additions & 21 deletions content/oci/oci.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
"oras.land/oras-go/v2/internal/container/set"
"oras.land/oras-go/v2/internal/descriptor"
"oras.land/oras-go/v2/internal/graph"
"oras.land/oras-go/v2/internal/manifestutil"
"oras.land/oras-go/v2/internal/resolver"
"oras.land/oras-go/v2/registry"
)
Expand All @@ -57,8 +58,8 @@

// AutoGC controls if the OCI store will automatically clean newly produced
// dangling (unreferenced) blobs during Delete() operation. For example the
// blobs whose manifests have been deleted. Manifests in index.json will not
// be deleted.
// blobs whose manifests have been deleted. Tagged manifests will not be
Wwwsylvia marked this conversation as resolved.
Show resolved Hide resolved
// deleted.
// - Default value: true.
AutoGC bool

Expand All @@ -76,8 +77,9 @@
graph *graph.Memory

// sync ensures that most operations can be done concurrently, while Delete
// has the exclusive access to Store if a delete operation is underway. Operations
// such as Fetch, Push use sync.RLock(), while Delete uses sync.Lock().
// has the exclusive access to Store if a delete operation is underway.
// Operations such as Fetch, Push use sync.RLock(), while Delete uses
// sync.Lock().
sync sync.RWMutex
// indexLock ensures that only one go-routine is writing to the index.
indexLock sync.Mutex
Expand Down Expand Up @@ -190,9 +192,8 @@
}
if s.AutoGC {
for _, d := range danglings {
// do not delete existing manifests in tagResolver
_, err = s.tagResolver.Resolve(ctx, string(d.Digest))
if errors.Is(err, errdef.ErrNotFound) {
// do not delete existing tagged manifests
if !s.isTagged(d) {
Wwwsylvia marked this conversation as resolved.
Show resolved Hide resolved
deleteQueue = append(deleteQueue, d)
}
}
Expand Down Expand Up @@ -455,19 +456,6 @@
return os.WriteFile(s.indexPath, indexJSON, 0666)
}

// reloadIndex reloads the index and updates metadata by creating a new store.
func (s *Store) reloadIndex(ctx context.Context) error {
newStore, err := NewWithContext(ctx, s.root)
if err != nil {
return err
}
s.index = newStore.index
s.storage = newStore.storage
s.tagResolver = newStore.tagResolver
s.graph = newStore.graph
return nil
}

// GC removes garbage from Store. Unsaved index will be lost. To prevent unexpected
// loss, call SaveIndex() before GC or set AutoSaveIndex to true.
// The garbage to be cleaned are:
Expand All @@ -478,7 +466,7 @@
defer s.sync.Unlock()

// get reachable nodes by reloading the index
err := s.reloadIndex(ctx)
err := s.gcIndex(ctx)
if err != nil {
return fmt.Errorf("unable to reload index: %w", err)
}
Expand Down Expand Up @@ -526,6 +514,73 @@
return nil
}

// gcIndex reloads the index and updates metadata. Information of untagged blobs
// are cleaned and only tagged blobs remain.
func (s *Store) gcIndex(ctx context.Context) error {
tagResolver := resolver.NewMemory()
graph := graph.NewMemory()
tagged := set.New[digest.Digest]()

// index tagged manifests
refMap := s.tagResolver.Map()
for ref, desc := range refMap {
if ref == desc.Digest.String() {
Wwwsylvia marked this conversation as resolved.
Show resolved Hide resolved
continue
}
if err := tagResolver.Tag(ctx, deleteAnnotationRefName(desc), desc.Digest.String()); err != nil {
return err
}

Check warning on line 532 in content/oci/oci.go

View check run for this annotation

Codecov / codecov/patch

content/oci/oci.go#L531-L532

Added lines #L531 - L532 were not covered by tests
if err := tagResolver.Tag(ctx, desc, ref); err != nil {
return err
}

Check warning on line 535 in content/oci/oci.go

View check run for this annotation

Codecov / codecov/patch

content/oci/oci.go#L534-L535

Added lines #L534 - L535 were not covered by tests
plain := descriptor.Plain(desc)
if err := graph.IndexAll(ctx, s.storage, plain); err != nil {
return err
}

Check warning on line 539 in content/oci/oci.go

View check run for this annotation

Codecov / codecov/patch

content/oci/oci.go#L538-L539

Added lines #L538 - L539 were not covered by tests
tagged.Add(desc.Digest)
}

// index referrer manifests
for ref, desc := range refMap {
if ref != desc.Digest.String() || tagged.Contains(desc.Digest) {
continue
}
// check if the referrers manifest can traverse to the existing graph
subject := &desc
for {
subject, err := manifestutil.Subject(ctx, s.storage, *subject)
if err != nil {
return err
}

Check warning on line 554 in content/oci/oci.go

View check run for this annotation

Codecov / codecov/patch

content/oci/oci.go#L553-L554

Added lines #L553 - L554 were not covered by tests
if subject == nil {
break
}
shizhMSFT marked this conversation as resolved.
Show resolved Hide resolved
if graph.Exists(*subject) {
if err := tagResolver.Tag(ctx, deleteAnnotationRefName(desc), desc.Digest.String()); err != nil {
return err
}

Check warning on line 561 in content/oci/oci.go

View check run for this annotation

Codecov / codecov/patch

content/oci/oci.go#L560-L561

Added lines #L560 - L561 were not covered by tests
plain := descriptor.Plain(desc)
if err := graph.IndexAll(ctx, s.storage, plain); err != nil {
return err
}

Check warning on line 565 in content/oci/oci.go

View check run for this annotation

Codecov / codecov/patch

content/oci/oci.go#L564-L565

Added lines #L564 - L565 were not covered by tests
break
}
}
}
s.tagResolver = tagResolver
s.graph = graph
return nil
}

// isTagged checks if the blob given by the descriptor is tagged.
func (s *Store) isTagged(desc ocispec.Descriptor) bool {
tagSet := s.tagResolver.TagSet(desc)
if tagSet.Contains(string(desc.Digest)) {
return len(tagSet) > 1
}
return len(tagSet) > 0
}

// unsafeStore is used to bypass lock restrictions in Delete.
type unsafeStore struct {
*Store
Expand Down
124 changes: 117 additions & 7 deletions content/oci/oci_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2900,7 +2900,7 @@ func TestStore_GC(t *testing.T) {
appendBlob(ocispec.MediaTypeImageLayer, []byte("blob")) // Blob 1
appendBlob(ocispec.MediaTypeImageLayer, []byte("dangling layer")) // Blob 2, dangling layer
generateManifest(descs[0], nil, descs[1]) // Blob 3, valid manifest
generateManifest(descs[0], &descs[3], descs[1]) // Blob 4, referrer of a valid manifest, not in index.json, should be cleaned with current implementation
generateManifest(descs[0], &descs[3], descs[1]) // Blob 4, referrer of a valid manifest
appendBlob(ocispec.MediaTypeImageLayer, []byte("dangling layer 2")) // Blob 5, dangling layer
generateArtifactManifest(descs[4]) // blob 6, dangling artifact
generateManifest(descs[0], &descs[5], descs[1]) // Blob 7, referrer of a dangling manifest
Expand All @@ -2913,6 +2913,8 @@ func TestStore_GC(t *testing.T) {
appendBlob(ocispec.MediaTypeImageLayer, []byte("garbage layer 2")) // Blob 14, garbage layer 2
generateManifest(descs[6], nil, descs[7]) // Blob 15, garbage manifest 2
generateManifest(descs[0], &descs[13], descs[1]) // Blob 16, referrer of a garbage manifest
appendBlob(ocispec.MediaTypeImageLayer, []byte("another layer")) // Blob 17, untagged manifest
generateManifest(descs[0], nil, descs[17]) // Blob 18, valid untagged manifest

// push blobs 0 - blobs 10 into s
for i := 0; i <= 10; i++ {
Expand All @@ -2922,23 +2924,31 @@ func TestStore_GC(t *testing.T) {
}
}

// remove blobs 4 - blobs 10 from index.json
for i := 4; i <= 10; i++ {
// push blobs 17 - blobs 18 into s
for i := 17; i <= 18; i++ {
err := s.Push(ctx, descs[i], bytes.NewReader(blobs[i]))
if err != nil {
t.Errorf("failed to push test content to src: %d: %v", i, err)
}
}

// remove blobs 5 - blobs 10 from index.json
for i := 5; i <= 10; i++ {
s.tagResolver.Untag(string(descs[i].Digest))
}
s.SaveIndex()

// push blobs 11 - blobs 16 into s.storage, making them garbage as their metadata
// doesn't exist in s
for i := 11; i < len(blobs); i++ {
for i := 11; i < 17; i++ {
err := s.storage.Push(ctx, descs[i], bytes.NewReader(blobs[i]))
if err != nil {
t.Errorf("failed to push test content to src: %d: %v", i, err)
}
}

// confirm that all the blobs are in the storage
for i := 11; i < len(blobs); i++ {
for i := 0; i < len(blobs); i++ {
exists, err := s.Exists(ctx, descs[i])
if err != nil {
t.Fatal(err)
Expand All @@ -2948,13 +2958,113 @@ func TestStore_GC(t *testing.T) {
}
}

// perform GC
// tag manifest blob 3
s.Tag(ctx, descs[3], "latest")

// perform double GC
if err = s.GC(ctx); err != nil {
t.Fatal(err)
}
if err = s.GC(ctx); err != nil {
t.Fatal(err)
}

// verify existence
wantExistence := []bool{true, true, false, true, false, false, false, false, false, false, false, false, false, false, false, false, false}
wantExistence := []bool{true, true, false, true, true, false, false, false,
false, false, false, false, false, false, false, false, false, false, false}
for i, wantValue := range wantExistence {
exists, err := s.Exists(ctx, descs[i])
if err != nil {
t.Fatal(err)
}
if exists != wantValue {
t.Fatalf("want existence %d to be %v, got %v", i, wantValue, exists)
}
}
}

func TestStore_GCAndDeleteOnIndex(t *testing.T) {
tempDir := t.TempDir()
s, err := New(tempDir)
if err != nil {
t.Fatal("New() error =", err)
}
ctx := context.Background()

// generate test content
var blobs [][]byte
var descs []ocispec.Descriptor
appendBlob := func(mediaType string, blob []byte) {
blobs = append(blobs, blob)
descs = append(descs, ocispec.Descriptor{
MediaType: mediaType,
Digest: digest.FromBytes(blob),
Size: int64(len(blob)),
})
}
generateManifest := func(config ocispec.Descriptor, subject *ocispec.Descriptor, layers ...ocispec.Descriptor) {
manifest := ocispec.Manifest{
Config: config,
Subject: subject,
Layers: layers,
}
manifestJSON, err := json.Marshal(manifest)
if err != nil {
t.Fatal(err)
}
appendBlob(ocispec.MediaTypeImageManifest, manifestJSON)
}
generateImageIndex := func(manifests ...ocispec.Descriptor) {
index := ocispec.Index{
Manifests: manifests,
}
indexJSON, err := json.Marshal(index)
if err != nil {
t.Fatal(err)
}
appendBlob(ocispec.MediaTypeImageIndex, indexJSON)
}

appendBlob(ocispec.MediaTypeImageConfig, []byte("config")) // Blob 0
appendBlob(ocispec.MediaTypeImageLayer, []byte("blob1")) // Blob 1
generateManifest(descs[0], nil, descs[1]) // Blob 2, manifest
appendBlob(ocispec.MediaTypeImageLayer, []byte("blob2")) // Blob 3
generateManifest(descs[0], nil, descs[3]) // Blob 4, manifest
appendBlob(ocispec.MediaTypeImageLayer, []byte("blob3")) // Blob 5
generateManifest(descs[0], nil, descs[5]) // Blob 6, manifest
appendBlob(ocispec.MediaTypeImageLayer, []byte("blob4")) // Blob 7
generateManifest(descs[0], nil, descs[7]) // Blob 8, manifest
generateImageIndex(descs[2], descs[4], descs[6], descs[8]) // blob 9, image index

// push all blobs into the store
for i := 0; i < len(blobs); i++ {
err := s.Push(ctx, descs[i], bytes.NewReader(blobs[i]))
if err != nil {
t.Errorf("failed to push test content to src: %d: %v", i, err)
}
}

// confirm that all the blobs are in the storage
for i := 0; i < len(blobs); i++ {
exists, err := s.Exists(ctx, descs[i])
if err != nil {
t.Fatal(err)
}
if !exists {
t.Fatalf("descs[%d] should exist", i)
}
}

// tag manifest blob 8
s.Tag(ctx, descs[8], "latest")

// delete the image index
if err := s.Delete(ctx, descs[9]); err != nil {
t.Fatal(err)
}

// verify existence
wantExistence := []bool{true, false, false, false, false, false, false, true, true, false}
for i, wantValue := range wantExistence {
exists, err := s.Exists(ctx, descs[i])
if err != nil {
Expand Down
13 changes: 13 additions & 0 deletions internal/graph/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,9 @@ func (m *Memory) Remove(node ocispec.Descriptor) []ocispec.Descriptor {

// DigestSet returns the set of node digest in memory.
func (m *Memory) DigestSet() set.Set[digest.Digest] {
m.lock.RLock()
defer m.lock.RUnlock()

s := set.New[digest.Digest]()
for desc := range m.nodes {
s.Add(desc.Digest)
Expand Down Expand Up @@ -186,3 +189,13 @@ func (m *Memory) index(ctx context.Context, fetcher content.Fetcher, node ocispe
}
return successors, nil
}

// Exists checks if the node exists in the graph
func (m *Memory) Exists(node ocispec.Descriptor) bool {
m.lock.RLock()
defer m.lock.RUnlock()

nodeKey := descriptor.FromOCI(node)
_, exists := m.nodes[nodeKey]
Wwwsylvia marked this conversation as resolved.
Show resolved Hide resolved
return exists
}
Loading
Loading