obi grep: now able to convert str to bytes in predicate expressions
This commit is contained in:
@ -13,6 +13,7 @@ from functools import reduce
|
||||
import time
|
||||
import re
|
||||
import sys
|
||||
import ast
|
||||
from cpython.exc cimport PyErr_CheckSignals
|
||||
|
||||
|
||||
@ -33,7 +34,7 @@ def addOptions(parser):
|
||||
group.add_argument("--predicate", "-p",
|
||||
action="append", dest="grep:grep_predicates",
|
||||
metavar="<PREDICATE>",
|
||||
default=None,
|
||||
default=[],
|
||||
type=str,
|
||||
help="Warning: use bytes for character strings (b'text' instead of 'text'). "
|
||||
"Python boolean expression to be evaluated in the "
|
||||
@ -140,15 +141,36 @@ def addOptions(parser):
|
||||
"the sequences having at least one of them are ignored.")
|
||||
|
||||
|
||||
def obi_compile_eval(str expr):
|
||||
|
||||
class MyVisitor(ast.NodeTransformer):
|
||||
def visit_Str(self, node: ast.Str):
|
||||
result = ast.Bytes(s = node.s.encode('utf-8'))
|
||||
return ast.copy_location(result, node)
|
||||
|
||||
expr = "obi_eval_result="+expr
|
||||
tree = ast.parse(expr)
|
||||
optimizer = MyVisitor()
|
||||
tree = optimizer.visit(tree)
|
||||
return compile(tree, filename="<ast>", mode="exec")
|
||||
|
||||
|
||||
def obi_eval(compiled_expr, loc_env, line):
|
||||
|
||||
exec(compiled_expr, {}, loc_env)
|
||||
obi_eval_result = loc_env["obi_eval_result"]
|
||||
return obi_eval_result
|
||||
|
||||
|
||||
def Filter_generator(options, tax_filter):
|
||||
#taxfilter = taxonomyFilterGenerator(options)
|
||||
|
||||
# Initialize conditions
|
||||
predicates = None
|
||||
if "grep_predicates" in options:
|
||||
predicates = options["grep_predicates"]
|
||||
predicates = [obi_compile_eval(p) for p in options["grep_predicates"]]
|
||||
attributes = None
|
||||
if "attributes" in options:
|
||||
if "attributes" in options and len(options["attributes"]) > 0:
|
||||
attributes = options["attributes"]
|
||||
lmax = None
|
||||
if "lmax" in options:
|
||||
@ -172,7 +194,7 @@ def Filter_generator(options, tax_filter):
|
||||
if "def_pattern" in options:
|
||||
def_pattern = re.compile(tobytes(options["def_pattern"]))
|
||||
attribute_patterns={}
|
||||
if "attribute_patterns" in options:
|
||||
if "attribute_patterns" in options and len(options["attribute_patterns"]) > 0:
|
||||
for p in options["attribute_patterns"]:
|
||||
attribute, pattern = p.split(":", 1)
|
||||
attribute_patterns[tobytes(attribute)] = re.compile(tobytes(pattern))
|
||||
@ -199,7 +221,7 @@ def Filter_generator(options, tax_filter):
|
||||
|
||||
if good and attribute_patterns:
|
||||
good = (reduce(lambda bint x, bint y : x and y,
|
||||
(line[attribute] is not None for attribute in attributes),
|
||||
(line[attribute] is not None for attribute in attribute_patterns),
|
||||
True)
|
||||
and
|
||||
reduce(lambda bint x, bint y: x and y,
|
||||
@ -209,12 +231,9 @@ def Filter_generator(options, tax_filter):
|
||||
)
|
||||
|
||||
if good and predicates:
|
||||
try:
|
||||
good = (reduce(lambda bint x, bint y: x and y,
|
||||
(bool(eval(p, loc_env, line))
|
||||
for p in predicates), True))
|
||||
except TypeError:
|
||||
raise Exception("Try replacing strings in python expression with bytes (by putting 'b' in front of strings, e.g. b'example')")
|
||||
good = (reduce(lambda bint x, bint y: x and y,
|
||||
(bool(obi_eval(p, loc_env, line))
|
||||
for p in predicates), True))
|
||||
|
||||
if good and lmin:
|
||||
good = len(line) >= lmin
|
||||
@ -313,7 +332,7 @@ def run(config):
|
||||
pb(i)
|
||||
line = i_view[i]
|
||||
|
||||
loc_env = {"sequence": line, "line": line, "taxonomy": taxo}
|
||||
loc_env = {"sequence": line, "line": line, "taxonomy": taxo, "obi_eval_result": False}
|
||||
|
||||
good = filter(line, loc_env)
|
||||
|
||||
|
Reference in New Issue
Block a user