Source code for asyncflux.util

# -*- coding: utf-8 -*-
"""General-purpose utilities"""
import functools

from collections import OrderedDict

from tornado import gen, httputil
from tornado.escape import parse_qs_bytes
try:
    from urlparse import urlparse
except ImportError:  # pragma: no cover
    from urllib.parse import urlparse  # pragma: no cover
try:
    xrange
except NameError:  # pragma: no cover
    xrange = range  # pragma: no cover


[docs]def asyncflux_coroutine(f): """A coroutine that accepts an optional callback. Given a callback, the function returns None, and the callback is run with (result, error). Without a callback the function returns a Future. """ coro = gen.coroutine(f) @functools.wraps(f) def wrapper(*args, **kwargs): callback = kwargs.pop('callback', None) if callback and not callable(callback): raise TypeError("callback must be a callable") future = coro(*args, **kwargs) if callback: def _callback(future): try: result = future.result() callback(result, None) except Exception as e: callback(None, e) future.add_done_callback(_callback) else: return future return wrapper
[docs]def sanitize_url(url): base_url, qs = url[:url.find('?')], parse_qs_bytes(urlparse(url).query) ordered_qs = OrderedDict(sorted(qs.items(), key=lambda x: x[1])) return httputil.url_concat(base_url, ordered_qs)
[docs]def batches(source, batch_size): source = iter(source) while True: batch = [] for i in xrange(batch_size): try: batch.append(next(source)) except StopIteration: if batch: yield batch raise StopIteration yield batch