API Docs for: 2.2.2
Show:

File: lib/core/riaknode.js

'use strict';

var async = require('async');
var backoff = require('backoff');
var events = require('events');
var Joi = require('joi');
var logger = require('winston');
var util = require('util');

var RiakConnection = require('./riakconnection');
var Ping = require('../commands/ping');
var utils = require('./utils');

var rpb = require('../protobuf/riakprotobuf');
var rpbErrorRespCode = rpb.getCodeFor('RpbErrorResp');

var nid = 0;

/**
 * @module Core
 */

/**
 * Provides the RiakNode class and its Builder.
 *
 * Instances of this class maintain connections to and execute commands on
 * a Riak node in a Riak cluster.
 *
 * __options__ is an object with the following defaults:
 *
 *     {
 *       remoteAdddress: '127.0.0.1',
 *       remotePort: 8087,
 *       maxConnections: 128,
 *       minConnections: 1,
 *       idleTimeout: 10000,
 *       connectionTimeout: 3000,
 *       requestTimeout: 5000,
 *       cork: true
 *     }
 *
 * As a convenience a builder class is provided;
 *
 *     var newNode = new RiakNode.Builder().withMinConnections(10).build();
 *
 * See {{#crossLink "RiakNode.Builder"}}RiakNode.Builder{{/crossLink}}
*
 * @class RiakNode
 * @constructor
 * @param {Object} options The options for this RiakNode.
 * @param {String} [options.remoteAddress=127.0.0.1] The address for this node. Can also be a FQDN.
 * @param {Number} [options.remotePort=8087] The port to connect to.
 * @param {Number} [options.minConnections=1] Set the minimum number of active connections to maintain.
 * @param {Number} [options.maxConnections=128] Set the maximum number of connections allowed.
 * @param {Number} [options.idleTimeout=10000] Set the idle timeout used to reap inactive connections.
 * @param {Number} [options.connectionTimeout=3000] Set the connection timeout used when making new connections.
 * @param {Number} [options.requestTimeout=5000] Set the timeout used when executing commands.
 * @param {Object} [options.auth] Set the authentication information for connections made by this node.
 * @param {Boolean} [options.cork] Use 'cork' on all sockets. Default is true.
 * @param {Boolean} [options.externalLoadBalancer] This RiakNode object connects to a load balancer. Default is false.
 * @param {String} options.auth.user Riak username.
 * @param {String} [options.auth.password] Riak password. Not required if using user cert.
 * @param {String|Buffer} [options.auth.pfx] A string or buffer holding the PFX or PKCS12 encoded private key, certificate and CA certificates.
 * @param {String|Buffer} [options.auth.key] A string holding the PEM encoded private key.
 * @param {String} [options.auth.passphrase]  A string of passphrase for the private key or pfx.
 * @param {String|Buffer} [options.auth.cert]  A string holding the PEM encoded certificate.
 * @param {String|String[]|Buffer[]} [options.auth.ca] Either a string or list of strings of PEM encoded CA certificates to trust.
 * @param {String|String[]|Buffer[]} [options.auth.crl] Either a string or list of strings of PEM encoded CRLs (Certificate Revocation List).
 * @param {Boolean} [options.auth.rejectUnauthorized] A boolean indicating whether a server should automatically reject clients with invalid certificates. Only applies to servers with requestCert enabled.
 *
 */
function RiakNode(options) {
    events.EventEmitter.call(this);

    var self = this;

    if (options === undefined) {
        options = {};
    }

    Joi.validate(options, schema, function (err, options) {
        if (err) {
            throw err;
        }
        self.remoteAddress = options.remoteAddress;
        self.remotePort = options.remotePort;
        self.minConnections = options.minConnections;
        self.maxConnections = options.maxConnections;
        self.idleTimeout = options.idleTimeout;
        self.connectionTimeout = options.connectionTimeout;
        self.requestTimeout = options.requestTimeout;
        self.state = State.CREATED;
        self.auth = options.auth;
        self.cork = options.cork;
        self.externalLoadBalancer = options.externalLoadBalancer;
        self.healthCheck = options.healthCheck;
    });

    this.executeCount = 0;

    // This is to facilitate debugging
    this.name = util.format('[RiakNode] (%s:%d-%d)',
        this.remoteAddress, this.remotePort, nid);
    nid++;

    // Note: useful for debugging event issues
    // this.setMaxListeners(1);

    // private data
    var currentNumConnections = 0;
    var available = [];

    // protected funcs
    this._getConnectionFromPool = function (cmd_name) {
        var conn = null;
        do {
            if (available.length === 0) {
                break;
            }
            conn = available.shift();
            if (conn.closed) {
                /*
                 * NB: this is expected as connection callbacks can close connections
                 * while they remain within the connection pool
                 */
                logger.debug('%s execute of %s attempted on closed connection', this.name, cmd_name);
                conn.executeDone();
                conn = null;
            }
        } while (!conn);
        return conn;
    };

    this._returnConnectionToPool = function (conn) {
        if (this.state < State.SHUTTING_DOWN) {
            conn.executeDone();
            available.unshift(conn);
            logger.debug("%s _returnConnectionToPool a: %d, cnc: %d",
                this.name, available.length, currentNumConnections);
        } else {
            logger.debug('%s connection returned to pool during shutdown.', this.name);
            this._decrementConnectionCount();
            conn.close();
        }
    };

    this._decrementConnectionCount = function () {
        if ((currentNumConnections - 1) < 0) {
            logger.debug("%s cnc will decrement less than zero! (%d)",
                this.name, currentNumConnections);
        }
        currentNumConnections--;
    };

    this._executeAllowed = function () {
        return this.state === State.RUNNING;
    };

    this._createNewConnectionAllowed = function () {
        return this.state === State.RUNNING &&
               currentNumConnections < this.maxConnections;
    };

    function expireIdleConnections(name, minConnections, idleTimeout) {
        logger.debug("%s expiring idle connections", name);
        var now = Date.now();
        var count = 0;
        var na = [];
        var conn = null;
        while (available.length) {
            conn = available.shift();
            // NB: don't keep any closed connections around.
            if (conn.closed) {
                conn.executeDone();
                conn = null;
                continue;
            }

            // NB: if a connection is executing, keep it around
            if (conn.inFlight) {
                na.push(conn);
                continue;
            }

            // NB: don't expire past minConnections
            if (currentNumConnections <= minConnections) {
                na.push(conn);
                continue;
            }

            if ((now - conn.lastUsed) >= idleTimeout) {
                /* jshint validthis:true */
                this._decrementConnectionCount();
                conn.close();
                count++;
                continue;
            }

            na.push(conn);
        }
        available = na;
        logger.debug("%s expired %d connections.", name, count);
    }

    var expireTimer = null;
    this._startIdleExpiration = function () {
        var cb = expireIdleConnections.bind(this, this.name, this.minConnections, this.idleTimeout);
        expireTimer = setInterval(cb, 5000);
    };

    this._stopIdleExpiration = function () {
        clearInterval(expireTimer);
    };

    this._createNewConnection = function (postConnectFunc, postFailFunc, healthCheck) {
        currentNumConnections++;

        var conn = new RiakConnection({
            remoteAddress : this.remoteAddress,
            remotePort : this.remotePort,
            connectionTimeout : this.connectionTimeout,
            requestTimeout : this.requestTimeout,
            auth: this.auth,
            healthCheck: healthCheck,
            cork: this.cork
        });

        var self = this;

        conn.on('connected', function (conn) {
            logger.debug("%s conn.on-connected", this.name);
            conn.on('responseReceived', self._responseReceived.bind(self));
            conn.on('connectionClosed', self._connectionClosed.bind(self));
            postConnectFunc(conn);
        });

        conn.on('connectFailed', function (conn, err){
            // NB: when connectFailed is raised, conn is already closed
            logger.debug("%s conn.on-connectFailed", this.name);
            self._decrementConnectionCount();
            postFailFunc(err);
        });

        conn.connect();
    };

    this._makeNewConnectionFunc = function (node) {
        var f = function (async_cb) {
            var postConnect = function (conn) {
                node._returnConnectionToPool(conn);
                async_cb(null, true);
            };
            var postFail = function (err) {
                async_cb(err, false);
            };
            node._createNewConnection(postConnect, postFail);
        };
        return f;
    };

    var hcb = backoff.fibonacci({
        initialDelay: 10,
        maxDelay: 5000
    });
    function initHealthChecker(self) {
        logger.debug("%s initializing health checker", self.name);
        var hcFunc = self.healthCheck;

        function hcSuccess(newConn) {
            hcb.reset();
            self._returnConnectionToPool(newConn);
            self.state = State.RUNNING;
            logger.debug("%s health check success", self.name);
            self.emit(EVT_SC, self, self.state);
        }

        function hcFailed(err) {
            logger.debug("%s failed health check:", err);
            // NB: healthcheck interval *should* be less than re-try interval
            hcb.backoff();
        }

        function hcReady(n, d) {
            self._createNewConnection(hcSuccess, hcFailed, hcFunc);
        }

        function hcBackoffHandler(n, d) {
            logger.debug("%s running health check %d, delay: %d", self.name, n, d);
        }

        hcb.on('ready', hcReady);
        hcb.on('backoff', hcBackoffHandler);
    }
    initHealthChecker(this);

    this._doHealthCheck = function () {
        switch (this.state) {
            case State.HEALTH_CHECKING:
                if (this.externalLoadBalancer) {
                    logger.error('%s already in health check, external load balancer:',
                        this.name, this.externalLoadBalancer);
                } else {
                    logger.debug("%s already in health check", this.name);
                }
                break;
            case State.RUNNING:
                if (this.externalLoadBalancer) {
                    logger.warn('%s would have health checked, external load balancer:',
                        this.name, this.externalLoadBalancer);
                } else {
                    this.state = State.HEALTH_CHECKING;
                    this.emit(EVT_SC, this, this.state);
                    hcb.backoff();
                }
                break;
            default:
                logger.warn('%s health check attempted in state:',
                    this.name, stateNames[this.state]);
        }
    };

    var sdb = backoff.fibonacci({
        initialDelay: 125,
        maxDelay: 1000
    });
    sdb.failAfter(10);
    var sdbHandlersInitialized = false;
    function runShutdown(self, callback) {
        var conns = [];
        function closeConnections() {
            logger.debug("%s closing %d connections",
                self.name, currentNumConnections);
            var conn = null;
            while (available.length) {
                conn = available.shift();
                if (conn.closed) {
                    conn.executeDone();
                    conn = null;
                } else {
                    self._decrementConnectionCount();
                    conn.close();
                    conns.push(conn);
                }
            }
            logger.debug("%s closed connections (cnc: %d)",
                self.name, currentNumConnections);
        }

        function areConnsClosed() {
            closeConnections();
            var conns_closed = true;
            for (var i = 0; i < conns.length; i++) {
                var c = conns[i];
                if (!c.closed) {
                    conns_closed = false;
                    break;
                }
            }
            if (currentNumConnections > 0) {
                conns_closed = false;
            }
            return conns_closed;
        }

        function nodeShutdown() {
            sdb.reset();
            self.state = State.SHUTDOWN;
            logger.debug("%s shut down.", self.name);
            if (self.executeCount !== 0) {
                logger.warn('%s execution count (%d) NOT ZERO at shutdown',
                    self.name, self.executeCount);
            }
            if (currentNumConnections !== 0) {
                logger.warn('%s connection count (%d) NOT ZERO at shutdown',
                    self.name, currentNumConnections);
            }
            self.emit(EVT_SC, self, self.state);
            self.removeAllListeners();
            if (callback) {
                callback(null, self.state);
            }
        }

        function sdReady() {
            if (areConnsClosed()) {
                nodeShutdown();
            } else {
                logger.debug("%s connections still not closed", self.name);
                sdb.backoff();
            }
        }

        function sdBackoffHandler(n, d) {
            logger.debug("%s running shutdown %d, delay: %d", self.name, n, d);
        }

        function sdFailHandler() {
            logger.warn("%s shutting down after max tries", self.name);
            nodeShutdown();
        }

        if (!sdbHandlersInitialized) {
            sdb.on('ready', sdReady);
            sdb.on('backoff', sdBackoffHandler);
            sdb.on('fail', sdFailHandler);
            sdbHandlersInitialized = true;
        }

        if (areConnsClosed()) {
            nodeShutdown();
        } else {
            sdb.backoff();
        }
    }

    this._shutdown = function(callback) {
        runShutdown(this, callback);
    };

    this._stateCheck = function (allowedStates) {
        return utils.stateCheck(this.name, this.state,
            allowedStates, stateNames);
    };
}

util.inherits(RiakNode, events.EventEmitter);

/**
 * Start this RiakNode.
 * @method start
 * @param {Function} callback - a callback for when node is started.
 */
RiakNode.prototype.start = function(callback) {
    this._stateCheck([State.CREATED]);

    logger.debug('%s starting', this.name);

    // Fire up connection pool
    var funcs = [];
    for (var i = 0; i < this.minConnections; i++) {
        funcs.push(this._makeNewConnectionFunc(this));
    }

    var self = this;
    async.parallel(funcs, function (err, rslts) {
        if (err) {
            logger.error('%s (%d) error during start:', self.name, self.executeCount, err);
        }
        self._startIdleExpiration();
        self.state = State.RUNNING;
        logger.debug('%s started', self.name);
        self.emit(EVT_SC, self, self.state);
        if (callback) {
            callback(err, self);
        }
    });
};

/**
 * Stop this RiakNode.
 * @param {Function} callback - called when node completely stopped.
 * @method stop
 */
RiakNode.prototype.stop = function(callback) {
    this._stateCheck([State.RUNNING, State.HEALTH_CHECKING]);
    this._stopIdleExpiration();
    this.state = State.SHUTTING_DOWN;
    logger.debug("%s shutting down", this.name);
    this.emit(EVT_SC, this, this.state);
    this._shutdown(callback);
};

/**
 * Execute a command on this RiakNode.
 * @method execute
 * @param {Object} command - a command to execute.
 * @return {Boolean} - if this RiakNode accepted the command for execution.
 */
RiakNode.prototype.execute = function (command) {
    this._stateCheck([State.RUNNING, State.HEALTH_CHECKING]);

    logger.debug("%s executing command '%s'", this.name, command.name);

    var executed = false;
    if (this._executeAllowed()) {
        var conn = this._getConnectionFromPool(command.name);
        // conn will be undefined if there's no available connections.
        if (!conn) {
            if (this._createNewConnectionAllowed()) {
                var self = this;
                this._createNewConnection(function (newConn) {
                    logger.debug("%s executing command '%s' (new connection %d)",
                        self.name, command.name, self.executeCount);
                    // NB: state could have transitioned out of RUNNING in the time
                    // connection is established
                    if (self._executeAllowed() && newConn.execute(command)) {
                        self.executeCount++;
                        logger.debug("%s (%d) executed command: '%s'",
                            self.name, self.executeCount, command.name);
                    }
                }, function (err) {
                    logger.debug("%s (%d) command execution failed '%s'",
                        self.name, self.executeCount, command.name);
                    self._doHealthCheck();
                    self._maybeRetryCommand(command, function () {
                        command.onError(err);
                    });
                });
                // NB: returning true is the only option since
                // creating a new connection is async
                executed = true;
            } else {
                logger.debug('%s all connections in use and at max', this.name);
                executed = false;
            }
        } else {
            logger.debug("%s executing command '%s' (existing connection %d)",
                this.name, command.name, this.executeCount);
            if (conn.execute(command)) {
                this.executeCount++;
                logger.debug("%s (%d) executed command: '%s'",
                    this.name, this.executeCount, command.name);
            }
            // NB: returning true is the only option since
            // executing a command is async
            executed = true;
        }
    }
    return executed;
};

RiakNode.prototype._responseReceived = function (conn, command, code, decoded) {
    // NB: this function is similar to _receiveHealthCheck in RiakConnection
    logger.debug("%s command '%s' _responseReceived: %d", this.name, command.name, code);
    var self = this;
    function onError(err) {
        self.executeCount--;
        self._returnConnectionToPool(conn);
        self._maybeRetryCommand(command, function () {
            if (err.riakError) {
                command.onRiakError(decoded);
            } else {
                command.onError(err.msg);
            }
        });
    }
    function onSuccess() {
        // All of our responses that return multiple protobuf messages (streaming) use
        // a "done" field. Checking for it allows us to know when a streaming op is done and
        // return the connections to the pool before calling the callback.
        // Some responses will be empty (null body), so we also need to account for that.
        var hasDone = decoded ? decoded.hasOwnProperty('done') : false;
        if ((hasDone && decoded.done) || !hasDone) {
            self.executeCount--;
            self._returnConnectionToPool(conn);
            logger.debug('%s command %s complete (%d)',
                self.name, command.name, self.executeCount);
        }
        command.onSuccess(decoded);
    }
    var data = {
        conn: conn,
        command: command,
        code: code,
        decoded: decoded,
        shouldCallback: false
    };
    utils.handleRiakResponse(data, onError, onSuccess);
};

RiakNode.prototype._connectionClosed = function (conn) {
    this._decrementConnectionCount();
    // See if a command was being handled
    var command = conn.command;
    logger.debug("%s connection closed; command: '%s', in-flight: '%s'",
        this.name, command.name, conn.inFlight);
    // NB: if there is no executing command on this connection,
    // inFlight will be false
    if (conn.inFlight) {
        this.executeCount--;
        this._maybeRetryCommand(command, function () {
            command.onError("Connection closed while executing command");
        });
    }
    this._doHealthCheck();
};

RiakNode.prototype._maybeRetryCommand = function (command, errfunc) {
    var tries = command.remainingTries;
    command.remainingTries--;
    logger.debug("%s command %s remaining tries %d -> %d",
        this.name, command.name, tries, command.remainingTries);
    if (command.remainingTries > 0) {
        this.emit(EVT_RC, command, this);
    } else {
        errfunc();
    }
};

/**
 * The state of this node.
 *
 * If listeneing for stateChange events, a numeric value will be sent that
 * can be compared to:
 *
 *     RiakNode.State.CREATED
 *     RiakNode.State.RUNNING
 *     RiakNode.State.HEALTH_CHECKING
 *     RiakNode.State.SHUTTING_DOWN
 *     RiakNode.State.SHUTDOWN
 *
 * See: {{#crossLink "RiakNode/stateChange:event"}}stateChange{{/crossLink}}
 *
 * @property State
 * @type {Object}
 * @static
 * @final
 */
var State = Object.freeze({
    CREATED : 0,
    RUNNING : 1,
    HEALTH_CHECKING : 2,
    SHUTTING_DOWN : 3,
    SHUTDOWN : 4
});

var stateNames = Object.freeze({
    0 : 'CREATED',
    1 : 'RUNNING',
    2 : 'HEALTH_CHECKING',
    3 : 'SHUTTING_DOWN',
    4 : 'SHUTDOWN'
});

var consts = Object.freeze({
    DefaultRemoteAddress : '127.0.0.1',
    DefaultRemotePort : 8087,
    DefaultMinConnections : 1,
    DefaultMaxConnections : 256,
    DefaultIdleTimeout : 10000,
    DefaultConnectionTimeout : 3000,
    DefaultRequestTimeout : 5000,
    DefaultHealthCheck : new Ping(function (){})
});

var schema = Joi.object().keys({
    remoteAddress: Joi.string().default(consts.DefaultRemoteAddress),
    remotePort: Joi.number().min(1).default(consts.DefaultRemotePort),
    minConnections: Joi.number().min(0).default(consts.DefaultMinConnections),
    maxConnections: Joi.number().min(0).default(consts.DefaultMaxConnections),
    idleTimeout: Joi.number().min(1000).default(consts.DefaultIdleTimeout),
    connectionTimeout: Joi.number().min(1).default(consts.DefaultConnectionTimeout),
    requestTimeout: Joi.number().min(1).default(consts.DefaultRequestTimeout),
    healthCheck: Joi.object().default(consts.DefaultHealthCheck),
    cork: Joi.boolean().default(true),
    externalLoadBalancer: Joi.boolean().default(false),
    auth: Joi.object().optional().keys({
        user: Joi.string().required(),
        password: Joi.string().allow(''),
        // https://nodejs.org/api/tls.html#tls_tls_createsecurecontext_details
        pfx: [Joi.string(), Joi.binary()],
        key: [Joi.string(), Joi.binary()],
        passphrase: Joi.string(),
        cert: [Joi.string(), Joi.binary()],
        ca: [Joi.string(), Joi.array().items(Joi.string(), Joi.binary())],
        crl: [Joi.string(), Joi.array().items(Joi.string(), Joi.binary())],
        rejectUnauthorized: Joi.boolean()
    }).xor('password', 'cert', 'pfx')
      .with('cert', 'key')
      .without('password', ['cert', 'pfx'])
});

/**
 * This event is fired whenever the state of the RiakNode changes.
 * @event stateChange
 * @param {Object} node - the RiakNode object whose state changed
 * @param {Number} state - the {{#crossLink "RiakNode/State:property"}}RiakNode.State{{/crossLink}}
 */
var EVT_SC = 'stateChange';

/**
 * This event is fired whenever a command fails and needs to be retried.
 * @event retryCommand
 * @param {Object} command - the command to retry
 * @param {RiakNode} node - this RiakNode
 */
var EVT_RC = 'retryCommand';

/**
 * A Builder for constructing RiakNode instances.
 *
 * Rather than having to manually construct the __options__ and instantiating
 * a RiakNode directly, this builder may be used.
 *
 *      var riakNode = new RiakNode.Builder().withRemotePort(9999).build();
 *
 * @class RiakNode.Builder
 * @constructor
 */
function Builder() {}

Builder.prototype = {
    /**
     * Set the remote address for the RiakNode.
     * @method withRemoteAddress
     * @param {String} address - IP or hostanme of the Riak node (__default:__ 127.0.0.1)
     * @return {RiakNode.Builder}
     */
    withRemoteAddress : function (address) {
        this.remoteAddress = address;
        return this;
    },
    /**
     * Set the remote port for this RiakNode.
     * @method withRemotePort
     * @param {Number} port - remote port of the Riak node (__default:__ 8087)
     * @return {RiakNode.Builder}
     */
    withRemotePort : function (port) {
        this.remotePort = port;
        return this;
    },
    /**
     * Set the minimum number of active connections to maintain.
     * These connections are exempt from the idle timeout.
     * @method withMinConnections
     * @param {Number} minConnections - number of connections to maintain (__default:__ 1)
     * @return {RiakNode.Builder}
     */
    withMinConnections : function (minConnections) {
        this.minConnections = minConnections;
        return this;
    },
    /**
     * Set the maximum number of connections allowed.
     * @method withMaxConnections
     * @param {Number} maxConnections - maximum number of connections to allow (__default:__ 10000)
     * @return {RiakNode.Builder}
     */
    withMaxConnections : function (maxConnections) {
        this.maxConnections = maxConnections;
        return this;
    },
    /**
     * Set the idle timeout used to reap inactive connections.
     * Any connection that has been idle for this amount of time
     * becomes eligible to be closed and discarded excluding the number
     * set via __withMinConnections()__.
     * @method withIdleTimeout
     * @param {Number} idleTimeout - the timeout in milliseconds (__default:__ 10000)
     * @return {RiakNode.Builder}
     */
    withIdleTimeout : function (idleTimeout) {
        this.idleTimeout = idleTimeout;
        return this;
    },
    /**
     * Set the connection timeout used when making new connections.
     * @method withConnectionTimeout
     * @param {Number} connectionTimeout - timeout in milliseconds (__default:__ 3000).
     * @return {RiakNode.Builder}
     */
    withConnectionTimeout : function (connectionTimeout) {
        this.connectionTimeout = connectionTimeout;
        return this;
    },
    /**
     * Set the request timeout used when executing commands.
     * @method withRequestTimeout
     * @param {Number} requestTimeout - timeout in milliseconds (__default:__ 5000).
     * @return {RiakNode.Builder}
     */
    withRequestTimeout : function (requestTimeout) {
        this.requestTimeout = requestTimeout;
        return this;
    },
    /**
     * Set whether to use the cork/uncork socket functions.
     *
     * @method withCork
     * @param {Boolean} [cork=true] use cork/uncork. Default is true.
     * @chainable
     */
    withCork : function (cork) {
        this.cork = cork;
        return this;
    },
    /**
     * Set whether this RiakNode connects to an external load balancer.
     *
     * @method withExternalLoadBalancer
     * @param {Boolean} [externalLoadBalancer=true] connects to external load balancer. Default is false.
     * @chainable
     */
    withExternalLoadBalancer : function (externalLoadBalancer) {
        this.externalLoadBalancer = externalLoadBalancer;
        return this;
    },
    /**
     * Set the authentication information for connections made by this node.
     * @method withAuth
     * @param {Object} auth Set the authentication information for connections made by this node.
     * @param {String} auth.user Riak username.
     * @param {String} [auth.password] Riak password. Not required if using user cert.
     * @param {String|Buffer} [auth.pfx] A string or buffer holding the PFX or PKCS12 encoded private key, certificate and CA certificates.
     * @param {String|Buffer} [auth.key] A string holding the PEM encoded private key.
     * @param {String} [auth.passphrase]  A string of passphrase for the private key or pfx.
     * @param {String|Buffer} [auth.cert]  A string holding the PEM encoded certificate.
     * @param {String|String[]|Buffer[]} [auth.ca] Either a string or list of strings of PEM encoded CA certificates to trust.
     * @param {String|String[]|Buffer[]} [auth.crl] Either a string or list of strings of PEM encoded CRLs (Certificate Revocation List).
     * @param {Boolean} [auth.rejectUnauthorized] A boolean indicating whether a server should automatically reject clients with invalid certificates. Only applies to servers with requestCert enabled.     * @return {RiakNode.Builder}
     */
    withAuth : function (auth) {
        this.auth = auth;
        return this;
    },
    /**
     * Set the command to be used for a health check.
     *
     * If this RiakNode performs a health check, a new connection is made and
     * a command performed. The default is to send a {{#crossLink "Ping"}}{{/crossLink}}
     * command but any command can be used. If it completes successfully the
     * health check is considered a success.
     * @method withHealthCheck
     * @param {Object} healthCheck - a command to execute as a health check.
     * @chainable
     */
    withHealthCheck : function (healthCheck) {
        this.healthCheck = healthCheck;
        return this;
    },
    /**
     * Builds a RiakNode instance.
     * @method build
     * @return {RiakNode}
     */
    build : function () {
        return new RiakNode(this);
    }
};

/**
 * Static factory for constructing a set of RiakNodes.
 *
 * To create a set of RiakNodes with the same options:
 *
 *      var options = new RiakNode.Builder().withMinConnections(10);
 *      var nodes = RiakNode.buildNodes(['192.168.1.1', '192.168.1.2'], options);
 *
 * __options__ can be manually constructed or an instance of the Builder.
 *
 * @static
 * @method buildNodes
 * @param {String[]} addresses - an array of IP|hostname[:port]
 * @param {Object} [options] - the options to use for all RiakNodes.
 * @return {Array/RiakNode}
 */
var buildNodes = function (addresses, options) {
    var riakNodes = [];

    if (options === undefined) {
        options = {};
    }

    for (var i = 0; i < addresses.length; i++) {
        var split = addresses[i].split(':');
        options.remoteAddress = split[0];
        if (split.length === 2) {
            options.remotePort = split[1];
        }
        riakNodes.push(new RiakNode(options));
    }

    return riakNodes;
};

module.exports = RiakNode;
module.exports.buildNodes = buildNodes;
module.exports.Builder = Builder;
module.exports.State = State;
module.exports.consts = consts;