Source code for elastic_connect.namespace

from elasticsearch import Elasticsearch
import elasticsearch.exceptions
import time
import requests
import logging

_global_prefix = ''
"""
Global index prefix. Used for example to distinguish between index names
used in production and in tests.
"""

logger = logging.getLogger(__name__)


[docs]class NamespaceConnectionError(Exception): pass
[docs]class NamespaceAlreadyExistsError(Exception): pass
[docs]class Namespace(object): """ Object describing a namespace of an elasticsearch cluster or a connection to a different elasticsearch cluster. Each namespace may have a different es_conf, thus connecting to a different elasticsearch cluster and/or a different index_prefix thus using a different set of indices on the same cluster. For example you may use two different namespaces to run two instances of the same application against a single elasticsearch cluster. Due to using different index_prefixes on the ``_default`` namespace, each application will preserve it's own data, i.e. one, with ``index_prefix="our"`` using indices ``our_users`` and ``our_data``, the other with ``index_prefix="their"`` using indices ``their_users`` and ``their_data``. It is also possible to use multiple namespaces in a single application. """ def __init__(self, name, es_conf, index_prefix=None): """ :param name: name of the namespace, must be unique :param es_conf: the configuration of the namespace i.e. at least {'host':..., 'port':...}. It is internally passed to the underlaying elasticsearch.Elasticsearch class. :param index_prefix: prefix of the namespace, it should probably be unique on the same cluster for sanity reasons, but no check is enforced """ self.name = name self.es_conf = es_conf if index_prefix is None: index_prefix = name + '_' self._index_prefix = index_prefix self.es = None
[docs] def register_model_class(self, model_class): """ Registers a model class in this namespace. By default all model classes are registered in the ``_default`` namespace. By registering a model in a namespace it is possible to reuse it to connect to a different Elasticsearch cluster. :param model_class: The model class to be registered :return: Returns a new model class with name prefixed with Namespace.name and properly set _es_namespace reference. """ if self.name == '_default': model_class._es_namespace = self return model_class class NewModelClass(model_class): _es_namespace = None _es_connection = None __slots__ = model_class.__slots__ NewModelClass.__name__ = self.name + '_' + model_class.__name__ NewModelClass._es_namespace = self return NewModelClass
def get_es(self): if not self.es: self.es = Elasticsearch(self.es_conf) return self.es def wait_for_yellow(self): return self.get_es().cluster.health(wait_for_status="yellow") def get_es_url(self, https=False): # TODO: handle possible multiple urls protocol = "http" if https: protocol += "s" host = self.es_conf[0].get('host', 'localhost') port = self.es_conf[0].get('port', 9200) return "%s://%s:%s" % (protocol, host, port)
[docs] def wait_for_http_connection(self, initial_wait=10.0, step=0.1, timeout=30.0, https=False): """ Waits for http(s) connection to elasticsearch to be ready :param initial_wait: initially wait in seconds :param step: try each step seconds after initial wait :param timeout: raise NamespaceConnectionError after timeout seconds of trying. This includes the inital wait. :param https: whether to use http or https protocol :return: True :raises: NamespaceConnectionError on connection timeout """ time.sleep(initial_wait) t = initial_wait url = self.get_es_url(https) while t < timeout: t += step time.sleep(step) try: ret = requests.get(url) except Exception: continue if ret.status_code == 200: return True raise NamespaceConnectionError( "Elasticsearch @ %s connection timeout" % url)
[docs] def wait_for_ready(self, initial_attempt=True, initial_wait=2.0, step=0.1, timeout=30.0, https=False): """ Waits for elasticsearch to get ready. First waits for the node to responde over http, then waits for the cluster to turn at least yellow. :param initial_attempt: If True, attempts a http connection right away, even before starting the initial_wait :param initial_wait: initially wait in seconds :param step: try each step seconds after initial wait :param timeout: raise NamespaceConnectionError after timeout seconds of trying. This includes the inital wait. :param https: whether to use http or https protocol :return: returns cluster health info """ if initial_attempt: try: self.wait_for_http_connection(initial_wait=0, step=0, timeout=0, https=https) except Exception: self.wait_for_http_connection(initial_wait, step, timeout, https) return self.wait_for_yellow()
@property def index_prefix(self): """ @property Returns the calculated index prefix, taking into account any global prefixes as well. """ return _global_prefix + self._index_prefix
[docs] def create_mappings(self, model_classes): """ Creates index mapping in elasticsearch for each model passed in. Doesn't update existing mappings. :param model_classes: a list of classes for which indices are created :return: returns the names of indices which were actually created """ def safe_create(index, body): try: self.get_es().indices.create(index=index, body=body) logger.info("Index %s created", (index,)) logger.debug("with params %s", (body,)) except elasticsearch.exceptions.RequestError as e: logger.info("Index %s already exists!", (index,)) if e.error != 'index_already_exists_exception': raise e mappings = {} created = [] for model_class in model_classes: index_name = model_class.get_index() doctype_name = model_class.get_doctype() mapping = { "properties": model_class.get_es_mapping() } safe_create(index=index_name, body={"mappings": {doctype_name: mapping}}) created.append(index_name) return created
[docs] def delete_index(self, index, timeout=2.0): """ Deletes an index from elasticsearch and blocks until it is deleted. Unlike the create_mappings and other operations, index deletes in elastic_connect *don't* perform any index_name prefix magic. All index deletions in elastic_connect are attempted with the name provided 'as is'. :param index: name of index to be deleted :param timeout: if the index is not deleted after the number of seconds, Exception is raised. If timeout = 0 doesn't block and returns immediately :return: none """ es = self.get_es() es.indices.delete(index=index) rep = int(10 * timeout) if not timeout: return while rep and es.indices.exists(index=index): rep -= 1 time.sleep(0.1) if not rep and es.indices.exists(index=index): raise Exception( "Timeout. Index %s still exists after %s seconds." % index, timeout) logger.info("Index %s deleted", (index))
[docs] def delete_indices(self, indices): """ Deletes multiple indices, blocks until they are deleted. Unlike the create_mappings and other operations, index deletes in elastic_connect *don't* perform any index_name prefix magic. All index deletions in elastic_connect are attempted with the name provided 'as is'. :param indices: names of indices to be deleted :return: None """ # TODO: delete the indices in parallel for index in indices: self.delete_index(index)
_namespaces = {'_default': Namespace(name='_default', es_conf=None, index_prefix='')} """ A singleton dict containing all registered namespaces indexed by their names. """
[docs]def register_namespace(namespace: Namespace): """ Register a new namespace. Changing a Namespace's parameters after it was registered may do crazy things, don't do it. :param namespace: Namespace instance to be registered :return: None :raises: NamespaceAlreadyExistsError if a Namespace with the same name already exists """ if namespace.name in _namespaces: raise NamespaceAlreadyExistsError( "Namespace " + namespace.name + " already exists!") _namespaces[namespace.name] = namespace