from elasticsearch import Elasticsearch
import elasticsearch.exceptions
import time
import requests
import logging
_global_prefix = ''
"""
Global index prefix. Used for example to distinguish between index names
used in production and in tests.
"""
logger = logging.getLogger(__name__)
[docs]class NamespaceConnectionError(Exception):
pass
[docs]class NamespaceAlreadyExistsError(Exception):
pass
[docs]class Namespace(object):
"""
Object describing a namespace of an elasticsearch cluster or a
connection to a different elasticsearch cluster. Each namespace may
have a different es_conf, thus connecting to a different
elasticsearch cluster and/or a different index_prefix thus using a
different set of indices on the same cluster.
For example you may use two different namespaces to run two
instances of the same application against a single elasticsearch
cluster. Due to using different index_prefixes on the ``_default``
namespace, each application will preserve it's own data, i.e. one,
with ``index_prefix="our"`` using indices ``our_users`` and
``our_data``, the other with ``index_prefix="their"`` using indices
``their_users`` and ``their_data``.
It is also possible to use multiple namespaces in a single
application.
"""
def __init__(self, name, es_conf, index_prefix=None):
"""
:param name: name of the namespace, must be unique
:param es_conf: the configuration of the namespace i.e. at least
{'host':..., 'port':...}. It is internally passed to the
underlaying elasticsearch.Elasticsearch class.
:param index_prefix: prefix of the namespace, it should probably
be unique on the same cluster for sanity reasons, but no
check is enforced
"""
self.name = name
self.es_conf = es_conf
if index_prefix is None:
index_prefix = name + '_'
self._index_prefix = index_prefix
self.es = None
[docs] def register_model_class(self, model_class):
"""
Registers a model class in this namespace. By default all model
classes are registered in the ``_default`` namespace. By
registering a model in a namespace it is possible to reuse it to
connect to a different Elasticsearch cluster.
:param model_class: The model class to be registered
:return: Returns a new model class with name prefixed with
Namespace.name and properly set _es_namespace reference.
"""
if self.name == '_default':
model_class._es_namespace = self
return model_class
class NewModelClass(model_class):
_es_namespace = None
_es_connection = None
__slots__ = model_class.__slots__
NewModelClass.__name__ = self.name + '_' + model_class.__name__
NewModelClass._es_namespace = self
return NewModelClass
def get_es(self):
if not self.es:
self.es = Elasticsearch(self.es_conf)
return self.es
def wait_for_yellow(self):
return self.get_es().cluster.health(wait_for_status="yellow")
def get_es_url(self, https=False):
# TODO: handle possible multiple urls
protocol = "http"
if https:
protocol += "s"
host = self.es_conf[0].get('host', 'localhost')
port = self.es_conf[0].get('port', 9200)
return "%s://%s:%s" % (protocol, host, port)
[docs] def wait_for_http_connection(self,
initial_wait=10.0,
step=0.1,
timeout=30.0,
https=False):
"""
Waits for http(s) connection to elasticsearch to be ready
:param initial_wait: initially wait in seconds
:param step: try each step seconds after initial wait
:param timeout: raise NamespaceConnectionError after timeout
seconds of trying. This includes the inital wait.
:param https: whether to use http or https protocol
:return: True
:raises: NamespaceConnectionError on connection timeout
"""
time.sleep(initial_wait)
t = initial_wait
url = self.get_es_url(https)
while t < timeout:
t += step
time.sleep(step)
try:
ret = requests.get(url)
except Exception:
continue
if ret.status_code == 200:
return True
raise NamespaceConnectionError(
"Elasticsearch @ %s connection timeout" % url)
[docs] def wait_for_ready(self,
initial_attempt=True,
initial_wait=2.0,
step=0.1,
timeout=30.0,
https=False):
"""
Waits for elasticsearch to get ready. First waits for the node
to responde over http, then waits for the cluster to turn at
least yellow.
:param initial_attempt: If True, attempts a http connection
right away, even before starting the initial_wait
:param initial_wait: initially wait in seconds
:param step: try each step seconds after initial wait
:param timeout: raise NamespaceConnectionError after timeout
seconds of trying. This includes the inital wait.
:param https: whether to use http or https protocol
:return: returns cluster health info
"""
if initial_attempt:
try:
self.wait_for_http_connection(initial_wait=0, step=0,
timeout=0, https=https)
except Exception:
self.wait_for_http_connection(initial_wait, step,
timeout, https)
return self.wait_for_yellow()
@property
def index_prefix(self):
"""
@property
Returns the calculated index prefix, taking into account any
global prefixes as well.
"""
return _global_prefix + self._index_prefix
[docs] def create_mappings(self, model_classes):
"""
Creates index mapping in elasticsearch for each model passed in.
Doesn't update existing mappings.
:param model_classes: a list of classes for which indices are
created
:return: returns the names of indices which were actually
created
"""
def safe_create(index, body):
try:
self.get_es().indices.create(index=index, body=body)
logger.info("Index %s created", (index,))
logger.debug("with params %s", (body,))
except elasticsearch.exceptions.RequestError as e:
logger.info("Index %s already exists!", (index,))
if e.error != 'index_already_exists_exception':
raise e
mappings = {}
created = []
for model_class in model_classes:
index_name = model_class.get_index()
doctype_name = model_class.get_doctype()
mapping = {
"properties": model_class.get_es_mapping()
}
safe_create(index=index_name,
body={"mappings": {doctype_name: mapping}})
created.append(index_name)
return created
[docs] def delete_index(self, index, timeout=2.0):
"""
Deletes an index from elasticsearch and blocks until it is
deleted.
Unlike the create_mappings and other operations, index deletes
in elastic_connect *don't* perform any index_name prefix magic.
All index deletions in elastic_connect are attempted with the
name provided 'as is'.
:param index: name of index to be deleted
:param timeout: if the index is not deleted after the number of
seconds, Exception is raised. If timeout = 0 doesn't block
and returns immediately
:return: none
"""
es = self.get_es()
es.indices.delete(index=index)
rep = int(10 * timeout)
if not timeout:
return
while rep and es.indices.exists(index=index):
rep -= 1
time.sleep(0.1)
if not rep and es.indices.exists(index=index):
raise Exception(
"Timeout. Index %s still exists after %s seconds." %
index, timeout)
logger.info("Index %s deleted", (index))
[docs] def delete_indices(self, indices):
"""
Deletes multiple indices, blocks until they are deleted.
Unlike the create_mappings and other operations, index deletes
in elastic_connect *don't* perform any index_name prefix magic.
All index deletions in elastic_connect are attempted with the
name provided 'as is'.
:param indices: names of indices to be deleted
:return: None
"""
# TODO: delete the indices in parallel
for index in indices:
self.delete_index(index)
_namespaces = {'_default': Namespace(name='_default', es_conf=None,
index_prefix='')}
"""
A singleton dict containing all registered namespaces indexed by their
names.
"""
[docs]def register_namespace(namespace: Namespace):
"""
Register a new namespace. Changing a Namespace's parameters after it
was registered may do crazy things, don't do it.
:param namespace: Namespace instance to be registered
:return: None
:raises: NamespaceAlreadyExistsError if a Namespace with the same
name already exists
"""
if namespace.name in _namespaces:
raise NamespaceAlreadyExistsError(
"Namespace " + namespace.name + " already exists!")
_namespaces[namespace.name] = namespace