larc r9

97 lines ยท 2.1 KB Raw
1 package mount
2
3 import (
4 "fmt"
5 "sync"
6
7 "larc.wejust.rest/larc/internal/protocol"
8 )
9
10 /* BlobFetcher downloads blobs from remote server with caching.
11 * Handles concurrent requests for the same blob efficiently. */
12
13 type BlobFetcher struct {
14 client *protocol.Client
15 repoName string
16 cache *BlobCache
17
18 /* in-flight request deduplication */
19 inflight sync.Map // hash -> *inflightReq
20 }
21
22 type inflightReq struct {
23 done chan struct{}
24 data []byte
25 err error
26 }
27
28 // NewBlobFetcher creates a fetcher for the given repo
29 func NewBlobFetcher(client *protocol.Client, repoName string, cache *BlobCache) *BlobFetcher {
30 return &BlobFetcher{
31 client: client,
32 repoName: repoName,
33 cache: cache,
34 }
35 }
36
37 // Fetch retrieves blob by hash, using cache or downloading from server
38 func (f *BlobFetcher) Fetch(hash string) ([]byte, error) {
39 /* check cache first */
40 if data, ok := f.cache.Get(hash); ok {
41 return data, nil
42 }
43
44 /* check if request is already in flight */
45 req := &inflightReq{done: make(chan struct{})}
46 if existing, loaded := f.inflight.LoadOrStore(hash, req); loaded {
47 /* wait for existing request */
48 existingReq := existing.(*inflightReq)
49 <-existingReq.done
50 return existingReq.data, existingReq.err
51 }
52
53 /* we own this request, fetch from server */
54 defer func() {
55 close(req.done)
56 f.inflight.Delete(hash)
57 }()
58
59 data, err := f.client.GetBlob(f.repoName, hash)
60 if err != nil {
61 req.err = fmt.Errorf("fetch blob %s: %w", hash, err)
62 return nil, req.err
63 }
64
65 /* store in cache */
66 if err := f.cache.Put(hash, data); err != nil {
67 /* cache error is not fatal, just log it */
68 /* TODO(kroot): add proper logging */
69 }
70
71 req.data = data
72 return data, nil
73 }
74
75 // Prefetch downloads blobs in background (optional optimization)
76 func (f *BlobFetcher) Prefetch(hashes []string) {
77 var wg sync.WaitGroup
78 sem := make(chan struct{}, 4) // limit concurrency
79
80 for _, hash := range hashes {
81 if f.cache.Exists(hash) {
82 continue
83 }
84
85 wg.Add(1)
86 go func(h string) {
87 defer wg.Done()
88 sem <- struct{}{}
89 defer func() { <-sem }()
90
91 f.Fetch(h) // ignore errors for prefetch
92 }(hash)
93 }
94
95 wg.Wait()
96 }
97