diff options
Diffstat (limited to 'vendor/github.com/minio/minio-go/v7/api-select.go')
-rw-r--r-- | vendor/github.com/minio/minio-go/v7/api-select.go | 757 |
1 files changed, 757 insertions, 0 deletions
diff --git a/vendor/github.com/minio/minio-go/v7/api-select.go b/vendor/github.com/minio/minio-go/v7/api-select.go new file mode 100644 index 0000000..628d967 --- /dev/null +++ b/vendor/github.com/minio/minio-go/v7/api-select.go | |||
@@ -0,0 +1,757 @@ | |||
1 | /* | ||
2 | * MinIO Go Library for Amazon S3 Compatible Cloud Storage | ||
3 | * (C) 2018-2020 MinIO, Inc. | ||
4 | * | ||
5 | * Licensed under the Apache License, Version 2.0 (the "License"); | ||
6 | * you may not use this file except in compliance with the License. | ||
7 | * You may obtain a copy of the License at | ||
8 | * | ||
9 | * http://www.apache.org/licenses/LICENSE-2.0 | ||
10 | * | ||
11 | * Unless required by applicable law or agreed to in writing, software | ||
12 | * distributed under the License is distributed on an "AS IS" BASIS, | ||
13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
14 | * See the License for the specific language governing permissions and | ||
15 | * limitations under the License. | ||
16 | */ | ||
17 | |||
18 | package minio | ||
19 | |||
20 | import ( | ||
21 | "bytes" | ||
22 | "context" | ||
23 | "encoding/binary" | ||
24 | "encoding/xml" | ||
25 | "errors" | ||
26 | "fmt" | ||
27 | "hash" | ||
28 | "hash/crc32" | ||
29 | "io" | ||
30 | "net/http" | ||
31 | "net/url" | ||
32 | "strings" | ||
33 | |||
34 | "github.com/minio/minio-go/v7/pkg/encrypt" | ||
35 | "github.com/minio/minio-go/v7/pkg/s3utils" | ||
36 | ) | ||
37 | |||
38 | // CSVFileHeaderInfo - is the parameter for whether to utilize headers. | ||
39 | type CSVFileHeaderInfo string | ||
40 | |||
41 | // Constants for file header info. | ||
42 | const ( | ||
43 | CSVFileHeaderInfoNone CSVFileHeaderInfo = "NONE" | ||
44 | CSVFileHeaderInfoIgnore CSVFileHeaderInfo = "IGNORE" | ||
45 | CSVFileHeaderInfoUse CSVFileHeaderInfo = "USE" | ||
46 | ) | ||
47 | |||
48 | // SelectCompressionType - is the parameter for what type of compression is | ||
49 | // present | ||
50 | type SelectCompressionType string | ||
51 | |||
52 | // Constants for compression types under select API. | ||
53 | const ( | ||
54 | SelectCompressionNONE SelectCompressionType = "NONE" | ||
55 | SelectCompressionGZIP SelectCompressionType = "GZIP" | ||
56 | SelectCompressionBZIP SelectCompressionType = "BZIP2" | ||
57 | |||
58 | // Non-standard compression schemes, supported by MinIO hosts: | ||
59 | |||
60 | SelectCompressionZSTD SelectCompressionType = "ZSTD" // Zstandard compression. | ||
61 | SelectCompressionLZ4 SelectCompressionType = "LZ4" // LZ4 Stream | ||
62 | SelectCompressionS2 SelectCompressionType = "S2" // S2 Stream | ||
63 | SelectCompressionSNAPPY SelectCompressionType = "SNAPPY" // Snappy stream | ||
64 | ) | ||
65 | |||
66 | // CSVQuoteFields - is the parameter for how CSV fields are quoted. | ||
67 | type CSVQuoteFields string | ||
68 | |||
69 | // Constants for csv quote styles. | ||
70 | const ( | ||
71 | CSVQuoteFieldsAlways CSVQuoteFields = "Always" | ||
72 | CSVQuoteFieldsAsNeeded CSVQuoteFields = "AsNeeded" | ||
73 | ) | ||
74 | |||
75 | // QueryExpressionType - is of what syntax the expression is, this should only | ||
76 | // be SQL | ||
77 | type QueryExpressionType string | ||
78 | |||
79 | // Constants for expression type. | ||
80 | const ( | ||
81 | QueryExpressionTypeSQL QueryExpressionType = "SQL" | ||
82 | ) | ||
83 | |||
84 | // JSONType determines json input serialization type. | ||
85 | type JSONType string | ||
86 | |||
87 | // Constants for JSONTypes. | ||
88 | const ( | ||
89 | JSONDocumentType JSONType = "DOCUMENT" | ||
90 | JSONLinesType JSONType = "LINES" | ||
91 | ) | ||
92 | |||
93 | // ParquetInputOptions parquet input specific options | ||
94 | type ParquetInputOptions struct{} | ||
95 | |||
96 | // CSVInputOptions csv input specific options | ||
97 | type CSVInputOptions struct { | ||
98 | FileHeaderInfo CSVFileHeaderInfo | ||
99 | fileHeaderInfoSet bool | ||
100 | |||
101 | RecordDelimiter string | ||
102 | recordDelimiterSet bool | ||
103 | |||
104 | FieldDelimiter string | ||
105 | fieldDelimiterSet bool | ||
106 | |||
107 | QuoteCharacter string | ||
108 | quoteCharacterSet bool | ||
109 | |||
110 | QuoteEscapeCharacter string | ||
111 | quoteEscapeCharacterSet bool | ||
112 | |||
113 | Comments string | ||
114 | commentsSet bool | ||
115 | } | ||
116 | |||
117 | // SetFileHeaderInfo sets the file header info in the CSV input options | ||
118 | func (c *CSVInputOptions) SetFileHeaderInfo(val CSVFileHeaderInfo) { | ||
119 | c.FileHeaderInfo = val | ||
120 | c.fileHeaderInfoSet = true | ||
121 | } | ||
122 | |||
123 | // SetRecordDelimiter sets the record delimiter in the CSV input options | ||
124 | func (c *CSVInputOptions) SetRecordDelimiter(val string) { | ||
125 | c.RecordDelimiter = val | ||
126 | c.recordDelimiterSet = true | ||
127 | } | ||
128 | |||
129 | // SetFieldDelimiter sets the field delimiter in the CSV input options | ||
130 | func (c *CSVInputOptions) SetFieldDelimiter(val string) { | ||
131 | c.FieldDelimiter = val | ||
132 | c.fieldDelimiterSet = true | ||
133 | } | ||
134 | |||
135 | // SetQuoteCharacter sets the quote character in the CSV input options | ||
136 | func (c *CSVInputOptions) SetQuoteCharacter(val string) { | ||
137 | c.QuoteCharacter = val | ||
138 | c.quoteCharacterSet = true | ||
139 | } | ||
140 | |||
141 | // SetQuoteEscapeCharacter sets the quote escape character in the CSV input options | ||
142 | func (c *CSVInputOptions) SetQuoteEscapeCharacter(val string) { | ||
143 | c.QuoteEscapeCharacter = val | ||
144 | c.quoteEscapeCharacterSet = true | ||
145 | } | ||
146 | |||
147 | // SetComments sets the comments character in the CSV input options | ||
148 | func (c *CSVInputOptions) SetComments(val string) { | ||
149 | c.Comments = val | ||
150 | c.commentsSet = true | ||
151 | } | ||
152 | |||
153 | // MarshalXML - produces the xml representation of the CSV input options struct | ||
154 | func (c CSVInputOptions) MarshalXML(e *xml.Encoder, start xml.StartElement) error { | ||
155 | if err := e.EncodeToken(start); err != nil { | ||
156 | return err | ||
157 | } | ||
158 | if c.FileHeaderInfo != "" || c.fileHeaderInfoSet { | ||
159 | if err := e.EncodeElement(c.FileHeaderInfo, xml.StartElement{Name: xml.Name{Local: "FileHeaderInfo"}}); err != nil { | ||
160 | return err | ||
161 | } | ||
162 | } | ||
163 | |||
164 | if c.RecordDelimiter != "" || c.recordDelimiterSet { | ||
165 | if err := e.EncodeElement(c.RecordDelimiter, xml.StartElement{Name: xml.Name{Local: "RecordDelimiter"}}); err != nil { | ||
166 | return err | ||
167 | } | ||
168 | } | ||
169 | |||
170 | if c.FieldDelimiter != "" || c.fieldDelimiterSet { | ||
171 | if err := e.EncodeElement(c.FieldDelimiter, xml.StartElement{Name: xml.Name{Local: "FieldDelimiter"}}); err != nil { | ||
172 | return err | ||
173 | } | ||
174 | } | ||
175 | |||
176 | if c.QuoteCharacter != "" || c.quoteCharacterSet { | ||
177 | if err := e.EncodeElement(c.QuoteCharacter, xml.StartElement{Name: xml.Name{Local: "QuoteCharacter"}}); err != nil { | ||
178 | return err | ||
179 | } | ||
180 | } | ||
181 | |||
182 | if c.QuoteEscapeCharacter != "" || c.quoteEscapeCharacterSet { | ||
183 | if err := e.EncodeElement(c.QuoteEscapeCharacter, xml.StartElement{Name: xml.Name{Local: "QuoteEscapeCharacter"}}); err != nil { | ||
184 | return err | ||
185 | } | ||
186 | } | ||
187 | |||
188 | if c.Comments != "" || c.commentsSet { | ||
189 | if err := e.EncodeElement(c.Comments, xml.StartElement{Name: xml.Name{Local: "Comments"}}); err != nil { | ||
190 | return err | ||
191 | } | ||
192 | } | ||
193 | |||
194 | return e.EncodeToken(xml.EndElement{Name: start.Name}) | ||
195 | } | ||
196 | |||
197 | // CSVOutputOptions csv output specific options | ||
198 | type CSVOutputOptions struct { | ||
199 | QuoteFields CSVQuoteFields | ||
200 | quoteFieldsSet bool | ||
201 | |||
202 | RecordDelimiter string | ||
203 | recordDelimiterSet bool | ||
204 | |||
205 | FieldDelimiter string | ||
206 | fieldDelimiterSet bool | ||
207 | |||
208 | QuoteCharacter string | ||
209 | quoteCharacterSet bool | ||
210 | |||
211 | QuoteEscapeCharacter string | ||
212 | quoteEscapeCharacterSet bool | ||
213 | } | ||
214 | |||
215 | // SetQuoteFields sets the quote field parameter in the CSV output options | ||
216 | func (c *CSVOutputOptions) SetQuoteFields(val CSVQuoteFields) { | ||
217 | c.QuoteFields = val | ||
218 | c.quoteFieldsSet = true | ||
219 | } | ||
220 | |||
221 | // SetRecordDelimiter sets the record delimiter character in the CSV output options | ||
222 | func (c *CSVOutputOptions) SetRecordDelimiter(val string) { | ||
223 | c.RecordDelimiter = val | ||
224 | c.recordDelimiterSet = true | ||
225 | } | ||
226 | |||
227 | // SetFieldDelimiter sets the field delimiter character in the CSV output options | ||
228 | func (c *CSVOutputOptions) SetFieldDelimiter(val string) { | ||
229 | c.FieldDelimiter = val | ||
230 | c.fieldDelimiterSet = true | ||
231 | } | ||
232 | |||
233 | // SetQuoteCharacter sets the quote character in the CSV output options | ||
234 | func (c *CSVOutputOptions) SetQuoteCharacter(val string) { | ||
235 | c.QuoteCharacter = val | ||
236 | c.quoteCharacterSet = true | ||
237 | } | ||
238 | |||
239 | // SetQuoteEscapeCharacter sets the quote escape character in the CSV output options | ||
240 | func (c *CSVOutputOptions) SetQuoteEscapeCharacter(val string) { | ||
241 | c.QuoteEscapeCharacter = val | ||
242 | c.quoteEscapeCharacterSet = true | ||
243 | } | ||
244 | |||
245 | // MarshalXML - produces the xml representation of the CSVOutputOptions struct | ||
246 | func (c CSVOutputOptions) MarshalXML(e *xml.Encoder, start xml.StartElement) error { | ||
247 | if err := e.EncodeToken(start); err != nil { | ||
248 | return err | ||
249 | } | ||
250 | |||
251 | if c.QuoteFields != "" || c.quoteFieldsSet { | ||
252 | if err := e.EncodeElement(c.QuoteFields, xml.StartElement{Name: xml.Name{Local: "QuoteFields"}}); err != nil { | ||
253 | return err | ||
254 | } | ||
255 | } | ||
256 | |||
257 | if c.RecordDelimiter != "" || c.recordDelimiterSet { | ||
258 | if err := e.EncodeElement(c.RecordDelimiter, xml.StartElement{Name: xml.Name{Local: "RecordDelimiter"}}); err != nil { | ||
259 | return err | ||
260 | } | ||
261 | } | ||
262 | |||
263 | if c.FieldDelimiter != "" || c.fieldDelimiterSet { | ||
264 | if err := e.EncodeElement(c.FieldDelimiter, xml.StartElement{Name: xml.Name{Local: "FieldDelimiter"}}); err != nil { | ||
265 | return err | ||
266 | } | ||
267 | } | ||
268 | |||
269 | if c.QuoteCharacter != "" || c.quoteCharacterSet { | ||
270 | if err := e.EncodeElement(c.QuoteCharacter, xml.StartElement{Name: xml.Name{Local: "QuoteCharacter"}}); err != nil { | ||
271 | return err | ||
272 | } | ||
273 | } | ||
274 | |||
275 | if c.QuoteEscapeCharacter != "" || c.quoteEscapeCharacterSet { | ||
276 | if err := e.EncodeElement(c.QuoteEscapeCharacter, xml.StartElement{Name: xml.Name{Local: "QuoteEscapeCharacter"}}); err != nil { | ||
277 | return err | ||
278 | } | ||
279 | } | ||
280 | |||
281 | return e.EncodeToken(xml.EndElement{Name: start.Name}) | ||
282 | } | ||
283 | |||
284 | // JSONInputOptions json input specific options | ||
285 | type JSONInputOptions struct { | ||
286 | Type JSONType | ||
287 | typeSet bool | ||
288 | } | ||
289 | |||
290 | // SetType sets the JSON type in the JSON input options | ||
291 | func (j *JSONInputOptions) SetType(typ JSONType) { | ||
292 | j.Type = typ | ||
293 | j.typeSet = true | ||
294 | } | ||
295 | |||
296 | // MarshalXML - produces the xml representation of the JSONInputOptions struct | ||
297 | func (j JSONInputOptions) MarshalXML(e *xml.Encoder, start xml.StartElement) error { | ||
298 | if err := e.EncodeToken(start); err != nil { | ||
299 | return err | ||
300 | } | ||
301 | |||
302 | if j.Type != "" || j.typeSet { | ||
303 | if err := e.EncodeElement(j.Type, xml.StartElement{Name: xml.Name{Local: "Type"}}); err != nil { | ||
304 | return err | ||
305 | } | ||
306 | } | ||
307 | |||
308 | return e.EncodeToken(xml.EndElement{Name: start.Name}) | ||
309 | } | ||
310 | |||
311 | // JSONOutputOptions - json output specific options | ||
312 | type JSONOutputOptions struct { | ||
313 | RecordDelimiter string | ||
314 | recordDelimiterSet bool | ||
315 | } | ||
316 | |||
317 | // SetRecordDelimiter sets the record delimiter in the JSON output options | ||
318 | func (j *JSONOutputOptions) SetRecordDelimiter(val string) { | ||
319 | j.RecordDelimiter = val | ||
320 | j.recordDelimiterSet = true | ||
321 | } | ||
322 | |||
323 | // MarshalXML - produces the xml representation of the JSONOutputOptions struct | ||
324 | func (j JSONOutputOptions) MarshalXML(e *xml.Encoder, start xml.StartElement) error { | ||
325 | if err := e.EncodeToken(start); err != nil { | ||
326 | return err | ||
327 | } | ||
328 | |||
329 | if j.RecordDelimiter != "" || j.recordDelimiterSet { | ||
330 | if err := e.EncodeElement(j.RecordDelimiter, xml.StartElement{Name: xml.Name{Local: "RecordDelimiter"}}); err != nil { | ||
331 | return err | ||
332 | } | ||
333 | } | ||
334 | |||
335 | return e.EncodeToken(xml.EndElement{Name: start.Name}) | ||
336 | } | ||
337 | |||
338 | // SelectObjectInputSerialization - input serialization parameters | ||
339 | type SelectObjectInputSerialization struct { | ||
340 | CompressionType SelectCompressionType `xml:"CompressionType,omitempty"` | ||
341 | Parquet *ParquetInputOptions `xml:"Parquet,omitempty"` | ||
342 | CSV *CSVInputOptions `xml:"CSV,omitempty"` | ||
343 | JSON *JSONInputOptions `xml:"JSON,omitempty"` | ||
344 | } | ||
345 | |||
346 | // SelectObjectOutputSerialization - output serialization parameters. | ||
347 | type SelectObjectOutputSerialization struct { | ||
348 | CSV *CSVOutputOptions `xml:"CSV,omitempty"` | ||
349 | JSON *JSONOutputOptions `xml:"JSON,omitempty"` | ||
350 | } | ||
351 | |||
352 | // SelectObjectOptions - represents the input select body | ||
353 | type SelectObjectOptions struct { | ||
354 | XMLName xml.Name `xml:"SelectObjectContentRequest" json:"-"` | ||
355 | ServerSideEncryption encrypt.ServerSide `xml:"-"` | ||
356 | Expression string | ||
357 | ExpressionType QueryExpressionType | ||
358 | InputSerialization SelectObjectInputSerialization | ||
359 | OutputSerialization SelectObjectOutputSerialization | ||
360 | RequestProgress struct { | ||
361 | Enabled bool | ||
362 | } | ||
363 | } | ||
364 | |||
365 | // Header returns the http.Header representation of the SelectObject options. | ||
366 | func (o SelectObjectOptions) Header() http.Header { | ||
367 | headers := make(http.Header) | ||
368 | if o.ServerSideEncryption != nil && o.ServerSideEncryption.Type() == encrypt.SSEC { | ||
369 | o.ServerSideEncryption.Marshal(headers) | ||
370 | } | ||
371 | return headers | ||
372 | } | ||
373 | |||
374 | // SelectObjectType - is the parameter which defines what type of object the | ||
375 | // operation is being performed on. | ||
376 | type SelectObjectType string | ||
377 | |||
378 | // Constants for input data types. | ||
379 | const ( | ||
380 | SelectObjectTypeCSV SelectObjectType = "CSV" | ||
381 | SelectObjectTypeJSON SelectObjectType = "JSON" | ||
382 | SelectObjectTypeParquet SelectObjectType = "Parquet" | ||
383 | ) | ||
384 | |||
385 | // preludeInfo is used for keeping track of necessary information from the | ||
386 | // prelude. | ||
387 | type preludeInfo struct { | ||
388 | totalLen uint32 | ||
389 | headerLen uint32 | ||
390 | } | ||
391 | |||
392 | // SelectResults is used for the streaming responses from the server. | ||
393 | type SelectResults struct { | ||
394 | pipeReader *io.PipeReader | ||
395 | resp *http.Response | ||
396 | stats *StatsMessage | ||
397 | progress *ProgressMessage | ||
398 | } | ||
399 | |||
400 | // ProgressMessage is a struct for progress xml message. | ||
401 | type ProgressMessage struct { | ||
402 | XMLName xml.Name `xml:"Progress" json:"-"` | ||
403 | StatsMessage | ||
404 | } | ||
405 | |||
406 | // StatsMessage is a struct for stat xml message. | ||
407 | type StatsMessage struct { | ||
408 | XMLName xml.Name `xml:"Stats" json:"-"` | ||
409 | BytesScanned int64 | ||
410 | BytesProcessed int64 | ||
411 | BytesReturned int64 | ||
412 | } | ||
413 | |||
414 | // messageType represents the type of message. | ||
415 | type messageType string | ||
416 | |||
417 | const ( | ||
418 | errorMsg messageType = "error" | ||
419 | commonMsg messageType = "event" | ||
420 | ) | ||
421 | |||
422 | // eventType represents the type of event. | ||
423 | type eventType string | ||
424 | |||
425 | // list of event-types returned by Select API. | ||
426 | const ( | ||
427 | endEvent eventType = "End" | ||
428 | recordsEvent eventType = "Records" | ||
429 | progressEvent eventType = "Progress" | ||
430 | statsEvent eventType = "Stats" | ||
431 | ) | ||
432 | |||
433 | // contentType represents content type of event. | ||
434 | type contentType string | ||
435 | |||
436 | const ( | ||
437 | xmlContent contentType = "text/xml" | ||
438 | ) | ||
439 | |||
440 | // SelectObjectContent is a implementation of http://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectSELECTContent.html AWS S3 API. | ||
441 | func (c *Client) SelectObjectContent(ctx context.Context, bucketName, objectName string, opts SelectObjectOptions) (*SelectResults, error) { | ||
442 | // Input validation. | ||
443 | if err := s3utils.CheckValidBucketName(bucketName); err != nil { | ||
444 | return nil, err | ||
445 | } | ||
446 | if err := s3utils.CheckValidObjectName(objectName); err != nil { | ||
447 | return nil, err | ||
448 | } | ||
449 | |||
450 | selectReqBytes, err := xml.Marshal(opts) | ||
451 | if err != nil { | ||
452 | return nil, err | ||
453 | } | ||
454 | |||
455 | urlValues := make(url.Values) | ||
456 | urlValues.Set("select", "") | ||
457 | urlValues.Set("select-type", "2") | ||
458 | |||
459 | // Execute POST on bucket/object. | ||
460 | resp, err := c.executeMethod(ctx, http.MethodPost, requestMetadata{ | ||
461 | bucketName: bucketName, | ||
462 | objectName: objectName, | ||
463 | queryValues: urlValues, | ||
464 | customHeader: opts.Header(), | ||
465 | contentMD5Base64: sumMD5Base64(selectReqBytes), | ||
466 | contentSHA256Hex: sum256Hex(selectReqBytes), | ||
467 | contentBody: bytes.NewReader(selectReqBytes), | ||
468 | contentLength: int64(len(selectReqBytes)), | ||
469 | }) | ||
470 | if err != nil { | ||
471 | return nil, err | ||
472 | } | ||
473 | |||
474 | return NewSelectResults(resp, bucketName) | ||
475 | } | ||
476 | |||
477 | // NewSelectResults creates a Select Result parser that parses the response | ||
478 | // and returns a Reader that will return parsed and assembled select output. | ||
479 | func NewSelectResults(resp *http.Response, bucketName string) (*SelectResults, error) { | ||
480 | if resp.StatusCode != http.StatusOK { | ||
481 | return nil, httpRespToErrorResponse(resp, bucketName, "") | ||
482 | } | ||
483 | |||
484 | pipeReader, pipeWriter := io.Pipe() | ||
485 | streamer := &SelectResults{ | ||
486 | resp: resp, | ||
487 | stats: &StatsMessage{}, | ||
488 | progress: &ProgressMessage{}, | ||
489 | pipeReader: pipeReader, | ||
490 | } | ||
491 | streamer.start(pipeWriter) | ||
492 | return streamer, nil | ||
493 | } | ||
494 | |||
495 | // Close - closes the underlying response body and the stream reader. | ||
496 | func (s *SelectResults) Close() error { | ||
497 | defer closeResponse(s.resp) | ||
498 | return s.pipeReader.Close() | ||
499 | } | ||
500 | |||
501 | // Read - is a reader compatible implementation for SelectObjectContent records. | ||
502 | func (s *SelectResults) Read(b []byte) (n int, err error) { | ||
503 | return s.pipeReader.Read(b) | ||
504 | } | ||
505 | |||
506 | // Stats - information about a request's stats when processing is complete. | ||
507 | func (s *SelectResults) Stats() *StatsMessage { | ||
508 | return s.stats | ||
509 | } | ||
510 | |||
511 | // Progress - information about the progress of a request. | ||
512 | func (s *SelectResults) Progress() *ProgressMessage { | ||
513 | return s.progress | ||
514 | } | ||
515 | |||
516 | // start is the main function that decodes the large byte array into | ||
517 | // several events that are sent through the eventstream. | ||
518 | func (s *SelectResults) start(pipeWriter *io.PipeWriter) { | ||
519 | go func() { | ||
520 | for { | ||
521 | var prelude preludeInfo | ||
522 | headers := make(http.Header) | ||
523 | var err error | ||
524 | |||
525 | // Create CRC code | ||
526 | crc := crc32.New(crc32.IEEETable) | ||
527 | crcReader := io.TeeReader(s.resp.Body, crc) | ||
528 | |||
529 | // Extract the prelude(12 bytes) into a struct to extract relevant information. | ||
530 | prelude, err = processPrelude(crcReader, crc) | ||
531 | if err != nil { | ||
532 | pipeWriter.CloseWithError(err) | ||
533 | closeResponse(s.resp) | ||
534 | return | ||
535 | } | ||
536 | |||
537 | // Extract the headers(variable bytes) into a struct to extract relevant information | ||
538 | if prelude.headerLen > 0 { | ||
539 | if err = extractHeader(io.LimitReader(crcReader, int64(prelude.headerLen)), headers); err != nil { | ||
540 | pipeWriter.CloseWithError(err) | ||
541 | closeResponse(s.resp) | ||
542 | return | ||
543 | } | ||
544 | } | ||
545 | |||
546 | // Get the actual payload length so that the appropriate amount of | ||
547 | // bytes can be read or parsed. | ||
548 | payloadLen := prelude.PayloadLen() | ||
549 | |||
550 | m := messageType(headers.Get("message-type")) | ||
551 | |||
552 | switch m { | ||
553 | case errorMsg: | ||
554 | pipeWriter.CloseWithError(errors.New(headers.Get("error-code") + ":\"" + headers.Get("error-message") + "\"")) | ||
555 | closeResponse(s.resp) | ||
556 | return | ||
557 | case commonMsg: | ||
558 | // Get content-type of the payload. | ||
559 | c := contentType(headers.Get("content-type")) | ||
560 | |||
561 | // Get event type of the payload. | ||
562 | e := eventType(headers.Get("event-type")) | ||
563 | |||
564 | // Handle all supported events. | ||
565 | switch e { | ||
566 | case endEvent: | ||
567 | pipeWriter.Close() | ||
568 | closeResponse(s.resp) | ||
569 | return | ||
570 | case recordsEvent: | ||
571 | if _, err = io.Copy(pipeWriter, io.LimitReader(crcReader, payloadLen)); err != nil { | ||
572 | pipeWriter.CloseWithError(err) | ||
573 | closeResponse(s.resp) | ||
574 | return | ||
575 | } | ||
576 | case progressEvent: | ||
577 | switch c { | ||
578 | case xmlContent: | ||
579 | if err = xmlDecoder(io.LimitReader(crcReader, payloadLen), s.progress); err != nil { | ||
580 | pipeWriter.CloseWithError(err) | ||
581 | closeResponse(s.resp) | ||
582 | return | ||
583 | } | ||
584 | default: | ||
585 | pipeWriter.CloseWithError(fmt.Errorf("Unexpected content-type %s sent for event-type %s", c, progressEvent)) | ||
586 | closeResponse(s.resp) | ||
587 | return | ||
588 | } | ||
589 | case statsEvent: | ||
590 | switch c { | ||
591 | case xmlContent: | ||
592 | if err = xmlDecoder(io.LimitReader(crcReader, payloadLen), s.stats); err != nil { | ||
593 | pipeWriter.CloseWithError(err) | ||
594 | closeResponse(s.resp) | ||
595 | return | ||
596 | } | ||
597 | default: | ||
598 | pipeWriter.CloseWithError(fmt.Errorf("Unexpected content-type %s sent for event-type %s", c, statsEvent)) | ||
599 | closeResponse(s.resp) | ||
600 | return | ||
601 | } | ||
602 | } | ||
603 | } | ||
604 | |||
605 | // Ensures that the full message's CRC is correct and | ||
606 | // that the message is not corrupted | ||
607 | if err := checkCRC(s.resp.Body, crc.Sum32()); err != nil { | ||
608 | pipeWriter.CloseWithError(err) | ||
609 | closeResponse(s.resp) | ||
610 | return | ||
611 | } | ||
612 | |||
613 | } | ||
614 | }() | ||
615 | } | ||
616 | |||
617 | // PayloadLen is a function that calculates the length of the payload. | ||
618 | func (p preludeInfo) PayloadLen() int64 { | ||
619 | return int64(p.totalLen - p.headerLen - 16) | ||
620 | } | ||
621 | |||
622 | // processPrelude is the function that reads the 12 bytes of the prelude and | ||
623 | // ensures the CRC is correct while also extracting relevant information into | ||
624 | // the struct, | ||
625 | func processPrelude(prelude io.Reader, crc hash.Hash32) (preludeInfo, error) { | ||
626 | var err error | ||
627 | pInfo := preludeInfo{} | ||
628 | |||
629 | // reads total length of the message (first 4 bytes) | ||
630 | pInfo.totalLen, err = extractUint32(prelude) | ||
631 | if err != nil { | ||
632 | return pInfo, err | ||
633 | } | ||
634 | |||
635 | // reads total header length of the message (2nd 4 bytes) | ||
636 | pInfo.headerLen, err = extractUint32(prelude) | ||
637 | if err != nil { | ||
638 | return pInfo, err | ||
639 | } | ||
640 | |||
641 | // checks that the CRC is correct (3rd 4 bytes) | ||
642 | preCRC := crc.Sum32() | ||
643 | if err := checkCRC(prelude, preCRC); err != nil { | ||
644 | return pInfo, err | ||
645 | } | ||
646 | |||
647 | return pInfo, nil | ||
648 | } | ||
649 | |||
650 | // extracts the relevant information from the Headers. | ||
651 | func extractHeader(body io.Reader, myHeaders http.Header) error { | ||
652 | for { | ||
653 | // extracts the first part of the header, | ||
654 | headerTypeName, err := extractHeaderType(body) | ||
655 | if err != nil { | ||
656 | // Since end of file, we have read all of our headers | ||
657 | if err == io.EOF { | ||
658 | break | ||
659 | } | ||
660 | return err | ||
661 | } | ||
662 | |||
663 | // reads the 7 present in the header and ignores it. | ||
664 | extractUint8(body) | ||
665 | |||
666 | headerValueName, err := extractHeaderValue(body) | ||
667 | if err != nil { | ||
668 | return err | ||
669 | } | ||
670 | |||
671 | myHeaders.Set(headerTypeName, headerValueName) | ||
672 | |||
673 | } | ||
674 | return nil | ||
675 | } | ||
676 | |||
677 | // extractHeaderType extracts the first half of the header message, the header type. | ||
678 | func extractHeaderType(body io.Reader) (string, error) { | ||
679 | // extracts 2 bit integer | ||
680 | headerNameLen, err := extractUint8(body) | ||
681 | if err != nil { | ||
682 | return "", err | ||
683 | } | ||
684 | // extracts the string with the appropriate number of bytes | ||
685 | headerName, err := extractString(body, int(headerNameLen)) | ||
686 | if err != nil { | ||
687 | return "", err | ||
688 | } | ||
689 | return strings.TrimPrefix(headerName, ":"), nil | ||
690 | } | ||
691 | |||
692 | // extractsHeaderValue extracts the second half of the header message, the | ||
693 | // header value | ||
694 | func extractHeaderValue(body io.Reader) (string, error) { | ||
695 | bodyLen, err := extractUint16(body) | ||
696 | if err != nil { | ||
697 | return "", err | ||
698 | } | ||
699 | bodyName, err := extractString(body, int(bodyLen)) | ||
700 | if err != nil { | ||
701 | return "", err | ||
702 | } | ||
703 | return bodyName, nil | ||
704 | } | ||
705 | |||
706 | // extracts a string from byte array of a particular number of bytes. | ||
707 | func extractString(source io.Reader, lenBytes int) (string, error) { | ||
708 | myVal := make([]byte, lenBytes) | ||
709 | _, err := source.Read(myVal) | ||
710 | if err != nil { | ||
711 | return "", err | ||
712 | } | ||
713 | return string(myVal), nil | ||
714 | } | ||
715 | |||
716 | // extractUint32 extracts a 4 byte integer from the byte array. | ||
717 | func extractUint32(r io.Reader) (uint32, error) { | ||
718 | buf := make([]byte, 4) | ||
719 | _, err := readFull(r, buf) | ||
720 | if err != nil { | ||
721 | return 0, err | ||
722 | } | ||
723 | return binary.BigEndian.Uint32(buf), nil | ||
724 | } | ||
725 | |||
726 | // extractUint16 extracts a 2 byte integer from the byte array. | ||
727 | func extractUint16(r io.Reader) (uint16, error) { | ||
728 | buf := make([]byte, 2) | ||
729 | _, err := readFull(r, buf) | ||
730 | if err != nil { | ||
731 | return 0, err | ||
732 | } | ||
733 | return binary.BigEndian.Uint16(buf), nil | ||
734 | } | ||
735 | |||
736 | // extractUint8 extracts a 1 byte integer from the byte array. | ||
737 | func extractUint8(r io.Reader) (uint8, error) { | ||
738 | buf := make([]byte, 1) | ||
739 | _, err := readFull(r, buf) | ||
740 | if err != nil { | ||
741 | return 0, err | ||
742 | } | ||
743 | return buf[0], nil | ||
744 | } | ||
745 | |||
746 | // checkCRC ensures that the CRC matches with the one from the reader. | ||
747 | func checkCRC(r io.Reader, expect uint32) error { | ||
748 | msgCRC, err := extractUint32(r) | ||
749 | if err != nil { | ||
750 | return err | ||
751 | } | ||
752 | |||
753 | if msgCRC != expect { | ||
754 | return fmt.Errorf("Checksum Mismatch, MessageCRC of 0x%X does not equal expected CRC of 0x%X", msgCRC, expect) | ||
755 | } | ||
756 | return nil | ||
757 | } | ||