aboutsummaryrefslogtreecommitdiffstats
path: root/vendor/github.com/minio/md5-simd/md5-server_amd64.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/minio/md5-simd/md5-server_amd64.go')
-rw-r--r--vendor/github.com/minio/md5-simd/md5-server_amd64.go397
1 files changed, 397 insertions, 0 deletions
diff --git a/vendor/github.com/minio/md5-simd/md5-server_amd64.go b/vendor/github.com/minio/md5-simd/md5-server_amd64.go
new file mode 100644
index 0000000..94f741c
--- /dev/null
+++ b/vendor/github.com/minio/md5-simd/md5-server_amd64.go
@@ -0,0 +1,397 @@
1//+build !noasm,!appengine,gc
2
3// Copyright (c) 2020 MinIO Inc. All rights reserved.
4// Use of this source code is governed by a license that can be
5// found in the LICENSE file.
6
7package md5simd
8
9import (
10 "encoding/binary"
11 "fmt"
12 "runtime"
13 "sync"
14
15 "github.com/klauspost/cpuid/v2"
16)
17
18// MD5 initialization constants
19const (
20 // Lanes is the number of concurrently calculated hashes.
21 Lanes = 16
22
23 init0 = 0x67452301
24 init1 = 0xefcdab89
25 init2 = 0x98badcfe
26 init3 = 0x10325476
27
28 // Use scalar routine when below this many lanes
29 useScalarBelow = 3
30)
31
32// md5ServerUID - Does not start at 0 but next multiple of 16 so as to be able to
33// differentiate with default initialisation value of 0
34const md5ServerUID = Lanes
35
36const buffersPerLane = 3
37
38// Message to send across input channel
39type blockInput struct {
40 uid uint64
41 msg []byte
42 sumCh chan sumResult
43 reset bool
44}
45
46type sumResult struct {
47 digest [Size]byte
48}
49
50type lanesInfo [Lanes]blockInput
51
52// md5Server - Type to implement parallel handling of MD5 invocations
53type md5Server struct {
54 uidCounter uint64
55 cycle chan uint64 // client with uid has update.
56 newInput chan newClient // Add new client.
57 digests map[uint64][Size]byte // Map of uids to (interim) digest results
58 maskRounds16 [16]maskRounds // Pre-allocated static array for max 16 rounds
59 maskRounds8a [8]maskRounds // Pre-allocated static array for max 8 rounds (1st AVX2 core)
60 maskRounds8b [8]maskRounds // Pre-allocated static array for max 8 rounds (2nd AVX2 core)
61 allBufs []byte // Preallocated buffer.
62 buffers chan []byte // Preallocated buffers, sliced from allBufs.
63
64 i8 [2][8][]byte // avx2 temporary vars
65 d8a, d8b digest8
66 wg sync.WaitGroup
67}
68
69// NewServer - Create new object for parallel processing handling
70func NewServer() Server {
71 if !cpuid.CPU.Supports(cpuid.AVX2) {
72 return &fallbackServer{}
73 }
74 md5srv := &md5Server{}
75 md5srv.digests = make(map[uint64][Size]byte)
76 md5srv.newInput = make(chan newClient, Lanes)
77 md5srv.cycle = make(chan uint64, Lanes*10)
78 md5srv.uidCounter = md5ServerUID - 1
79 md5srv.allBufs = make([]byte, 32+buffersPerLane*Lanes*internalBlockSize)
80 md5srv.buffers = make(chan []byte, buffersPerLane*Lanes)
81 // Fill buffers.
82 for i := 0; i < buffersPerLane*Lanes; i++ {
83 s := 32 + i*internalBlockSize
84 md5srv.buffers <- md5srv.allBufs[s : s+internalBlockSize : s+internalBlockSize]
85 }
86
87 // Start a single thread for reading from the input channel
88 go md5srv.process(md5srv.newInput)
89 return md5srv
90}
91
92type newClient struct {
93 uid uint64
94 input chan blockInput
95}
96
97// process - Sole handler for reading from the input channel.
98func (s *md5Server) process(newClients chan newClient) {
99 // To fill up as many lanes as possible:
100 //
101 // 1. Wait for a cycle id.
102 // 2. If not already in a lane, add, otherwise leave on channel
103 // 3. Start timer
104 // 4. Check if lanes is full, if so, goto 10 (process).
105 // 5. If timeout, goto 10.
106 // 6. Wait for new id (goto 2) or timeout (goto 10).
107 // 10. Process.
108 // 11. Check all input if there is already input, if so add to lanes.
109 // 12. Goto 1
110
111 // lanes contains the lanes.
112 var lanes lanesInfo
113 // lanesFilled contains the number of filled lanes for current cycle.
114 var lanesFilled int
115 // clients contains active clients
116 var clients = make(map[uint64]chan blockInput, Lanes)
117
118 addToLane := func(uid uint64) {
119 cl, ok := clients[uid]
120 if !ok {
121 // Unknown client. Maybe it was already removed.
122 return
123 }
124 // Check if we already have it.
125 for _, lane := range lanes[:lanesFilled] {
126 if lane.uid == uid {
127 return
128 }
129 }
130 // Continue until we get a block or there is nothing on channel
131 for {
132 select {
133 case block, ok := <-cl:
134 if !ok {
135 // Client disconnected
136 delete(clients, block.uid)
137 return
138 }
139 if block.uid != uid {
140 panic(fmt.Errorf("uid mismatch, %d (block) != %d (client)", block.uid, uid))
141 }
142 // If reset message, reset and we're done
143 if block.reset {
144 delete(s.digests, uid)
145 continue
146 }
147
148 // If requesting sum, we will need to maintain state.
149 if block.sumCh != nil {
150 var dig digest
151 d, ok := s.digests[uid]
152 if ok {
153 dig.s[0] = binary.LittleEndian.Uint32(d[0:4])
154 dig.s[1] = binary.LittleEndian.Uint32(d[4:8])
155 dig.s[2] = binary.LittleEndian.Uint32(d[8:12])
156 dig.s[3] = binary.LittleEndian.Uint32(d[12:16])
157 } else {
158 dig.s[0], dig.s[1], dig.s[2], dig.s[3] = init0, init1, init2, init3
159 }
160
161 sum := sumResult{}
162 // Add end block to current digest.
163 blockScalar(&dig.s, block.msg)
164
165 binary.LittleEndian.PutUint32(sum.digest[0:], dig.s[0])
166 binary.LittleEndian.PutUint32(sum.digest[4:], dig.s[1])
167 binary.LittleEndian.PutUint32(sum.digest[8:], dig.s[2])
168 binary.LittleEndian.PutUint32(sum.digest[12:], dig.s[3])
169 block.sumCh <- sum
170 if block.msg != nil {
171 s.buffers <- block.msg
172 }
173 continue
174 }
175 if len(block.msg) == 0 {
176 continue
177 }
178 lanes[lanesFilled] = block
179 lanesFilled++
180 return
181 default:
182 return
183 }
184 }
185 }
186 addNewClient := func(cl newClient) {
187 if _, ok := clients[cl.uid]; ok {
188 panic("internal error: duplicate client registration")
189 }
190 clients[cl.uid] = cl.input
191 }
192
193 allLanesFilled := func() bool {
194 return lanesFilled == Lanes || lanesFilled >= len(clients)
195 }
196
197 for {
198 // Step 1.
199 for lanesFilled == 0 {
200 select {
201 case cl, ok := <-newClients:
202 if !ok {
203 return
204 }
205 addNewClient(cl)
206 // Check if it already sent a payload.
207 addToLane(cl.uid)
208 continue
209 case uid := <-s.cycle:
210 addToLane(uid)
211 }
212 }
213
214 fillLanes:
215 for !allLanesFilled() {
216 select {
217 case cl, ok := <-newClients:
218 if !ok {
219 return
220 }
221 addNewClient(cl)
222
223 case uid := <-s.cycle:
224 addToLane(uid)
225 default:
226 // Nothing more queued...
227 break fillLanes
228 }
229 }
230
231 // If we did not fill all lanes, check if there is more waiting
232 if !allLanesFilled() {
233 runtime.Gosched()
234 for uid := range clients {
235 addToLane(uid)
236 if allLanesFilled() {
237 break
238 }
239 }
240 }
241 if false {
242 if !allLanesFilled() {
243 fmt.Println("Not all lanes filled", lanesFilled, "of", len(clients))
244 //pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
245 } else if true {
246 fmt.Println("all lanes filled")
247 }
248 }
249 // Process the lanes we could collect
250 s.blocks(lanes[:lanesFilled])
251
252 // Clear lanes...
253 lanesFilled = 0
254 // Add all current queued
255 for uid := range clients {
256 addToLane(uid)
257 if allLanesFilled() {
258 break
259 }
260 }
261 }
262}
263
264func (s *md5Server) Close() {
265 if s.newInput != nil {
266 close(s.newInput)
267 s.newInput = nil
268 }
269}
270
271// Invoke assembly and send results back
272func (s *md5Server) blocks(lanes []blockInput) {
273 if len(lanes) < useScalarBelow {
274 // Use scalar routine when below this many lanes
275 switch len(lanes) {
276 case 0:
277 case 1:
278 lane := lanes[0]
279 var d digest
280 a, ok := s.digests[lane.uid]
281 if ok {
282 d.s[0] = binary.LittleEndian.Uint32(a[0:4])
283 d.s[1] = binary.LittleEndian.Uint32(a[4:8])
284 d.s[2] = binary.LittleEndian.Uint32(a[8:12])
285 d.s[3] = binary.LittleEndian.Uint32(a[12:16])
286 } else {
287 d.s[0] = init0
288 d.s[1] = init1
289 d.s[2] = init2
290 d.s[3] = init3
291 }
292 if len(lane.msg) > 0 {
293 // Update...
294 blockScalar(&d.s, lane.msg)
295 }
296 dig := [Size]byte{}
297 binary.LittleEndian.PutUint32(dig[0:], d.s[0])
298 binary.LittleEndian.PutUint32(dig[4:], d.s[1])
299 binary.LittleEndian.PutUint32(dig[8:], d.s[2])
300 binary.LittleEndian.PutUint32(dig[12:], d.s[3])
301 s.digests[lane.uid] = dig
302
303 if lane.msg != nil {
304 s.buffers <- lane.msg
305 }
306 lanes[0] = blockInput{}
307
308 default:
309 s.wg.Add(len(lanes))
310 var results [useScalarBelow]digest
311 for i := range lanes {
312 lane := lanes[i]
313 go func(i int) {
314 var d digest
315 defer s.wg.Done()
316 a, ok := s.digests[lane.uid]
317 if ok {
318 d.s[0] = binary.LittleEndian.Uint32(a[0:4])
319 d.s[1] = binary.LittleEndian.Uint32(a[4:8])
320 d.s[2] = binary.LittleEndian.Uint32(a[8:12])
321 d.s[3] = binary.LittleEndian.Uint32(a[12:16])
322 } else {
323 d.s[0] = init0
324 d.s[1] = init1
325 d.s[2] = init2
326 d.s[3] = init3
327 }
328 if len(lane.msg) == 0 {
329 results[i] = d
330 return
331 }
332 // Update...
333 blockScalar(&d.s, lane.msg)
334 results[i] = d
335 }(i)
336 }
337 s.wg.Wait()
338 for i, lane := range lanes {
339 dig := [Size]byte{}
340 binary.LittleEndian.PutUint32(dig[0:], results[i].s[0])
341 binary.LittleEndian.PutUint32(dig[4:], results[i].s[1])
342 binary.LittleEndian.PutUint32(dig[8:], results[i].s[2])
343 binary.LittleEndian.PutUint32(dig[12:], results[i].s[3])
344 s.digests[lane.uid] = dig
345
346 if lane.msg != nil {
347 s.buffers <- lane.msg
348 }
349 lanes[i] = blockInput{}
350 }
351 }
352 return
353 }
354
355 inputs := [16][]byte{}
356 for i := range lanes {
357 inputs[i] = lanes[i].msg
358 }
359
360 // Collect active digests...
361 state := s.getDigests(lanes)
362 // Process all lanes...
363 s.blockMd5_x16(&state, inputs, len(lanes) <= 8)
364
365 for i, lane := range lanes {
366 uid := lane.uid
367 dig := [Size]byte{}
368 binary.LittleEndian.PutUint32(dig[0:], state.v0[i])
369 binary.LittleEndian.PutUint32(dig[4:], state.v1[i])
370 binary.LittleEndian.PutUint32(dig[8:], state.v2[i])
371 binary.LittleEndian.PutUint32(dig[12:], state.v3[i])
372
373 s.digests[uid] = dig
374 if lane.msg != nil {
375 s.buffers <- lane.msg
376 }
377 lanes[i] = blockInput{}
378 }
379}
380
381func (s *md5Server) getDigests(lanes []blockInput) (d digest16) {
382 for i, lane := range lanes {
383 a, ok := s.digests[lane.uid]
384 if ok {
385 d.v0[i] = binary.LittleEndian.Uint32(a[0:4])
386 d.v1[i] = binary.LittleEndian.Uint32(a[4:8])
387 d.v2[i] = binary.LittleEndian.Uint32(a[8:12])
388 d.v3[i] = binary.LittleEndian.Uint32(a[12:16])
389 } else {
390 d.v0[i] = init0
391 d.v1[i] = init1
392 d.v2[i] = init2
393 d.v3[i] = init3
394 }
395 }
396 return
397}