'use strict';
var RiakObject = require('./riakobject');
var CommandBase = require('../commandbase');
var inherits = require('util').inherits;
var Joi = require('joi');
/**
* Provides the StoreValue class, its builder, and its response.
* @module KV
*/
/**
* Command used to store data in Riak.
*
* As a convenience, a builder class is provided:
*
* var storeValue = new StoreValue.Builder()
* .withBucket('myBucket')
* .withKey('myKey')
* .withContent(myObj)
* .build();
*
* See {{#crossLink "StoreValue.Builder"}}StoreValue.Builder{{/crossLink}}
*
* @class StoreValue
* @constructor
* @param {Object} options The options for this command.
* @param {String} [options.bucketType=default] The bucket type in riak. If not supplied 'default' is used.
* @param {String} options.bucket The bucket in riak.
* @param {String} [options.key] The key for the object you want to store.
* @param {Buffer} [options.vclock] The vector clock to use.
* @param {RiakObject|String|Buffer|Object} options.value The value to store in Riak. JS Objects will be passed to JSON.stringify().
* @param {Number} [options.w] The W value to use.
* @param {Number} [options.dw] The DW value to use.
* @param {Number} [options.pw] The PW value to use.
* @param {Boolean} [options.returnBody] Return the stored object and meta (incl. siblings)
* @param {Boolean} [options.returnHead] Return the metatdata only for the stored object.
* @param {Boolean} [options.convertToJs] Convert the returned value(s) to JS objects using JSON.parse()
* @param {Function} [options.conflictResolver] A function used to resolve siblings to a single object.
* @param {RiakObject[]|Object[]} options.conflictResolver.objects The array of objects returned from Riak.
* @param {Number} [options.timeout] Set a timeout for this command.
* @param {Boolean} [options.ifNotModified] The if_not_modified flag.
* @param {Boolean} [options.ifNoneMatch] The if_none_match flag.
* @param {Function} callback The allback to be executed when the operation completes.
* @param {String} callback.err An error message. Will be null if no error.
* @param {Object} callback.response The response from Riak
* @param {Buffer} callback.response.vclock The vector clock for this object (and its siblings)
* @param {Object[]|RiakObject[]} callback.response.values An array of one or more values. Either RiakObjects or JS objects if convertToJs was used.
* @param {String} callback.response.generatedKey The key generated by Riak if one was not supplied.
* @param {Object} callback.data additional error data. Will be null if no error.
* @extends CommandBase
*/
function StoreValue(options, callback) {
CommandBase.call(this, 'RpbPutReq', 'RpbPutResp', callback);
this.validateOptions(options, schema);
var hasBucket = this.options.bucket ||
(RiakObject.isRiakObject(options.value) && this.options.value.bucket);
if (!hasBucket) {
throw new Error('Must supply bucket directly or via a RiakObject');
}
}
inherits(StoreValue, CommandBase);
StoreValue.prototype.constructPbRequest = function() {
var protobuf = this.getPbReqBuilder();
// RiakObject values take precidence
var value = this.options.value;
if (RiakObject.isRiakObject(value)) {
if (value.hasOwnProperty('vclock')) {
this.options.vclock = value.vclock;
}
if (value.hasOwnProperty('bucket')) {
this.options.bucket = value.bucket;
}
if (value.hasOwnProperty('bucketType')) {
this.options.bucketType = value.bucketType;
}
if (value.hasOwnProperty('key')) {
this.options.key = value.key;
}
} else {
value = new RiakObject().setValue(this.options.value);
}
protobuf.setBucket(new Buffer(this.options.bucket));
protobuf.setType(new Buffer(this.options.bucketType));
if (this.options.key) {
protobuf.setKey(new Buffer(this.options.key));
}
protobuf.setVclock(this.options.vclock);
protobuf.setW(this.options.w);
protobuf.setDw(this.options.dw);
protobuf.setPw(this.options.pw);
protobuf.setReturnBody(this.options.returnBody);
protobuf.setReturnHead(this.options.returnHead);
protobuf.setTimeout(this.options.timeout);
protobuf.setIfNotModified(this.options.ifNotModified);
protobuf.setIfNoneMatch(this.options.ifNoneMatch);
var rpbContent = RiakObject.populateRpbContentFromRiakObject(value);
protobuf.setContent(rpbContent);
return protobuf;
};
StoreValue.prototype.onSuccess = function(rpbPutResp) {
// unless returnHead or returnMeta were specified, or if no key was supplied
// Riak doesn't return any data
var response = {};
if (rpbPutResp) {
var responseKey = null;
if (rpbPutResp.getKey()) {
responseKey = rpbPutResp.getKey().toString('utf8');
}
var values;
var vclock = null;
if (rpbPutResp.getContent().length) {
var pbContentArray = rpbPutResp.getContent();
values = new Array(pbContentArray.length);
vclock = rpbPutResp.getVclock().toBuffer();
for (var i = 0; i < pbContentArray.length; i++) {
var riakObject = RiakObject.createFromRpbContent(pbContentArray[i], this.options.convertToJs);
riakObject.vclock = vclock;
riakObject.bucket = this.options.bucket;
riakObject.bucketType = this.options.bucketType;
riakObject.key = responseKey ? responseKey : this.options.key;
values[i] = riakObject;
}
if (this.options.conflictResolver) {
values = [this.options.conflictResolver(values)];
}
} else {
values =[];
}
response = { generatedKey: responseKey, vclock: vclock, values: values };
} else {
response = { generatedKey: null, vclock: null, values: [] };
}
this._callback(null, response);
return true;
};
var schema = Joi.object().keys({
bucket: Joi.string().optional(),
bucketType: Joi.string().default('default'),
key: Joi.binary().default(null).optional(),
value: Joi.any().required(),
w: Joi.number().default(null).optional(),
dw: Joi.number().default(null).optional(),
pw: Joi.number().default(null).optional(),
returnBody: Joi.boolean().default(false).optional(),
returnHead: Joi.boolean().default(false).optional(),
timeout: Joi.number().default(null).optional(),
ifNotModified: Joi.boolean().default(false).optional(),
ifNoneMatch: Joi.boolean().default(false).optional(),
vclock: Joi.binary().default(null).optional(),
convertToJs: Joi.boolean().default(false).optional(),
conflictResolver: Joi.func().default(null).optional()
});
/**
* A builder for constructing StoreValue instances.
*
* Rather than having to manually construct the __options__ and instantiating
* a StoreValue directly, this builder may be used.
*
* var storeValue = new StoreValue.Builder()
* .withBucket('myBucket')
* .withKey('myKey')
* .withContent(myObj);
* .build();
*
* @class StoreValue.Builder
* @constructor
*/
function Builder() {}
Builder.prototype = {
/**
* Set the bucket.
* Note that this can also be provided via RiakObject passed in via withContent() and
* doing so will take precidence over this method.
* @method withBucket
* @param {String} bucket the bucket in Riak
* @chainable
*/
withBucket : function(bucket) {
this.bucket = bucket;
return this;
},
/**
* Set the bucket type.
* If not supplied, 'default' is used.
* Note that this can also be provided via RiakObject passed in via withContent() and
* doing so will take precidence over this method.
* @method withBucketType
* @param {String} bucketType the bucket type in riak
* @chainable
*/
withBucketType : function(bucketType) {
this.bucketType = bucketType;
return this;
},
/**
* Set the key.
* If not set, riak will generate and return a key.
* Note that this can also be provided via RiakObject passed in via withContent() and
* doing so will take precidence over this method.
* @method withKey
* @param {String} key the key in riak.
* @chainable
*/
withKey : function(key) {
this.key = key;
return this;
},
/**
* Set the value and its metadata to be stored in Riak.
* If a JS object is supplied, it will be converted to JSON
* using JSON.stringify()
* @method withContent
* @param {RiakObject|String|Buffer|Object} value the data to store in Riak
* @chainable
*/
withContent : function(value) {
this.value = value;
return this;
},
/**
* Set the vector clock.
* Convenience method if a RiakObject is not supplied as content.
* Note that a vclock supplied via RiakObject in withContent() will have precendence over this.
* @method withVClock
* @param {Buffer} vclock a vector clock returned from a previous fetch
*/
withVClock : function(vclock) {
this.vclock = vclock;
return this;
},
/**
* Set the W value.
* How many replicas to write to before returning a successful response.
* If not set the bucket default is used.
* @method withW
* @param {number} w the W value.
* @chainable
*/
withW : function(w) {
this.w = w ;
return this;
},
/**
* Set the DW value.
* How many replicas to commit to durable storage before returning a successful response.
* If not set the bucket default is used.
* @method withDw
* @param {number} dw the DW value.
* @chainable
*/
withDw : function(dw) {
this.dw = dw;
return this;
},
/**
* Set the PW value.
* How many primary nodes must be up when the write is attempted.
* If not set the bucket default is used.
* @method withPw
* @param {number} pw the PW value.
* @chainable
*/
withPw : function(pw) {
this.pw = pw;
return this;
},
/**
* Return the object after storing (including any siblings).
* If siblings are present, an optional conflictResolver can be
* provided to resolve them.
* @method withReturnBody
* @param {boolean} returnBody true to return the object.
* @param {boolean} [convertToJs] convert the returned value(s) to JS objects using JSON.parse()
* @param {Function} [conflictResolver] the conflict resolver
* @param {RiakObject[]} conflictResolver.riakobjects array of RiakObjects returned by Riak.
* @chainable
*/
withReturnBody: function(returnBody, convertToJs, conflictResolver) {
this.returnBody = arguments[0];
for (var i = 1; i < arguments.length; i++) {
var arg = arguments[i];
if (typeof arg === 'boolean') {
this.convertToJs = arg;
} else {
this.conflictResolver = arg;
}
}
return this;
},
/**
* Return the metadata after storing the value.
*
* Causes Riak to only return the metadata for the object.
* If siblings are present, an optional conflictResolver can be
* provided to resolve them.
* @method withReturnHead
* @param {Function} [conflictResolver] the conflict resolver
* @param {RiakObject[]} conflictResolver.riakobjects array of RiakObjects returned by Riak.
* @param {boolean} returnHead true to return only metadata.
* @chainable
*/
withReturnHead : function(returnHead, conflictResolver) {
this.returnHead = returnHead;
this.conflictResolver = conflictResolver;
return this;
},
/**
* Set the if_not_modified flag.
*
* Setting this to true means to store the value only if the
* supplied vclock matches the one in the database.
*
* Be aware there are several cases where this may not actually happen.
* Use of this feature is discouraged.
* @method withIfNotModified
* @param {Boolean} ifNotModified the if_not_modified value.
* @chainable
*/
withIfNotModified : function(ifNotModified) {
this.ifNotModified = ifNotModified;
return this;
},
/**
* Set the if_none_match flag.
*
* Setting this to true means store the value only if this
* bucket/key combination are not already defined.
*
* Be aware that there are several cases where
* this may not actually happen. Use of this option is discouraged.
* @method withIfNoneMatch
* @param {boolean} ifNoneMatch the if_non-match value.
* @chainable
*/
withIfNoneMatch : function(ifNoneMatch) {
this.ifNoneMatch = ifNoneMatch;
return this;
},
/**
* Set a timeout for this operation.
* @method withTimeout
* @param {number} timeout a timeout in milliseconds.
* @chainable
*/
withTimeout : function(timeout) {
this.timeout = timeout;
return this;
},
/**
* Set the callback to be executed when the operation completes.
* @method withCallback
* @param {Function} callback The allback to be executed when the operation completes.
* @param {String} callback.err An error message. Will be null if no error.
* @param {Object} callback.response The response from Riak
* @param {Buffer} callback.response.vclock The vector clock for this object (and its siblings)
* @param {Object[]|RiakObject[]} callback.response.values An array of one or more values. Either RiakObjects or JS objects if convertToJs was used.
* @param {String} callback.response.generatedKey The key generated by Riak if one was not supplied.
* @chainable
*/
withCallback : function(callback) {
this.callback = callback;
return this;
},
/**
* Construct a StoreValue instance.
* @method build
* @return {StoreValue} a StoreValue instance
*/
build : function() {
var cb = this.callback;
delete this.callback;
return new StoreValue(this, cb);
}
};
module.exports = StoreValue;
module.exports.Builder = Builder;