"""
Cluster Supporting Redis Client
"""
import collections
import logging
import hiredis
from tornado import concurrent
from tornado import ioloop
from tornado import locks
from tornado import iostream
from tornado import tcpclient
from tredis import common
from tredis import crc16
from tredis import exceptions
from tredis import cluster
from tredis import connection
from tredis import geo
from tredis import hashes
from tredis import hyperloglog
from tredis import keys
from tredis import lists
from tredis import pubsub
from tredis import scripting
from tredis import server
from tredis import sets
from tredis import sortedsets
from tredis import strings
from tredis import transactions
LOGGER = logging.getLogger(__name__)
CRLF = b'\r\n'
DEFAULT_HOST = 'localhost'
"""The default host to connect to"""
DEFAULT_PORT = 6379
"""The default port to connect to"""
DEFAULT_DB = 0
"""The default database number to use"""
HASH_SLOTS = 16384
"""Redis Cluster Hash Slots Value"""
# Python 2 support for ascii()
if 'ascii' not in dir(__builtins__): # pragma: nocover
from tredis.compat import ascii
Command = collections.namedtuple(
'Command', ['command', 'connection', 'expectation', 'callback'])
class _Connection(object):
"""Manages the redis TCP connection.
:param str host: The hostname to connect to
:param int port: The port to connect on
:param int db: The database number to use
:param method on_close: The method to call if the connection is closed
"""
def __init__(self,
host,
port,
db,
on_written,
on_close,
io_loop,
cluster_node=False,
read_only=False,
slots=None):
super(_Connection, self).__init__()
self.connected = False
self.io_loop = io_loop
self.host = host
self.port = port
self.database = int(db or DEFAULT_DB)
self._client = tcpclient.TCPClient()
self._cluster_node = cluster_node
self._read_only = read_only
self._slots = slots or []
self._stream = None
self._on_connect = None
self._on_close = on_close
self._on_written = on_written
def close(self):
"""Close the stream.
:raises: :class:`tredis.exceptions.ConnectionError` if the
stream is not currently connected
"""
if self._stream is None:
raise exceptions.ConnectionError('Not connected')
self._stream.close()
def connect(self):
"""Connect to the Redis server if necessary.
:rtype: :class:`~tornado.concurrent.Future`
:raises: :class:`~tredis.exceptions.ConnectError`
:class:`~tredis.exceptinos.RedisError`
"""
future = concurrent.Future()
if self.connected:
raise exceptions.ConnectError('already connected')
LOGGER.debug('%s connecting', self.name)
self.io_loop.add_future(
self._client.connect(self.host, self.port),
lambda f: self._on_connected(f, future))
return future
def execute(self, command, future):
"""Execute a command after connecting if necessary.
:param bytes command: command to execute after the connection
is established
:param tornado.concurrent.Future future: future to resolve
when the command's response is received.
"""
LOGGER.debug('execute(%r, %r)', command, future)
if self.connected:
self._write(command, future)
else:
def on_connected(cfuture):
if cfuture.exception():
return future.set_exception(cfuture.exception())
self._write(command, future)
self.io_loop.add_future(self.connect(), on_connected)
@property
def name(self):
"""Return the connection name as it is returned in the cluster nodes
command.
:rtype: str
"""
return '{}:{}'.format(self.host, self.port)
def read(self, callback):
"""Issue a read on the stream, invoke callback when completed.
:raises: :class:`tredis.exceptions.ConnectionError` if the
stream is not currently connected
"""
self._stream.read_bytes(65536, callback, None, True)
def set_read_only(self, read_only):
"""Change the connection's read-only flag in the client.
:param bool read_only: Value to set
"""
self._read_only = read_only
def set_slots(self, slots):
"""Change the connection's slot list in the client.
:param list slots: The updated slot values
"""
self._slots = slots
@property
def slots(self):
"""Return the connection's slot values for clustering.
:rtype: list
"""
return self._slots
def _on_closed(self):
"""Invoked when the connection is closed"""
LOGGER.error('Redis connection closed')
self.connected = False
self._on_close()
self._stream = None
def _on_connected(self, stream_future, connect_future):
"""Invoked when the socket stream has connected, setting up the
stream callbacks and invoking the on connect callback if set.
:param stream_future: The connection socket future
:type stream_future: :class:`~tornado.concurrent.Future`
:param stream_future: The connection response future
:type stream_future: :class:`~tornado.concurrent.Future`
:raises: :exc:`tredis.exceptions.ConnectError`
"""
if stream_future.exception():
connect_future.set_exception(
exceptions.ConnectError(stream_future.exception()))
else:
self._stream = stream_future.result()
self._stream.set_close_callback(self._on_closed)
self.connected = True
connect_future.set_result(self)
def _write(self, command, future):
"""Write a command to the socket
:param Command command: the Command data structure
"""
def on_written():
self._on_written(command, future)
try:
self._stream.write(command.command, callback=on_written)
except iostream.StreamClosedError as error:
future.set_exception(exceptions.ConnectionError(error))
except Exception as error:
LOGGER.exception('unhandled write failure - %r', error)
future.set_exception(exceptions.ConnectionError(error))
[docs]class Client(server.ServerMixin, keys.KeysMixin, strings.StringsMixin,
geo.GeoMixin, hashes.HashesMixin, hyperloglog.HyperLogLogMixin,
lists.ListsMixin, sets.SetsMixin, sortedsets.SortedSetsMixin,
pubsub.PubSubMixin, connection.ConnectionMixin,
cluster.ClusterMixin, scripting.ScriptingMixin,
transactions.TransactionsMixin):
"""Asynchronous Redis client that supports Redis with master/slave failover
and clustering. When ``clustering`` is ``True``, the client will
automatically discover all of the nodes in the cluster and connect to them.
The ``hosts`` argument should contain a list of Redis servers to connect
to. The connection information for the server should be a :class:`dict`. In
the following example, the client will connect to Redis running at
``127.0.0.1`` on port ``6379`` using database # ``2``:
.. code:: python
class RequestHandler(web.RequestHandler):
@gen.coroutine
def connect_to_redis(self)
client = tredis.Client([{
'host': '127.0.0.1', 'port': 6379, 'db': 2
}], auto_connect=False, clustering=True)
yield client.connect()
When ``auto_connect`` is set to ``True``, the connection to the Redis
server or the Redis cluster starts on creation of the client. You should be
aware that this will not block on creation and the connection will be
established asynchronously in the background. Any requests made with the
client while it is connecting will block until the connection is available.
When ``auto_connect`` is set to ``False``, you will need to invoke the
:meth:`~tredis.Client.connect` method, yielding to the
:class:`~tornado.concurrent.Future` that it returns.
.. added: 0.7.0
:param hosts: A list of host connection values.
:type hosts: list(dict)
:param io_loop: Override the current Tornado IOLoop instance
:type io_loop: tornado.ioloop.IOLoop
:param method on_close: The method to call if the connection is closed
:param bool clustering: Toggle the cluster support in the client
:param bool auto_connect: Toggle the auto-connect on creation feature
"""
def __init__(self,
hosts,
on_close=None,
io_loop=None,
clustering=False,
auto_connect=True):
"""Create a new instance of the ``Client`` class.
:param hosts: A list of host connection values.
:type hosts: list(dict)
:param io_loop: Override the current Tornado IOLoop instance
:type io_loop: tornado.ioloop.IOLoop
:param method on_close: The method to call if the connection is closed
:param bool clustering: Toggle the cluster support in the client
:param bool auto_connect: Toggle the auto-connect on creation feature
"""
self._buffer = bytes()
self._busy = locks.Lock()
self._closing = False
self._cluster = {}
self._clustering = clustering
self._connected = locks.Event()
self._connect_future = concurrent.Future()
self._connection = None
self._discovery = False
self._hosts = hosts
self._on_close_callback = on_close
self._reader = hiredis.Reader()
self.io_loop = io_loop or ioloop.IOLoop.current()
if not self._clustering:
if len(hosts) > 1:
raise ValueError('Too many hosts for non-clustering mode')
if auto_connect:
LOGGER.debug('Auto-connecting')
self.connect()
[docs] def connect(self):
"""Connect to the Redis server or Cluster.
:rtype: tornado.concurrent.Future
"""
LOGGER.debug('Creating a%s connection to %s:%s (db %s)',
' cluster node'
if self._clustering else '', self._hosts[0]['host'],
self._hosts[0]['port'], self._hosts[0].get(
'db', DEFAULT_DB))
self._connect_future = concurrent.Future()
conn = _Connection(
self._hosts[0]['host'],
self._hosts[0]['port'],
self._hosts[0].get('db', DEFAULT_DB),
self._read,
self._on_closed,
self.io_loop,
cluster_node=self._clustering)
self.io_loop.add_future(conn.connect(), self._on_connected)
return self._connect_future
[docs] def close(self):
"""Close any open connections to Redis.
:raises: :exc:`tredis.exceptions.ConnectionError`
"""
if not self._connected.is_set():
raise exceptions.ConnectionError('not connected')
self._closing = True
if self._clustering:
for host in self._cluster.keys():
self._cluster[host].close()
elif self._connection:
self._connection.close()
@property
def ready(self):
"""Indicates that the client is connected to the Redis server or
cluster and is ready for use.
:rtype: bool
"""
if self._clustering:
return (all([c.connected for c in self._cluster.values()])
and len(self._cluster))
return (self._connection and self._connection.connected)
def _build_command(self, parts):
"""Build the command that will be written to Redis via the socket
:param list parts: The list of strings for building the command
:rtype: bytes
"""
return self._encode_resp(parts)
def _create_cluster_connection(self, node):
"""Create a connection to a Redis server.
:param node: The node to connect to
:type node: tredis.cluster.ClusterNode
"""
LOGGER.debug('Creating a cluster connection to %s:%s', node.ip,
node.port)
conn = _Connection(
node.ip,
node.port,
0,
self._read,
self._on_closed,
self.io_loop,
cluster_node=True,
read_only='slave' in node.flags,
slots=node.slots)
self.io_loop.add_future(conn.connect(), self._on_connected)
def _encode_resp(self, value):
"""Dynamically build the RESP payload based upon the list provided.
:param mixed value: The list of command parts to encode
:rtype: bytes
"""
if isinstance(value, bytes):
return b''.join(
[b'$',
ascii(len(value)).encode('ascii'), CRLF, value, CRLF])
elif isinstance(value, str): # pragma: nocover
return self._encode_resp(value.encode('utf-8'))
elif isinstance(value, int):
return self._encode_resp(ascii(value).encode('ascii'))
elif isinstance(value, float):
return self._encode_resp(ascii(value).encode('ascii'))
elif isinstance(value, list):
output = [b'*', ascii(len(value)).encode('ascii'), CRLF]
for item in value:
output.append(self._encode_resp(item))
return b''.join(output)
else:
raise ValueError('Unsupported type: {0}'.format(type(value)))
@staticmethod
def _eval_expectation(command, response, future):
"""Evaluate the response from Redis to see if it matches the expected
response.
:param command: The command that is being evaluated
:type command: tredis.client.Command
:param bytes response: The response value to check
:param future: The future representing the execution of the command
:type future: tornado.concurrent.Future
:return:
"""
if isinstance(command.expectation, int) and command.expectation > 1:
future.set_result(response == command.expectation or response)
else:
future.set_result(response == command.expectation)
def _execute(self, parts, expectation=None, format_callback=None):
"""Really execute a redis command
:param list parts: The list of command parts
:param mixed expectation: Optional response expectation
:rtype: :class:`~tornado.concurrent.Future`
:raises: :exc:`~tredis.exceptions.SubscribedError`
"""
future = concurrent.TracebackFuture()
try:
command = self._build_command(parts)
except ValueError as error:
future.set_exception(error)
return future
def on_locked(_):
if self.ready:
if self._clustering:
cmd = Command(command, self._pick_cluster_host(parts),
expectation, format_callback)
else:
LOGGER.debug('Connection: %r', self._connection)
cmd = Command(command, self._connection, expectation,
format_callback)
LOGGER.debug('_execute(%r, %r, %r) on %s', cmd.command,
expectation, format_callback, cmd.connection.name)
cmd.connection.execute(cmd, future)
else:
LOGGER.critical('Lock released & not ready, aborting command')
# Wait until the cluster is ready, letting cluster discovery through
if not self.ready and not self._connected.is_set():
self.io_loop.add_future(
self._connected.wait(),
lambda f: self.io_loop.add_future(self._busy.acquire(), on_locked)
)
else:
self.io_loop.add_future(self._busy.acquire(), on_locked)
# Release the lock when the future is complete
self.io_loop.add_future(future, lambda r: self._busy.release())
return future
def _on_cluster_discovery(self, future):
"""Invoked when the Redis server has responded to the ``CLUSTER_NODES``
command.
:param future: The future containing the response from Redis
:type future: tornado.concurrent.Future
"""
LOGGER.debug('_on_cluster_discovery(%r)', future)
common.maybe_raise_exception(future)
nodes = future.result()
for node in nodes:
name = '{}:{}'.format(node.ip, node.port)
if name in self._cluster:
LOGGER.debug('Updating cluster connection info for %s:%s',
node.ip, node.port)
self._cluster[name].set_slots(node.slots)
self._cluster[name].set_read_only('slave' in node.flags)
else:
self._create_cluster_connection(node)
self._discovery = True
def _on_closed(self):
"""Invoked by connections when they are closed."""
self._connected.clear()
if not self._closing:
if self._on_close_callback:
self._on_close_callback()
else:
raise exceptions.ConnectionError('closed')
def _on_cluster_data_moved(self, response, command, future):
"""Process the ``MOVED`` response from a Redis cluster node.
:param bytes response: The response from the Redis server
:param command: The command that was being executed
:type command: tredis.client.Command
:param future: The execution future
:type future: tornado.concurrent.Future
"""
LOGGER.debug('on_cluster_data_moved(%r, %r, %r)', response, command,
future)
parts = response.split(' ')
name = '{}:{}'.format(*common.split_connection_host_port(parts[2]))
LOGGER.debug('Moved to %r', name)
if name not in self._cluster:
raise exceptions.ConnectionError(
'{} is not connected'.format(name))
self._cluster[name].execute(
command._replace(connection=self._cluster[name]), future)
def _on_connected(self, future):
"""Invoked when connections have been established. If the client is
in clustering mode, it will kick of the discovery step if needed. If
not, it will select the configured database.
:param future: The connection future
:type future: tornado.concurrent.Future
"""
if future.exception():
self._connect_future.set_exception(future.exception())
return
conn = future.result()
LOGGER.debug('Connected to %s (%r, %r, %r)', conn.name,
self._clustering, self._discovery, self._connected)
if self._clustering:
self._cluster[conn.name] = conn
if not self._discovery:
self.io_loop.add_future(self.cluster_nodes(),
self._on_cluster_discovery)
elif self.ready:
LOGGER.debug('Cluster nodes all connected')
if not self._connect_future.done():
self._connect_future.set_result(True)
self._connected.set()
else:
def on_selected(sfuture):
LOGGER.debug('Initial setup and selection processed')
if sfuture.exception():
self._connect_future.set_exception(sfuture.exception())
else:
self._connect_future.set_result(True)
self._connected.set()
select_future = concurrent.Future()
self.io_loop.add_future(select_future, on_selected)
self._connection = conn
cmd = Command(
self._build_command(['SELECT', str(conn.database)]),
self._connection, None, None)
cmd.connection.execute(cmd, select_future)
def _on_read_only_error(self, command, future):
"""Invoked when a Redis node returns an error indicating it's in
read-only mode. It will use the ``INFO REPLICATION`` command to
attempt to find the master server and failover to that, reissuing
the command to that server.
:param command: The command that was being executed
:type command: tredis.client.Command
:param future: The execution future
:type future: tornado.concurrent.Future
"""
failover_future = concurrent.TracebackFuture()
def on_replication_info(_):
common.maybe_raise_exception(failover_future)
LOGGER.debug('Failover closing current read-only connection')
self._closing = True
database = self._connection.database
self._connection.close()
self._connected.clear()
self._connect_future = concurrent.Future()
info = failover_future.result()
LOGGER.debug('Failover connecting to %s:%s', info['master_host'],
info['master_port'])
self._connection = _Connection(
info['master_host'], info['master_port'], database, self._read,
self._on_closed, self.io_loop, self._clustering)
# When the connection is re-established, re-run the command
self.io_loop.add_future(
self._connect_future,
lambda f: self._connection.execute(
command._replace(connection=self._connection), future))
# Use the normal connection processing flow when connecting
self.io_loop.add_future(self._connection.connect(),
self._on_connected)
if self._clustering:
command.connection.set_readonly(True)
LOGGER.debug('%s is read-only, need to failover to new master',
command.connection.name)
cmd = Command(
self._build_command(['INFO', 'REPLICATION']), self._connection,
None, common.format_info_response)
self.io_loop.add_future(failover_future, on_replication_info)
cmd.connection.execute(cmd, failover_future)
def _read(self, command, future):
"""Invoked when a command is executed to read and parse its results.
It will loop on the IOLoop until the response is complete and then
set the value of the response in the execution future.
:param command: The command that was being executed
:type command: tredis.client.Command
:param future: The execution future
:type future: tornado.concurrent.Future
"""
response = self._reader.gets()
if response is not False:
if isinstance(response, hiredis.ReplyError):
if response.args[0].startswith('MOVED '):
self._on_cluster_data_moved(response.args[0], command,
future)
elif response.args[0].startswith('READONLY '):
self._on_read_only_error(command, future)
else:
future.set_exception(exceptions.RedisError(response))
elif command.callback is not None:
future.set_result(command.callback(response))
elif command.expectation is not None:
self._eval_expectation(command, response, future)
else:
future.set_result(response)
else:
def on_data(data):
# LOGGER.debug('Read %r', data)
self._reader.feed(data)
self._read(command, future)
command.connection.read(on_data)
def _pick_cluster_host(self, value):
"""Selects the Redis cluster host for the specified value.
:param mixed value: The value to use when looking for the host
:rtype: tredis.client._Connection
"""
crc = crc16.crc16(self._encode_resp(value[1])) % HASH_SLOTS
for host in self._cluster.keys():
for slot in self._cluster[host].slots:
if slot[0] <= crc <= slot[1]:
return self._cluster[host]
LOGGER.debug('Host not found for %r, returning first connection',
value)
host_keys = sorted(list(self._cluster.keys()))
return self._cluster[host_keys[0]]
[docs]class RedisClient(Client):
"""This is provided for backwards compatibility for versions < 0.7.
.. deprecated:: 0.7
:param str host: The hostname to connect to
:param int port: The port to connect on
:param int db: The database number to use
:param method on_close: The method to call if the connection is closed
:param bool clustering: Toggle the cluster support in the client
:param bool auto_connect: Toggle the auto-connect on creation feature
"""
def __init__(self,
host=DEFAULT_HOST,
port=DEFAULT_PORT,
db=DEFAULT_DB,
on_close=None,
clustering=False,
auto_connect=True):
super(RedisClient, self).__init__(
[{
'host': host,
'port': port,
'db': db
}],
on_close,
clustering=clustering,
auto_connect=auto_connect)