Cython API: added API to use apat (pattern search) C library

This commit is contained in:
Celine Mercier
2019-02-06 18:12:49 +01:00
parent 6ca6d27ecb
commit 04a3682307
3 changed files with 239 additions and 0 deletions

View File

@ -0,0 +1,61 @@
#cython: language_level=3
from obitools3.dms.capi.obiview cimport Obiview_p
from obitools3.dms.capi.obidmscolumn cimport OBIDMS_column_p
from obitools3.dms.capi.obitypes cimport index_t
from libc.stdint cimport int32_t, uint32_t, uint8_t
cdef extern from "libecoPCR/libapat/libstki.h" nogil:
struct Stacki :
int32_t size
int32_t top
int32_t cursor
int32_t* val
ctypedef Stacki* StackiPtr
cdef extern from "libecoPCR/libapat/apat.h" nogil:
extern int MAX_PATTERN
struct Seq :
char* name
int32_t seqlen
int32_t seqsiz
int32_t datsiz
int32_t circular
uint8_t* data
char* cseq
StackiPtr* hitpos
StackiPtr* hiterr
ctypedef Seq* SeqPtr
struct Pattern :
int patlen
int maxerr
char* cpat
int32_t* patcode
uint32_t* smat
uint32_t omask
bint hasIndel
bint ok
ctypedef Pattern* PatternPtr
int32_t ManberAll(Seq *pseq, Pattern *ppat, int patnum, int begin, int length)
cdef extern from "libecoPCR/ecoPCR.h" nogil:
SeqPtr ecoseq2apatseq(char* sequence, SeqPtr out, int32_t circular)
int32_t delete_apatseq(SeqPtr pseq)
PatternPtr buildPattern(const char *pat, int32_t error_max)
PatternPtr complementPattern(PatternPtr pat)

View File

@ -0,0 +1,28 @@
#cython: language_level=3
from obitools3.dms.capi.apat cimport SeqPtr
cdef class One_primer_search_result:
cdef SeqPtr _pointer
cdef int pattern_ref
cdef int hit_count
cdef inline SeqPtr pointer(self)
@staticmethod
cdef new(SeqPtr apat_seq_p, int pattern_ref, int hit_count)
cpdef first_encountered(self)
cdef class Primer_search:
cdef SeqPtr apat_seq_p
cdef list direct_primers
cdef list revcomp_primers
cpdef One_primer_search_result search_one_primer(self, bytes sequence,
int primer_pair_index, int primer_index,
bint reverse_comp=*,
bint same_sequence=*,
int pattern_ref=*,
int begin=*)
cpdef free(self)

View File

@ -0,0 +1,150 @@
#cython: language_level=3
from libc.stdint cimport uintptr_t
from obitools3.dms.capi.apat cimport SeqPtr, \
PatternPtr, \
ecoseq2apatseq, \
ManberAll, \
delete_apatseq, \
buildPattern, \
complementPattern
cdef class One_primer_search_result:
cdef inline SeqPtr pointer(self) :
return <SeqPtr>(self._pointer)
@staticmethod
cdef new(SeqPtr apat_seq_p, int pattern_ref, int hit_count) : # not __init__ method because need to pass pointer
result = One_primer_search_result()
result._pointer = apat_seq_p
result.pattern_ref = pattern_ref
result.hit_count = hit_count
return result
cpdef first_encountered(self):
cdef int i
cdef int first
cdef int first_index
cdef int hit_pos
cdef int error_count
cdef SeqPtr pointer
pointer = self.pointer()
first = -1
for i in range(self.hit_count):
hit_pos = pointer.hitpos[self.pattern_ref].val[i]
if first == -1 or hit_pos < first:
first = hit_pos
first_index = i
error_count = pointer.hiterr[self.pattern_ref].val[first_index]
return error_count, first
cdef class Primer_search:
def __init__(self, list primers, int error_max) :
cdef PatternPtr p1
cdef PatternPtr p2
cdef PatternPtr p3
cdef PatternPtr p4
cdef PatternPtr test
cdef uintptr_t test_i
cdef bytes test_b
self.apat_seq_p = NULL
self.direct_primers = []
self.revcomp_primers = []
for i in range(len(primers)):
p1 = buildPattern(primers[i][0], error_max)
p2 = buildPattern(primers[i][1], error_max)
p3 = complementPattern(p1)
p4 = complementPattern(p2)
self.direct_primers.append((<uintptr_t>p1, <uintptr_t>p2))
self.revcomp_primers.append((<uintptr_t>p3, <uintptr_t>p4))
cpdef One_primer_search_result search_one_primer(self, bytes sequence,
int primer_pair_index, int primer_index,
bint reverse_comp=False,
bint same_sequence=False,
int pattern_ref=0,
int begin=0) :
'''
begin = start of direct pattern if it was already searched + its length
'''
cdef One_primer_search_result result = None
cdef PatternPtr primer_p
cdef SeqPtr apat_seq_p
cdef int hit_count
if not same_sequence:
self.apat_seq_p = <SeqPtr>(ecoseq2apatseq(sequence, self.apat_seq_p, 0))
apat_seq_p = <SeqPtr>(self.apat_seq_p)
if reverse_comp:
primer_p = <PatternPtr>(<uintptr_t>(self.revcomp_primers[primer_pair_index][primer_index]))
else:
primer_p = <PatternPtr>(<uintptr_t>(self.direct_primers[primer_pair_index][primer_index]))
begin = begin
seqlen = (apat_seq_p.seqlen) - begin
hit_count = ManberAll(apat_seq_p, primer_p, pattern_ref, begin, seqlen)
if hit_count:
result = One_primer_search_result.new(apat_seq_p, pattern_ref, hit_count)
else:
result = None
return result
cpdef free(self):
delete_apatseq(self.apat_seq_p)
# min_error_count = -1
# best_hit = -1
# for i in range(hit_count):
# error_count = apat_seq_p.hiterr[pattern_ref].val[i]
# hit_pos = apat_seq_p.hitpos[pattern_ref].val[i]
# if min_error_count < 0 or error_count < min_error_count:
# self.min_error_count = error_count
# self.best_hit = i
# def __call__(self, bytes sequence):
# # TODO declare things
# # Search ALL primers in the direct direction
# p1 = search_one_primer(self, sequence, True, False, same_sequence=False)
# p2 = search_one_primer(self, sequence, False, False, same_sequence=True)
#
# # Search each primer in both directions
# # 1st direction
# p1 = search_one_primer(self, sequence, True, False, same_sequence=False)
# p2_revcomp = search_one_primer(self, sequence, False, True, same_sequence=True)
# # 2nd direction
# p2 = search_one_primer(self, sequence, False, False, same_sequence=True)
# p1_revcomp = search_one_primer(self, sequence, True, True, same_sequence=True)
# # Choose best hit (best score for direction and longest amplicon if multiple hits in one direction)
# if p1.min_error_count + p2_revcomp.min_error_count < p2.min_error_count + p1_revcomp.min_error_count:
# direct_hit = True
# if p1.hit_count > 1:
# pos1 = min((pos, idx) for (idx, pos) in enumerate(p1.hit_pos))[0]
# if p2_revcomp.hit_count > 1:
# pos2 = max((pos, idx) for (idx, pos) in enumerate(p2_revcomp.hit_pos))[0]
# else:
# direct_hit = False
# if p2.hit_count > 1:
# pos1 = min((pos, idx) for (idx, pos) in enumerate(p2.hit_pos))[0]
# if p2_revcomp.hit_count > 1:
# pos2 = max((pos, idx) for (idx, pos) in enumerate(p2_revcomp.hit_pos))[0]