larc r9

159 lines ยท 3.3 KB Raw
1 package storage
2
3 import (
4 "bytes"
5 "fmt"
6 "io"
7 "sync"
8
9 "github.com/klauspost/compress/zstd"
10 )
11
12 /* zstd compression wrapper for larc blob storage.
13 * Uses encoder/decoder pools for efficiency.
14 * Default compression level balances speed and ratio. */
15
16 const (
17 // DefaultCompressionLevel is the default zstd level (3 = fast, good ratio)
18 DefaultCompressionLevel = 3
19 )
20
21 var (
22 encoderPool sync.Pool
23 decoderPool sync.Pool
24 initOnce sync.Once
25 )
26
27 func initPools() {
28 initOnce.Do(func() {
29 encoderPool = sync.Pool{
30 New: func() any {
31 enc, err := zstd.NewWriter(nil,
32 zstd.WithEncoderLevel(zstd.EncoderLevelFromZstd(DefaultCompressionLevel)),
33 zstd.WithEncoderConcurrency(1),
34 )
35 if err != nil {
36 panic(fmt.Sprintf("failed to create zstd encoder: %v", err))
37 }
38 return enc
39 },
40 }
41
42 decoderPool = sync.Pool{
43 New: func() any {
44 dec, err := zstd.NewReader(nil,
45 zstd.WithDecoderConcurrency(1),
46 )
47 if err != nil {
48 panic(fmt.Sprintf("failed to create zstd decoder: %v", err))
49 }
50 return dec
51 },
52 }
53 })
54 }
55
56 // Compress compresses data using zstd
57 func Compress(data []byte) ([]byte, error) {
58 initPools()
59
60 enc := encoderPool.Get().(*zstd.Encoder)
61 defer encoderPool.Put(enc)
62
63 var buf bytes.Buffer
64 enc.Reset(&buf)
65
66 if _, err := enc.Write(data); err != nil {
67 return nil, fmt.Errorf("zstd compress write: %w", err)
68 }
69
70 if err := enc.Close(); err != nil {
71 return nil, fmt.Errorf("zstd compress close: %w", err)
72 }
73
74 return buf.Bytes(), nil
75 }
76
77 // Decompress decompresses zstd data
78 func Decompress(data []byte) ([]byte, error) {
79 initPools()
80
81 dec := decoderPool.Get().(*zstd.Decoder)
82 defer decoderPool.Put(dec)
83
84 if err := dec.Reset(bytes.NewReader(data)); err != nil {
85 return nil, fmt.Errorf("zstd decompress reset: %w", err)
86 }
87
88 result, err := io.ReadAll(dec)
89 if err != nil {
90 return nil, fmt.Errorf("zstd decompress read: %w", err)
91 }
92
93 return result, nil
94 }
95
96 // CompressReader compresses from reader to writer
97 func CompressReader(r io.Reader, w io.Writer) (int64, error) {
98 initPools()
99
100 enc := encoderPool.Get().(*zstd.Encoder)
101 defer encoderPool.Put(enc)
102
103 enc.Reset(w)
104
105 n, err := io.Copy(enc, r)
106 if err != nil {
107 return n, fmt.Errorf("zstd compress copy: %w", err)
108 }
109
110 if err := enc.Close(); err != nil {
111 return n, fmt.Errorf("zstd compress close: %w", err)
112 }
113
114 return n, nil
115 }
116
117 // DecompressReader decompresses from reader to writer
118 func DecompressReader(r io.Reader, w io.Writer) (int64, error) {
119 initPools()
120
121 dec := decoderPool.Get().(*zstd.Decoder)
122 defer decoderPool.Put(dec)
123
124 if err := dec.Reset(r); err != nil {
125 return 0, fmt.Errorf("zstd decompress reset: %w", err)
126 }
127
128 n, err := io.Copy(w, dec)
129 if err != nil {
130 return n, fmt.Errorf("zstd decompress copy: %w", err)
131 }
132
133 return n, nil
134 }
135
136 // CompressLevel compresses with specific compression level (1-19)
137 func CompressLevel(data []byte, level int) ([]byte, error) {
138 enc, err := zstd.NewWriter(nil,
139 zstd.WithEncoderLevel(zstd.EncoderLevelFromZstd(level)),
140 )
141 if err != nil {
142 return nil, fmt.Errorf("zstd new encoder: %w", err)
143 }
144 defer enc.Close()
145
146 var buf bytes.Buffer
147 enc.Reset(&buf)
148
149 if _, err := enc.Write(data); err != nil {
150 return nil, fmt.Errorf("zstd compress write: %w", err)
151 }
152
153 if err := enc.Close(); err != nil {
154 return nil, fmt.Errorf("zstd compress close: %w", err)
155 }
156
157 return buf.Bytes(), nil
158 }
159