Source code for elastic_connect.base_model

import elastic_connect
import elastic_connect.data_types as data_types
import elastic_connect.data_types.base
import logging

logger = logging.getLogger(__name__)


[docs]class IntegrityError(Exception): pass
[docs]class Model(object): """ Base class for Models stored in elasticseach. Handles creation, loading and saving of the models. Also handles simple SQL-like joins, in fact lazy loading dependent child/parent models. """ __slots__ = ('id', ) _mapping = { # type: dict[str:data_types.base.BaseDataType] 'id': data_types.Keyword(name='id'), } """ Dictionary describing the model. property_name: elasticsearch data type or 'ref' for reference to other model, defined by a join keys starting with _ are not saved in elasticsearch """ _meta = { '_doc_type': 'model', } _es_namespace = elastic_connect._namespaces['_default'] _es_connection = None def __init__(self, **kw): r""" Creates an instance of the model using \*\*kw parameters for setting values of attributes. The values get converted by their respective data_type.from_python method. """ for property, type in self._mapping.items(): self.__update(property, type.get_default_value()) for property, type in self._mapping.items(): value = kw.get(property, type.get_default_value()) self.__update(property, type.on_update(type.from_python(value), self))
[docs] @classmethod def get_index(cls): """ Returns the name of the index this model is stored in, including any prefixes defined globally or in namespace. In ES >= 6 each model type needs it's own index. ES < 6 supports multiple doc_types in a single index. """ return cls._es_namespace.index_prefix + cls._meta['_doc_type']
[docs] @classmethod def get_doctype(cls): """ :deprecated: Returns the name of the index this model is stored in, including any prefixes defined globally or in namespace. In ES >= 6 each model type needs it's own index. ES < 6 supports multiple doc_types in a single index. """ return cls._meta['_doc_type']
[docs] def _compute_id(self): """ Count or return stored id for this model instance. :return: None for unsaved models which should receive a unique id generated by Elasticsearch. Should be overriden and return some computed value for models which handle their uniqe id by themselves - mainly to keep a model parameter unique """ return self.id
[docs] @classmethod def get_es_connection(cls): """ Initializes or returns an existing DocTypeConnection to elasticsearch for this model. :return: DocTypeConnection """ if not cls._es_connection: logger.debug("%s connecting to %s", cls.__name__, str(cls._es_namespace.__dict__)) cls._es_connection = elastic_connect.DocTypeConnection( model=cls, es_namespace=cls._es_namespace, index=cls.get_index(), doc_type=cls.get_doctype()) logger.debug("connection index name %s", cls._es_connection.index_name) else: logger.debug("%s connection already established %s", cls.__name__, str(cls._es_connection.__dict__)) return cls._es_connection
[docs] @classmethod def from_dict(cls, **kw): """ Create and return an unsaved model instance based on dict. :param kw: keyword arguments describing the model's attributes :return: instance of the model """ model = cls(**kw) return model
[docs] @classmethod def from_es(cls, hit): """ Create and return an unsaved model instance based on elasticsearch query result. :param hit: a ``hit`` from an elasticsearch query :return: instance of the model """ kwargs = {} for property, type in cls._mapping.items(): kwargs.update({property: type.from_es(hit['_source'])}) kwargs['id'] = hit['_id'] model = cls(**kwargs) return model
[docs] @classmethod def create(cls, **kw) -> 'Model': """ Create, save and return a model instance based on dictionary. Property id gets set by elasticsearch or computed depending on cls._compute_id() :param kw: keyword arguments describing the model's attributes :return: instance of the model with the ``id`` set """ model = cls.from_dict(**kw) model.id = model._compute_id() ret = cls._create(model) return ret
[docs] @classmethod def _create(cls, model): """ Handles the creation of the model in elasticsearch Models without an id are indexed, thus receiving id from elasticsearch. Models with id are created. This prevents the creation of duplicates. :param model: the model to be created :return: the model with the ``id`` set """ serialized_flat = model.serialize(exclude=['id'], flat=True) if model.id: response = cls.get_es_connection().create(id=model.id, body=serialized_flat) # TODO: probably needs to call cls.refresh() to properly prevent # creation of duplicates else: logger.debug("serialize in _create %s", model.serialize(exclude=['id'])) response = cls.get_es_connection().index(body=serialized_flat) model.id = response['_id'] logger.debug("model.id from _create %s", model.id) model.post_save() return model
[docs] def save(self): """ Save a model that has an id, index a model without an id into elasticsearch. Saves unsaved joins recursively. Joined models, which already have an id (and thus are already present in the database) are not re-saved automatically. You must save them yourself if they changed. :return: self with dependencies updated """ es_connection = self.get_es_connection() if self.id: cmp = self._compute_id() if cmp and cmp != self.id: raise IntegrityError("Can't save model with a changed " "computed id, create a new model") serialized = self.serialize(exclude=['id']) es_connection.update(id=self.id, body={'doc': serialized}) else: self.id = self._compute_id() serialized_flat = self.serialize(exclude=['id'], flat=True) if self.id: response = es_connection.create(id=self.id, body=serialized_flat) else: response = es_connection.index(body=serialized_flat) self.id = response['_id'] logger.debug("model.id from save %s", self.id) return self.post_save()
def post_save(self): logger.debug("post_save %s %s", self.__class__.__name__, self.id) ret = [] for property, type in self._mapping.items(): ret.append(type.on_save(model=self)) logger.debug("post_save ret %s %s", self.id, ret) ret = [r for r in ret if r is not None] if len(ret): # resave, because some child models were updated self.save() return self
[docs] def delete(self): """ Delete a model from elasticsearch. :return: None """ self.get_es_connection().delete(id=self.id)
[docs] def _lazy_load(self): """ Lazy loads model's joins - child / parent models. """ for property, type in self._mapping.items(): logger.debug("pre _lazy_load %s %s", property, self.__getattribute__(property)) self.__update(property, type.lazy_load(self)) logger.debug("_lazy_load %s", self) return self
[docs] @classmethod def get(cls, id): """ Get a model by id from elasticsearch. :param id: id of the model to get :return: returns an instance of elastic_connect.connect.Result """ if isinstance(id, str): print("getting single documents %s" % id) ret = cls.get_es_connection().get(id=id) return ret else: logger.debug("getting multiple documents %s" % id) if not id: return [] print("getting multiple document %s" % id) ret = cls.get_es_connection().mget(body={'ids':id}) return ret
[docs] @classmethod def all(cls, size=100, sort=None): """ Get all models from Elasticsearch. :param size: max number of hits to return. Default = 100. :param sort: sorting of the result as provided by prepare_sort(sort) :return: returns an instance of elastic_connect.connect.Result """ sort = cls.prepare_sort(sort, stringify=True) return cls.get_es_connection().search(sort=sort, size=size)
[docs] @classmethod def get_default_sort(cls): """ Returns the default sort order, which is used by find_by() and all() if no other sorting is explicitly provided in their call. """ sort = [] if hasattr(cls, 'order'): sort.append({'order': 'asc'}) sort.append({'_uid': 'asc'}) return sort
[docs] @classmethod def prepare_sort(cls, sort=None, stringify=False): """ Prepares sorting for model. Defaults to get_default_sort, {"_uid": "asc"} is also appended as last resort to all sorts that don't use _uid. Sorting by _id is not supported by elasticsearch, use _uid (_doc_type + '#' + _id) instead. Important: _uid is not incremental in elasticsearch, it's here just to get constistent results on the same dataset. :param sort: array of {property: "asc|desc"} values :param stringify: default False: if the result should be stringified for kw parameter, or left in the json format for body of Elasticsearch query. :return: returns the input sort with appended {"_uid": "asc"} """ def prepare_sort_array(sort): if not sort: return cls.get_default_sort() for s in sort: if '_uid' in s: return sort sort.append({'_uid': 'asc'}) return sort sort = prepare_sort_array(sort) if not stringify: return sort ret = [] for pair in sort: for key, value in pair.items(): ret.append("%s:%s" % (key, value)) return ret
[docs] @classmethod def find_by(cls, size=100, sort=None, search_after=None, query=None, **kw): """ Search for models in Elasticsearch by attribute values. :example: .. code-block:: python # return model with email="test@test.cz" model.find_by(email="test@test.cz") # return model with both email="test@test.cz" and parent=10 model.find_by(email="test@test.cz", parent=10) # return models with parent 10 sorted by email ascending model.find_by(parent=10, sort=[{"email":"asc"}]) # return models with email >= "foo@bar.cz" (and _uid > '' as # per default sort order, every _uid is greated than '') model.find_by(parent=10, sort=[{"email":"asc"}], search_after["foo@bar.cz", '']) # return models with parent 10 and email _anything_@bar.cz model.find_by(query="parent: 10 AND email: *@bar.cz") :param size: max number of hits to return. Default = 100. :param kw: attributes of the model by which to search :param sort: sorting of the result as provided by prepare_sort(sort) :param search_after: searches for results 'after' the value(s) supplied, preferably used with elastic_connect.connect.Result.search_after_values :param query: instead of specifying kw search arguments, you may enter here a wildcard query :return: returns an instance of elastic_connect.connect.Result """ if not query: query = kw sort = cls.prepare_sort(sort) if isinstance(query, str): _query = { "bool": { "must": [ { "query_string": { "query": query, "analyze_wildcard": True } } ] } } else: if len(query.keys()) == 1: _query = {"term": query} else: _query = { "bool": { "must": [{"term": {k: kw[k]}} for k in query.keys()] } } body = { "size": size, "query": _query, "sort": sort } if search_after: body['search_after'] = search_after logger.debug("find_by body %s", body) ret = cls.get_es_connection().search(body=body) return ret
[docs] def serialize(self, exclude=["password"], depth=0, to_str=False, flat=False): """ Serilaizes the model for storing to elasticsearch. Joins are flattened from join: model format to join: model.id format. Other attributes are serialized by their respective type.serialize method :param exclude: default=["password"] :param depth: default=0, the depth up to which models are serialized as jsons, deeper than that models are reduced to their id :param to_str: default=False, pass True if the serialization is for console output purposes :param flat: default=False, unsaved joined models are returned as Models if False, as None if True :return: json representation of the model """ ret = {} for property, type in self._mapping.items(): if property not in exclude: serialized = type.serialize(self.__getattribute__(property), depth=depth, to_str=to_str, flat=flat) ret.update({property: serialized}) return ret
def __repr__(self): if self.id: return object.__repr__(self) + str(self) return object.__repr__(self) def __str__(self): return str(self.serialize(depth=0, to_str=True))
[docs] @classmethod def refresh(cls): """ Refresh the index where this model is stored to make all changes immediately visible to others. """ cls._es_namespace.get_es().indices.refresh(index=cls.get_index())
def __setattr__(self, name, value): if name in self._mapping: return self.__update(name, value) return super().__setattr__(name, value) def __update(self, name, value): super().__setattr__(name, self._mapping[name].on_update(value, self)) return self
[docs] @classmethod def get_es_mapping(cls): """ Returns a dict representing the elastic search mapping for this model :return: dict """ mapping = {} for name, type in cls._mapping.items(): es_type = type.get_es_type() if name != 'id' and es_type: mapping[name] = es_type logger.debug("mapping %s", mapping) return mapping