@Service(value="blDatabaseSystemEventNode") @ManagedResource(objectName="org.broadleafcommerce:name=DatabaseSystemEventNode", description="Database System Event Node", currencyTimeLimit=15) public class DatabaseSystemEventNodeImpl extends Object implements DatabaseSystemEventNode, org.springframework.context.ApplicationContextAware, org.springframework.context.SmartLifecycle
Modifier and Type | Field and Description |
---|---|
protected CacheInvalidationHelper |
cacheInvalidationHelper |
protected ConsumeWorker |
consumeWorker |
protected org.springframework.context.ApplicationContext |
context |
protected ConsumeWorker |
durableConsumeWorker |
protected int |
durableEventsPerPoll
Deprecated.
Not used internally
|
protected org.quartz.Scheduler |
durableScheduler |
protected Map<String,List<SystemEventConsumer>> |
eventConsumers |
protected boolean |
eventConsumptionEnabled
Whether or not the schedulers for event consumption are intialized at startup.
|
protected EventLockUtil |
eventLockUtil |
protected int |
eventNodeLogInterval
The interval for DEBUG level logging in this class.
|
protected int |
eventsPerPoll
The number of
EventScopeType.VM events to retrieve from the datastore and queue up for consumption during
a polling attempt (see pollingIntervalSeconds ). |
protected EventWorkerType |
eventWorkerType |
protected ConsumeWorker |
globalConsumeWorker |
protected int |
globalEventsPerPoll
The number of
EventScopeType.GLOBAL events to retrieve from the datastore and queue up for consumption during a polling attempt
(globalPollingIntervalSeconds ). |
protected long |
globalJobThreadPoolCount
The quantity of threads use to consume
EventScopeType.GLOBAL events. |
protected long |
globalPollingIntervalSeconds
Period of time between attempts to retrieve a new batch of
EventScopeType.GLOBAL events
from the database. |
protected org.quartz.Scheduler |
globalScheduler |
protected long |
jobThreadPoolCount
The quantity of threads used to consume
EventScopeType.VM events. |
protected org.quartz.Scheduler |
keepAliveScheduler |
protected ConsumeWorker |
keepAliveWorker |
protected long |
missedDurableEventsPollingThresholdSeconds
The maximum age of a durable events (in seconds) that a node will consume.
|
protected boolean |
pause |
protected long |
pollingIntervalSeconds
Period of time between attempts to retrieve a new batch of
EventScopeType.VM events
from the database. |
protected ConsumeWorker |
priorityConsumeWorker |
protected int |
priorityEventsPerPoll
The number of priority events to retrieve from the datastore and queue up for consumption during a polling attempt
(
priorityPollingIntervalSeconds ). |
protected long |
priorityJobThreadPoolCount
The quantity of threads used to consume priority events.
|
protected long |
priorityPollingIntervalSeconds
Period of time between attempts to retrieve a new batch of priority events
from the database.
|
protected org.quartz.Scheduler |
priorityScheduler |
protected ProcessMonitorStatusManager |
processMonitorStatusManager |
protected ProcessStatusManager |
processStatusManager |
protected org.quartz.Scheduler |
scheduler |
protected SystemEventCompressionHelper |
systemEventCompressionHelper |
protected DatabaseSystemEventDao |
systemEventDao |
protected SystemEventSender |
systemEventSender |
protected org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor |
taskExecutor |
protected List<ScheduledJobThreadInitializer> |
threadInitializers |
protected org.broadleafcommerce.common.util.StreamingTransactionCapableUtil |
transUtil |
protected List<SystemEventConsumer> |
unsortedConsumers |
Constructor and Description |
---|
DatabaseSystemEventNodeImpl() |
Modifier and Type | Method and Description |
---|---|
protected void |
asyncExecuteEvent(boolean durable,
QueueStatusThreadPoolTaskExecutor executor,
Map<String,List<SystemEventConsumer>> consumers,
SystemEvent event) |
protected void |
buildEventScheduler() |
protected void |
clearLog() |
void |
consumeLocalEvent(SystemEvent event)
Cause asynchronous consumption of this event to occur in the same JVM, rather than provide to event to the
cluster.
|
EventWorkerType |
determineHostWorkerType()
Determine if this is an ADMIN or SITE node
|
protected void |
executeEvent(SystemEventConsumer consumer,
SystemEvent lockedEvent) |
void |
executeEvents(List<SystemEvent> events,
boolean durable,
QueueStatusThreadPoolTaskExecutor executor)
Iterate through a list of events and provide those events to the appropriate
SystemEventConsumer
instances. |
int |
getDurableEventsPerPoll()
The number of events of type DURABLE_GLOBAL to retrieve in each poll.
|
Map<String,List<SystemEventConsumer>> |
getEventConsumers()
All the
SystemEventConsumer instances registered in Spring. |
int |
getEventsPerPoll()
The number of
EventScopeType.VM events to retrieve in each poll. |
int |
getGlobalEventsPerPoll()
The number of
EventScopeType.GLOBAL events to retrieve in each poll. |
long |
getGlobalJobThreadPoolCount()
The number of threads actively polling for available
EventScopeType.GLOBAL events. |
long |
getGlobalPollingIntervalSeconds() |
protected String |
getHostIpAddress()
Retrieve the ip address for the current host.
|
long |
getJobThreadPoolCount()
The number of threads actively polling for available
EventScopeType.VM events. |
long |
getMissedDurableEventsPollingThresholdSeconds()
The maximum amount of time into the past that durable events should be retrieved.
|
String |
getNodeId() |
RegisteredNode |
getNodeRegistration()
Get the
RegisteredNode associated. |
int |
getPhase() |
long |
getPollingIntervalSeconds()
The wait time between each polling attempt for available events.
|
int |
getPriorityEventsPerPoll()
The number of priority events to retrieve in each poll.
|
long |
getPriorityJobThreadPoolCount()
The number of threads actively polling for priority events (
EventScopeType.VM or EventScopeType.GLOBAL ). |
long |
getPriorityPollingIntervalSeconds() |
DatabaseSystemEventDao |
getSystemEventDao() |
protected void |
incrementLog() |
void |
invalidateCacheElement(String entityClass,
String id) |
void |
invalidateCacheElement(String entityClass,
String property,
String id)
Utility method for initiating a cache invalidation event.
|
void |
invalidateCacheRegion(String region) |
boolean |
isAutoStartup() |
boolean |
isCreated() |
protected boolean |
isDebugEnabled() |
protected boolean |
isDebugEnabled(boolean checkInterval) |
protected boolean |
isLogEnabled(int intervalCount) |
boolean |
isPause()
Whether or not this node is paused.
|
boolean |
isRunning() |
protected void |
removeUniversalDuplicates(List<SystemEvent> events) |
void |
setApplicationContext(org.springframework.context.ApplicationContext applicationContext) |
void |
setDurableEventsPerPoll(int durableEventsPerPoll) |
void |
setEventConsumers(Map<String,List<SystemEventConsumer>> eventConsumers) |
void |
setEventsPerPoll(int eventsPerPoll) |
void |
setGlobalEventsPerPoll(int globalEventsPerPoll) |
void |
setGlobalJobThreadPoolCount(long globalJobThreadPoolCount) |
void |
setGlobalPollingIntervalSeconds(long globalPollingIntervalSeconds) |
void |
setJobThreadPoolCount(long jobThreadPoolCount) |
void |
setMissedDurableEventsPollingThresholdSeconds(long missedDurableEventsPollingThresholdSeconds) |
void |
setPause(boolean pause) |
void |
setPollingIntervalSeconds(long pollingIntervalSeconds) |
void |
setPriorityEventsPerPoll(int priorityEventsPerPoll) |
void |
setPriorityJobThreadPoolCount(long priorityJobThreadPoolCount) |
void |
setPriorityPollingIntervalSeconds(long priorityPollingIntervalSeconds) |
void |
start() |
void |
stop() |
void |
stop(Runnable callback) |
@Value(value="${database.event.polling.interval.seconds:5}") protected long pollingIntervalSeconds
EventScopeType.VM
events
from the database. If the last batch of events has not been depleted yet by the configured
worker threads (see jobThreadPoolCount
) at the time of polling, the current polling round is skipped.
Default is 5. Can be set via property ${database.event.polling.interval.seconds}.ConsumeWorkerImpl
@Value(value="${database.event.quantity.per.poll:5}") protected int eventsPerPoll
EventScopeType.VM
events to retrieve from the datastore and queue up for consumption during
a polling attempt (see pollingIntervalSeconds
).
Default is 5. Can be set via property ${database.event.quantity.per.poll}.ConsumeWorkerImpl
@Value(value="${database.priority.event.polling.interval.seconds:5}") protected long priorityPollingIntervalSeconds
priorityJobThreadPoolCount
) at the time of polling, the current polling round is skipped.
See PriorityConsumeWorkerImpl
for more information on priority events.
Default is 5. Can be set via property ${database.priority.event.polling.interval.seconds}.PriorityConsumeWorkerImpl
@Value(value="${database.priority.event.quantity.per.poll:20}") protected int priorityEventsPerPoll
priorityPollingIntervalSeconds
). See PriorityConsumeWorkerImpl
for more information on priority events.
Default is 20. Can be set via property ${database.priority.event.quantity.per.poll}.PriorityConsumeWorkerImpl
@Value(value="${database.global.event.polling.interval.seconds:5}") protected long globalPollingIntervalSeconds
EventScopeType.GLOBAL
events
from the database. If the last batch of events has not been depleted yet by the configured
worker threads (see globalJobThreadPoolCount
) at the time of polling, the current polling round is skipped.
See GlobalConsumeWorkerImpl
for more information.
Default is 5. Can be set via property ${database.global.event.polling.interval.seconds}.GlobalConsumeWorkerImpl
@Value(value="${database.global.event.quantity.per.poll:10}") protected int globalEventsPerPoll
EventScopeType.GLOBAL
events to retrieve from the datastore and queue up for consumption during a polling attempt
(globalPollingIntervalSeconds
). See GlobalConsumeWorkerImpl
for more information.
Default is 10. Can be set via property ${database.global.event.quantity.per.poll}.GlobalConsumeWorkerImpl
@Deprecated @Value(value="${database.durable.event.quantity.per.poll:250}") protected int durableEventsPerPoll
@Value(value="${database.missed.durable.event.polling.threshold.seconds:604800}") protected long missedDurableEventsPollingThresholdSeconds
@Value(value="${database.event.consumption.enabled:true}") protected boolean eventConsumptionEnabled
@Value(value="${database.event.node.log.interval:-1}") protected int eventNodeLogInterval
@Value(value="${database.job.thread.pool.count:2}") protected long jobThreadPoolCount
EventScopeType.VM
events. This can be increased to increase
potential throughput via multiple, concurrent threads consuming events, at the possible cost of additional CPU
usage. Since consuming events is often not the primary work of a given node in the cluster, it is important
to throttle event consumption appropriately to not interfere with the nodes primary purpose.
The default is 2. Can be set via property ${database.job.thread.pool.count}.
See eventsPerPoll
and pollingIntervalSeconds
for other properties that can be used to affect
throughput, possibly without changing the job thread pool count.@Value(value="${database.priority.job.thread.pool.count:4}") protected long priorityJobThreadPoolCount
SystemEvent.getPriority()
value is less than 0.
This property can be increased to increase potential throughput via multiple, concurrent threads consuming events,
at the possible cost of additional CPU usage. Since consuming events is often not the primary work of a given node
in the cluster, it is important to throttle event consumption appropriately to not interfere with the nodes
primary purpose.
The default is 4. Can be set via property ${database.priority.job.thread.pool.count}.
See priorityEventsPerPoll
and priorityPollingIntervalSeconds
for other properties that can be
used to affect throughput, possibly without changing the job thread pool count.@Value(value="${database.global.job.thread.pool.count:3}") protected long globalJobThreadPoolCount
EventScopeType.GLOBAL
events. This can be increased to increase
potential throughput via multiple, concurrent threads consuming events, at the possible cost of additional CPU
usage. Since consuming events is often not the primary work of a given node in the cluster, it is important
to throttle event consumption appropriately to not interfere with the nodes primary purpose.
The default is 3. Can be set via property ${database.global.job.thread.pool.count}.
See eventsPerPoll
and pollingIntervalSeconds
for other properties that can be used to affect
throughput, possibly without changing the job thread pool count.protected org.broadleafcommerce.common.util.StreamingTransactionCapableUtil transUtil
protected DatabaseSystemEventDao systemEventDao
protected ProcessStatusManager processStatusManager
protected SystemEventCompressionHelper systemEventCompressionHelper
protected ProcessMonitorStatusManager processMonitorStatusManager
protected ConsumeWorker consumeWorker
protected ConsumeWorker priorityConsumeWorker
protected ConsumeWorker globalConsumeWorker
protected ConsumeWorker durableConsumeWorker
protected ConsumeWorker keepAliveWorker
protected SystemEventSender systemEventSender
protected CacheInvalidationHelper cacheInvalidationHelper
protected org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor taskExecutor
protected EventLockUtil eventLockUtil
protected List<ScheduledJobThreadInitializer> threadInitializers
protected List<SystemEventConsumer> unsortedConsumers
protected Map<String,List<SystemEventConsumer>> eventConsumers
protected org.quartz.Scheduler scheduler
protected org.quartz.Scheduler priorityScheduler
protected org.quartz.Scheduler globalScheduler
protected org.quartz.Scheduler durableScheduler
protected org.quartz.Scheduler keepAliveScheduler
protected boolean pause
protected EventWorkerType eventWorkerType
protected org.springframework.context.ApplicationContext context
public boolean isAutoStartup()
isAutoStartup
in interface org.springframework.context.SmartLifecycle
public void stop(Runnable callback)
stop
in interface org.springframework.context.SmartLifecycle
public void start()
start
in interface org.springframework.context.Lifecycle
public boolean isRunning()
isRunning
in interface org.springframework.context.Lifecycle
public int getPhase()
getPhase
in interface org.springframework.context.Phased
getPhase
in interface org.springframework.context.SmartLifecycle
public void consumeLocalEvent(SystemEvent event)
DatabaseSystemEventNode
consumeLocalEvent
in interface DatabaseSystemEventNode
public void stop()
stop
in interface org.springframework.context.Lifecycle
public void setApplicationContext(org.springframework.context.ApplicationContext applicationContext) throws org.springframework.beans.BeansException
setApplicationContext
in interface org.springframework.context.ApplicationContextAware
org.springframework.beans.BeansException
public EventWorkerType determineHostWorkerType()
DatabaseSystemEventNode
determineHostWorkerType
in interface DatabaseSystemEventNode
public void executeEvents(List<SystemEvent> events, boolean durable, QueueStatusThreadPoolTaskExecutor executor)
DatabaseSystemEventNode
SystemEventConsumer
instances.executeEvents
in interface DatabaseSystemEventNode
durable
- Whether or not these events are durable (i.e. GLOBAL_DURABLE)public RegisteredNode getNodeRegistration()
DatabaseSystemEventNode
RegisteredNode
associated. Every instance of DatabaseSystemEventNode
creates a registration in the database.getNodeRegistration
in interface DatabaseSystemEventNode
@ManagedAttribute(description="Whether or not event processing is currently paused.", currencyTimeLimit=15) public boolean isPause()
DatabaseSystemEventNode
isPause
in interface DatabaseSystemEventNode
@ManagedAttribute(description="Whether or not event processing is currently paused.", currencyTimeLimit=15) public void setPause(boolean pause)
setPause
in interface DatabaseSystemEventNode
@ManagedOperation(description="Invalidate the cache for an entity collection property", currencyTimeLimit=15) public void invalidateCacheElement(String entityClass, String property, String id)
DatabaseSystemEventNode
invalidateCacheElement
in interface DatabaseSystemEventNode
@ManagedOperation(description="Invalidate the cache for an entity", currencyTimeLimit=15) public void invalidateCacheElement(String entityClass, String id)
invalidateCacheElement
in interface DatabaseSystemEventNode
@ManagedOperation(description="Invalidate an entire cache region", currencyTimeLimit=15) public void invalidateCacheRegion(String region)
invalidateCacheRegion
in interface DatabaseSystemEventNode
public DatabaseSystemEventDao getSystemEventDao()
getSystemEventDao
in interface DatabaseSystemEventNode
public long getPollingIntervalSeconds()
DatabaseSystemEventNode
getPollingIntervalSeconds
in interface DatabaseSystemEventNode
public void setPollingIntervalSeconds(long pollingIntervalSeconds)
setPollingIntervalSeconds
in interface DatabaseSystemEventNode
public long getPriorityPollingIntervalSeconds()
public void setPriorityPollingIntervalSeconds(long priorityPollingIntervalSeconds)
public long getGlobalPollingIntervalSeconds()
public void setGlobalPollingIntervalSeconds(long globalPollingIntervalSeconds)
public int getEventsPerPoll()
DatabaseSystemEventNode
EventScopeType.VM
events to retrieve in each poll. This can be increased to process more events at any given time
at the cost of prolonged CPU consumption. It is likely advantageous to throttle event consumption to avoid overly taxing
the system and penalizing other operations. Also, smaller event processing groups allows the system to respond more
quickly to events that utilize the priority feature (see SystemEvent.getPriority()
.
The default value is 5. This does not include events of type GLOBAL_DURABLE.
See the discussion on DatabaseSystemEventNode.getPriorityEventsPerPoll()
for similar configuration related to special priority events.getEventsPerPoll
in interface DatabaseSystemEventNode
public void setEventsPerPoll(int eventsPerPoll)
setEventsPerPoll
in interface DatabaseSystemEventNode
public int getPriorityEventsPerPoll()
DatabaseSystemEventNode
SystemEvent.getPriority()
less than zero.
Priority events are generally considered to be important functional events that consume very quickly (e.g. cache invalidations).
Processing these events in their own queue attempts to guarantee a more timely processing, instead of having them
queue up with standard, possibly long-running, events in the standard queue.
The priority events per poll can be increased to process more events at any given time
at the cost of prolonged CPU consumption. It is likely advantageous to throttle event consumption to avoid overly taxing
the system and penalizing other operations. The default value is 5. This does not include events of type GLOBAL_DURABLE.getPriorityEventsPerPoll
in interface DatabaseSystemEventNode
public void setPriorityEventsPerPoll(int priorityEventsPerPoll)
setPriorityEventsPerPoll
in interface DatabaseSystemEventNode
public int getGlobalEventsPerPoll()
DatabaseSystemEventNode
EventScopeType.GLOBAL
events to retrieve in each poll. This can be increased to process more events at any given time
at the cost of prolonged CPU consumption. It is likely advantageous to throttle event consumption to avoid overly taxing
the system and penalizing other operations. Also, smaller event processing groups allows the system to respond more
quickly to events that utilize the priority feature (see SystemEvent.getPriority()
.
The default value is 5. This does not include events of type GLOBAL_DURABLE.
See the discussion on DatabaseSystemEventNode.getPriorityEventsPerPoll()
for similar configuration related to special priority events.getGlobalEventsPerPoll
in interface DatabaseSystemEventNode
public void setGlobalEventsPerPoll(int globalEventsPerPoll)
setGlobalEventsPerPoll
in interface DatabaseSystemEventNode
public long getJobThreadPoolCount()
DatabaseSystemEventNode
EventScopeType.VM
events. Defaults to 2. See jobThreadPoolCount
for more info.getJobThreadPoolCount
in interface DatabaseSystemEventNode
public void setJobThreadPoolCount(long jobThreadPoolCount)
setJobThreadPoolCount
in interface DatabaseSystemEventNode
public long getPriorityJobThreadPoolCount()
DatabaseSystemEventNode
EventScopeType.VM
or EventScopeType.GLOBAL
). See priorityJobThreadPoolCount
for more info. Default is 4.getPriorityJobThreadPoolCount
in interface DatabaseSystemEventNode
public void setPriorityJobThreadPoolCount(long priorityJobThreadPoolCount)
setPriorityJobThreadPoolCount
in interface DatabaseSystemEventNode
public long getGlobalJobThreadPoolCount()
DatabaseSystemEventNode
EventScopeType.GLOBAL
events. Defaults to 3. See globalJobThreadPoolCount
for more info.getGlobalJobThreadPoolCount
in interface DatabaseSystemEventNode
public void setGlobalJobThreadPoolCount(long globalJobThreadPoolCount)
setGlobalJobThreadPoolCount
in interface DatabaseSystemEventNode
public Map<String,List<SystemEventConsumer>> getEventConsumers()
DatabaseSystemEventNode
SystemEventConsumer
instances registered in Spring.getEventConsumers
in interface DatabaseSystemEventNode
public void setEventConsumers(Map<String,List<SystemEventConsumer>> eventConsumers)
setEventConsumers
in interface DatabaseSystemEventNode
public long getMissedDurableEventsPollingThresholdSeconds()
DatabaseSystemEventNode
getMissedDurableEventsPollingThresholdSeconds
in interface DatabaseSystemEventNode
public int getDurableEventsPerPoll()
DatabaseSystemEventNode
SystemEvent.getPriority()
.getDurableEventsPerPoll
in interface DatabaseSystemEventNode
public void setDurableEventsPerPoll(int durableEventsPerPoll)
setDurableEventsPerPoll
in interface DatabaseSystemEventNode
public void setMissedDurableEventsPollingThresholdSeconds(long missedDurableEventsPollingThresholdSeconds)
setMissedDurableEventsPollingThresholdSeconds
in interface DatabaseSystemEventNode
public boolean isCreated()
isCreated
in interface DatabaseSystemEventNode
public String getNodeId()
getNodeId
in interface DatabaseSystemEventNode
protected void asyncExecuteEvent(boolean durable, QueueStatusThreadPoolTaskExecutor executor, Map<String,List<SystemEventConsumer>> consumers, SystemEvent event)
protected void executeEvent(SystemEventConsumer consumer, SystemEvent lockedEvent)
protected void removeUniversalDuplicates(List<SystemEvent> events)
protected boolean isLogEnabled(int intervalCount)
protected void clearLog()
protected void incrementLog()
protected String getHostIpAddress()
InetAddress.getLocalHost().getHostAddress()
.
Subclasses may override for different ip address determination.protected boolean isDebugEnabled()
protected boolean isDebugEnabled(boolean checkInterval)
protected void buildEventScheduler() throws IOException, org.quartz.SchedulerException
IOException
org.quartz.SchedulerException
Copyright © 2020. All rights reserved.