public class MultiFetchObject<T> extends Object implements RiakOperation<List<MultiFetchFuture<T>>>
Use the Bucket.multiFetch(java.lang.String[])
,
Bucket.multiFetch(java.util.List, java.lang.Class)
,
or Bucket.multiFetch(java.util.List, java.lang.Class)
methods to create a mutli-fetch operation.
Riak itself does not support pipelining of requests. The MutliFetchObject addresses this issue by using a threadpool to parallelize a set of fetch operations for a given set of keys.
The result of calling execute()
is a List
of MultiFetchFuture
objects each one representing a fetch operation. The simplest use would be a loop where
you iterate through and wait for them to complete:
List<MultiFetchFuture<MyPojo>> futures = bucket.multiFetch(keys).execute();
List<MyPojo> myResults = new ArrayList<MyPojo>();
for (MultiFetchFuture<MyPojo> f : futures)
{
try
{
MyPojo mp = f.get();
myResults.add(mp);
}
catch (ExecutionException e)
{
// log error, etc.
}
}
Thread Pool:
The internal ThreadPoolExecutor
is static; all multi-fetch operations
performed by a single instance of the client use the same pool. This is to prevent resource
starvation in the case of multiple simultaneous multi-fetch operations. Idle threads
(including core threads) are timed out after 5 seconds.
The defaults for corePoolSize
is determined by the Java
Runtime using:
Runtime.getRuntime().availableProcessors() * 2;
Advanced users can tune this via the setCorePoolSize(int)
method; this is passed directly to its counterpart in the
ThreadPoolExecutor
. The queue feeding the threadpool
is unbounded therefore ThreadPoolExecutor.setMaximumPoolSize(int)
has no effect and is simply set to match.
Be aware that because requests are being parallelized performance is also dependent on the client's underlying connection pool. If there are no connections available performance will suffer initially as connections will need to be established or worse they could time out.
Bucket
,
RiakFactory
Modifier and Type | Field and Description |
---|---|
static int |
DEFAULT_POOL_MAX_SIZE
The initial value for both corePoolSize and maximumPoolSize.
|
Constructor and Description |
---|
MultiFetchObject(RawClient client,
String bucket,
List<String> keys,
Retrier retrier)
Create a new MultiFetchOperation that delegates to the given
client to fetch the data from bucket for
keys using retrier |
Modifier and Type | Method and Description |
---|---|
MultiFetchObject<T> |
basicQuorum(boolean basicQuorum) |
List<MultiFetchFuture<T>> |
execute()
Attempts to fetch the data for all the keys, convert it with
Converter and resolve any siblings with ConflictResolver |
static int |
getCorePoolSize()
Returns the core number of threads from the internal
ThreadPoolExecutor |
MultiFetchObject<T> |
headOnly()
Causes the client to retrieve only the metadata and not the value
of this object.
|
MultiFetchObject<T> |
modifiedSince(Date modifiedSince)
*NOTE* HTTP Only.
|
MultiFetchObject<T> |
notFoundOK(boolean notFoundOK) |
MultiFetchObject<T> |
pr(int pr) |
MultiFetchObject<T> |
pr(Quora pr) |
MultiFetchObject<T> |
pr(Quorum pr) |
MultiFetchObject<T> |
r(int r)
The read quorum for this fetch operation
|
MultiFetchObject<T> |
r(Quora r)
The read quorum for this fetch operation
|
MultiFetchObject<T> |
r(Quorum r)
The read quorum for this fetch operation
|
MultiFetchObject<T> |
returnDeletedVClock(boolean returnDeletedVClock) |
static void |
setCorePoolSize(int size)
Sets the core number of threads in the internal
ThreadPoolExecutor . |
MultiFetchObject<T> |
timeout(int timeout)
Set an operation timeout in milliseconds to be sent to Riak
As of 1.4 Riak allows a timeout to be sent for get, put, and delete operations.
|
MultiFetchObject<T> |
withConverter(Converter<T> converter)
A
Converter to use to convert the data fetched to some other type |
MultiFetchObject<T> |
withResolver(ConflictResolver<T> resolver)
Sets the
ConflictResolver to use for this multi-fetch operation. |
MultiFetchObject<T> |
withRetrier(Retrier retrier)
A
Retrier to use |
public static final int DEFAULT_POOL_MAX_SIZE
Runtime.getRuntime().availableProcessors() * 2;
ThreadPoolExecutor
public List<MultiFetchFuture<T>> execute()
Converter
and resolve any siblings with ConflictResolver
execute
in interface RiakOperation<List<MultiFetchFuture<T>>>
MultiFetchFuture
objects.public static void setCorePoolSize(int size)
ThreadPoolExecutor
.size
- - the new core sizeThreadPoolExecutor.setCorePoolSize(int)
public static int getCorePoolSize()
ThreadPoolExecutor
ThreadPoolExecutor.getCorePoolSize()
public MultiFetchObject<T> withResolver(ConflictResolver<T> resolver)
ConflictResolver
to use for this multi-fetch operation.resolver
- public MultiFetchObject<T> r(int r)
r
- an Integer for the read quorumpublic MultiFetchObject<T> r(Quora r)
r
- an Quora for the read quorumpublic MultiFetchObject<T> r(Quorum r)
r
- an Quorum for the read quorumpublic MultiFetchObject<T> pr(int pr)
pr
- FetchMeta.Builder.pr(int)
public MultiFetchObject<T> pr(Quora pr)
pr
- FetchMeta.Builder.pr(Quora)
public MultiFetchObject<T> pr(Quorum pr)
pr
- FetchMeta.Builder.pr(Quora)
public MultiFetchObject<T> notFoundOK(boolean notFoundOK)
notFoundOK
- FetchMeta.Builder.notFoundOK(boolean)
public MultiFetchObject<T> basicQuorum(boolean basicQuorum)
basicQuorum
- FetchMeta.Builder.basicQuorum(boolean)
public MultiFetchObject<T> timeout(int timeout)
timeout
- the timeout in millisecondspublic MultiFetchObject<T> returnDeletedVClock(boolean returnDeletedVClock)
returnDeletedVClock
- FetchMeta.Builder.returnDeletedVClock(boolean)
public MultiFetchObject<T> modifiedSince(Date modifiedSince)
modifiedSince
- a last modified date.public MultiFetchObject<T> headOnly()
Converter
being used must be able to handle an empty
value.FetchMeta.Builder.headOnly(boolean headOnly)
public MultiFetchObject<T> withConverter(Converter<T> converter)
Converter
to use to convert the data fetched to some other typeconverter
- public MultiFetchObject<T> withRetrier(Retrier retrier)
Retrier
to useretrier
- Copyright © 2013. All Rights Reserved.