Source code for asyncflux.client

# -*- coding: utf-8 -*-
"""Connection to InfluxDB"""
import json
try:
    from urlparse import urlparse
except ImportError:  # pragma: no cover
    from urllib.parse import urlparse    # pragma: no cover

from tornado import gen, httpclient, httputil, ioloop
from tornado.util import basestring_type
from influxdb.resultset import ResultSet

from asyncflux import database, user
from asyncflux.util import asyncflux_coroutine


[docs]class AsyncfluxClient(object): HOST = 'localhost' PORT = 8086 USERNAME = 'root' PASSWORD = 'root' def __init__(self, host=None, port=None, username=None, password=None, is_secure=False, io_loop=None, **kwargs): scheme = 'https' if is_secure else 'http' host = host or self.HOST port = port or self.PORT username = username or self.USERNAME password = password or self.PASSWORD if not isinstance(port, int): raise TypeError("port must be an instance of int") if '://' in host: if host.startswith(('http://', 'https://')): result = urlparse(host) scheme = result.scheme host = result.hostname port = result.port or port username = result.username or username password = result.password or password else: index = host.find("://") raise ValueError('Invalid URL scheme: %s' % host[:index]) self.__scheme = scheme self.__host = host self.__port = port self.__username = username self.__password = password self.__json = kwargs.get('json_module', json) self.io_loop = io_loop or ioloop.IOLoop.current() self.http_client = httpclient.AsyncHTTPClient(self.io_loop) @property def host(self): return self.__host @property def port(self): return self.__port @property def base_url(self): return '%s://%s:%s' % (self.__scheme, self.host, self.port, ) @property def username(self): return self.__username @username.setter def username(self, value): self.__username = value @property def password(self): return self.__password @password.setter def password(self, value): self.__password = value def __getattr__(self, name): return database.Database(self, name) def __getitem__(self, name): return self.__getattr__(name) @asyncflux_coroutine
[docs] def request(self, path, path_params=None, qs=None, body=None, method='GET', auth_username=None, auth_password=None): try: path_params = path_params or {} qs = qs or {} auth_username = auth_username or self.username auth_password = auth_password or self.password url = (self.base_url + path) % path_params if isinstance(body, dict): body = self.__json.dumps(body) response = yield self.http_client.fetch( httputil.url_concat(url, qs), body=body, method=method, auth_username=auth_username, auth_password=auth_password) if hasattr(response, 'body') and response.body: raise gen.Return(self.__json.loads(response.body)) except httpclient.HTTPError: raise
@asyncflux_coroutine
[docs] def ping(self): yield self.request('/ping')
@asyncflux_coroutine
[docs] def query(self, query, params=None, database=None, raise_errors=True): params = params or {} params['q'] = query if database: params['db'] = database response = yield self.request('/query', qs=params) result_set = [ ResultSet(result, raise_errors=raise_errors) for result in response.get('results', []) ] raise gen.Return(result_set)
@asyncflux_coroutine
[docs] def get_databases(self): result_set = yield self.query('SHOW DATABASES') databases = [ database.Database(self, raw['name']) for raw in result_set[0].get_points() ] raise gen.Return(databases)
@asyncflux_coroutine
[docs] def get_database_names(self): result_set = yield self.query('SHOW DATABASES') databases = [ raw['name'] for raw in result_set[0].get_points() ] raise gen.Return(databases)
@asyncflux_coroutine
[docs] def create_database(self, name_or_database): name = name_or_database if isinstance(name, database.Database): name = name_or_database.name if not isinstance(name, basestring_type): raise TypeError("name_or_database must be an instance of " "%s or Database" % (basestring_type.__name__,)) yield self.query('CREATE DATABASE {}'.format(name)) new_database = database.Database(self, name) raise gen.Return(new_database)
@asyncflux_coroutine
[docs] def drop_database(self, name_or_database): name = name_or_database if isinstance(name, database.Database): name = name_or_database.name if not isinstance(name, basestring_type): raise TypeError("name_or_database must be an instance of " "%s or Database" % (basestring_type.__name__,)) yield self.query('DROP DATABASE {}'.format(name))
@asyncflux_coroutine
[docs] def get_users(self): result_set = yield self.query('SHOW USERS') databases = [ user.User(self, point['user'], point['admin']) for point in result_set[0].get_points() ] raise gen.Return(databases)
@asyncflux_coroutine
[docs] def get_user_names(self): result_set = yield self.query('SHOW USERS') databases = [ point['user'] for point in result_set[0].get_points() ] raise gen.Return(databases)
@asyncflux_coroutine
[docs] def create_user(self, username, password, admin=False): query_list = ["CREATE USER {} WITH PASSWORD '{}'".format(username, password)] if admin: query_list.append('WITH ALL PRIVILEGES') yield self.query(' '.join(query_list)) new_user = user.User(self, username, admin) raise gen.Return(new_user)
@asyncflux_coroutine
[docs] def change_user_password(self, username, password): query_str = "SET PASSWORD FOR {} = '{}'".format(username, password) yield self.query(query_str)
@asyncflux_coroutine
[docs] def drop_user(self, username): query_str = 'DROP USER {}'.format(username) yield self.query(query_str)
@asyncflux_coroutine
[docs] def grant_privilege(self, privilege, username, database=None): query_list = ['GRANT {}'.format(privilege)] if database: query_list.append('ON {}'.format(database)) query_list.append('TO {}'.format(username)) yield self.query(' '.join(query_list))
@asyncflux_coroutine
[docs] def revoke_privilege(self, privilege, username, database=None): query_list = ['REVOKE {}'.format(privilege)] if database: query_list.append('ON {}'.format(database)) query_list.append('FROM {}'.format(username)) yield self.query(' '.join(query_list))
@asyncflux_coroutine
[docs] def grant_admin_privileges(self, username): yield self.grant_privilege('ALL PRIVILEGES', username)
@asyncflux_coroutine
[docs] def revoke_admin_privileges(self, username): yield self.revoke_privilege('ALL PRIVILEGES', username)
def __repr__(self): return "AsyncfluxClient(%r, %r)" % (self.host, self.port)