Source code for asyncflux.database

# -*- coding: utf-8 -*-
"""Database level operations"""
from tornado import gen
from tornado.util import basestring_type

from influxdb.line_protocol import make_lines

from asyncflux import retentionpolicy, user
from asyncflux.util import asyncflux_coroutine, batches


[docs]class Database(object): BATCH_SIZE = 5000 def __init__(self, client, name): self.__client = client self.__name = name @property def client(self): return self.__client @property def name(self): return self.__name @asyncflux_coroutine
[docs] def query(self, query, params=None, raise_errors=True): result_set = yield self.client.query(query, params, database=self.name, raise_errors=raise_errors) raise gen.Return(result_set)
@asyncflux_coroutine
[docs] def write(self, data, retention_policy=None, precision=None, consistency=None): qs = {'db': self.name} if retention_policy: qs['rp'] = retention_policy if precision: qs['precision'] = precision if consistency: qs['consistency'] = consistency body = make_lines(data, precision) yield self.client.request('/write', method='POST', qs=qs, body=body)
@asyncflux_coroutine
[docs] def write_points(self, measurement, points, tags=None, retention_policy=None, precision=None, consistency=None, batch_size=None): batch_size = batch_size or self.BATCH_SIZE for batch in batches(points, batch_size): data = { 'measurement': measurement, 'points': batch } if tags: data['tags'] = tags yield self.write(data, retention_policy=retention_policy, precision=precision, consistency=consistency)
@asyncflux_coroutine
[docs] def get_measurements(self, tags=None, limit=None, offset=None): tags = tags or {} query_list = ['SHOW MEASUREMENTS'] if tags: tags_str = ' and '.join(["{}='{}'".format(k, v) for k, v in tags.items()]) query_list.append('WHERE {}'.format(tags_str)) if limit: query_list.append('LIMIT {}'.format(limit)) if offset: query_list.append('OFFSET {}'.format(offset)) result_set = yield self.query(' '.join(query_list)) measurements = [ point['name'] for point in result_set[0].get_points() ] raise gen.Return(measurements)
@asyncflux_coroutine
[docs] def get_tag_keys(self, measurement=None): query_list = ['SHOW TAG KEYS'] if measurement: query_list.append('FROM {}'.format(measurement)) result_set = yield self.query(' '.join(query_list)) tag_keys = [ (item[0][0], [x['tagKey'] for x in item[1]]) for item in result_set[0].items() ] raise gen.Return(tag_keys)
@asyncflux_coroutine
[docs] def get_tag_values(self, key_or_keys, measurement=None): query_list = ['SHOW TAG VALUES'] if measurement: query_list.append('FROM "{}"'.format(measurement)) keys = ( key_or_keys if isinstance(key_or_keys, (list, tuple)) else [key_or_keys] ) keys_str = ','.join('"%s"' % key for key in keys) query_list.append('WITH KEY IN ({})'.format(keys_str)) result_set = yield self.query(' '.join(query_list)) values = [ (i['columns'][0], sum(i['values'], [])) for i in result_set[0].raw.get('series', []) ] raise gen.Return(values)
@asyncflux_coroutine
[docs] def get_series(self, limit=None, offset=None): query_list = ['SHOW SERIES'] if limit: query_list.append('LIMIT {}'.format(limit)) if offset: query_list.append('OFFSET {}'.format(offset)) result_set = yield self.query(' '.join(query_list)) series = [] for serie in result_set[0].items(): series.append({'name': serie[0][0], 'tags': list(serie[1])}) raise gen.Return(series)
@asyncflux_coroutine
[docs] def drop_series(self, measurement=None, tags=None): query_list = ['DROP SERIES'] if measurement: query_list.append('FROM "{}"'.format(measurement)) if tags: tags_str = ' and '.join(["{}='{}'".format(k, v) for k, v in tags.items()]) query_list.append('WHERE {}'.format(tags_str)) yield self.query(' '.join(query_list))
def __get_username(self, username_or_user): username = username_or_user if isinstance(username, user.User): username = username_or_user.name if not isinstance(username, basestring_type): raise TypeError("username_or_user must be an instance of " "%s or User" % (basestring_type.__name__,)) return username @asyncflux_coroutine
[docs] def grant_privilege_to(self, privilege, username_or_user): username = self.__get_username(username_or_user) yield self.client.grant_privilege(privilege, username, self.name)
@asyncflux_coroutine
[docs] def revoke_privilege_from(self, privilege, username_or_user): username = self.__get_username(username_or_user) yield self.client.revoke_privilege(privilege, username, self.name)
@asyncflux_coroutine
[docs] def get_retention_policies(self): query_str = 'SHOW RETENTION POLICIES ON {}'.format(self.name) result_set = yield self.client.query(query_str) retention_policies = [ retentionpolicy.RetentionPolicy(self, point['name'], point['duration'], point['replicaN'], point['default']) for point in result_set[0].get_points() ] raise gen.Return(retention_policies)
@asyncflux_coroutine
[docs] def get_retention_policy_names(self): query_str = 'SHOW RETENTION POLICIES ON {}'.format(self.name) result_set = yield self.client.query(query_str) retention_policies = [ point['name'] for point in result_set[0].get_points() ] raise gen.Return(retention_policies)
@asyncflux_coroutine
[docs] def create_retention_policy(self, retention_name, duration, replication, default=False): query_format = ('CREATE RETENTION POLICY {} ON {} ' 'DURATION {} REPLICATION {}') query_list = [ query_format.format(retention_name, self.name, duration, replication) ] if default: query_list.append('DEFAULT') yield self.client.query(' '.join(query_list)) new_retention_policy = retentionpolicy.RetentionPolicy(self, retention_name, duration, replication, default) raise gen.Return(new_retention_policy)
@asyncflux_coroutine
[docs] def alter_retention_policy(self, retention_name, duration=None, replication=None, default=False): query_list = ['ALTER RETENTION POLICY {} ON {}'.format(retention_name, self.name)] if duration: query_list.append('DURATION {}'.format(duration)) if replication: query_list.append('REPLICATION {}'.format(replication)) if default: query_list.append('DEFAULT') yield self.client.query(' '.join(query_list))
@asyncflux_coroutine
[docs] def drop_retention_policy(self, retention_name): query_str = 'DROP RETENTION POLICY {} ON {}'.format(retention_name, self.name) yield self.client.query(query_str)
@asyncflux_coroutine
[docs] def drop(self): yield self.client.drop_database(self.name)
def __repr__(self): return "Database(%r, %r)" % (self.client, self.name)