Class AbstractPipeliningExecutor<O,P>
- java.lang.Object
-
- java.util.concurrent.AbstractExecutorService
-
- java.util.concurrent.ThreadPoolExecutor
-
- de.aristaflow.adept2.util.threading.AFThreadPoolExecutor
-
- de.aristaflow.adept2.util.threading.CachedThreadPoolExecutor
-
- de.aristaflow.adept2.util.threading.executor.AbstractObjectSpecificExecutor<O,P,Queue<Runnable>>
-
- de.aristaflow.adept2.util.threading.executor.AbstractPipeliningExecutor<O,P>
-
- Type Parameters:
O
- The type of object for which work is registered.P
- The type identifying a pipeline. This can be the same as <O> or an appropriate hash value for the registered object.
- All Implemented Interfaces:
Executor
,ExecutorService
- Direct Known Subclasses:
LimitedPipelineExecutor
,UnlimitedPipelineExecutor
public abstract class AbstractPipeliningExecutor<O,P> extends AbstractObjectSpecificExecutor<O,P,Queue<Runnable>>
A pipelining executor handles work in pipelines. That is, registered work is assigned to a specific pipeline. The work in each pipeline will be executed in the order it arrived while the work of different pipelines will be executed in parallel (or at least concurrently).
How the work of an object is assigned to a pipeline depends on the hashing that creates the index objects. This allows to set a fixed amount of pipelines and distribute work according to the registered objects on the available pipelines.As object-specific executor, the indexed objects are the pipelines while the runtime data are queues of runnables which are part of the corresponding pipeline that need to be executed. There is exactly one thread in the pool per pipeline.
Note that the assignment of pipelines to threads is not fixed. As long as there is work in a pipeline, it will be executed by the same thread. As soon as a pipeline is empty, new work in this pipeline may be done by a different thread.
-
-
Nested Class Summary
-
Nested classes/interfaces inherited from class java.util.concurrent.ThreadPoolExecutor
ThreadPoolExecutor.AbortPolicy, ThreadPoolExecutor.CallerRunsPolicy, ThreadPoolExecutor.DiscardOldestPolicy, ThreadPoolExecutor.DiscardPolicy
-
-
Field Summary
-
Fields inherited from class de.aristaflow.adept2.util.threading.executor.AbstractObjectSpecificExecutor
abortCount, flushPending, logger, objects, shutdownPending
-
Fields inherited from class de.aristaflow.adept2.util.threading.CachedThreadPoolExecutor
fullSize, realCoreSize, realMaxSize, workSize
-
Fields inherited from class de.aristaflow.adept2.util.threading.AFThreadPoolExecutor
activeThreads, completedTasks, HYSTERESIS, submittedTasks
-
-
Constructor Summary
Constructors Modifier Constructor Description protected
AbstractPipeliningExecutor(int maxPoolSize, Adept2ThreadFactory threadFactory)
Creates a new thread pool executor with the designated maximum amount of threads for different objects using the designate thread factory for creating threads.protected
AbstractPipeliningExecutor(Adept2ThreadFactory threadFactory)
Deprecated, for removal: This API element is subject to removal in a future version.UseAbstractPipeliningExecutor(int, Adept2ThreadFactory)
instead.
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description protected ObjectRunnable<O,P>
addRunnable(O registeredObject, P indexObject, Runnable task)
Adds the designated runnable for the designated object and index object to this executor.protected <T> Pair<Future<T>,Map<P,Queue<Runnable>>>
cancelAndFlush(boolean shutdownRequest, Map<P,Queue<Runnable>> pending, Callable<T> task, Collection<ObjectRunnable<O,P>> objectRunnables)
Cancels the designated pending work (if appropriate), flushes (i. e. waits for or aborts the designated runnables) and executes the designated task.protected void
clearAbortedPendingWork(boolean shutdownRequest)
Removes all pending work that should be aborted for an aborting flush, that is, it removes the from the data structures, e. g.protected PipelineRunnable<O,P>
createPipelineRunnable(O object, P pipelineId, Queue<Runnable> pipelines)
Creates a new pipeline runnable.protected void
executeShutdown(Runnable shutdown)
Executes the designated shutdown task which contains a flush callingThreadPoolExecutor.shutdown()
without abortion.protected List<Runnable>
getOutstandingRunnables(Map<P,Queue<Runnable>> outstandingWork)
Gets a list of runnables which have not been executed based on the designated outstanding work.-
Methods inherited from class de.aristaflow.adept2.util.threading.executor.AbstractObjectSpecificExecutor
execute, flushAndExecute, flushAndExecuteUnchecked, getIndexObjectFor, isShutdown, restart, restartPendingWork, shutdown, shutdownNow, shutdownRunnableRun, submit, submit, submitAndFlush
-
Methods inherited from class de.aristaflow.adept2.util.threading.CachedThreadPoolExecutor
afterExecute, execute
-
Methods inherited from class de.aristaflow.adept2.util.threading.AFThreadPoolExecutor
beforeExecute, logPoolStatistics, newTaskFor, newTaskFor, recalculateLogLimits, setCorePoolSize, setLogExecuteCallStack, setMaximumPoolSize, updateQueueLimit
-
Methods inherited from class java.util.concurrent.ThreadPoolExecutor
allowCoreThreadTimeOut, allowsCoreThreadTimeOut, awaitTermination, finalize, getActiveCount, getCompletedTaskCount, getCorePoolSize, getKeepAliveTime, getLargestPoolSize, getMaximumPoolSize, getPoolSize, getQueue, getRejectedExecutionHandler, getTaskCount, getThreadFactory, isTerminated, isTerminating, prestartAllCoreThreads, prestartCoreThread, purge, remove, setKeepAliveTime, setRejectedExecutionHandler, setThreadFactory, terminated, toString
-
-
-
-
Constructor Detail
-
AbstractPipeliningExecutor
@Deprecated(since="14.4.0", forRemoval=true) protected AbstractPipeliningExecutor(Adept2ThreadFactory threadFactory)
Deprecated, for removal: This API element is subject to removal in a future version.UseAbstractPipeliningExecutor(int, Adept2ThreadFactory)
instead.Creates a new thread pool executor with an unlimited amount of pipelines for different objects using the designated thread factory for creating threads. The threads will stay idle for 60 seconds before they terminate.- Parameters:
threadFactory
- The factory to use when the executor creates a new thread.
-
AbstractPipeliningExecutor
protected AbstractPipeliningExecutor(int maxPoolSize, Adept2ThreadFactory threadFactory)
Creates a new thread pool executor with the designated maximum amount of threads for different objects using the designate thread factory for creating threads. The threads will stay idle for 60 seconds before they terminate.- Parameters:
maxPoolSize
- The maximum number of threads allowed in the pool. Set this to the number of pipelines or less.threadFactory
- The factory to use when the executor creates a new thread.
-
-
Method Detail
-
addRunnable
protected ObjectRunnable<O,P> addRunnable(O registeredObject, P indexObject, Runnable task)
Description copied from class:AbstractObjectSpecificExecutor
Adds the designated runnable for the designated object and index object to this executor.
Implementations may store the work internally for later execution or assign it to an already existing active element (usually a thread). If they need a new active element for the designated runnable, they have to return a new runnable which will then be added to this executor using the normalCachedThreadPoolExecutor.execute(Runnable)
. If they returnnull
, they have to assign the runnable to an active element eventually and arbitrarily.The caller has the lock on
objects
.- Specified by:
addRunnable
in classAbstractObjectSpecificExecutor<O,P,Queue<Runnable>>
- Parameters:
registeredObject
- The object for which this runnable has been registered.indexObject
- The object with which this runnable is managed and executed within the corresponding object-specific executor service.task
- The task which to execute.- Returns:
- A runnable which to add to the executor so that this gets a new active element
(thread). This will be handled via
CachedThreadPoolExecutor.execute(Runnable)
, which means the runnable will be executed independently from other work and a new thread may be started for it.
-
createPipelineRunnable
protected PipelineRunnable<O,P> createPipelineRunnable(O object, P pipelineId, Queue<Runnable> pipelines)
Creates a new pipeline runnable. This method allows subclasses to create more specific runnables.- Parameters:
object
- The object for which to create a pipeline runnable.pipelineId
- The identifier of the pipeline.pipelines
- The data structures containing all pipelines from which the corresponding pipeline will be removed as soon as it is empty.- Returns:
- A new runnable for the designated object and pipeline.
-
clearAbortedPendingWork
protected void clearAbortedPendingWork(boolean shutdownRequest)
Description copied from class:AbstractObjectSpecificExecutor
Removes all pending work that should be aborted for an aborting flush, that is, it removes the from the data structures, e. g.objects
. This method only affects data structures, it does not affect any executed work. The caller will take care of this shortly.The caller has the lock on
objects
.- Specified by:
clearAbortedPendingWork
in classAbstractObjectSpecificExecutor<O,P,Queue<Runnable>>
- Parameters:
shutdownRequest
- Whether this flush is caused by aAbstractObjectSpecificExecutor.shutdownNow()
. In this case all work has to be aborted, even the work pending after a flush.
-
cancelAndFlush
protected <T> Pair<Future<T>,Map<P,Queue<Runnable>>> cancelAndFlush(boolean shutdownRequest, Map<P,Queue<Runnable>> pending, Callable<T> task, Collection<ObjectRunnable<O,P>> objectRunnables)
Description copied from class:AbstractObjectSpecificExecutor
Cancels the designated pending work (if appropriate), flushes (i. e. waits for or aborts the designated runnables) and executes the designated task.The caller has the lock on
objects
.- Specified by:
cancelAndFlush
in classAbstractObjectSpecificExecutor<O,P,Queue<Runnable>>
- Parameters:
shutdownRequest
- Whether this flush is caused by aAbstractObjectSpecificExecutor.shutdownNow()
. In this case all work has to be aborted, even the work pending after a flush.pending
- The pending work that will not be executed due to abort ornull
if running work should not be aborted.task
- The task which to execute without any object work running.objectRunnables
- The currently running work. This will either be aborted or awaited for flushing.- Returns:
- A pair containing the future representing the designated task and as second element
either
null
in case no abortion is requested otherwise the managed objects and the runtime data of the aborted work. Note that the runtime data may contain work that is not cancelled by the flush. This depends on the runtime data.
-
executeShutdown
protected void executeShutdown(Runnable shutdown)
Description copied from class:AbstractObjectSpecificExecutor
Executes the designated shutdown task which contains a flush callingThreadPoolExecutor.shutdown()
without abortion.
This method is only called if there is pending work. So the implementation may decide on how and when to execute the shutdown runnable.The caller has the lock on
objects
.- Specified by:
executeShutdown
in classAbstractObjectSpecificExecutor<O,P,Queue<Runnable>>
- Parameters:
shutdown
- The task which to perform to shutdown.
-
getOutstandingRunnables
protected List<Runnable> getOutstandingRunnables(Map<P,Queue<Runnable>> outstandingWork)
Description copied from class:AbstractObjectSpecificExecutor
Gets a list of runnables which have not been executed based on the designated outstanding work.
This method will be called after an immediate shutdown which aborts pending work. Depending on the implementation, the designated runtime data may contain runnables which have not been executed and should therefore be returned to conform to the interface of an executor service.
The caller does not have the lock onobjects
.- Specified by:
getOutstandingRunnables
in classAbstractObjectSpecificExecutor<O,P,Queue<Runnable>>
- Parameters:
outstandingWork
- The pending work that has been aborted due to the immediate shutdown.- Returns:
- A list of runnables stemming from the designated outstanding work.
-
-