Source code for asyncflux.client

# -*- coding: utf-8 -*-
"""Connection to InfluxDB"""
import json
import sys
try:
    from urlparse import urlparse
except ImportError:  # pragma: no cover
    from urllib.parse import urlparse    # pragma: no cover
if sys.version_info[0] >= 3:
    basestring = str  # pragma: no cover
else:
    basestring = basestring  # pragma: no cover

from tornado import gen, httpclient, httputil, ioloop

from asyncflux import clusteradmin, database, shardspace
from asyncflux.errors import AsyncfluxError
from asyncflux.util import asyncflux_coroutine, snake_case_dict


[docs]class AsyncfluxClient(object): HOST = 'localhost' PORT = 8086 USERNAME = 'root' PASSWORD = 'root' def __init__(self, host=None, port=None, username=None, password=None, is_secure=False, io_loop=None, **kwargs): scheme = 'https' if is_secure else 'http' host = host or self.HOST port = port or self.PORT username = username or self.USERNAME password = password or self.PASSWORD if not isinstance(port, int): raise TypeError("port must be an instance of int") if '://' in host: if host.startswith(('http://', 'https://')): result = urlparse(host) scheme = result.scheme host = result.hostname port = result.port or port username = result.username or username password = result.password or password else: index = host.find("://") raise ValueError('Invalid URL scheme: %s' % host[:index]) self.__scheme = scheme self.__host = host self.__port = port self.__username = username self.__password = password self.__json = kwargs.get('json_module', json) self.io_loop = io_loop or ioloop.IOLoop.current() self.http_client = httpclient.AsyncHTTPClient(self.io_loop) @property def host(self): return self.__host @property def port(self): return self.__port @property def base_url(self): return '%s://%s:%s' % (self.__scheme, self.host, self.port, ) @property def username(self): return self.__username @username.setter def username(self, value): self.__username = value @property def password(self): return self.__password @password.setter def password(self, value): self.__password = value def __getattr__(self, name): return database.Database(self, name) def __getitem__(self, name): return self.__getattr__(name) @asyncflux_coroutine
[docs] def request(self, path, path_params=None, qs=None, body=None, method='GET', auth_username=None, auth_password=None): try: path_params = path_params or {} qs = qs or {} auth_username = auth_username or self.username auth_password = auth_password or self.password url = (self.base_url + path) % path_params if isinstance(body, dict): body = self.__json.dumps(body) response = yield self.http_client.fetch( httputil.url_concat(url, qs), body=body, method=method, auth_username=auth_username, auth_password=auth_password) if hasattr(response, 'body') and response.body: raise gen.Return(self.__json.loads(response.body)) except httpclient.HTTPError as e: raise AsyncfluxError(e.response)
@asyncflux_coroutine
[docs] def ping(self): status = yield self.request('/ping') raise gen.Return(status)
@asyncflux_coroutine
[docs] def get_databases(self): dbs = yield self.request('/db') databases = [database.Database(self, db['name']) for db in dbs] raise gen.Return(databases)
@asyncflux_coroutine
[docs] def get_database_names(self): databases = yield self.request('/db') raise gen.Return([db['name'] for db in databases])
@asyncflux_coroutine
[docs] def create_database(self, name_or_database): name = name_or_database if isinstance(name, database.Database): name = name_or_database.name if not isinstance(name, basestring): raise TypeError("name_or_database must be an instance of " "%s or Database" % (basestring.__name__,)) yield self.request('/db', body={'name': name}, method='POST') new_database = database.Database(self, name) raise gen.Return(new_database)
@asyncflux_coroutine
[docs] def delete_database(self, name_or_database): name = name_or_database if isinstance(name, database.Database): name = name_or_database.name if not isinstance(name, basestring): raise TypeError("name_or_database must be an instance of " "%s or Database" % (basestring.__name__,)) yield self.request('/db/%(database)s', {'database': name}, method='DELETE')
@asyncflux_coroutine
[docs] def get_cluster_admin_names(self): admins = yield self.request('/cluster_admins') raise gen.Return([a['name'] for a in admins])
@asyncflux_coroutine
[docs] def get_cluster_admins(self): cas = yield self.request('/cluster_admins') admins = [clusteradmin.ClusterAdmin(self, ca['name']) for ca in cas] raise gen.Return(admins)
@asyncflux_coroutine
[docs] def create_cluster_admin(self, username, password): yield self.request('/cluster_admins', method='POST', body={'name': username, 'password': password}) new_cluster_admin = clusteradmin.ClusterAdmin(self, username) raise gen.Return(new_cluster_admin)
@asyncflux_coroutine
[docs] def change_cluster_admin_password(self, username, new_password): yield self.request('/cluster_admins/%(username)s', {'username': username}, method='POST', body={'password': new_password})
@asyncflux_coroutine
[docs] def delete_cluster_admin(self, username): yield self.request('/cluster_admins/%(username)s', {'username': username}, method='DELETE')
@asyncflux_coroutine
[docs] def authenticate_cluster_admin(self, username, password): try: yield self.request('/cluster_admins/authenticate', auth_username=username, auth_password=password) except AsyncfluxError: raise gen.Return(False) raise gen.Return(True)
@asyncflux_coroutine
[docs] def get_shard_spaces(self): spaces = yield self.request('/cluster/shard_spaces') shard_spaces = [ shardspace.ShardSpace(self, **snake_case_dict(s)) for s in spaces ] raise gen.Return(shard_spaces)
def __repr__(self): return "AsyncfluxClient(%r, %r)" % (self.host, self.port)