package mount import ( "fmt" "sync" "github.com/lain/larc/internal/protocol" ) /* BlobFetcher downloads blobs from remote server with caching. * Handles concurrent requests for the same blob efficiently. */ type BlobFetcher struct { client *protocol.Client repoName string cache *BlobCache /* in-flight request deduplication */ inflight sync.Map // hash -> *inflightReq } type inflightReq struct { done chan struct{} data []byte err error } // NewBlobFetcher creates a fetcher for the given repo func NewBlobFetcher(client *protocol.Client, repoName string, cache *BlobCache) *BlobFetcher { return &BlobFetcher{ client: client, repoName: repoName, cache: cache, } } // Fetch retrieves blob by hash, using cache or downloading from server func (f *BlobFetcher) Fetch(hash string) ([]byte, error) { /* check cache first */ if data, ok := f.cache.Get(hash); ok { return data, nil } /* check if request is already in flight */ req := &inflightReq{done: make(chan struct{})} if existing, loaded := f.inflight.LoadOrStore(hash, req); loaded { /* wait for existing request */ existingReq := existing.(*inflightReq) <-existingReq.done return existingReq.data, existingReq.err } /* we own this request, fetch from server */ defer func() { close(req.done) f.inflight.Delete(hash) }() data, err := f.client.GetBlob(f.repoName, hash) if err != nil { req.err = fmt.Errorf("fetch blob %s: %w", hash, err) return nil, req.err } /* store in cache */ if err := f.cache.Put(hash, data); err != nil { /* cache error is not fatal, just log it */ /* TODO(kroot): add proper logging */ } req.data = data return data, nil } // Prefetch downloads blobs in background (optional optimization) func (f *BlobFetcher) Prefetch(hashes []string) { var wg sync.WaitGroup sem := make(chan struct{}, 4) // limit concurrency for _, hash := range hashes { if f.cache.Exists(hash) { continue } wg.Add(1) go func(h string) { defer wg.Done() sem <- struct{}{} defer func() { <-sem }() f.Fetch(h) // ignore errors for prefetch }(hash) } wg.Wait() }