# _*_ coding: utf-8 _*_
import typing
from abc import ABC
from copy import copy
from warnings import warn
from zope.interface import implementer
from fhirpath.enums import EngineQueryType
from fhirpath.exceptions import ConstraintNotSatisfied, ValidationError
from fhirpath.model import Model
from fhirpath.thirdparty import Proxy
from fhirpath.utils import FHIR_VERSION, builder
from .constraints import required_finalized, required_not_finalized
from .exceptions import MultipleResultsFound
from .fql.expressions import and_, fql, sort_
from .fql.types import (
ElementClause,
ElementPath,
FromClause,
LimitClause,
SelectClause,
SortClause,
WhereClause,
)
from .interfaces import (
ICloneable,
IElementPath,
IGroupTerm,
IQuery,
IQueryBuilder,
IQueryResult,
ISortTerm,
ITerm,
)
if typing.TYPE_CHECKING:
from fhirpath.engine.base import Engine
__author__ = "Md Nazrul Islam<email2nazrul@gmail.com>"
[docs]@implementer(IQuery, ICloneable)
class Query(ABC):
""" """
def __init__(
self,
fhir_release: FHIR_VERSION,
from_: FromClause,
select: SelectClause,
element: ElementClause,
where: WhereClause,
sort: SortClause,
limit: LimitClause,
):
""" """
self.fhir_release: FHIR_VERSION = FHIR_VERSION.normalize(fhir_release)
self._from: FromClause = from_
self._select: SelectClause = select
self._element: ElementClause = element
self._where: WhereClause = where
self._sort: SortClause = sort
self._limit: LimitClause = limit
@classmethod
def _builder(cls, engine: typing.Optional["Engine"] = None) -> "QueryBuilder":
return QueryBuilder(engine)
[docs] @classmethod
def from_builder(cls, builder: "QueryBuilder") -> "Query":
"""Create Query object from QueryBuilder.
Kind of reverse process"""
if not IQueryBuilder(builder)._finalized:
raise ConstraintNotSatisfied(
"QueryBuilder object must be in finalized state"
)
query = cls(
builder._engine.fhir_release, # type: ignore
builder._from, # type: ignore
builder._select, # type: ignore
builder._element, # type: ignore
builder._where, # type: ignore
builder._sort, # type: ignore
builder._limit, # type: ignore
)
return query
[docs] def get_where(self) -> WhereClause:
""" """
return self._where
[docs] def get_from(self) -> FromClause:
""" """
return self._from
[docs] def get_select(self) -> SelectClause:
""" """
return self._select
[docs] def get_element(self) -> ElementClause:
""" """
return self._element
[docs] def get_sort(self) -> SortClause:
""" """
return self._sort
[docs] def get_limit(self) -> LimitClause:
""" """
return self._limit
[docs] def clone(self) -> "Query":
""" """
return self.__copy__()
def __copy__(self) -> "Query":
""" """
newone = type(self).__new__(type(self))
newone.__dict__.update(self.__dict__)
newone.fhir_release = self.fhir_release
newone._from = copy(self._from)
newone._select = copy(self._select)
newone._where = copy(self._where)
newone._sort = copy(self._sort)
newone._limit = copy(self._limit)
return newone
def __proxy__(self):
""" """
proxied = Proxy().initialize(self)
return proxied
[docs]@implementer(IQueryBuilder)
class QueryBuilder(ABC):
""" """
def __init__(self, engine: typing.Optional["Engine"] = None):
""" """
self._engine: typing.Optional["Engine"] = engine
self._finalized: bool = False
self._from: FromClause = FromClause()
self._select: SelectClause = SelectClause()
self._element: ElementClause = ElementClause()
self._where: WhereClause = WhereClause()
self._sort: SortClause = SortClause()
self._limit: LimitClause = LimitClause()
[docs] def bind(self, engine: "Engine"):
""" """
# might be clone
self._engine = engine
[docs] def clone(self) -> "QueryBuilder":
""" """
return self.__copy__()
[docs] def finalize(self, engine: typing.Optional["Engine"] = None):
""" """
self._pre_check()
if engine:
self.bind(engine)
if self._engine is None:
raise ConstraintNotSatisfied(
f"Object from '{self.__class__.__name__}' must be bound with engine"
)
# xxx: do any validation?
if len(self._element) == 0:
el_path = ElementPath("*")
self._element.append(el_path)
# Finalize path elements
[se.finalize(self._engine) for se in self._select]
# Finalize where terms on demand
[wr.finalize(self._engine) for wr in self._where]
# Finalize sorts ondemand
[sr.finalize(self._engine) for sr in self._sort]
self._validate()
self._finalized = True
def __copy__(self) -> "QueryBuilder":
""" """
newone = type(self).__new__(type(self))
newone.__dict__.update(self.__dict__)
newone._finalized = self._finalized
newone._engine = self._engine
newone._limit = copy(self._limit)
newone._from = copy(self._from)
newone._select = copy(self._select)
newone._element = copy(self._element)
newone._where = copy(self._where)
newone._sort = copy(self._sort)
return newone
@builder
def from_(self, resource_type: typing.Union[str, typing.List[str]]):
""" """
required_not_finalized(self)
assert self._engine
if isinstance(resource_type, str):
model = Model.create(resource_type, fhir_release=self._engine.fhir_release)
self._from.append((resource_type, model))
else:
for r_type in resource_type:
model = Model.create(r_type, fhir_release=self._engine.fhir_release)
self._from.append((r_type, model))
@builder
def select(self, *args):
""" """
self._pre_check()
for el_path in args:
if not IElementPath.providedBy(el_path):
el_path = ElementPath(el_path)
# Make sure correct root path
if not (el_path.star or el_path.non_fhir):
self._validate_root_path(str(el_path))
self._select.append(el_path)
@builder
def element(self, *args):
""" """
self._pre_check()
for el_path in args:
if not IElementPath.providedBy(el_path):
el_path = ElementPath(el_path)
# Make sure correct root path
if not (el_path.star or el_path.non_fhir):
self._validate_root_path(str(el_path))
self._element.append(el_path)
@builder
def where(self, *args, **kwargs):
""" """
self._pre_check()
if len(kwargs) > 0:
for path, value in kwargs.items():
term = and_(path, value)
self._validate_term_path(term)
self._where.append(term)
for term in args:
assert ITerm.providedBy(term) is True
self._validate_term_path(term)
self._where.append(term)
@builder
def limit(self, limit: int, offset: int = 0):
""" """
self._pre_check()
self._limit.limit = limit
self._limit.offset = offset
@builder
def sort(self, *args):
""" """
self._pre_check()
for sort_path in args:
if not ISortTerm.providedBy(sort_path):
if isinstance(sort_path, (tuple, list)):
sort_path = sort_(*sort_path)
else:
sort_path = sort_(sort_path)
self._sort.append(sort_path)
[docs] def get_query(self) -> "Query":
""" """
required_finalized(self)
return Query.from_builder(self)
def __fql__(self):
""" """
required_finalized(self)
return fql(self.context.dialect.bind(self))
def __str__(self):
""" """
required_finalized(self)
return self.__fql__()
def __call__(
self,
unrestricted: bool = False,
engine: typing.Optional["Engine"] = None,
async_result: bool = None,
) -> typing.Union["QueryResult", "AsyncQueryResult"]:
""" """
if async_result is not None:
warn(
"'async_result' is no longer used, as Engine has that info already. "
"this parameter will be removed in future release.",
category=DeprecationWarning,
)
if not self._finalized and (engine or self._engine):
self.finalize(engine)
query = self.get_query()
if typing.TYPE_CHECKING:
assert self._engine
if typing.TYPE_CHECKING:
result_factory: typing.Union[
typing.Type[AsyncQueryResult], typing.Type[QueryResult]
]
if self._engine.__class__.is_async() is True:
result_factory = AsyncQueryResult
else:
result_factory = QueryResult
result = result_factory(
query=query, engine=self._engine, unrestricted=unrestricted
)
return result
def _pre_check(self):
""" """
# TODO can we modify this check somehow?
# required_from_resource(self)
required_not_finalized(self)
def _validate(self):
""" """
# validate select elements
if any([el.star for el in self._select]) and len(self._select) > 1:
raise ValidationError("select(*) cannot co-exists other select element!")
def _validate_root_path(self, path_string: str):
""" """
root_path = path_string.split(".")[0]
if self._from:
match = any(alias == root_path for alias, _ in self._from)
else:
# FIXME: find a better way to validate that we're searching on all resources
match = root_path == "Resource"
if not match:
raise ValidationError(
f"Root path '{root_path}' must be matched with from models"
)
def _validate_term_path(self, term):
""" """
if IGroupTerm.providedBy(term):
for trm in term.terms:
self._validate_term_path(trm)
else:
self._validate_root_path(str(term.path))
[docs]@implementer(IQueryResult)
class QueryResult(ABC):
""" """
def __init__(self, query: Query, engine: "Engine", unrestricted: bool = False):
""" """
self._query: Query = query
self._engine: "Engine" = engine
self._unrestricted: bool = unrestricted
[docs] def fetchall(self):
""" """
return self._engine.execute(self._query, self._unrestricted)
[docs] def single(self):
"""Will return the single item in the input if there is just one item.
If the input collection is empty ({ }), the result is empty.
If there are multiple items, an error is signaled to the evaluation environment.
This operation is useful for ensuring that an error is returned
if an assumption about cardinality is violated at run-time."""
result = self.fetchall()
if result.header.total == 0:
return None
if result.header.total > 1:
raise MultipleResultsFound
return result.body[0]
[docs] def first(self):
"""Returns a collection containing only the first item in the input collection.
This function is equivalent to item(0), so it will return an empty collection
if the input collection has no items."""
query = self._query.clone()
query._limit.limit = 1
result = self._engine.execute(query, self._unrestricted)
if result.header.total > 0:
return result.body[0]
return None
[docs] def last(self):
"""Returns a collection containing only the last item in the input collection.
Will return an empty collection if the input collection has no items."""
[docs] def tail(self):
"""Returns a collection containing all but the first item in the input
collection. Will return an empty collection
if the input collection has no items, or only one item."""
[docs] def skip(self, num: int):
"""Returns a collection containing all but the first num items
in the input collection. Will return an empty collection
if there are no items remaining after the indicated number of items have
been skipped, or if the input collection is empty.
If num is less than or equal to zero, the input collection
is simply returned."""
[docs] def take(self, num: int):
"""Returns a collection containing the first num items in the input collection,
or less if there are less than num items. If num is less than or equal to 0, or
if the input collection is empty ({ }), take returns an empty collection."""
[docs] def count_raw(self):
"""Returns EngineResult"""
return self._engine.execute(
self._query, self._unrestricted, EngineQueryType.COUNT
)
[docs] def count(self) -> int:
"""Returns the integer count of the number of items in the input collection.
Returns 0 when the input collection is empty."""
return self.count_raw().header.total
[docs] def empty(self) -> bool:
"""Returns true if the input collection is empty ({ }) and false otherwise."""
return self.count() == 0
def __len__(self) -> int:
""" Returns the number of resources matching the query"""
return self.count()
[docs] def OFF__getitem__(self, key):
"""
Lazy loading es results with negative index support.
We store the results in buckets of what the bulk size is.
This is so you can skip around in the indexes without needing
to load all the data.
Example(all zero based indexing here remember)::
(525 results with bulk size 50)
- self[0]: 0 bucket, 0 item
- self[10]: 0 bucket, 10 item
- self[50]: 50 bucket: 0 item
- self[55]: 50 bucket: 5 item
- self[352]: 350 bucket: 2 item
- self[-1]: 500 bucket: 24 item
- self[-2]: 500 bucket: 23 item
- self[-55]: 450 bucket: 19 item
"""
# if isinstance(key, slice):
# return [self[i] for i in range(key.start, key.end)]
# else:
# if key + 1 > self.count:
# raise IndexError
# elif key < 0 and abs(key) > self.count:
# raise IndexError
# if key >= 0:
# result_key = (key / self.bulk_size) * self.bulk_size
# start = result_key
# result_index = key % self.bulk_size
# elif key < 0:
# last_key = (
# int(math.floor(float(self.count) / float(self.bulk_size)))
# * self.bulk_size
# )
# start = result_key = last_key - (
# (abs(key) / self.bulk_size) * self.bulk_size
# )
# if last_key == result_key:
# result_index = key
# else:
# result_index = (key % self.bulk_size) - (
# self.bulk_size - (self.count % last_key)
# )
# if result_key not in self.results:
# self.results[result_key] = self.es._search(
# self.query, sort=self.sort, start=start, **self.query_params
# )["hits"]["hits"]
# return self.results[result_key][result_index]
def __iter__(self):
""" """
result = self._engine.execute(self._query, self._unrestricted)
model_class = self._query.get_from()[0][1]
for row in result.body:
if self._query.get_element()[0].star:
yield model_class(**row[0])
else:
yield row
[docs]class AsyncQueryResult(QueryResult):
""" """
[docs] async def fetchall(self):
""" """
result = await self._engine.execute(self._query, self._unrestricted)
return result
async def __aiter__(self):
""" """
result = await self._engine.execute(self._query, self._unrestricted)
model_class = self._query.get_from()[0][1]
for row in result.body:
if self._query.get_element()[0].star:
yield model_class(**row[0])
else:
yield row
[docs] async def single(self):
""" """
result = await self.fetchall()
if result.header.total == 0:
return None
if result.header.total > 1:
raise MultipleResultsFound
return result.body[0]
[docs] async def first(self):
""" """
query = self._query.clone()
query._limit.limit = 1
result = await self._engine.execute(query, self._unrestricted)
if result.header.total > 0:
return result.body[0]
return None
[docs] async def count(self):
"""Returns the integer count of the number of items in the input collection.
Returns 0 when the input collection is empty."""
result = await self.count_raw()
return result.header.total
[docs] async def empty(self):
"""Returns true if the input collection is empty ({ }) and false otherwise."""
count = await self.count()
return count == 0
[docs]def Q_(
resource: typing.Optional[typing.Union[str, typing.List[str]]] = None, engine=None
):
""" """
builder = Query._builder(engine)
if resource:
builder = builder.from_(resource)
return builder