# -*- coding: utf-8 -*-
"""Database level operations"""
from tornado import gen
from tornado.util import basestring_type
from influxdb.line_protocol import make_lines
from asyncflux import retentionpolicy, user
from asyncflux.util import asyncflux_coroutine, batches
[docs]class Database(object):
BATCH_SIZE = 5000
def __init__(self, client, name):
self.__client = client
self.__name = name
@property
def client(self):
return self.__client
@property
def name(self):
return self.__name
@asyncflux_coroutine
[docs] def query(self, query, params=None, raise_errors=True):
result_set = yield self.client.query(query, params, database=self.name,
raise_errors=raise_errors)
raise gen.Return(result_set)
@asyncflux_coroutine
[docs] def write(self, data, retention_policy=None, precision=None,
consistency=None):
qs = {'db': self.name}
if retention_policy:
qs['rp'] = retention_policy
if precision:
qs['precision'] = precision
if consistency:
qs['consistency'] = consistency
body = make_lines(data, precision)
yield self.client.request('/write', method='POST', qs=qs, body=body)
@asyncflux_coroutine
[docs] def write_points(self, measurement, points, tags=None,
retention_policy=None, precision=None, consistency=None,
batch_size=None):
batch_size = batch_size or self.BATCH_SIZE
for batch in batches(points, batch_size):
data = {
'measurement': measurement,
'points': batch
}
if tags:
data['tags'] = tags
yield self.write(data, retention_policy=retention_policy,
precision=precision, consistency=consistency)
@asyncflux_coroutine
[docs] def get_measurements(self, tags=None, limit=None, offset=None):
tags = tags or {}
query_list = ['SHOW MEASUREMENTS']
if tags:
tags_str = ' and '.join(["{}='{}'".format(k, v)
for k, v in tags.items()])
query_list.append('WHERE {}'.format(tags_str))
if limit:
query_list.append('LIMIT {}'.format(limit))
if offset:
query_list.append('OFFSET {}'.format(offset))
result_set = yield self.query(' '.join(query_list))
measurements = [
point['name']
for point
in result_set[0].get_points()
]
raise gen.Return(measurements)
@asyncflux_coroutine
[docs] def get_tag_keys(self, measurement=None):
query_list = ['SHOW TAG KEYS']
if measurement:
query_list.append('FROM {}'.format(measurement))
result_set = yield self.query(' '.join(query_list))
tag_keys = [
(item[0][0], [x['tagKey'] for x in item[1]])
for item
in result_set[0].items()
]
raise gen.Return(tag_keys)
@asyncflux_coroutine
[docs] def get_tag_values(self, key_or_keys, measurement=None):
query_list = ['SHOW TAG VALUES']
if measurement:
query_list.append('FROM "{}"'.format(measurement))
keys = (
key_or_keys
if isinstance(key_or_keys, (list, tuple))
else [key_or_keys]
)
keys_str = ','.join('"%s"' % key for key in keys)
query_list.append('WITH KEY IN ({})'.format(keys_str))
result_set = yield self.query(' '.join(query_list))
values = [
(i['columns'][0], sum(i['values'], []))
for i
in result_set[0].raw.get('series', [])
]
raise gen.Return(values)
@asyncflux_coroutine
[docs] def get_series(self, limit=None, offset=None):
query_list = ['SHOW SERIES']
if limit:
query_list.append('LIMIT {}'.format(limit))
if offset:
query_list.append('OFFSET {}'.format(offset))
result_set = yield self.query(' '.join(query_list))
series = []
for serie in result_set[0].items():
series.append({'name': serie[0][0],
'tags': list(serie[1])})
raise gen.Return(series)
@asyncflux_coroutine
[docs] def drop_series(self, measurement=None, tags=None):
query_list = ['DROP SERIES']
if measurement:
query_list.append('FROM "{}"'.format(measurement))
if tags:
tags_str = ' and '.join(["{}='{}'".format(k, v)
for k, v in tags.items()])
query_list.append('WHERE {}'.format(tags_str))
yield self.query(' '.join(query_list))
def __get_username(self, username_or_user):
username = username_or_user
if isinstance(username, user.User):
username = username_or_user.name
if not isinstance(username, basestring_type):
raise TypeError("username_or_user must be an instance of "
"%s or User" % (basestring_type.__name__,))
return username
@asyncflux_coroutine
[docs] def grant_privilege_to(self, privilege, username_or_user):
username = self.__get_username(username_or_user)
yield self.client.grant_privilege(privilege, username, self.name)
@asyncflux_coroutine
[docs] def revoke_privilege_from(self, privilege, username_or_user):
username = self.__get_username(username_or_user)
yield self.client.revoke_privilege(privilege, username, self.name)
@asyncflux_coroutine
[docs] def get_retention_policies(self):
query_str = 'SHOW RETENTION POLICIES ON {}'.format(self.name)
result_set = yield self.client.query(query_str)
retention_policies = [
retentionpolicy.RetentionPolicy(self, point['name'],
point['duration'],
point['replicaN'],
point['default'])
for point
in result_set[0].get_points()
]
raise gen.Return(retention_policies)
@asyncflux_coroutine
[docs] def get_retention_policy_names(self):
query_str = 'SHOW RETENTION POLICIES ON {}'.format(self.name)
result_set = yield self.client.query(query_str)
retention_policies = [
point['name']
for point
in result_set[0].get_points()
]
raise gen.Return(retention_policies)
@asyncflux_coroutine
[docs] def create_retention_policy(self, retention_name, duration, replication,
default=False):
query_format = ('CREATE RETENTION POLICY {} ON {} '
'DURATION {} REPLICATION {}')
query_list = [
query_format.format(retention_name, self.name, duration,
replication)
]
if default:
query_list.append('DEFAULT')
yield self.client.query(' '.join(query_list))
new_retention_policy = retentionpolicy.RetentionPolicy(self,
retention_name,
duration,
replication,
default)
raise gen.Return(new_retention_policy)
@asyncflux_coroutine
[docs] def alter_retention_policy(self, retention_name, duration=None,
replication=None, default=False):
query_list = ['ALTER RETENTION POLICY {} ON {}'.format(retention_name,
self.name)]
if duration:
query_list.append('DURATION {}'.format(duration))
if replication:
query_list.append('REPLICATION {}'.format(replication))
if default:
query_list.append('DEFAULT')
yield self.client.query(' '.join(query_list))
@asyncflux_coroutine
[docs] def drop_retention_policy(self, retention_name):
query_str = 'DROP RETENTION POLICY {} ON {}'.format(retention_name,
self.name)
yield self.client.query(query_str)
@asyncflux_coroutine
[docs] def drop(self):
yield self.client.drop_database(self.name)
def __repr__(self):
return "Database(%r, %r)" % (self.client, self.name)