Skip to content

Commit a2da860

Browse files
author
Philipp Heckel
committed
Remove chunks from cache if no slices need chunk anymore
1 parent 5ebb6bb commit a2da860

File tree

8 files changed

+90
-27
lines changed

8 files changed

+90
-27
lines changed

cmd/fsdup/main.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@ func mapCommand(args []string) {
142142
flags := flag.NewFlagSet("map", flag.ExitOnError)
143143
debugFlag := flags.Bool("debug", fsdup.Debug, "Enable debug mode")
144144
storeFlag := flags.String("store", "index", "Location of the chunk store")
145+
cacheFlag := flags.String("cache", "cache", "Location of the chunk cache")
145146
targetFlag := flags.String("target", "", "Target device or file used for local caching and live migration")
146147

147148
flags.Parse(args)
@@ -161,9 +162,10 @@ func mapCommand(args []string) {
161162
exit(2, "Invalid syntax: " + string(err.Error()))
162163
}
163164

165+
cache := fsdup.NewFileChunkStore(*cacheFlag)
164166
targetFile := *targetFlag
165167

166-
if err := fsdup.Map(manifestFile, store, targetFile); err != nil {
168+
if err := fsdup.Map(manifestFile, store, cache, targetFile); err != nil {
167169
exit(2, "Cannot map drive file: " + string(err.Error()))
168170
}
169171
}

log.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package fsdup
22

33
import (
44
"fmt"
5+
"log"
56
"strings"
67
)
78

@@ -18,7 +19,7 @@ func debugf(format string, args ...interface{}) {
1819
}
1920

2021
if Debug {
21-
fmt.Printf(format, args...)
22+
log.Printf(format, args...)
2223
}
2324
}
2425

manifest.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,8 @@ func NewManifestFromFile(file string) (*manifest, error) {
6666
// Breakpoints returns a sorted list of breakpoints, useful for sequential disk traversal
6767
func (m *manifest) Offsets() []int64 {
6868
offsets := make([]int64, 0, len(m.diskMap))
69-
for breakpoint, _ := range m.diskMap {
70-
offsets = append(offsets, breakpoint)
69+
for offset, _ := range m.diskMap {
70+
offsets = append(offsets, offset)
7171
}
7272

7373
sort.Slice(offsets, func(i, j int) bool {

map.go

+40-15
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,10 @@ type manifestImage struct {
2323
offsets []int64
2424
chunks map[string]*chunk
2525
written map[int64]bool
26-
//chunkCount map[string]int64
26+
sliceCount map[string]int64
2727
}
2828

29-
func Map(manifestFile string, store ChunkStore, targetFile string) error {
29+
func Map(manifestFile string, store ChunkStore, cache ChunkStore, targetFile string) error {
3030
manifest, err := NewManifestFromFile(manifestFile)
3131
if err != nil {
3232
return err
@@ -51,7 +51,7 @@ func Map(manifestFile string, store ChunkStore, targetFile string) error {
5151

5252
debugf("Creating device %s ...\n", deviceName)
5353

54-
image := NewManifestImage(manifest, store, target)
54+
image := NewManifestImage(manifest, store, cache, target)
5555
device, err := buse.CreateDevice(deviceName, uint(manifest.Size()), image)
5656
if err != nil {
5757
return err
@@ -61,7 +61,7 @@ func Map(manifestFile string, store ChunkStore, targetFile string) error {
6161
signal.Notify(sig, os.Interrupt)
6262
go func() {
6363
if err := device.Connect(); err != nil {
64-
log.Printf("Buse device stopped with error: %s", err)
64+
debugf("Buse device stopped with error: %s", err)
6565
} else {
6666
log.Println("Buse device stopped gracefully.")
6767
}
@@ -76,16 +76,31 @@ func Map(manifestFile string, store ChunkStore, targetFile string) error {
7676
return nil
7777
}
7878

79-
func NewManifestImage(manifest *manifest, store ChunkStore, target *os.File) *manifestImage {
79+
func NewManifestImage(manifest *manifest, store ChunkStore, cache ChunkStore, target *os.File) *manifestImage {
80+
sliceCount := make(map[string]int64, 0)
81+
82+
for _, sliceOffset := range manifest.Offsets() {
83+
slice := manifest.Get(sliceOffset)
84+
if slice.checksum != nil {
85+
checksumStr := fmt.Sprintf("%x", slice.checksum)
86+
87+
if _, ok := sliceCount[checksumStr]; ok {
88+
sliceCount[checksumStr]++
89+
} else {
90+
sliceCount[checksumStr] = 1
91+
}
92+
}
93+
}
94+
8095
return &manifestImage{
81-
manifest: manifest,
82-
store: store,
83-
target: target,
84-
cache: NewFileChunkStore("cache"),
85-
offsets: manifest.Offsets(), // cache !
86-
chunks: manifest.Chunks(), // cache !
87-
written: make(map[int64]bool, 0),
88-
// chunkCount: make(map[string]int64, 0),
96+
manifest: manifest,
97+
store: store,
98+
target: target,
99+
cache: cache,
100+
offsets: manifest.Offsets(), // cache !
101+
chunks: manifest.Chunks(), // cache !
102+
written: make(map[int64]bool, 0),
103+
sliceCount: sliceCount,
89104
}
90105
}
91106

@@ -116,7 +131,7 @@ func (d *manifestImage) WriteAt(p []byte, off uint) error {
116131
return err
117132
}
118133

119-
log.Printf("WRITE offset:%d len:%d\n", off, len(p))
134+
debugf("WRITE offset:%d len:%d\n", off, len(p))
120135

121136
written, err := d.target.WriteAt(p, int64(off))
122137
if err != nil {
@@ -183,13 +198,14 @@ func (d *manifestImage) syncSlice(offset int64, slice *chunkSlice) error {
183198
debugf("Syncing diskoff %d - %d (len %d) -> checksum %x, %d to %d\n",
184199
offset, offset + length, length, slice.checksum, slice.from, slice.to)
185200

201+
checksumStr := fmt.Sprintf("%x", slice.checksum)
186202
buffer := make([]byte, chunkSizeMaxBytes) // FIXME: Make this a buffer pool
187203
read, err := d.cache.ReadAt(slice.checksum, buffer[:length], slice.from)
188204
if err != nil {
189205
debugf("Chunk %x not in cache. Retrieving full chunk ...\n", slice.checksum)
190206

191207
// Read entire chunk, store to cache
192-
chunk := d.chunks[fmt.Sprintf("%x", slice.checksum)]
208+
chunk := d.chunks[checksumStr]
193209

194210
// FIXME: This will fill up the local cache will all chunks and never delete it
195211
read, err = d.store.ReadAt(slice.checksum, buffer[:chunk.size], 0)
@@ -213,7 +229,16 @@ func (d *manifestImage) syncSlice(offset int64, slice *chunkSlice) error {
213229
return err
214230
}
215231

232+
// Accounting
216233
d.written[offset] = true
234+
d.sliceCount[checksumStr]--
235+
236+
// Remove from cache if it will never be requested again
237+
if d.sliceCount[checksumStr] == 0 {
238+
if err := d.cache.Remove(slice.checksum); err != nil {
239+
return err
240+
}
241+
}
217242

218243
return nil
219244
}

store.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package fsdup
22

33
type ChunkStore interface {
4-
Write(checksum []byte, buffer []byte) error
54
ReadAt(checksum []byte, buffer []byte, offset int64) (int, error)
5+
Write(checksum []byte, buffer []byte) error
6+
Remove(checksum []byte) error
67
}

store_ceph.go

+21-7
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,22 @@ func NewCephStore(configFile string, pool string) *cephChunkStore {
2121
}
2222
}
2323

24+
func (idx *cephChunkStore) ReadAt(checksum []byte, buffer []byte, offset int64) (int, error) {
25+
if err := idx.openPool(); err != nil {
26+
return 0, err
27+
}
28+
29+
checksumStr := fmt.Sprintf("%x", checksum)
30+
read, err := idx.ctx.Read(checksumStr, buffer, uint64(offset))
31+
if err != nil {
32+
return 0, err
33+
} else if read != len(buffer) {
34+
return 0, errors.New("cannot read full section")
35+
}
36+
37+
return read, nil
38+
}
39+
2440
func (idx *cephChunkStore) Write(checksum []byte, buffer []byte) error {
2541
if err := idx.openPool(); err != nil {
2642
return err
@@ -41,20 +57,18 @@ func (idx *cephChunkStore) Write(checksum []byte, buffer []byte) error {
4157
return nil
4258
}
4359

44-
func (idx *cephChunkStore) ReadAt(checksum []byte, buffer []byte, offset int64) (int, error) {
60+
func (idx *cephChunkStore) Remove(checksum []byte) error {
4561
if err := idx.openPool(); err != nil {
46-
return 0, err
62+
return err
4763
}
4864

4965
checksumStr := fmt.Sprintf("%x", checksum)
50-
read, err := idx.ctx.Read(checksumStr, buffer, uint64(offset))
66+
err := idx.ctx.Delete(checksumStr)
5167
if err != nil {
52-
return 0, err
53-
} else if read != len(buffer) {
54-
return 0, errors.New("cannot read full section")
68+
return err
5569
}
5670

57-
return read, nil
71+
return nil
5872
}
5973

6074
func (idx *cephChunkStore) openPool() error {

store_dummy.go

+4
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,8 @@ func (idx *dummyChunkStore) Write(checksum []byte, buffer []byte) error {
1616

1717
func (idx *dummyChunkStore) ReadAt(checksum []byte, buffer []byte, offset int64) (int, error) {
1818
return 0, errors.New("cannot read from a dummy store, dummy!")
19+
}
20+
21+
func (idx *dummyChunkStore) Remove(checksum []byte) error {
22+
return nil
1923
}

store_file.go

+16
Original file line numberDiff line numberDiff line change
@@ -66,3 +66,19 @@ func (idx *fileChunkStore) ReadAt(checksum []byte, buffer []byte, offset int64)
6666

6767
return read, nil
6868
}
69+
70+
func (idx *fileChunkStore) Remove(checksum []byte) error {
71+
checksumStr := fmt.Sprintf("%x", checksum)
72+
dir1 := fmt.Sprintf("%s/%s", idx.root, checksumStr[0:3])
73+
dir2 := fmt.Sprintf("%s/%s/%s", idx.root, checksumStr[0:3], checksumStr[3:6])
74+
file := fmt.Sprintf("%s/%s", dir2, checksumStr)
75+
76+
if err := os.Remove(file); err != nil {
77+
return err
78+
}
79+
80+
os.Remove(dir2)
81+
os.Remove(dir1)
82+
83+
return nil
84+
}

0 commit comments

Comments
 (0)