API Docs for: 2.2.2
Show:

File: lib/commands/kv/secondaryindexquery.js

'use strict';

var CommandBase = require('../commandbase');
var inherits = require('util').inherits;
var Joi = require('joi');
var RpbIndexReq = require('../../protobuf/riakprotobuf').getProtoFor('RpbIndexReq');
var RpbPair = require('../../protobuf/riakprotobuf').getProtoFor('RpbPair');
var RiakObject = require('./riakobject');

/**
 * Provides the SecondaryIndexQuery class and its builder.
 * @module KV
 */

/**
 * Command used to perform a secondary index query.
 *
 * As a convenience, a builder class is provided:
 *
 *     var query = new SecondaryIndexQuery.Builder()
 *                  .withBucket('myBucket')
 *                  .withIndexName('email_bin')
 *                  .withIndexKey('roach@basho.com')
 *                  .withCallback(myCallback)
 *                  .build();
 *
 * See {{#crossLink "SecondaryIndexQuery.Builder"}}SecondaryIndexQuery.Builder{{/crossLink}}
 *
 * @class SecondaryIndexQuery
 * @constructor
 * @param {Object} options The options for this command.
 * @param {String} [options.bucketType=default] The bucket type in riak. If not supplied 'default' is used.
 * @param {String} options.bucket The bucket in riak.
 * @param {String} options.indexName The secondary index name to query
 * @param {String|Number} [options.indexKey] A single index key to query
 * @param {String|Number} [options.rangeStart] Starting key for a range query
 * @param {String|Number} [options.rangeEnd] Ending key for a range query
 * @param {Boolean} [options.returnKeyAndIndex=false] Return the index keys along with the object keys
 * @param {Number} [options.maxResults] Limit the results returned and paginate if necessary
 * @param {Buffer} [options.continuation] A continuation returned from a previous query that set maxResults. Used for pagination.
 * @param {Boolean} [options.stream=true] Whether to stream or accumulate the result before calling callback.
 * @param {Number} [options.timeout] Set a timeout for this operation.
 * @param {Boolean} [options.paginationSort=false] True to sort a non-paginated query.
 * @param {Function} callback the callback to be executed by the command.
 * @param {String} callback.err  An error message. Will ne null if no error.
 * @param {Object} callback.response The response from Riak.
 * @param {Object[]} callback.response.values Object keys returned by the query, and optionally the index keys.
 * @param {Boolean} callback.response.done True if you have received all the results.
 * @param {Buffer} callback.response.continuation The continuation if a continuation was returned.
 * @param {Object} callback.data additional error data. Will be null if no error.
 * @extends CommandBase
 */
function SecondaryIndexQuery(options, callback) {
    CommandBase.call(this, 'RpbIndexReq', 'RpbIndexResp', callback);
    this.validateOptions(options, schema);

    if (this.options.indexKey === null && (this.options.rangeStart === null || this.options.rangeEnd === null)) {
        throw new Error("either 'indexKey' or 'rangeStart' + 'rangeEnd' are required");
    }

    if (!this.options.stream) {
        this.responses = [];
    }
}

inherits(SecondaryIndexQuery, CommandBase);

SecondaryIndexQuery.prototype.constructPbRequest = function() {

    var protobuf = this.getPbReqBuilder();
    // We always stream from Riak.
    protobuf.setStream(true);

    protobuf.setBucket(new Buffer(this.options.bucket));
    protobuf.setType(new Buffer(this.options.bucketType));
    protobuf.setIndex(new Buffer(this.options.indexName));
    protobuf.setReturnTerms(this.options.returnKeyAndIndex);

    if (this.options.indexKey !== null) {
        // _int index requests take ... strings, not numbers
        protobuf.setKey(new Buffer(this.options.indexKey.toString()));
        protobuf.setQtype(RpbIndexReq.IndexQueryType.eq);
    } else {
        protobuf.setRangeMin(new Buffer(this.options.rangeStart.toString()));
        protobuf.setRangeMax(new Buffer(this.options.rangeEnd.toString()));
        protobuf.setQtype(RpbIndexReq.IndexQueryType.range);
    }

    if (!this.options.maxResults && this.options.paginationSort) {
        protobuf.setPaginationSort(this.options.paginationSort);
    }
    protobuf.setMaxResults(this.options.maxResults);
    protobuf.setContinuation(this.options.continuation);
    protobuf.setTimeout(this.options.timeout);

    return protobuf;
};

SecondaryIndexQuery.prototype.onSuccess = function(rpbIndexResp) {

    /*
    * @private
    * The 2i API is inconsistent on the Riak side. If it's not
    * a range query, return_terms is ignored it only returns the
    * list of object keys and you have to have
    * preserved the index key if you want to return it to the user
    * with the results.
    *
    * Also, the $key index queries just ignore return_terms altogether.
    */

    var i, resultsToReturn;
    if (rpbIndexResp.results.length) {
        // Index keys and object keys were returned
        resultsToReturn = new Array(rpbIndexResp.results.length);
        for (i = 0; i < rpbIndexResp.results.length; i++) {
            var iKey = rpbIndexResp.results[i].key.toString('utf8');
            if (RiakObject.isIntIndex(this.options.indexName)) {
                iKey = parseInt(iKey);
            }
            resultsToReturn[i] = { indexKey : iKey,
                                  objectKey : rpbIndexResp.results[i].value.toString('utf8') };
        }
    } else {
        // only object keys were returned
        resultsToReturn = new Array(rpbIndexResp.keys.length);
        for (i = 0; i < rpbIndexResp.keys.length; i++) {
            var key = null;
            if (this.options.returnKeyAndIndex) {
                // this is only possible if this was a single key query
                key = this.options.indexKey;
            }
            resultsToReturn[i] = { indexKey: key, objectKey: rpbIndexResp.keys[i].toString('utf8') };
        }
    }

    var continuation = rpbIndexResp.continuation;

    if (this.options.stream) {
        this._callback(null, { values: resultsToReturn, done: rpbIndexResp.done, continuation: continuation });
    } else {
        Array.prototype.push.apply(this.responses, resultsToReturn);
        if (rpbIndexResp.done) {
            this._callback(null, { values: this.responses, done : true, continuation: continuation });
        }
    }

    return rpbIndexResp.done;

};

// Explicitly set null on options for protobufs and for conditionals
var schema = Joi.object().keys({
    bucket: Joi.string().required(),
    bucketType: Joi.string().default('default'),
    indexName: Joi.string().required(),
    indexKey: Joi.alternatives().try(
            Joi.number().options({ convert: false }), Joi.string()
        ).default(null).optional(),
    rangeStart: Joi.alternatives().try(
            Joi.number().options({ convert: false }), Joi.string()
        ).default(null).optional(),
    rangeEnd: Joi.alternatives().try(
            Joi.number().options({ convert: false }), Joi.string()
        ).default(null).optional(),
    returnKeyAndIndex: Joi.boolean().default(null).optional(),
    maxResults: Joi.number().default(null).optional(),
    continuation: Joi.any().default(null).optional(),
    timeout: Joi.number().default(null).optional(),
    stream: Joi.boolean().default(true).optional(),
    paginationSort: Joi.boolean().default(null).optional()
});

/**
 * A builder for constructing SecondaryIndexquery instances.
 *
 * Rather than having to manually construct the __options__ and instantiating
 * a SecondaryIndexQuery directly, this builder may be used.
 *
 *     var query = new SecondaryIndexQuery.Builder()
 *                  .withBucket('myBucket')
 *                  .withIndexName('email_bin')
 *                  .withIndexKey('roach@basho.com')
 *                  .withCallback(myCallback)
 *                  .build();
 *
 * @class SecondaryIndexQuery.Builder
 * @constructor
 */
function Builder() {}

Builder.prototype = {

    /**
     * Set the bucket.
     * @method withBucket
     * @param {String} bucket the bucket in Riak
     * @chainable
     */
    withBucket : function(bucket) {
        this.bucket = bucket;
        return this;
    },
    /**
     * Set the bucket type.
     * If not supplied, 'default' is used.
     * @method withBucketType
     * @param {String} bucketType the bucket type in riak
     * @chainable
     */
    withBucketType : function(bucketType) {
        this.bucketType = bucketType;
        return this;
    },
    /**
     * Set the index name
     * @method
     * @param {String} indexName the index to query
     * @chainable
     */
    withIndexName : function(indexName) {
        this.indexName = indexName;
        return this;
    },
    /**
     * Set a single secondary index key to use for query.
     * Note that you can only set a single key, or a range.
     * @method withIndexKey
     * @param {String|Number} indexKey the secondary index key.
     * @chainable
     */
    withIndexKey : function(indexKey) {
        this.indexKey = indexKey;
        return this;
    },
    /**
     * Set a range for the query
     * @method withRange
     * @param {String|Number} start the start of the range
     * @param {String|Number} end the end of the range
     * @chainable
     */
    withRange : function(start, end) {
        this.rangeStart = start;
        this.rangeEnd = end;
        return this;
    },
    /**
     * Set whether to return the index keys with the Riak object keys.
     * Setting this to true will return both the index key and the Riak
     * object's key. The default is false (only to return the Riak object keys).
     * @method withReturnKeyAndIndex
     * @param {Boolean} returnKeyAndIndex whether to return the index keys as well
     * @chainable
     */
    withReturnKeyAndIndex : function(returnKeyAndIndex) {
        this.returnKeyAndIndex = returnKeyAndIndex;
        return this;
    },
    /**
     * Set the maximum number of results returned by the query.
     *
     * When asking for large result sets, it is often desirable to ask the
     * servers to return chunks of results instead of a firehose.
     * You can do so using this method, where maxResults is the number of
     * results you'd like to receive.
     *
     * Assuming more keys are available, a continuation value will be included
     * in the results to allow the client to request the next page.
     * @method withMaxResults
     * @param {Number} maxResults the max number of results to return.
     * @chainable
     */
    withMaxResults : function(maxResults) {
        this.maxResults = maxResults;
        return this;
    },
    /**
     * Set the continuation for this query.
     * If using pagination via maxResults, this will have been returned
     * by a previous query.
     * @method withContinuation
     * @param {Buffer} continuation the continuation from a previous query
     * @chainable
     */
    withContinuation : function(continuation) {
        this.continuation = continuation;
        return this;
    },
    /**
    * Set a timeout for this operation.
    * @method withTimeout
    * @param {Number} timeout a timeout in milliseconds.
    * @chainable
    */
    withTimeout : function(timeout) {
        this.timeout = timeout;
        return this;
    },
    /**
     * Set the callback to be executed when the operation completes.
     * @method withCallback
     * @param {Function} callback the callback to be executed by the command.
     * @param {String} callback.err  An error message. Will ne null if no error.
     * @param {Object} callback.response The response from Riak.
     * @param {Object[]} callback.response.values Object keys returned by the query, and optionally the index keys.
     * @param {Boolean} callback.response.done True if you have received all the results.
     * @param {Buffer} callback.response.continuation The continuation if a continuation was returned.
     * @chainable
     */
    withCallback : function(callback) {
        this.callback = callback;
        return this;
    },
    /**
     * Stream the results.
     * Setting this to true will cause you callback to be called as the results
     * are returned from Riak. Set to false the result set will be buffered and
     * delevered via a single call to your callback. Note that on large result sets
     * this is very memory intensive.
     * @method withStreaming
     * @param {Boolean} [stream=true] Set whether or not to stream the results
     * @chainable
     */
    withStreaming : function(stream) {
        this.stream = stream;
        return this;
    },
    /**
     * Set whether to sort the results of a non-paginated 2i query.
     * If you are not using pagination (setting withMaxResults()) the
     * default behavior in Riak is to not sort the result set.
     * Setting this to true will sort the results before returning them.
     * __Note that this is not recommended for queries that could return a large
     * result set; the overhead in Riak is substantial. __
     * @method withPaginationSort
     * @param {Boolean} paginationSort true to sort the results of a non-paginated query
     * @chainable
     */
    withPaginationSort : function(paginationSort) {
        this.paginationSort = paginationSort;
        return this;
    },
    /**
     * Construct a SecondaryIndexQuery instance
     * @method build
     * @return {SecondaryIndexQuery}
     */
    build : function() {
        var cb = this.callback;
        delete this.callback;
        return new SecondaryIndexQuery(this, cb);
    }

};

module.exports = SecondaryIndexQuery;
module.exports.Builder = Builder;