Source code for elastic_connect.base_model

import elastic_connect
import elastic_connect.data_types as data_types
import elastic_connect.data_types.base
import logging

logger = logging.getLogger(__name__)


[docs]class IntegrityError(Exception): pass
[docs]class Model(object): """ Base class for Models stored in elasticseach. Handles creation, loading and saving of the models. Also handles simple SQL-like joins, in fact lazy loading dependent child/parent models. """ __slots__ = ('id', ) _mapping = { # type: dict[str:elastic_connect.data_types.base.BaseDataType] 'id': data_types.Keyword(name='id'), } """ Dictionary describing the model. property_name: elasticsearch data type or 'ref' for reference to other model, defined by a join keys starting with _ are not saved in elasticsearch """ _meta = { '_doc_type': 'model', } _es_namespace = elastic_connect._namespaces['_default'] _es_connection = None def __init__(self, **kw): """ Creates an instance of the model using \*\*kw parameters for setting values of attributes. The values get converted by their respective data_type.from_python method. """ for property, type in self._mapping.items(): self.__update(property, type.get_default_value()) for property, type in self._mapping.items(): self.__update(property, type.on_update(type.from_python(kw.get(property, type.get_default_value())), self))
[docs] @classmethod def get_index(cls): """ :deprecated: Returns the name of the index this model is stored in, includin any prefixes defined globally or in namespace. In ES >= 6 each model type needs it's own index. ES < 6 supports multiple doc_types in a single index. """ return cls._es_namespace.index_prefix + cls._meta['_doc_type']
[docs] def _compute_id(self): """ Count or return stored id for this model instance. :return: None for unsaved models which should receive a unique id generated by Elasticsearch. Should be overriden and return some computed value for models which handle their uniqe id by themselves - mainly to keep a model parameter unique """ return self.id
[docs] @classmethod def get_es_connection(cls): """ Initializes or returns an existing DocTypeConnection to elasticsearch for this model. :return: DocTypeConnection """ if not cls._es_connection: logger.debug(cls.__name__ + " connecting to " + str(cls._es_namespace.__dict__)) cls._es_connection = elastic_connect.DocTypeConnection(model=cls, es_namespace=cls._es_namespace, index=cls.get_index(), doc_type=cls._meta['_doc_type']) logger.debug("connection index name " + cls._es_connection.index_name) else: logger.debug(cls.__name__ + " connection already established " + str(cls._es_connection.__dict__)) return cls._es_connection
[docs] @classmethod def from_dict(cls, **kw): """ Create and return an unsaved model instance based on dict. :param kw: keyword arguments describing the model's attributes :return: instance of the model """ model = cls(**kw) return model
[docs] @classmethod def from_es(cls, hit): """ Create and return an unsaved model instance based on elasticsearch query result. :param hit: a ``hit`` from an elasticsearch query :return: instance of the model """ kwargs = {} for property, type in cls._mapping.items(): kwargs.update({property: type.from_es(hit['_source'])}) kwargs['id'] = hit['_id'] model = cls(**kwargs) return model
[docs] @classmethod def create(cls, **kw) -> 'Model': """ Create, save and return a model instance based on dictionary. Property id gets set by elasticsearch or computed depending on cls._compute_id() :param kw: keyword arguments describing the model's attributes :return: instance of the model with the ``id`` set """ model = cls.from_dict(**kw) model.id = model._compute_id() ret = cls._create(model) return ret
[docs] @classmethod def _create(cls, model): """ Handles the creation of the model in elasticsearch Models without an id are indexed, thus receiving id from elasticsearch. Models with id are created. This prevents the creation of duplicates. :param model: the model to be created :return: the model with the ``id`` set """ if model.id: response = cls.get_es_connection().create(id=model.id, body=model.serialize(exclude=['id'], flat=True)) # TODO: probably needs to call cls.refresh() to properly prevent creation of duplicates else: logger.debug("serialize in _create %s", model.serialize(exclude=['id'])) response = cls.get_es_connection().index(body=model.serialize(exclude=['id'], flat=True)) model.id = response['_id'] logger.debug("model.id from _create %s", model.id) model.post_save() return model
[docs] def save(self): """ Save a model that has an id, index a model without an id into elasticsearch. Saves unsaved joins recursively. Joined models, which already have an id (and thus are already present in the database) are not re-saved automatically. You must save them yourself if they changed. :return: self with dependencies updated """ if self.id: cmp = self._compute_id() if cmp and cmp != self.id: raise IntegrityError("Can't save model with a changed computed id, create a new model") self.get_es_connection().update(id=self.id, body={'doc': self.serialize(exclude=['id'])}) else: self.id = self._compute_id() if self.id: response = self.get_es_connection().create(id=self.id, body=self.serialize(exclude=['id'], flat=True)) else: response = self.get_es_connection().index(body=self.serialize(exclude=['id'], flat=True)) self.id = response['_id'] logger.debug("model.id from save %s", self.id) return self.post_save()
def post_save(self): logger.debug("post_save %s %s", self.__class__.__name__, self.id) ret = [] for property, type in self._mapping.items(): ret.append(type.on_save(model=self)) logger.debug("post_save ret %s %s", self.id, ret) ret = [r for r in ret if r is not None] if len(ret): # resave, because some child models were updated self.save() return self
[docs] def delete(self): """ Delete a model from elasticsearch. :return: None """ self.get_es_connection().delete(id=self.id)
[docs] def _lazy_load(self): """ Lazy loads model's joins - child / parent models. """ for property, type in self._mapping.items(): logger.debug("pre _lazy_load %s %s", property, self.__getattribute__(property)) self.__update(property, type.lazy_load(self)) logger.debug("_lazy_load %s", self) return self
[docs] @classmethod def get(cls, id): """ Get a model by id from elasticsearch. :param id: id of the model to get :return: returns an instance of elastic_connect.connect.Result """ ret = cls.get_es_connection().get(id=id) return ret
[docs] @classmethod def all(cls, size=100): """ Get all models from Elasticsearch. :param size: max number of hits to return. Default = 100. :return: returns an instance of elastic_connect.connect.Result """ return cls.get_es_connection().search(size=size)
[docs] @classmethod def find_by(cls, size=100, sort=None, search_after=None, query=None, **kw): """ Search for models in Elasticsearch by attribute values. :example: .. code-block:: python # return model with email="test@test.cz" model.find_by(email="test@test.cz") # return model with both email="test@test.cz" and parent=10 model.find_by(email="test@test.cz", parent=10) # return models with parent 10 sorted by email ascending model.find_by(parent=10, sort=[{"email":"asc"}]) # return models with email >= "foo@bar.cz" (and _uid > '' as per default sort order, # every _uid is greated than '') model.find_by(parent=10, sort=[{"email":"asc"}], search_after["foo@bar.cz", '']) # return models with parent 10 and email _anything_@bar.cz model.find_by(query="parent: 10 AND email: *@bar.cz") :param size: max number of hits to return. Default = 100. :param kw: attributes of the model by which to search :param sort: sorting of the result, default is {"_uid": "asc"}, default is also appended as last resort to all sorts that don't use _uid. Sorting by _id is not supported by default, use _uid (_doc_type + '#' + _id) instead. :param search_after: searches for results 'after' the value(s) supplied, preferably used with elastic_connect.connect.Result.search_after_values :param query: instead of specifying kw search arguments, you may enter here a wildcard query :return: returns an instance of elastic_connect.connect.Result """ def prepare_sort(sort): if not sort: sort = [{"_uid": "asc"}] return sort for s in sort: if '_uid' in s: return sort sort.append({"_uid": "asc"}) return sort if not query: query = kw sort = prepare_sort(sort) if isinstance(query, str): _query = { "bool": { "must": [ {"query_string": { "query": query, "analyze_wildcard": True } } ] } } else: if len(query.keys()) == 1: _query = {"term": query} else: _query = { "bool": { "must": [{"term": {k: kw[k]}} for k in query.keys()] } } body = { "size": size, "query": _query, "sort": sort } if search_after: body['search_after'] = search_after logger.debug("find_by body %s", body) ret = cls.get_es_connection().search(body=body) return ret
[docs] def serialize(self, exclude=["password"], depth=0, to_str=False, flat=False): """ Serilaizes the model for storing to elasticsearch. Joins are flattened from join: model format to join: model.id format. Other attributes are serialized by their respective type.serialize method :param exclude: default=["password"] :param depth: default=0, the depth up to which models are serialized as jsons, deeper than that models are reduced to their id :param to_str: default=False, pass True if the serialization is for console output purposes :param flat: default=False, unsaved joined models are returned as Models if False, as None if True :return: json representation of the model """ ret = {} for property, type in self._mapping.items(): if property not in exclude: ret.update({property: type.serialize(self.__getattribute__(property), depth=depth, to_str=to_str, flat=flat)}) return ret
def __repr__(self): if self.id: return object.__repr__(self) + str(self) return object.__repr__(self) def __str__(self): return str(self.serialize(depth=0, to_str=True))
[docs] @classmethod def refresh(cls): """ Refresh the index where this model is stored to make all changes immediately visible to others. """ cls._es_namespace.get_es().indices.refresh(index=cls.get_index())
def __setattr__(self, name, value): if name in self._mapping: return self.__update(name, value) return super().__setattr__(name, value) def __update(self, name, value): super().__setattr__(name, self._mapping[name].on_update(value, self)) return self
[docs] @classmethod def get_es_mapping(cls): """ Returns a dict representing the elastic search mapping for this model :return: dict """ mapping = {} for name, type in cls._mapping.items(): es_type = type.get_es_type() if name != 'id' and es_type: mapping[name] = {"type": es_type} logger.debug("mapping %s", mapping) return mapping