diff --git a/pkg/obiformats/binary_write.go b/pkg/obiformats/binary_write.go deleted file mode 100644 index d61c2d8..0000000 --- a/pkg/obiformats/binary_write.go +++ /dev/null @@ -1,153 +0,0 @@ -package obiformats - -import ( - "io" - "log" - "os" - "time" - - "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" - "github.com/renproject/surge" -) - -func WriteBinary(iterator obiseq.IBioSequence, file io.Writer) error { - singleseq := make(obiseq.BioSequenceSlice, 1) - blob := make([]byte, 0, 1024) - for iterator.Next() { - singleseq[0] = iterator.Get() - blobsize := singleseq.SizeHint() - if blobsize > cap(blob) { - blob = make([]byte, 0, blobsize*2+8) - } - _, _, err := surge.MarshalI64(int64(blobsize), blob, 8) - if err != nil { - return err - } - data := blob[8 : 8+blobsize] - _, _, err = singleseq.Marshal(data, blobsize) - if err != nil { - return err - } - - file.Write(blob[0 : 8+blobsize]) - } - - return nil -} - -func WriteBinaryToFile(iterator obiseq.IBioSequence, - filename string) error { - - file, err := os.Create(filename) - - if err != nil { - log.Fatalf("open file error: %v", err) - return err - } - - return WriteBinary(iterator, file) -} - -func WriteBinaryToStdout(iterator obiseq.IBioSequence) error { - return WriteBinary(iterator, os.Stdout) -} - -func WriteBinaryBatch(iterator obiseq.IBioSequenceBatch, file io.Writer, options ...WithOption) (obiseq.IBioSequenceBatch, error) { - opt := MakeOptions(options) - - buffsize := iterator.BufferSize() - newIter := obiseq.MakeIBioSequenceBatch(buffsize) - - nwriters := opt.ParallelWorkers() - - chunkchan := make(chan FileChunck) - - newIter.Add(nwriters) - - go func() { - newIter.Wait() - for len(chunkchan) > 0 { - time.Sleep(time.Millisecond) - } - close(chunkchan) - for len(newIter.Channel()) > 0 { - time.Sleep(time.Millisecond) - } - close(newIter.Channel()) - }() - - ff := func(iterator obiseq.IBioSequenceBatch) { - blob := make([]byte, 0, 10240) - for iterator.Next() { - batch := iterator.Get() - blobsize := batch.Slice().SizeHint() - if blobsize > cap(blob) { - blob = make([]byte, 0, blobsize*2+8) - } - _, _, err := surge.MarshalI64(int64(blobsize), blob, 8) - if err != nil { - log.Fatalf("error in reading binary file %v\n", err) - } - data := blob[8 : 8+blobsize] - _, _, err = batch.Slice().Marshal(data, blobsize) - if err != nil { - log.Fatalf("error in reading binary file %v\n", err) - } - - chunkchan <- FileChunck{ - data, - batch.Order(), - } - newIter.Channel() <- batch - } - newIter.Done() - } - - log.Println("Start of the binary file writing") - go ff(iterator) - for i := 0; i < nwriters-1; i++ { - go ff(iterator.Split()) - } - - next_to_send := 0 - received := make(map[int]FileChunck, 100) - - go func() { - for chunk := range chunkchan { - if chunk.order == next_to_send { - file.Write(chunk.text) - next_to_send++ - chunk, ok := received[next_to_send] - for ok { - file.Write(chunk.text) - delete(received, next_to_send) - next_to_send++ - chunk, ok = received[next_to_send] - } - } else { - received[chunk.order] = chunk - } - - } - }() - - return newIter, nil -} - -func WriteBinaryBatchToStdout(iterator obiseq.IBioSequenceBatch, options ...WithOption) (obiseq.IBioSequenceBatch, error) { - return WriteBinaryBatch(iterator, os.Stdout, options...) -} - -func WriteBinaryBatchToFile(iterator obiseq.IBioSequenceBatch, - filename string, - options ...WithOption) (obiseq.IBioSequenceBatch, error) { - - file, err := os.Create(filename) - - if err != nil { - log.Fatalf("open file error: %v", err) - return obiseq.NilIBioSequenceBatch, err - } - - return WriteBinaryBatch(iterator, file, options...) -} diff --git a/pkg/obiformats/fastseq_json_header.go b/pkg/obiformats/fastseq_json_header.go index 5daba14..adc6c80 100644 --- a/pkg/obiformats/fastseq_json_header.go +++ b/pkg/obiformats/fastseq_json_header.go @@ -1,6 +1,7 @@ package obiformats import ( + "log" "strings" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" @@ -40,7 +41,11 @@ func _parse_json_header_(header string, annotations obiseq.Annotation) string { stop++ - json.Unmarshal([]byte(header)[start:stop], annotations) + err := json.Unmarshal([]byte(header)[start:stop], &annotations) + if err != nil { + log.Fatalf("annotation parsing error on %s : %v\n", header, err) + } + return strings.TrimSpace(header[stop:]) } diff --git a/pkg/obiformats/fastseq_read.go b/pkg/obiformats/fastseq_read.go index 5cada79..12367ea 100644 --- a/pkg/obiformats/fastseq_read.go +++ b/pkg/obiformats/fastseq_read.go @@ -122,9 +122,10 @@ func ReadFastSeqBatchFromFile(filename string, options ...WithOption) (obiseq.IB go _FastseqReader(pointer, newIter, opt.BatchSize()) parser := opt.ParseFastSeqHeader() + if parser != nil { return IParseFastSeqHeaderBatch(newIter, options...), err - } + } return newIter, err } diff --git a/pkg/obiseq/surge.go b/pkg/obiseq/surge.go deleted file mode 100644 index b6d43fc..0000000 --- a/pkg/obiseq/surge.go +++ /dev/null @@ -1,105 +0,0 @@ -package obiseq - -import "github.com/renproject/surge" - -func (sequence BioSequence) SizeHint() int { - return surge.SizeHintString(sequence.sequence.id) + - surge.SizeHintString(sequence.sequence.definition) + - surge.SizeHintBytes(sequence.sequence.sequence) + - surge.SizeHintBytes(sequence.sequence.qualities) + - surge.SizeHintBytes(sequence.sequence.feature) + - surge.SizeHint(sequence.sequence.annotations) -} - -func (sequence BioSequence) Marshal(buf []byte, rem int) ([]byte, int, error) { - var err error - if buf, rem, err = surge.MarshalString(sequence.sequence.id, buf, rem); err != nil { - return buf, rem, err - } - if buf, rem, err = surge.MarshalString(sequence.sequence.definition, buf, rem); err != nil { - return buf, rem, err - } - if buf, rem, err = surge.MarshalBytes(sequence.sequence.sequence, buf, rem); err != nil { - return buf, rem, err - } - if buf, rem, err = surge.MarshalBytes(sequence.sequence.qualities, buf, rem); err != nil { - return buf, rem, err - } - if buf, rem, err = surge.MarshalBytes(sequence.sequence.feature, buf, rem); err != nil { - return buf, rem, err - } - if buf, rem, err = surge.Marshal(sequence.sequence.annotations, buf, rem); err != nil { - return buf, rem, err - } - - return buf, rem, err -} - -// Unmarshal is the opposite of Marshal, and requires -// a pointer receiver. -func (sequence *BioSequence) Unmarshal(buf []byte, rem int) ([]byte, int, error) { - var err error - if buf, rem, err = surge.UnmarshalString(&(sequence.sequence.id), buf, rem); err != nil { - return buf, rem, err - } - if buf, rem, err = surge.UnmarshalString(&(sequence.sequence.definition), buf, rem); err != nil { - return buf, rem, err - } - if buf, rem, err = surge.UnmarshalBytes(&(sequence.sequence.sequence), buf, rem); err != nil { - return buf, rem, err - } - if buf, rem, err = surge.UnmarshalBytes(&(sequence.sequence.qualities), buf, rem); err != nil { - return buf, rem, err - } - if buf, rem, err = surge.UnmarshalBytes(&(sequence.sequence.feature), buf, rem); err != nil { - return buf, rem, err - } - if buf, rem, err = surge.Unmarshal(&(sequence.sequence.annotations), buf, rem); err != nil { - return buf, rem, err - } - return buf, rem, err -} - -func (sequences BioSequenceSlice) SizeHint() int { - size := surge.SizeHintI64 - for _, s := range sequences { - size += s.SizeHint() - } - - return size -} - -func (sequences BioSequenceSlice) Marshal(buf []byte, rem int) ([]byte, int, error) { - var err error - - if buf, rem, err = surge.MarshalI64(int64(len(sequences)), buf, rem); err != nil { - return buf, rem, err - } - - for _, s := range sequences { - if buf, rem, err = s.Marshal(buf, rem); err != nil { - return buf, rem, err - } - } - - return buf, rem, err -} - -func (sequences *BioSequenceSlice) Unmarshal(buf []byte, rem int) ([]byte, int, error) { - var err error - var length int64 - - if buf, rem, err = surge.UnmarshalI64(&length, buf, rem); err != nil { - return buf, rem, err - } - - *sequences = make(BioSequenceSlice, length) - - for i := 0; i < int(length); i++ { - if buf, rem, err = ((*sequences)[i]).Unmarshal(buf, rem); err != nil { - return buf, rem, err - } - } - - return buf, rem, err -}