| 1 |
package mount |
| 2 |
|
| 3 |
import ( |
| 4 |
"fmt" |
| 5 |
"sync" |
| 6 |
|
| 7 |
"larc.wejust.rest/larc/internal/protocol" |
| 8 |
) |
| 9 |
|
| 10 |
/* BlobFetcher downloads blobs from remote server with caching. |
| 11 |
* Handles concurrent requests for the same blob efficiently. */ |
| 12 |
|
| 13 |
type BlobFetcher struct { |
| 14 |
client *protocol.Client |
| 15 |
repoName string |
| 16 |
cache *BlobCache |
| 17 |
|
| 18 |
/* in-flight request deduplication */ |
| 19 |
inflight sync.Map // hash -> *inflightReq |
| 20 |
} |
| 21 |
|
| 22 |
type inflightReq struct { |
| 23 |
done chan struct{} |
| 24 |
data []byte |
| 25 |
err error |
| 26 |
} |
| 27 |
|
| 28 |
// NewBlobFetcher creates a fetcher for the given repo |
| 29 |
func NewBlobFetcher(client *protocol.Client, repoName string, cache *BlobCache) *BlobFetcher { |
| 30 |
return &BlobFetcher{ |
| 31 |
client: client, |
| 32 |
repoName: repoName, |
| 33 |
cache: cache, |
| 34 |
} |
| 35 |
} |
| 36 |
|
| 37 |
// Fetch retrieves blob by hash, using cache or downloading from server |
| 38 |
func (f *BlobFetcher) Fetch(hash string) ([]byte, error) { |
| 39 |
/* check cache first */ |
| 40 |
if data, ok := f.cache.Get(hash); ok { |
| 41 |
return data, nil |
| 42 |
} |
| 43 |
|
| 44 |
/* check if request is already in flight */ |
| 45 |
req := &inflightReq{done: make(chan struct{})} |
| 46 |
if existing, loaded := f.inflight.LoadOrStore(hash, req); loaded { |
| 47 |
/* wait for existing request */ |
| 48 |
existingReq := existing.(*inflightReq) |
| 49 |
<-existingReq.done |
| 50 |
return existingReq.data, existingReq.err |
| 51 |
} |
| 52 |
|
| 53 |
/* we own this request, fetch from server */ |
| 54 |
defer func() { |
| 55 |
close(req.done) |
| 56 |
f.inflight.Delete(hash) |
| 57 |
}() |
| 58 |
|
| 59 |
data, err := f.client.GetBlob(f.repoName, hash) |
| 60 |
if err != nil { |
| 61 |
req.err = fmt.Errorf("fetch blob %s: %w", hash, err) |
| 62 |
return nil, req.err |
| 63 |
} |
| 64 |
|
| 65 |
/* store in cache */ |
| 66 |
if err := f.cache.Put(hash, data); err != nil { |
| 67 |
/* cache error is not fatal, just log it */ |
| 68 |
/* TODO(kroot): add proper logging */ |
| 69 |
} |
| 70 |
|
| 71 |
req.data = data |
| 72 |
return data, nil |
| 73 |
} |
| 74 |
|
| 75 |
// Prefetch downloads blobs in background (optional optimization) |
| 76 |
func (f *BlobFetcher) Prefetch(hashes []string) { |
| 77 |
var wg sync.WaitGroup |
| 78 |
sem := make(chan struct{}, 4) // limit concurrency |
| 79 |
|
| 80 |
for _, hash := range hashes { |
| 81 |
if f.cache.Exists(hash) { |
| 82 |
continue |
| 83 |
} |
| 84 |
|
| 85 |
wg.Add(1) |
| 86 |
go func(h string) { |
| 87 |
defer wg.Done() |
| 88 |
sem <- struct{}{} |
| 89 |
defer func() { <-sem }() |
| 90 |
|
| 91 |
f.Fetch(h) // ignore errors for prefetch |
| 92 |
}(hash) |
| 93 |
} |
| 94 |
|
| 95 |
wg.Wait() |
| 96 |
} |
| 97 |
|