深入理解java线程池—ThreadPoolExecutor

几句闲扯:首先,我想说java的线程池真的是很绕,以前一直都感觉新建几个线程一直不退出到底是怎么实现的,也就有了后来学习ThreadPoolExecutor源码。学习源码的过程中,最恶心的其实就是几种状态的转换了,这也是ThreadPoolExecutor的核心。花了将近小一周才大致的弄明白ThreadPoolExecutor的机制,遂记录下来。

线程池有多重要#####

线程是一个程序员一定会涉及到的一个概念,但是线程的创建和切换都是代价比较大的。所以,我们有没有一个好的方案能做到线程的复用呢?这就涉及到一个概念——线程池。合理的使用线程池能够带来3个很明显的好处:
1.降低资源消耗:通过重用已经创建的线程来降低线程创建和销毁的消耗
2.提高响应速度:任务到达时不需要等待线程创建就可以立即执行。
3.提高线程的可管理性:线程池可以统一管理、分配、调优和监控。

java多线程池的支持——ThreadPoolExecutor#####

java的线程池支持主要通过ThreadPoolExecutor来实现,我们使用的ExecutorService的各种线程池策略都是基于ThreadPoolExecutor实现的,所以ThreadPoolExecutor十分重要。要弄明白各种线程池策略,必须先弄明白ThreadPoolExecutor。

1. 实现原理#####

首先看一个线程池的流程图:

img

Paste_Image.png

step1.调用ThreadPoolExecutor的execute提交线程,首先检查CorePool,如果CorePool内的线程小于CorePoolSize,新创建线程执行任务。
step2.如果当前CorePool内的线程大于等于CorePoolSize,那么将线程加入到BlockingQueue。
step3.如果不能加入BlockingQueue,在小于MaxPoolSize的情况下创建线程执行任务。
step4.如果线程数大于等于MaxPoolSize,那么执行拒绝策略。

2.线程池的创建#####

线程池的创建可以通过ThreadPoolExecutor的构造方法实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

具体解释一下上述参数:

  1. corePoolSize 核心线程池大小
  2. maximumPoolSize 线程池最大容量大小
  3. keepAliveTime 线程池空闲时,线程存活的时间
  4. TimeUnit 时间单位
  5. ThreadFactory 线程工厂
  6. BlockingQueue任务队列
  7. RejectedExecutionHandler 线程拒绝策略
3.线程的提交#####

ThreadPoolExecutor的构造方法如上所示,但是只是做一些参数的初始化,ThreadPoolExecutor被初始化好之后便可以提交线程任务,线程的提交方法主要是execute和submit。这里主要说execute,submit会在后续的博文中分析。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
/**
* Executes the given task sometime in the future. The task
* may execute in a new thread or in an existing pooled thread.
*
* If the task cannot be submitted for execution, either because this
* executor has been shutdown or because its capacity has been reached,
* the task is handled by the current {@code RejectedExecutionHandler}.
*
* @param command the task to execute
* @throws RejectedExecutionException at discretion of
* {@code RejectedExecutionHandler}, if the task
* cannot be accepted for execution
* @throws NullPointerException if {@code command} is null
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
* 如果当前的线程数小于核心线程池的大小,根据现有的线程作为第一个Worker运行的线程,
* 新建一个Worker,addWorker自动的检查当前线程池的状态和Worker的数量,
* 防止线程池在不能添加线程的状态下添加线程
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
* 如果线程入队成功,然后还是要进行double-check的,因为线程池在入队之后状态是可能会发生变化的
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*
* 如果task不能入队(队列满了),这时候尝试增加一个新线程,如果增加失败那么当前的线程池状态变化了或者线程池已经满了
* 然后拒绝task
*/
int c = ctl.get();
//当前的Worker的数量小于核心线程池大小时,新建一个Worker。
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}

if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))//recheck防止线程池状态的突变,如果突变,那么将reject线程,防止workQueue中增加新线程
reject(command);
else if (workerCountOf(recheck) == 0)//上下两个操作都有addWorker的操作,但是如果在workQueue.offer的时候Worker变为0,
//那么将没有Worker执行新的task,所以增加一个Worker.
addWorker(null, false);
}
//如果workQueue满了,那么这时候可能还没到线程池的maxnum,所以尝试增加一个Worker
else if (!addWorker(command, false))
reject(command);//如果Worker数量到达上限,那么就拒绝此线程
}

这里需要明确几个概念:

  1. Worker和Task的区别,Worker是当前线程池中的线程,而task虽然是runnable,但是并没有真正执行,只是被Worker调用了run方法,后面会看到这部分的实现。
  2. maximumPoolSize和corePoolSize的区别:这个概念很重要,maximumPoolSize为线程池最大容量,也就是说线程池最多能起多少Worker。corePoolSize是核心线程池的大小,当corePoolSize满了时,同时workQueue full(ArrayBolckQueue是可能满的) 那么此时允许新建Worker去处理workQueue中的Task,但是不能超过maximumPoolSize。超过corePoolSize之外的线程会在空闲超时后终止。
核心方法:addWorker#####

Worker的增加和Task的获取以及终止都是在此方法中实现的,也就是这一个方法里面包含了很多东西。在addWorker方法中提到了Status的概念,Status是线程池的核心概念,这里我们先看一段关于status的注释:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
/**
* 首先ctl是一个原子量,同时它里面包含了两个field,一个是workerCount,另一个是runState
* workerCount表示当前有效的线程数,也就是Worker的数量
* runState表示当前线程池的状态
* The main pool control state, ctl, is an atomic integer packing
* two conceptual fields
* workerCount, indicating the effective number of threads
* runState, indicating whether running, shutting down etc
*
* 两者是怎么结合的呢?首先workerCount是占据着一个atomic integer的后29位的,而状态占据了前3位
* 所以,workerCount上限是(2^29)-1。
* In order to pack them into one int, we limit workerCount to
* (2^29)-1 (about 500 million) threads rather than (2^31)-1 (2
* billion) otherwise representable. If this is ever an issue in
* the future, the variable can be changed to be an AtomicLong,
* and the shift/mask constants below adjusted. But until the need
* arises, this code is a bit faster and simpler using an int.
*
* The workerCount is the number of workers that have been
* permitted to start and not permitted to stop. The value may be
* transiently different from the actual number of live threads,
* for example when a ThreadFactory fails to create a thread when
* asked, and when exiting threads are still performing
* bookkeeping before terminating. The user-visible pool size is
* reported as the current size of the workers set.
*
* runState是整个线程池的运行生命周期,有如下取值:
* 1. RUNNING:可以新加线程,同时可以处理queue中的线程。
* 2. SHUTDOWN:不增加新线程,但是处理queue中的线程。
* 3.STOP 不增加新线程,同时不处理queue中的线程。
* 4.TIDYING 所有的线程都终止了(queue中),同时workerCount为0,那么此时进入TIDYING
* 5.terminated()方法结束,变为TERMINATED
* The runState provides the main lifecyle control, taking on values:
*
* RUNNING: Accept new tasks and process queued tasks
* SHUTDOWN: Don't accept new tasks, but process queued tasks
* STOP: Don't accept new tasks, don't process queued tasks,
* and interrupt in-progress tasks
* TIDYING: All tasks have terminated, workerCount is zero,
* the thread transitioning to state TIDYING
* will run the terminated() hook method
* TERMINATED: terminated() has completed
*
* The numerical order among these values matters, to allow
* ordered comparisons. The runState monotonically increases over
* time, but need not hit each state. The transitions are:
* 状态的转化主要是:
* RUNNING -> SHUTDOWN(调用shutdown())
* On invocation of shutdown(), perhaps implicitly in finalize()
* (RUNNING or SHUTDOWN) -> STOP(调用shutdownNow())
* On invocation of shutdownNow()
* SHUTDOWN -> TIDYING(queue和pool均empty)
* When both queue and pool are empty
* STOP -> TIDYING(pool empty,此时queue已经为empty)
* When pool is empty
* TIDYING -> TERMINATED(调用terminated())
* When the terminated() hook method has completed
*
* Threads waiting in awaitTermination() will return when the
* state reaches TERMINATED.
*
* Detecting the transition from SHUTDOWN to TIDYING is less
* straightforward than you'd like because the queue may become
* empty after non-empty and vice versa during SHUTDOWN state, but
* we can only terminate if, after seeing that it is empty, we see
* that workerCount is 0 (which sometimes entails a recheck -- see
* below).
*/

下面是状态的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//利用ctl来保证当前线程池的状态和当前的线程的数量。ps:低29位为线程池容量,高3位为线程状态。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//设定偏移量
private static final int COUNT_BITS = Integer.SIZE - 3;
//确定最大的容量2^29-1
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
//几个状态,用Integer的高三位表示
// runState is stored in the high-order bits
//111
private static final int RUNNING = -1 << COUNT_BITS;
//000
private static final int SHUTDOWN = 0 << COUNT_BITS;
//001
private static final int STOP = 1 << COUNT_BITS;
//010
private static final int TIDYING = 2 << COUNT_BITS;
//011
private static final int TERMINATED = 3 << COUNT_BITS;
//获取线程池状态,取前三位
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
//获取当前正在工作的worker,主要是取后面29位
private static int workerCountOf(int c) { return c & CAPACITY; }
//获取ctl
private static int ctlOf(int rs, int wc) { return rs | wc; }

接下来贴上addWorker方法看看:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
/**
* Checks if a new worker can be added with respect to current
* pool state and the given bound (either core or maximum). If so,
* the worker count is adjusted accordingly, and, if possible, a
* new worker is created and started running firstTask as its
* first task. This method returns false if the pool is stopped or
* eligible to shut down. It also returns false if the thread
* factory fails to create a thread when asked, which requires a
* backout of workerCount, and a recheck for termination, in case
* the existence of this worker was holding up termination.
*
* @param firstTask the task the new thread should run first (or
* null if none). Workers are created with an initial first task
* (in method execute()) to bypass queuing when there are fewer
* than corePoolSize threads (in which case we always start one),
* or when the queue is full (in which case we must bypass queue).
* Initially idle threads are usually created via
* prestartCoreThread or to replace other dying workers.
*
* @param core if true use corePoolSize as bound, else
* maximumPoolSize. (A boolean indicator is used here rather than a
* value to ensure reads of fresh values after checking other pool
* state).
* @return true if successful
*/
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
/**
* rs!=Shutdown || fistTask!=null || workCount.isEmpty
* 如果当前的线程池的状态>SHUTDOWN 那么拒绝Worker的add 如果=SHUTDOWN
* 那么此时不能新加入不为null的Task,如果在WorkCount为empty的时候不能加入任何类型的Worker,
* 如果不为empty可以加入task为null的Worker,增加消费的Worker
*/
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;

for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}

Worker w = new Worker(firstTask);
Thread t = w.thread;

final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int c = ctl.get();
int rs = runStateOf(c);
/**
* rs!=SHUTDOWN ||firstTask!=null
*
* 同样检测当rs>SHUTDOWN时直接拒绝减小Wc,同时Terminate,如果为SHUTDOWN同时firstTask不为null的时候也要Terminate
*/
if (t == null ||
(rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null))) {
decrementWorkerCount();
tryTerminate();
return false;
}

workers.add(w);

int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
} finally {
mainLock.unlock();
}

t.start();
// It is possible (but unlikely) for a thread to have been
// added to workers, but not yet started, during transition to
// STOP, which could result in a rare missed interrupt,
// because Thread.interrupt is not guaranteed to have any effect
// on a non-yet-started Thread (see Thread#interrupt).
//Stop或线程Interrupt的时候要中止所有的运行的Worker
if (runStateOf(ctl.get()) == STOP && ! t.isInterrupted())
t.interrupt();
return true;
}

addWorker中首先进行了一次线程池状态的检测:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
int c = ctl.get();
int rs = runStateOf(c);

// Check if queue empty only if necessary.
//判断当前线程池的状态是不是已经shutdown,如果shutdown了拒绝线程加入
//(rs!=SHUTDOWN || first!=null || workQueue.isEmpty())
//如果rs不为SHUTDOWN,此时状态是STOP、TIDYING或TERMINATED,所以此时要拒绝请求
//如果此时状态为SHUTDOWN,而传入一个不为null的线程,那么需要拒绝
//如果状态为SHUTDOWN,同时队列中已经没任务了,那么拒绝掉
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;

其实是比较难懂的,主要在线程池状态判断条件这里:

  1. 如果是runing,那么跳过if。
  2. 如果rs>=SHUTDOWN,同时不等于SHUTDOWN,即为SHUTDOWN以上的状态,那么不接受新线程。
  3. 如果rs>=SHUTDOWN,同时等于SHUTDOWN,同时first!=null,那么拒绝新线程,如果first==null,那么可能是新增加线程消耗Queue中的线程。但是同时还要检测workQueue是否isEmpty(),如果为Empty,那么队列已空,不需要增加消耗线程,如果队列没有空那么运行增加first=null的Worker。
    从这里是可以看出一些策略的
    首先,在rs>SHUTDOWN时,拒绝一切线程的增加,因为STOP是会终止所有的线程,同时移除Queue中所有的待执行的线程的,所以也不需要增加first=null的Worker了
    其次,在SHUTDOWN状态时,是不能增加first!=null的Worker的,同时即使first=null,但是此时Queue为Empty也是不允许增加Worker的,SHUTDOWN下增加的Worker主要用于消耗Queue中的任务。
    SHUTDOWN状态时,是不允许向workQueue中增加线程的,isRunning(c) && workQueue.offer(command) 每次在offer之前都要做状态检测,也就是线程池状态变为>=SHUTDOWN时不允许新线程进入线程池了。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
for (;;) {
int wc = workerCountOf(c);
//如果当前的数量超过了CAPACITY,或者超过了corePoolSize和maximumPoolSize(试core而定)
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//CAS尝试增加线程数,如果失败,证明有竞争,那么重新到retry。
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
//判断当前线程池的运行状态
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}

这段代码做了一个兼容,主要是没有到corePoolSize 或maximumPoolSize上限时,那么允许添加线程,CAS增加Worker的数量后,跳出循环。
接下来实例化Worker,实例化Worker其实是很关键的,后面会说。
因为workers是HashSet线程不安全的,那么此时需要加锁,所以mainLock.lock(); 之后重新检查线程池的状态,如果状态不正确,那么减小Worker的数量,为什么tryTerminate()目前不大清楚。如果状态正常,那么添加Worker到workers。最后:

1
2
if (runStateOf(ctl.get()) == STOP && ! t.isInterrupted())
t.interrupt();

注释说的很清楚,为了能及时的中断此Worker,因为线程存在未Start的情况,此时是不能响应中断的,如果此时status变为STOP,则不能中断线程。此处用作中断线程之用。
接下来我们看Worker的方法:

1
2
3
4
5
6
7
8
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}

这里可以看出Worker是对firstTask的包装,并且Worker本身就是Runnable的,看上去真心很流氓的感觉~~~
通过ThreadFactory为Worker自己构建一个线程。
因为Worker是Runnable类型的,所以是有run方法的,上面也看到了会调用t.start() 其实就是执行了run方法:

1
2
3
4
/** Delegates main run loop to outer runWorker  */
public void run() {
runWorker(this);
}

调用了runWorker:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
/**
* Main worker run loop. Repeatedly gets tasks from queue and
* executes them, while coping with a number of issues:
* 1 Worker可能还是执行一个初始化的task——firstTask。
* 但是有时也不需要这个初始化的task(可以为null),只要pool在运行,就会
* 通过getTask从队列中获取Task,如果返回null,那么worker退出。
* 另一种就是external抛出异常导致worker退出。
* 1. We may start out with an initial task, in which case we
* don't need to get the first one. Otherwise, as long as pool is
* running, we get tasks from getTask. If it returns null then the
* worker exits due to changed pool state or configuration
* parameters. Other exits result from exception throws in
* external code, in which case completedAbruptly holds, which
* usually leads processWorkerExit to replace this thread.
*
*
* 2 在运行任何task之前,都需要对worker加锁来防止other pool中断worker。
* clearInterruptsForTaskRun保证除了线程池stop,那么现场都没有中断标志
* 2. Before running any task, the lock is acquired to prevent
* other pool interrupts while the task is executing, and
* clearInterruptsForTaskRun called to ensure that unless pool is
* stopping, this thread does not have its interrupt set.
*
* 3. Each task run is preceded by a call to beforeExecute, which
* might throw an exception, in which case we cause thread to die
* (breaking loop with completedAbruptly true) without processing
* the task.
*
* 4. Assuming beforeExecute completes normally, we run the task,
* gathering any of its thrown exceptions to send to
* afterExecute. We separately handle RuntimeException, Error
* (both of which the specs guarantee that we trap) and arbitrary
* Throwables. Because we cannot rethrow Throwables within
* Runnable.run, we wrap them within Errors on the way out (to the
* thread's UncaughtExceptionHandler). Any thrown exception also
* conservatively causes thread to die.
*
* 5. After task.run completes, we call afterExecute, which may
* also throw an exception, which will also cause thread to
* die. According to JLS Sec 14.20, this exception is the one that
* will be in effect even if task.run throws.
*
* The net effect of the exception mechanics is that afterExecute
* and the thread's UncaughtExceptionHandler have as accurate
* information as we can provide about any problems encountered by
* user code.
*
* @param w the worker
*/
final void runWorker(Worker w) {
Runnable task = w.firstTask;
w.firstTask = null;
//标识线程是不是异常终止的
boolean completedAbruptly = true;
try {
//task不为null情况是初始化worker时,如果task为null,则去队列中取线程--->getTask()
while (task != null || (task = getTask()) != null) {
w.lock();
//获取woker的锁,防止线程被其他线程中断
clearInterruptsForTaskRun();//清楚所有中断标记
try {
beforeExecute(w.thread, task);//线程开始执行之前执行此方法,可以实现Worker未执行退出,本类中未实现
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);//线程执行后执行,可以实现标识Worker异常中断的功能,本类中未实现
}
} finally {
task = null;//运行过的task标null
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
//处理worker退出的逻辑
processWorkerExit(w, completedAbruptly);
}
}

从上面代码可以看出,execute的Task是被“包装 ”了一层,线程启动时是内部调用了Task的run方法。
接下来所有的核心集中在getTask()方法上:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
/**
* Performs blocking or timed wait for a task, depending on
* current configuration settings, or returns null if this worker
* must exit because of any of:
* 1. There are more than maximumPoolSize workers (due to
* a call to setMaximumPoolSize).
* 2. The pool is stopped.
* 3. The pool is shutdown and the queue is empty.
* 4. This worker timed out waiting for a task, and timed-out
* workers are subject to termination (that is,
* {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
* both before and after the timed wait.
*
* @return task, or null if the worker must exit, in which case
* workerCount is decremented
*
*
* 队列中获取线程
*/
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?

retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// Check if queue empty only if necessary.
//当前状态为>stop时,不处理workQueue中的任务,同时减小worker的数量所以返回null,如果为shutdown 同时workQueue已经empty了,同样减小worker数量并返回null
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}

boolean timed; // Are workers subject to culling?

for (;;) {
int wc = workerCountOf(c);
timed = allowCoreThreadTimeOut || wc > corePoolSize;

if (wc <= maximumPoolSize && ! (timedOut && timed))
break;
if (compareAndDecrementWorkerCount(c))
return null;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}

try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}

这段代码十分关键,首先看几个局部变量:
boolean timedOut = false;
主要是判断后面的poll是否要超时
boolean timed;
主要是标识着当前Worker超时是否要退出。wc > corePoolSize时需要减小空闲的Worker数,那么timed为true,但是wc <= corePoolSize时,不能减小核心线程数timed为false。
timedOut初始为false,如果timed为true那么使用poll取线程。如果正常返回,那么返回取到的task。如果超时,证明worker空闲,同时worker超过了corePoolSize,需要删除。返回r=null。则 timedOut = true。此时循环到wc <= maximumPoolSize && ! (timedOut && timed)时,减小worker数,并返回null,导致worker退出。如果线程数<= corePoolSize,那么此时调用 workQueue.take(),没有线程获取到时将一直阻塞,知道获取到线程或者中断,关于中断后面Shutdown的时候会说。

至此线程执行过程就分析完了~~~~


关于终止线程池#####

我个人认为,如果想了解明白线程池,那么就一定要理解好各个状态之间的转换,想理解转换,线程池的终止机制是很好的一个途径。对于关闭线程池主要有两个方法shutdown()和shutdownNow():
首先从shutdown()方法开始:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
* Initiates an orderly shutdown in which previously submitted
* tasks are executed, but no new tasks will be accepted.
* Invocation has no additional effect if already shut down.
*
* <p>This method does not wait for previously submitted tasks to
* complete execution. Use {@link #awaitTermination awaitTermination}
* to do that.
*
* @throws SecurityException {@inheritDoc}
*/
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//判断是否可以操作目标线程
checkShutdownAccess();
//设置线程池状态为SHUTDOWN,此处之后,线程池中不会增加新Task
advanceRunState(SHUTDOWN);
//中断所有的空闲线程
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
//转到Terminate
tryTerminate();
}

shutdown做了几件事:
1. 检查是否能操作目标线程
2. 将线程池状态转为SHUTDOWN
3. 中断所有空闲线程
这里就引发了一个问题,什么是空闲线程?
这需要接着看看interruptIdleWorkers是怎么回事。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
//这里的意图很简单,遍历workers 对所有worker做中断处理。
// w.tryLock()对Worker加锁,这保证了正在运行执行Task的Worker不会被中断,那么能中断哪些线程呢?
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}

这里主要是为了中断worker,但是中断之前需要先获取锁,这就意味着正在运行的Worker不能中断。但是上面的代码有w.tryLock(),那么获取不到锁就不会中断,shutdown的Interrupt只是对所有的空闲Worker(正在从workQueue中取Task,此时Worker没有加锁)发送中断信号。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
while (task != null || (task = getTask()) != null) {
w.lock();
//获取woker的锁,防止线程被其他线程中断
clearInterruptsForTaskRun();//清楚所有中断标记
try {
beforeExecute(w.thread, task);//线程开始执行之前执行此方法,可以实现Worker未执行退出,本类中未实现
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);//线程执行后执行,可以实现标识Worker异常中断的功能,本类中未实现
}
} finally {
task = null;//运行过的task标null
w.completedTasks++;
w.unlock();
}
}

在runWorker中,每一个Worker getTask成功之后都要获取Worker的锁之后运行,也就是说运行中的Worker不会中断。因为核心线程一般在空闲的时候会一直阻塞在获取Task上,也只有中断才可能导致其退出。这些阻塞着的Worker就是空闲的线程(当然,非核心线程,并且阻塞的也是空闲线程)。在getTask方法中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?

retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// Check if queue empty only if necessary.
//当前状态为>stop时,不处理workQueue中的任务,同时减小worker的数量所以返回null,如果为shutdown 同时workQueue已经empty了,同样减小worker数量并返回null
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}

boolean timed; // Are workers subject to culling?

for (;;) {
//allowCoreThreadTimeOu是判断CoreThread是否会超时的,true为会超时,false不会超时。默认为false
int wc = workerCountOf(c);
timed = allowCoreThreadTimeOut || wc > corePoolSize;

if (wc <= maximumPoolSize && ! (timedOut && timed))
break;
if (compareAndDecrementWorkerCount(c))
return null;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}

try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}

会有两阶段的Worker:

  1. 刚进入getTask(),还没进行状态判断。
  2. block在poll或者take上的Worker。

当调用ShutDown方法时,首先设置了线程池的状态为ShutDown,此时1阶段的worker进入到状态判断时会返回null,此时Worker退出。
因为getTask的时候是不加锁的,所以在shutdown时可以调用worker.Interrupt.此时会中断退出,Loop到状态判断时,同时workQueue为empty。那么抛出中断异常,导致重新Loop,在检测线程池状态时,Worker退出。如果workQueue不为null就不会退出,此处有些疑问,因为没有看见中断标志位清除的逻辑,那么这里就会不停的循环直到workQueue为Empty退出。
这里也能看出来SHUTDOWN只是清除一些空闲Worker,并且拒绝新Task加入,对于workQueue中的线程还是继续处理的。
对于shutdown中获取mainLock而addWorker中也做了mainLock的获取,这么做主要是因为Works是HashSet类型的,是线程不安全的,我们也看到在addWorker后面也是对线程池状态做了判断,将Worker添加和中断逻辑分离开。
接下来做了tryTerminate()操作,这操作是进行了后面状态的转换,在shutdownNow后面说。
接下来看看shutdownNow:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
* Attempts to stop all actively executing tasks, halts the
* processing of waiting tasks, and returns a list of the tasks
* that were awaiting execution. These tasks are drained (removed)
* from the task queue upon return from this method.
*
* <p>This method does not wait for actively executing tasks to
* terminate. Use {@link #awaitTermination awaitTermination} to
* do that.
*
* <p>There are no guarantees beyond best-effort attempts to stop
* processing actively executing tasks. This implementation
* cancels tasks via {@link Thread#interrupt}, so any task that
* fails to respond to interrupts may never terminate.
*
* @throws SecurityException {@inheritDoc}
*/
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}

shutdownNow和shutdown代码类似,但是实现却很不相同。首先是设置线程池状态为STOP,前面的代码我们可以看到,是对SHUTDOWN有一些额外的判断逻辑,但是对于>=STOP,基本都是reject,STOP也是比SHUTDOWN更加严格的一种状态。此时不会有新Worker加入,所有刚执行完一个线程后去GetTask的Worker都会退出。
之后调用interruptWorkers:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* Interrupts all threads, even if active. Ignores SecurityExceptions
* (in which case some threads may remain uninterrupted).
*/
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
try {
w.thread.interrupt();
} catch (SecurityException ignore) {
}
}
} finally {
mainLock.unlock();
}
}

这里可以看出来,此方法目的是中断所有的Worker,而不是像shutdown中那样只中断空闲线程。这样体现了STOP的特点,中断所有线程,同时workQueue中的Task也不会执行了。所以接下来drainQueue:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* Drains the task queue into a new list, normally using
* drainTo. But if the queue is a DelayQueue or any other kind of
* queue for which poll or drainTo may fail to remove some
* elements, it deletes them one by one.
*/
private List<Runnable> drainQueue() {
BlockingQueue<Runnable> q = workQueue;
List<Runnable> taskList = new ArrayList<Runnable>();
q.drainTo(taskList);
if (!q.isEmpty()) {
for (Runnable r : q.toArray(new Runnable[0])) {
if (q.remove(r))
taskList.add(r);
}
}
return taskList;
}

获取所有没有执行的Task,并且返回。
这也体现了STOP的特点:
拒绝所有新Task的加入,同时中断所有线程,WorkerQueue中没有执行的线程全部抛弃。所以此时Pool是空的,WorkerQueue也是空的。
这之后就是进行到TIDYING和TERMINATED的转化了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/**
* Transitions to TERMINATED state if either (SHUTDOWN and pool
* and queue empty) or (STOP and pool empty). If otherwise
* eligible to terminate but workerCount is nonzero, interrupts an
* idle worker to ensure that shutdown signals propagate. This
* method must be called following any action that might make
* termination possible -- reducing worker count or removing tasks
* from the queue during shutdown. The method is non-private to
* allow access from ScheduledThreadPoolExecutor.
*/
final void tryTerminate() {
for (;;) {
int c = ctl.get();
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}

final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}

上面的代码其实很有意思有几种状态是不能转化到TIDYING的:

  1. RUNNING状态
  2. TIDYING或TERMINATED
  3. SHUTDOWN状态,但是workQueue不为空

也说明了两点:
1. SHUTDOWN想转化为TIDYING,需要workQueue为空,同时workerCount为0。
2. STOP转化为TIDYING,需要workerCount为0
如果满足上面的条件(一般一定时间后都会满足的),那么CAS成TIDYING,TIDYING也只是个过度状态,最终会转化为TERMINATED。

至此,ThreadPoolExecutor一些核心思想就介绍完了,想分析清楚实在是不容易,对于ThreadPoolExecutor我还是有些不懂地方,以上只是我对源码的片面的见解,如果有不正确之处,希望大神能不吝赐教。同时也希望给正在研究ThreadPoolExecutor的童鞋提供一点帮助。

勿忘初心,方得始终。晚安~~

l

sql难点记录

1.

The expression subject IN (‘Chemistry’,’Physics’) can be used as a value - it will be 0 or 1.

Show the 1984 winners and subject ordered by subject and winner name; but list Chemistry and Physics last.

这个题目没有中文,翻译的大概意思是 按照获奖的科学领域跟获奖者的名字来排序,但是 化学和物理要被排在最后

1
SELECT winner, subject FROM nobel where yr=1984 ORDER BY subject IN ('Physics','Chemistry'),subject asc,winner asc

相当于

1
2
3
4
SELECT winner, subject FROM nobel where yr=1984 
ORDER BY CASE WHEN subject IN ('Physics','Chemistry') THEN 1
ELSE 0 END,
subject asc,winner asc

相当于

1
2
3
4
5
6
SELECT winner, subject FROM nobel where yr=1984 
ORDER BY ( case subject
when 'Chemistry' then 1
when 'Physics' then 1
else 0
end),subject asc,winner asc

这里分析一下,以后也用得上,关键在order by subject IN (‘Physics’,’Chemistry’) ,subject asc,winner asc

后两个比较容易理解 字段名加上asc表示按正常排序,难点在 subject in (xxx)这个表达式,

排除后两个表达式,这是一个分组排序,

subject in(xxx)为0的分成一组 排序

subject in(xxx)为1的分成一组 排序

得到结果连接起来就是新的排序表

subject in(xxx) desc :新的排序表 就在前面

subject in(xxx) asc :新的排序表 就在后面 (默认asc)

l

linux命令which,whereis,locate,find的区别

  1. which:常用于查找可直接执行的命令。只能查找可执行文件,该命令基本只在$PATH路径中搜索,查找范围最小,查找速度快。默认只返回第一个匹配的文件路径,通过选项 -a 可以返回所有匹配结果。
  2. whereis:不只可以查找命令,其他文件类型都可以(man中说只能查命令、源文件和man文件,实际测试可以查大多数文件)。在$PATH路径基础上增加了一些系统目录的查找,查找范围比which稍大,查找速度快。可以通过 -b 选项,限定只搜索二进制文件。
  3. locate:超快速查找任意文件。它会从linux内置的索引数据库查找文件的路径,索引速度超快。刚刚新建的文件可能需要一定时间才能加入该索引数据库,可以通过执行updatedb命令来强制更新一次索引,这样确保不会遗漏文件。该命令通常会返回大量匹配项,可以使用 -r 选项通过正则表达式来精确匹配。
  4. find:直接搜索整个文件目录,默认直接从根目录开始搜索,建议在以上命令都无法解决问题时才用它,功能最强大但速度超慢。除非你指定一个很小的搜索范围。通过 -name 选项指定要查找的文件名,支持通配符。

下面通过一个实际的例子来测试和体会几个命令的差异:

先通过which找到ls命令的位置

1
2
tarena@tedu:/$ which ls
/bin/ls

把ls复制到主目录,并把名称修改为newls

1
2
tarena@tedu:/$ cp /bin/ls ~/newls
tarena@tedu:/$ cd ~

尝试用which和whereis命令查找newls,由于主目录不在$PATH中(除非你恰巧之前你恰巧把~加入$PATH了),所以都无法找到

1
2
3
4
tarena@tedu:~$ whereis newls
newls:
tarena@tedu:~$ which newls
tarena@tedu:~$

执行以下export命令,把~加入$PATH,然后我们cd到根目录,再次尝试查找newls,发现已经可以找到了

1
2
3
4
5
6
7
8
tarena@tedu:~$ export PATH=$PATH:~
tarena@tedu:~$ echo $PATH
/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/snap/bin:/home/tarena
tarena@tedu:~$ cd /
tarena@tedu:/$ which newls
/home/tarena/newls
tarena@tedu:/$ whereis newls
newls: /home/tarena/newls

我们再cd到~,然后取消newls的可执行权限

1
2
tarena@tedu:/$ cd ~
tarena@tedu:~$ chmod u-x newls

然后我们再次尝试使用which和whereis查找newls,我们发现whereis可以找到,而which找不到newls。因为which只能用来查找可执行文件,whereis没有该限制。

1
2
3
4
tarena@tedu:~$ cd /
tarena@tedu:/$ whereis newls
newls: /home/tarena/newls
tarena@tedu:/$ which newls

这时我们再把newls改名为ls,然后我们尝试用locate命令找出系统中存在的两个ls文件,我们发现会找到大量不是我们要的文件(此处已省略了很多),但这些文件路径中确实包含ls。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
tarena@tedu:~$ cd ~
tarena@tedu:~$ mv newls ls
/bin/false
/bin/ls
/bin/lsblk
/bin/lsmod
/bin/ntfsls
/boot/grub/i386-pc/cbls.mod
/boot/grub/i386-pc/command.lst
/boot/grub/i386-pc/crypto.lst
/boot/grub/i386-pc/fs.lst
/boot/grub/i386-pc/ls.mod
/boot/grub/i386-pc/lsacpi.mod
/boot/grub/i386-pc/lsapm.mod
/boot/grub/i386-pc/lsmmap.mod
/boot/grub/i386-pc/lspci.mod
/boot/grub/i386-pc/moddep.lst
/boot/grub/i386-pc/partmap.lst
/boot/grub/i386-pc/parttool.lst
/boot/grub/i386-pc/terminal.lst
/boot/grub/i386-pc/video.lst
...

我们尝试用正则表达式缩小匹配范围

1
2
3
4
5
tarena@tedu:~$ locate -r '\bls$'
/bin/ls
/usr/bin/gvfs-ls
/usr/lib/klibc/bin/ls
/usr/share/bash-completion/completions/gvfs-ls

我们发现只找到了一个ls,另外一个可能因为系统还没有纳入索引数据库,所以没有找到,我们执行updatedb命令,强制更新一下系统索引,然后再执行一遍locate试试,发现现在可以找到了

1
2
3
4
5
6
tarena@tedu:~$ sudo updatedb
/bin/ls
/home/tarena/ls
/usr/bin/gvfs-ls
/usr/lib/klibc/bin/ls
/usr/share/bash-completion/completions/gvfs-ls

find命令全盘查找太慢,所以限制下查找路径,也是同样可以找到

1
2
3
tarena@tedu:~$ find ~ /bin/ -name ls
/home/tarena/ls
/bin/ls
l

tool

软件推荐

文本编辑软件

  • vscode

    • 性能较好,资源占用较低,功能强大,社区活跃,性能稳定,插件丰富
  • notepad++

    • 推荐插件JSTool
    • 性能较差,文本行数多时,操作可能会崩溃
  • Emedit

    • 速度快,多文件查找, 正则处理, 各种字符编码转换。
    • 大文件处理性能好

utools-效率提升工具

  • -- 书签

    • 快捷书签打开,支持拼音、url、书签名搜索
    • 书签很多的时候很好用
  • -- jetbrains

    • 快捷打开jetbrain的项目
  • -- find

    • 内嵌everything
    • 搜索速度很快
  • -- 网页快开

    • 快捷打开某些常用的搜索软件,拼接了查询query,如果百度、google、stackoverflow等
  • -- 剪切板

    • 历史的剪切板内容

draw.io--流程图绘制

开源、功能强大、社区活跃。

丰富的客户端支持,在线、win、mac

支持多种存储方式:云存储、本地、浏览器…

备选:processon、visio

blog&document&note

科学上网

sockboom

樱猫

ftp

  • FileZillaClient

  • FileZillaServer

    • 搭建个人ftp使用,局域网内共享大文件使用

redis

  • Redis DeskTop

    • redis管理终端,使用简单

sql

  • navicat/dataGrip/sqlyog

    • 常用的sql客户端,常用navicat,支持多种数据库如mongo
  • PDMan

    • 可以建立表之间的逻辑关联关系
    • 支持导出建表语句

terminal

  • mobaxterm

    • 免费
    • 直接建立Sftp,方便文件管理
    • 性能稍差
  • xhsell

    • 收费,免费版限制较多,限制终端开启数量
  • Cmder/GitBash

    • windows端的终端
    • 直接一些bash指令,模拟linux操作win,比如tail、less等等,比win cmd及powershell稍微好用些

onedrive + office

  • 家庭版共享性价比很高,1T云存储

 

开发办公

idea

  • 破解和注册
    • 教育邮箱,免费注册专业版本(jetbrain全家桶)
    • 读书成诗公众号,破解补丁
  • 插件推荐
    • 1、easycode
      • sqlmap模板代码生成插件,模板自定义,语法较为简单,上手容易
    • 2、easyyapi

      • 接口文档生成插件
      • 支持http、dubbo接口
      • 可导出至yapi、postman、本地markdown
    • 3、 camelcase

      • 大小写转换工具,驼峰、下划线等多种类型切换
      • 常用于从数据库中复制的字段转换成驼峰
    • 4、alibaba cloud toolkit

      • 远程部署
      • 一般本地测试直接部署到本地tomcat,开发环境联调需要部署,可以使用该功能
    • 5、codeglance

      • 代码概览,类似vscode的右边导览,在文件较长时(看源码)时浏览方便
  • 常用快捷操作和命令
    • 1、文本操作 (常用于数据清洗,批量操作数据或是批量修改代码)

      • 扩大/减小选取,ctrl alt up/down
      • 多光标,alt shift leftmouse
      • 选区转光标,alt shift insert
      • 正则替换
    • 2、重构操作

      • change singature 修改方法签名:alt shift c
      • 重命名变量/方法:alt shift r
  • debug
    • evaluate

    • watch

    • remote debug

      • 远程连接服务器debug,可以用于开发环境debug使用

飞书

  • infobot

    • 消息推送,webhook直接推送,使用简单
  • 捷径

    • 配合infobot可以实现一些个人的快捷功能

远程桌面

  • teamview

    • 免费,一旦检测到商业使用,会限制连接时长,基本无法继续使用
    • 稳定,基于P2P或teamview服务器连接
  • 向日葵

    • 免费、不会商业检测限制
  • remotedesktop

    • 支持多平台客户端
    • 配合vpn或是nat使用效果很好,连接质量比teamview高,基本可以达到与使用本机相同的体验
    • 使用nat时,一定要做好安全控制

 

常用站点

内部

  • yapi

    • 接口文档共享
  • zabbix

    • 生产服务器运行监控,服务器的cpu、内存、tomcat gc等查看
    • 可以查看消息队列的消息长度,用于判断生产消息队列的消费者消费能力
  • kibana

    • 日志检索平台
    • 多台服务器部署的应用,日志查询
  • nexus

    • maven仓库管理
    • 可以查询公司目前已有的jar包,常用于内部jar包版本问题排查
  • rocketchat

  • pinpoint

    • APM监控工具
    • 慢sql排查,服务性能问题排查,服务调用监控
  • apollo配置中心

    • 分布式配置中心
    • 一般用于配置信息查询
  • dubbo admin

    • 查看dubbo服务的注册与订阅信息
    • 查询对应zk上已经注册的服务
    • 服务治理、统计查询、服务mock
  • rabbitMQ

    • rabbitmq管理页面
    • 可以操作队列、查询队列消息长度,消息内容,手动发送消息等
  • jenkins

    • 部署记录、日志查看;部署操作(dev、fat)
  • 备库延迟检测

    • 生产备库延迟检测
    • 有时候更新了生产数据,但发现没有变化,此时可以看一下备库延迟
  • dataquery

    • 数据查询平台,需要找俊博注册开通,查询生产备库数据

其他站点

l

tool

软件推荐

文本编辑软件

vscode/notepad++

-- JSTool

utools-效率提升工具

  • -- 书签

    • 快捷书签打开,支持拼音、url、书签名搜索
    • 书签很多的时候很好用
  • -- jetbrains

    • 快捷打开jetbrain的项目
  • -- find

    • 内嵌everything
    • 搜索速度很快
  • -- 网页快开

    • 快捷打开某些常用的搜索软件,拼接了查询query,如果百度、google、stackoverflow等
  • -- 剪切板

    • 历史的剪切板内容

draw.io--流程图绘制

开源、功能强大、社区活跃。

丰富的客户端支持,在线、win、mac

支持多种存储方式:云存储、本地、浏览器…

备选:processon、visio

blog&document&note

-- https://www.gitbook.com/ -- Typora/markdown -- onenote -- csdn

科学上网

sockboom

樱猫

ftp

  • FileZillaClient

  • FileZillaServer

    • 搭建个人ftp使用,局域网内共享大文件使用

redis

  • Redis DeskTop

    • redis管理终端,使用简单

sql

  • navicat/dataGrip/sqlyog

    • 常用的sql客户端,常用navicat,支持多种数据库如mongo
  • PDMan

    • 可以建立表之间的逻辑关联关系
    • 支持导出建表语句

terminal

  • mobaxterm

    • 免费
    • 直接建立Sftp,方便文件管理
    • 性能稍差
  • xhsell

    • 收费,免费版限制较多,限制终端开启数量
  • Cmder/GitBash

    • windows端的终端
    • 直接一些bash指令,模拟linux操作win,比如tail、less等等,比win cmd及powershell稍微好用些

onedrive + office

  • 家庭版共享性价比很高,1T云存储

开发办公

idea

  • 破解和注册
    • 教育邮箱,免费注册专业版本(jetbrain全家桶)
    • 读书成诗公众号,破解补丁
  • 插件推荐
    • 1、easycode
      • sqlmap模板代码生成插件,模板自定义,语法较为简单,上手容易
    • 2、easyyapi

      • 接口文档生成插件
      • 支持http、dubbo接口
      • 可导出至yapi、postman、本地markdown
    • 3、 camelcase

      • 大小写转换工具,驼峰、下划线等多种类型切换
      • 常用于从数据库中复制的字段转换成驼峰
    • 4、alibaba cloud toolkit

      • 远程部署
      • 一般本地测试直接部署到本地tomcat,开发环境联调需要部署,可以使用该功能
    • 5、codeglance

      • 代码概览,类似vscode的右边导览,在文件较长时(看源码)时浏览方便
  • 常用快捷操作和命令
    • 1、文本操作 (常用于数据清洗,批量操作数据或是批量修改代码)

      • 扩大/减小选取,ctrl alt up/down
      • 多光标,alt shift leftmouse
      • 选区转光标,alt shift insert
      • 正则替换
    • 2、重构操作

      • change singature 修改方法签名:alt shift c
      • 重命名变量/方法:alt shift r
  • debug
    • evaluate

    • watch

    • remote debug

      • 远程连接服务器debug,可以用于开发环境debug使用

飞书

  • infobot

    • 消息推送,webhook直接推送,使用简单
  • 捷径

    • 配合infobot可以实现一些个人的快捷功能

远程桌面

  • teamview

    • 免费,一旦检测到商业使用,会限制连接时长,基本无法继续使用
    • 稳定,基于P2P或teamview服务器连接
  • remotedesktop

    • 支持多平台客户端
    • 配合vpn或是nat使用效果很好,连接质量比teamview高,基本可以达到与使用本机相同的体验
    • 使用nat时,一定要做好安全控制

 

常用站点

内部

  • yapi

    • 接口文档共享
  • zabbix

    • 生产服务器运行监控,服务器的cpu、内存、tomcat gc等查看
    • 可以查看消息队列的消息长度,用于判断生产消息队列的消费者消费能力
  • kibana

    • 日志检索平台
    • 多台服务器部署的应用,日志查询
  • nexus

    • maven仓库管理
    • 可以查询公司目前已有的jar包,常用于内部jar包版本问题排查
  • rocketchat

  • pinpoint

    • APM监控工具
    • 慢sql排查,服务性能问题排查,服务调用监控
  • apollo配置中心

    • 分布式配置中心
    • 一般用于配置信息查询
  • dubbo admin

    • 查看dubbo服务的注册与订阅信息
    • 查询对应zk上已经注册的服务
    • 服务治理、统计查询、服务mock
  • rabbitMQ

    • rabbitmq管理页面
    • 可以操作队列、查询队列消息长度,消息内容,手动发送消息等
  • jenkins

    • 部署记录、日志查看;部署操作(dev、fat)
  • 备库延迟检测

    • 生产备库延迟检测
    • 有时候更新了生产数据,但发现没有变化,此时可以看一下备库延迟
  • dataquery

    • 数据查询平台,需要找俊博注册开通,查询生产备库数据

其他站点

l

1
2
3
4
graph TD;
A-->B;
A-->C;
B-->D;
1
2
3
4
5
6
graph LR
A[方形] -->B(圆角)
B --> C{条件a}
C -->|a=1| D[结果1]
C -->|a=2| E[结果2]
F[横向流程图]
1
2
3
4
5
6
7
graph TD
A[方形] -->B(圆角)
B --> C{条件a}
C -->|a=1| D[结果1]
C -->|a=2| E[结果2]
F[竖向流程图]

1
2
3
4
5
6
7
8
9
st=>start: 开始框
op=>operation: 处理框
cond=>condition: 判断框(是或否?)
sub1=>subroutine: 子流程
io=>inputoutput: 输入输出框
e=>end: 结束框
st->op->cond
cond(yes)->io->e
cond(no)->sub1(right)->op
l

Java中float和double中溢值问题和浮点数的储存问题

Java中float和double中溢值问题和浮点数的储存问题

记录一下初学Java出现的问题。

以为之前是从Python起步的,最初了解到Java的数据类型有float和double这两个东西,就尝试相加这两个

img

这里返回的结果:32.45000076293945

这个返回值看的我一脸懵

要理解这个问题要先知道浮点数在计算机中是以什么形式储存的

首先要知道计算机能懂的只有0和1

每一个0和1都占一个位 bit (比特)(Binary Digits):存放一位二进制数,最小的存储单位。

所以,整数部分:

22 / 2 = 11 余0

11 / 2 = 5 余1

5 / 2 = 2 余1

2 / 2 = 1 余0

1 / 2 = 0 余1

22的二进制转换就是10110

小数部分:

0.45 * 2 = 0.9 0

0.9 * 2 = 1.8 1

0.8 * 2 = 1.6 1

0.6 * 2 = 1.2 1

0.2 * 2 = 0.4 0

0.4 * 2 = 0.8 0

0.8 * 2 = 1.6 1

0.6 * 2 = 1.2 1

0.2 * 2 = 0.4 0

……

我们可以发现0.45转化成二进制的时候是无限循环的

二进制转换完成,22.45 –> 10110.011100110……

得到这个二进制浮点数之后,计算机是怎么把他表示为没有小数点的字符呢?

就要用到小学学过的科学记数法

10110.011100110可以写为

1.0110011100110 [公式] [公式]

过程中我们发现,小数的转换有可能会产生无限循环的情况,想要做的最精确的记录22.45,计算机需要无限大的空间来记录

那么IEEE754标准就规定:

32位单精度(java中的float),使用32位(bit)来存储

64位双精度double), 使用64位储存

那采用什么样的格式呢

|S| Exp | Fraction |

+-+——–+———————–+

S:符号位(正0负1)

EXP:指数位

Fraction:有效数字

img

单精度(float)就是

|1(bit)| 8(bit) | 23(bit) |

+-+——–+———————–+

双精度(double)

|1(bit)| 11(bit) | 52(bit) |

+-+——–+———————–+

以22.45为例:

1.0110011100110 [公式][公式](二进制科学记数)

S = 0

EXP = 4+127 =131 –> 10000011

这里为什么是131而不是直接的4呢?

0000 0000八个位来表示指数,最大值就是1111 1111 –> 十进制就是 255

指数会有正负数两种情况,所以分两半,255 / 2 = 127.5

0~127用于负数

127~255用于正数

127相当于一个指数是0,所以表示正指数就 + 同理 -

Fraction = 0110011100110(这里只有13(bit))

= 01100111001100110011010(接着算了10(bit)补齐23(bit))

所以22.45在计算机里就是 S+EXP+Fraction = 01000001101100111001100110011010

同理双精度(double)有64位来记录

img

System.out.println(Integer.toBinaryString(Float.floatToIntBits(x)));

这行代码可以查看22.45的二进位表达

img

为什么没有0,因为01 跟1是一样的,所以0就不会显示了

这是-22.45

img

=======================================================

了解了IEEE二进位浮点数,知道了float和double记录的浮点精确度不一样我们再看一下问题。

img

从输出的结果看 32.45000076293945 是一个double类型,精确的表示了小数点后14位

所以float + double 是从float赋值到double

img

输出结果:22.450000762939453

可是如果float(单精度)赋值到double(双精度)出现精度丢失可以理解,但是会什么会溢值呢?

我们可以从二进制推回十进制来看一看

0 | 10000011 | 01100111001100110011010

正| 指数=4 |有效数字

1.01100111001100110011010 [公式][公式] = 10110.0111001100110011010

后面这个0.0111001100110011010表示小数位

我们可以通过把0.0111001100110011010换成十进制来看看

小数的二进制到十进制的方法是

从小数点后依次乘以2的负一次方,2的负二次方,2的负三次方等

0.0111001100110011010

[公式]

可以直接用python一算

img

同这个结果,我们可以看出,由于float单精度只能选取 无限循环的二进制小数的23位

导致了十进制浮点数在存储时的不够精准

当我们把已经储存好的32二进制格式转换成64位时

img

输出结果:

img

可以看出Java并没有重新计算小数点后的更多位, 而是用0来补位

所以转换成double后,之前float没有精确到的位数就会显示出来。

要解决这个问题就需要使用java.math中提供的API类BigDecimal

注:(BigDecimal用String 或Integer 初始化,double初始化会有舍入精度问题)

l

线程池

ThreadPool源码学习
zodiac ·2020-12-10 ·20 次阅读

ThreadPoolExecutor
jdk1.5在juc包里提供了方便快捷的线程池api,并提供了基于工厂模式的Executors工具类用于快捷创建线程池,在实际开发过程中,需要使用线程池时,应当优先考虑使用Executors

1、类继承关系

Executor为顶级接口,其主要目标是将任务、任务的提交与任务的执行解耦

ExecutorService接口则定义了正常的线程池应该有的功能与行为,诸如任务提交,异步执行等等

ScheduledExecutorService接口则定义了一些定时的特性

2、Executor
Executor执行提交的任务(Runnable),该接口提供了一种将任务提交与任务执行(包括执行细节:线程、定时等)解耦的途径。

通过Executor包装的线程(Thread)对象,避免直接使用Thread对象来执行任务,可以有效将线程信息屏蔽,避免直接对线程的操作。

Executor本身并不强制要求执行的任务必须是异步执行

用于执行任务的执行器,而Runnable则表示可以用于执行的任务。

/**

  • 在未来某个时刻执行给定的指令,命令可以在新的线程中、线程池或调用线程中执行
  • 实际情况取决于Executor的实现

*/
void execute(Runnable command);
3、ExecutorService
能够管理任务终止、能够产生追踪一个或多个异步任务处理进度的Future的执行器(Executor)

接口的核心定义:

提交任务

Future submit(Callable task);
Future submit(Runnable task, T result);
Future<?> submit(Runnable task);
第一种提交Callable task类型的任务较好理解,任务完成时会将task的结果放至Future中。

第二种方式,Runnable task和 T result作为参数,实际上内部通过包装将入参result作为返回值与Runnable task一同包装为一个Callable,最后任务完成时将Callable结果放至Future中。因此通过这种方式,入参result,返回结果则是可能在任务中被修改的result。

AbstractExecutorService

public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture ftask = newTaskFor(task, null);//生成一个RunnableFuture作为任务
execute(ftask);
return ftask;
}

//异步任务的抽象,内部封装了实际的任务、任务状态、真正的执行线程以及等待任务完成的线程等细节
//实际上是调用FutureTask.run()的线程被阻塞,作为真正的执行线程
protected RunnableFuture newTaskFor(Runnable runnable, T value) {
return new FutureTask(runnable, value);//创建FutureTask
}
FutureTask

public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);//将Runnable适配为Callable
this.state = NEW; // ensure visibility of callable
}
Tips:

FutureTask里有很多特性(比如对等待任务完成的线程进行阻塞,任务完成后对等待线程的唤醒,防止任务被并发调用等等)都可以使用AbstractQueuedSynchronizer,但在JDK1.8中的源码却没有发现AQS的痕迹,想想这是为何?

早期FutureTask确实是使用AQS实现,后续修改为了目前的样子(很多通过内存直接修改对象的操作,Unsafe类),核心是为了性能。这一部分可以再单独深入看看

Executors.RunnableAdapter

//简单的适配,将Runnable包装为Callable
static final class RunnableAdapter implements Callable {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}
第三种方式,与第二种类似,会创一个result 为 null的Runnable适配器。

异步执行任务

通过任务提交、invokeAny、invokeAll

等待任一/全部任务执行完成

invokeAny()
一次性提交批量任务,有任一任务完成时返回该任务的处理结果,调用线程阻塞。

private T doInvokeAny(Collection<? extends Callable> tasks,
boolean timed, long nanos)
throws InterruptedException, ExecutionException, TimeoutException {
if (tasks == null)
throw new NullPointerException();
int ntasks = tasks.size();
if (ntasks == 0)
throw new IllegalArgumentException();
ArrayList<Future> futures = new ArrayList<Future>(ntasks);
ExecutorCompletionService ecs =
new ExecutorCompletionService(this);//Wrapper或Decorator模式(组合模式??),增强了对已完成任务的管理能力

try {
    ExecutionException ee = null;
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    Iterator<? extends Callable<T>> it = tasks.iterator();
    //1、提交第一个任务
    futures.add(ecs.submit(it.next()));
    --ntasks;
    int active = 1;
    for (; ; ) {
        //2、判断任务是否完成
        Future<T> f = ecs.poll();
        if (f == null) {
            //3、没有完成,且还有任务可以提交时,继续提交
            if (ntasks > 0) {
                --ntasks;
                futures.add(ecs.submit(it.next()));
                ++active;
            } else if (active == 0)//4、没有完成、没有任务可以提交、无处理中任务跳出
                break;
            else if (timed) {//5、无任务提交,任务在处理中时,设置等待任务完成
                f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
                if (f == null)
                    throw new TimeoutException();
                nanos = deadline - System.nanoTime();
            } else//6、无限期阻塞,等待完成任务的队列有完成任务可以获得
                f = ecs.take();
        }
        if (f != null) {//7、获取到完成任务
            --active;
            try {
                return f.get();//8、返回完成任务的结果
            } catch (ExecutionException eex) {
                ee = eex;
            } catch (RuntimeException rex) {
                ee = new ExecutionException(rex);
            }
        }
    }
 
    if (ee == null)
        ee = new ExecutionException();
    throw ee;
} finally {//9、最终取消所有任务
    for (int i = 0, size = futures.size(); i < size; i++)
        futures.get(i).cancel(true);//FutureTask.cancel()只有当未NEW状态才会取消
}

}
invokeAll
如果无超时时间(较为简单)

则遍历FutureTask,通过FutureTask.get()方法阻塞调用线程即可。

如果存在超时时间

遍历FutureTask,每提交一次任务检查一次是否超时。任务提交完成后,遍历未结束Future,调用Future.get(timeout),最终返回结果,任务清理。

public List<Future> invokeAll(Collection<? extends Callable> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
long nanos = unit.toNanos(timeout);
ArrayList<Future> futures = new ArrayList<Future>(tasks.size());
boolean done = false;
try {
for (Callable t : tasks)
futures.add(newTaskFor(t));

    final long deadline = System.nanoTime() + nanos;
    final int size = futures.size();
 
    // Interleave time checks and calls to execute in case
    // executor doesn't have any/much parallelism.
    for (int i = 0; i < size; i++) {
        execute((Runnable)futures.get(i));//执行任务,executor不保证异步执行
        nanos = deadline - System.nanoTime();//检测超时时间
        if (nanos <= 0L)
            return futures;//超时间内未提交的任务,不会再被执行
    }
 
    for (int i = 0; i < size; i++) {
        Future<T> f = futures.get(i);
        if (!f.isDone()) {//未完成future
            if (nanos <= 0L)//阻塞前先判断一次是否超时
                return futures;
            try {
                f.get(nanos, TimeUnit.NANOSECONDS);//超时时间内获取
            } catch (CancellationException ignore) {
            } catch (ExecutionException ignore) {
            } catch (TimeoutException toe) {
                return futures;
            }
            nanos = deadline - System.nanoTime();//更新超时时间
        }
    }
    done = true;
    return futures;
} finally {
    if (!done)
        for (int i = 0, size = futures.size(); i < size; i++)
            futures.get(i).cancel(true);//最终未完成的任务,不会再被执行
}

}
终止任务提交

shutdown()

已提交的任务,仍将被执行,但新的任务不再被接收。如果已经被shutdown,再次调用无影响

终止任务执行

List shutdownNow();

立即尝试停止所有正在执行的任务,返回等待执行的任务,不会等待正在执行的任务终结。

该方法会尝试尽最大努力终结执行中的任务,但无法保证正在执行的任务被终结,因此,如果有任务终结失败,该任务也许永远无法被终止。

等待执行器进入终止状态

boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;

当调用了showdown()后,在超时间内、调用线程被中断前阻塞调用线程等待已提交任务完成。

4、AbstractExecutorService
通过模板模式的设计模式,对ExecutorService接口中定义的某些方法,进行了通用实现。

public Future<?> submit(Runnable task){…}
public Future submit(Runnable task, T result) {…}
public Future submit(Callable task) {…}

public T invokeAny(Collection<? extends Callable> tasks) throws InterruptedException, ExecutionException {…}
public List<Future> invokeAll(Collection<? extends Callable> tasks) throws InterruptedException {…}
submit(…):

主要逻辑:

1、包装Callable、Runnable、result为RunnableFuture(实际默认实现是FutureTask,适配Runable接口)

2、调用实现类execute()方法执行RunnableFuture

invokeAny(…)

invokeAll(…)

上述方法的具体源码可以参见ExecutorService部分

5、ThreadPoolExecutor
虽然AbstractExecutorService进行了一些通用实现,但诸如execute()、shutdown()等依赖实际执行路径与执行器内部状态的方法并未被实现,因此这些方法的实现逻辑将是对Executor进行区分的重要因素。

ThreadPoolExecutor直译为线程池执行器,该类通过维护内线程池,最大程度复用线程,减少线程创建、销毁与维护的开销,提高任务的执行效率。通常会作为系统的一个异步处理模块出现,最大程度降低系统对硬件资源的占用。同时也提供了友善的api,通过对线程池核心参数的设计,可以设计出不同类型的线程池,扩展线程池的可应用场景。

基础概念
包含线程控制状态、作业队列、工人(worker)、工人集合(worker set)、工人数量(worker count)、终止条件、线程工厂、任务拒绝处理器、核心池大小、最大池大小等等

控制状态、工人数量

控制状态:

标记了执行器的生命周期,记录线程池的当前状态,分为RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED,主要是为了实现shutdown()等涉及到执行器状态的方法,如果不存在控制状态,则无法实现类似:拒绝新任务的添加、终结执行器等功能。

阶段 说明 状态转换
RUNING 任务执行中,此时可以接收新的任务 SHUTDOWN:调用shutdown()
STOP:调用shutdownNow()
SHUTDOWN 任务执行中,但不再接收新的任务 STOP:调用shutdownNow()
TIDYING:任务完成、线程池清空
STOP 执行中的任务将被终止,且不再接收新的任务,不再继续处理任务 TIDYING:线程池清空
TIDYING 整理阶段,主要是回收资源与收尾工作。所有的任务已经结束,工人数量为0(线程已经完成回收),在该阶段对应线程将调用钩子函数terminated()告知子类即将要终结 TERMINATED:terminated()执行后
TERMINATED terminated()完成
ThreadPoolExecutor实现中将线程池状态与工人数量整合到一个Integer内,为了保证并发安全,Integer使用AtomicInteger。控制状态一共5种,在设计中通过保证各个状态在状态空间中的有序,直接使用数值的方式判断当前状态。

工人数量(workerCount):

记录当前线程池中的有效线程数量,主要用于状态变更、判断是否需要新增线程、线程数量是否已达设定值等。

状态字段:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;//29
private static final int CAPACITY = (1 << COUNT_BITS) - 1;//0001111111111…

// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;//RUNING状态下,ctl<0
private static final int SHUTDOWN = 0 << COUNT_BITS;//SHUTDOWN状态下,当workercount=0时,ctl=0
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;

// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }//取高三位的结果
private static int workerCountOf(int c) { return c & CAPACITY; }//低位即wokerCount
private static int ctlOf(int rs, int wc) { return rs | wc; }
作业队列(workQueue)-BlockingQueue

用于保存待处理任务并将任务移交给工作线程。声明类型为阻塞队列(BlockingQueue)

不要求poll()返回null时代表队列为空(isEmpty),在决定是否进行线程池状态转移时(比如由SHUTDOWN转移为TIDYING需要判断任务队列是否为空)使用isEmpty判断队列是否为空。这种设计就可以让workQueue使用一些特殊设计的队列,比如延迟队列(DelayQueue,即便poll()为null,但延迟一段时间后可以返回non-null,即无法通过poll()是否为null判断队列是否为空),ThreadPoolExecutor通过isEmpty而非poll() == null 的方式判断队列是否为空,能够对工作队列的实现类型更加包容。

ThreadPoolExecutor并没有为BlokingQueue提供默认实现(声明时没有指定实例,且所有的构造方法没有为其提供默认实现),但使用Executors创建ThreadPoolExecutors时默认会使用LinkedBlockedQueue作为默认实现。

关于阻塞队列(BlockingQueue)

合适的阻塞队列,当队列空时会阻塞消费者,队列满时阻塞生产者。

如果不适用阻塞队列,可以使用线程安全队列+标志锁实现~(但自己实现的方式可能会存在很多细节问题,有空可以把这个细节深入研究一下)

工人(worker)

保存了所有在线程池中的工作线程的集合

private final HashSet workers = new HashSet();
该声明并未使用线程安全的Set类,而是使用了最为简单的HashSet,因此其内部要求,任何对该集合的访问都需要获取private final ReentrantLock mainLock = new ReentrantLock();的锁权限。

Worker类属于ThreadPoolExecutor的私有内部类,因此只有ThreadPoolExecutor能够创建该类的实例,作为内部类,该类的实例能够直接访问ThreadPoolExecutor的属性与方法。

Woker源码:

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
//通过ThreadPoolExecutor内的ThreadFactory生成的执行线程
//woker持有该thread,主要是为了保证可以获取到是哪个线程是该woker的运行线程
final Thread thread;
//该woker的第一个任务,可能为null
Runnable firstTask;
//该woker完成的任务总量
volatile long completedTasks;

//AQS中state标志位,0:无锁状态,1:被持有锁状态,>=0:可被中断状态
Worker(Runnable firstTask) {
    setState(-1); // 禁止中断该worker
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);//worker也是runnable,将worker作为其执行线程的target,当其执行线程start()时,JVM会调用worker.run()
}
 
//该方法表明woker是个Runnable,当其执行线程start时,会调用该方法
public void run() {
    runWorker(this);//将自己作为参数,运行自己
}
 
protected boolean isHeldExclusively() {
    return getState() != 0;
}
 
protected boolean tryAcquire(int unused) {
    if (compareAndSetState(0, 1)) {
        setExclusiveOwnerThread(Thread.currentThread());
        return true;
    }
    return false;
}
 
protected boolean tryRelease(int unused) {
    setExclusiveOwnerThread(null);
    setState(0);
    return true;
}
 
public void lock() {
    acquire(1);
}
 
public boolean tryLock() {
    return tryAcquire(1);
}
 
public void unlock() {
    release(1);
}
 
public boolean isLocked() {
    return isHeldExclusively();
}
 
//仅当woker线程运行时,允许中断
void interruptIfStarted() {
    Thread t;
    //getState() >= 0 通过判断state是否大于0,初始情况下state=-1
    //当state < 0,即state = -1时,其执行线程尚未start,因此无需中断其执行线程
    //并不要求获取了woker的锁
    if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
        try {
            t.interrupt();//中断执行线程
        } catch (SecurityException ignore) {
        }
    }
}

}
ThreadPoolExecutor.runWoker()

final void runWorker(ThreadPoolExecutor.Worker w) {
//runWoker()方法只会被Woker.run()调用,而Woker.run()只会在其执行线程start后由jvm调用,因此runWoker()必定只会被对应Worker的执行线程调用
Thread wt = Thread.currentThread();//所以这里当前线程就是w.thread,也就是woker的执行线程
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock();//此时woker的执行线程已经启动,允许外部进行中断,因此将woker的state置为0
boolean completedAbruptly = true;
try {
//getTask()重点,内部其实是从任务队列里获取任务,包含了线程阻塞、挂起、淘汰等一系列逻辑
//正常情况下就是该线程不断从任务队列里取任务
while (task != null || (task = getTask()) != null) {
//这里的这个lock很有意思
//如果仅仅是执行线程执行任务的话,其实执行线程获取woker锁并无太大意义,一方面是因为该方法(runWoker(…))只会运行在执行线程中,不可能有并发情况,另一方面woker本身没有可以共享的资源(这个地方可以再考虑一下:获取的任务、本身执行的线程是否算可共享资源呢),没必要获取这个锁,但这个地方有个隐藏逻辑,就是一旦woker在执行任务,则woker必定是被其执行线程锁住的,因此通过woker锁的状态可以判断woker是否在执行任务。当外部线程想要让worker正常执行完任务,然后再停止woker时,就必须获取该woker的锁,如此就能够保证woker当前正在执行的任务被正常完成
w.lock();
//简短的代码,逻辑复杂,具体分析参见“Worker检测ThreadPoolExecutor是否Stoping及线程状态机制”
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);//单个任务执行之前的钩子函数
Throwable thrown = null;
try {
task.run();//真正的执行任务
} catch (RuntimeException x) {
thrown = x;
throw x;
} catch (Error x) {
thrown = x;
throw x;
} catch (Throwable x) {
thrown = x;
throw new Error(x);
} finally {
afterExecute(task, thrown);//任务执行之后的钩子函数
}
} finally {
task = null;
w.completedTasks++;//完成任务计数
w.unlock();//释放锁,表明当前worker已经完成某个任务的执行
}
}
completedAbruptly = false;//表明线程正常执行完任务,如果为true,则说明执行形成可能中途被中断,或是用户任务发生了异常
} finally {
/* *处理将要结束的worker的收尾工作。
* 1、将当前worker从workerSet中移除
* 2、尝试将线程池过度为Terminated状态
* 3、在线程池仍然需要执行任务的状态下(RUNING、SHUTDOWN),判断是否需要添加新的worker至线程池,添加条件为:1)当前worker为突然终止;2)当前线程池的线程数量小于最小需求线程数量
* 4、走到这行代码一般有两种场景:1)无任务可做,且不会有新任务来了;2)用户任务执行期间发生了异常
*/
processWorkerExit(w, completedAbruptly);
}
}
Worker检测ThreadPoolExecutor是否Stoping及线程状态机制

1、如果ThreadPool处于Stop、Tidying或Terminated,且当前线程未被中断,则中断当前线程;2、如果处理Runing、Shutdown状态,则清除当前中断标志位,返回之前的中断标志,如果之前是中断的,且ThreadPool状态变为了Stop、Tidying或Terminated,则再次中断执行线程;3、这段逻辑保证了:如果当前ThreadPoolExecutor处于RUNING、SHUTDOWN,则中断标志位被清除;否则,直接中断执行线程。4、进一步思考:如果不这样实现会有什么问题?

下图为实际的判断逻辑,如果没有红框部分逻辑,则之前的清除中断标志位可能会导致在箭头处插入的ShutdownNow事件被忽略。

处理woker结束的工作

private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn’t adjusted
//正常结束不需要再去减少workerCount,原因是在getTask()的地方已经预先减过了
decrementWorkerCount();

final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
    completedTaskCount += w.completedTasks;
    workers.remove(w);
} finally {
    mainLock.unlock();
}
 
tryTerminate();
 
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
    if (!completedAbruptly) {
        int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
        if (min == 0 && ! workQueue.isEmpty())
            min = 1;
        if (workerCountOf(c) >= min)
            return; // replacement not needed
    }
    addWorker(null, false);
}

}
线程池里的线程创建与销毁

失效场景:1、空闲worker(当前worker数量超过corePoolSize,且线程keepAliveTime时间内未获取到任务,这也是线程池控制worker超时失效的机制);2、用户程序异常;3、任务完全处理完成

线程池在处理失效线程时,如果仍然需要增加线程,那么会再次通过ThreadFactory创建一个新的线程。

假设如下场景:线程池持续接收新任务,如果新的任务正常执行,那么执行该任务的线程还会继续服役,处理后续的新任务,但是如果新的任务无法正常执行,抛出了异常,那么该线程将由于该异常而而终结,线程池会创建新的线程继续处理后续任务。所以当所有的新任务都抛出异常时,可能会导致线程池单个线程的寿命极短,频繁创建线程,事实上导致线程池失效。

实际上呢?

通过submit()提交给线程池的RunnableTask都被包装为FutureTask了,而FutureTask在run()的时候,对异常进行包装,将outCome输出为Exception,也就是说正常的用户异常根本不会抛出到Worker的runWoker loop里…

但是如果直接调用ThreadPoolExecutor.execute()方法,则不会使用FutureTask包装该Runnable了,包装的过程都是在AbstractExecutorService里完成的,直接通过execute执行任务的话,则会产生之前所述的场景,线程将频繁创建

ThreadPoolExecutor executorService = (ThreadPoolExecutor) Executors.newFixedThreadPool(5, new NamedThreadFactory());
while (true) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
executorService.execute(() -> {//使用Executor.execute()执行,则会出现频繁创建线程池的情况,使用submit()则不会,但其实execute与submit都是异步执行
throw new RuntimeException();
});
}
6、总结
线程池的核心其实是一个基于BlockingQueue的任务生产消费模型,任务的生产方为ThreadPool的使用方,通过submit与execute生产Runnable任务,任务的消费方则为ThreadPool内部的Worker。TheadPool做了很多关于线程池特性的控制:比如核心线程池大小、最大线程池大小、空闲线程的最大存活时间等等,主要就是通过Worker从BlockingQueue中获取任务的状态来控制Worker的销毁。

源码十分精巧,部分代码写的相当简练,但包含了很深的关于状态定义、任务生产、任务消费、超时控制、同步控制、空闲线程销毁、新增线程等一系列特性的实现逻辑,每一步的动作都值得进一步思考。

如果让自己去实现一遍这样的线程池,该如何去实现呢?

最后附上向ThreadPoolExecutor submit任务的执行流程图~

l

问答系统总结

1.开发工具GIt,idea

  • 我是把代码放到github上的,以便于代码版本管理,不能拷贝来拷贝去
  • java语言/规范最基础,要研究jvm,垃圾回收算法思路,根据什么样的规则,怎么回收申请的一些对象;参数可以配置,怎么配置,对应的后台运行又是怎么样的

    2.Spring Boot,Velocity

  • spring框架:spring是怎么做的,view、controller、service是怎么连在一起的:spring是控制反转、依赖注入,解决了数据数据初始化一些问题,能够吧代码写得很简洁,然后脱颖而出,核心是怎么做的,数据初始化是怎么做的。面向切面的编程可以用在什么地方,spring的一般框架是mvc
    :层是controller、中间层是service、底层是dao。为什么这么分?他比其他的区别是什么?对一些request包装了
    找spring一两点深入研究:我不仅仅是用了spring,我还研究了他某一个组件,他是怎么实现的?讲讲(闪光点)

  • velocity(模板语言)
    我用了velocity,我发现了velocity里的很多东西和我学的java、c++都是一样的,他这个框架他把一些公用的都提取出来,前面的spring boot是个java框架,为什么要用velocity,因为我要前后端分离,view和后面的数据要分离,data是怎么传过来的,怎么解析的,他支持什么东西?(不仅仅用了这个东西,还有思想,解耦)

    3.mybatis

  • 怎么把数据库的一些前后的读取给做了,怎么把xml里的多重条件,怎么做文法的解析,然后把这些条件给处理掉。

    4.登录/注册

  • 网站安全(salt):密码为什么加了一个salt就变得安全?

  • 通过拦截器来实现的:拦截器的思想、框架,留好接口;拦截器实现登录注册:我在cookie里放了一个token,token怎么处理用户登录注册的:在用户登录注册的时候,会下发一个token,把token与用户信息关联起来,关联起来之后我为了优化token信息,把token放到数据库里(redis),设计一个分布式的统一登录系统,现在的互联网产品都是统一登录的,比如,登录qq之后,登录网页就不需要登录了,qq登录过的token直接注入到网页上去。这是个ssension共享问题:、

  • 保证数据安全:验证(邮件激活)

    5.前缀树

  • 构造一个前缀树,通过一个有限状态机来实现一传文本是不是包含敏感词,繁杂度是多少 很重要:优点有哪些?为什么不用kmp,文本查找算法:
    可以很快的加一些词汇过来;有扩展性,以及性能更提高

    6.redis

  • 数据结构:跳列表,哈希,优先队列,list:我了解redis底层是怎么实现的,为什么他的效率很高,他的字符串是怎么保存的,做这个工程的时候我用在的异步队列上、排序上、异步框架

    7.异步框架

  • 思路:我这个网站附带的每一步操作可能附带的操作都非常多,为了更快的吧结果返回给用户,所以采用异步框架,自己写的,数据结构:使用redis的队列,因为redis能够保证线程同步;除了用队列,我还想过用有优先队列,这样我的异步框架能够把紧急的任务线处理掉。我这个异步框架:有消息的发射,消息的处理,事件的模型定义以及具体执行的eventhandle,我定义了一些公共接口把这些实现了。

    8.邮件(smtp协议)

  • 做了一个简单邮件,怎么连接上服务器,我当时做这个的时候,ssl问题
    ,ssl理解,服务器需要ssl链接,为了安全服务器是怎么做的;java sdk 1.7 1.8的问题,1.8是需要换一个jar包的

  • 豆瓣电影排序:好的问题能挑选出来,互动越多,时间越新,评分越高。

    9.timeline(时间轴)

  • 肯定会问:为什么用推拉模式,用推实时性高能让好友快速得到消息,用拉能节省僵尸号、不是活跃用户的存储空间。怎么区分?最后把timeline组合起来‘timeline模板系统,每个新鲜事展向不一样,和velocity结合起来,后台存储的都是核心数据,每个数据对应的是一个模板,我把模板结合起来,我就能快速的把时间轴展示出来。

    10.爬虫

    11.solr搜索

  • 搜索去重:对比相似度,敏感哈希算法,哈希算法:两个字符串稍微有一点点不一样,结构就是不一样的。可能头尾是不一样的,内容一样:采用敏感哈希算法把相似度求出来,区别:敏感哈希算法两个文档相似度很高,他生成的哈希值的比例是很相似的。

    12.单元测试/部署

  • 部署:运维,llinux nigix反向代理,与正向对比。负载均衡:为什么要负载均衡。

l

问答系统总结2

1.开发工具GIt,idea

  • 我是把代码放到github上的,以便于代码版本管理,不能拷贝来拷贝去
  • java语言/规范最基础,要研究jvm,垃圾回收算法思路,根据什么样的规则,怎么回收申请的一些对象;参数可以配置,怎么配置,对应的后台运行又是怎么样的

    2.Spring Boot,Velocity

  • spring框架:spring是怎么做的,view、controller、service是怎么连在一起的:spring是控制反转、依赖注入,解决了数据数据初始化一些问题,能够吧代码写得很简洁,然后脱颖而出,核心是怎么做的,数据初始化是怎么做的。面向切面的编程可以用在什么地方,spring的一般框架是mvc
    :层是controller、中间层是service、底层是dao。为什么这么分?他比其他的区别是什么?对一些request包装了
    找spring一两点深入研究:我不仅仅是用了spring,我还研究了他某一个组件,他是怎么实现的?讲讲(闪光点)
  • velocity(模板语言)
    我用了velocity,我发现了velocity里的很多东西和我学的java、c++都是一样的,他这个框架他把一些公用的都提取出来,前面的spring boot是个java框架,为什么要用velocity,因为我要前后端分离,view和后面的数据要分离,data是怎么传过来的,怎么解析的,他支持什么东西?(不仅仅用了这个东西,还有思想,解耦)

    3.mybatis

  • 怎么把数据库的一些前后的读取给做了,怎么把xml里的多重条件,怎么做文法的解析,然后把这些条件给处理掉。

    4.登录/注册

  • 网站安全(salt):密码为什么加了一个salt就变得安全?
  • 通过拦截器来实现的:拦截器的思想、框架,留好接口;拦截器实现登录注册:我在cookie里放了一个token,token怎么处理用户登录注册的:在用户登录注册的时候,会下发一个token,把token与用户信息关联起来,关联起来之后我为了优化token信息,把token放到数据库里(redis),设计一个分布式的统一登录系统,现在的互联网产品都是统一登录的,比如,登录qq之后,登录网页就不需要登录了,qq登录过的token直接注入到网页上去。这是个ssension共享问题:、
  • 保证数据安全:验证(邮件激活)

    5.前缀树

  • 构造一个前缀树,通过一个有限状态机来实现一传文本是不是包含敏感词,繁杂度是多少 很重要:优点有哪些?为什么不用kmp,文本查找算法:
    可以很快的加一些词汇过来;有扩展性,以及性能更提高

    6.redis

  • 数据结构:跳列表,哈希,优先队列,list:我了解redis底层是怎么实现的,为什么他的效率很高,他的字符串是怎么保存的,做这个工程的时候我用在的异步队列上、排序上、异步框架

    7.异步框架

*思路:我这个网站附带的每一步操作可能附带的操作都非常多,为了更快的吧结果返回给用户,所以采用异步框架,自己写的,数据结构:使用redis的队列,因为redis能够保证线程同步;除了用队列,我还想过用有优先队列,这样我的异步框架能够把紧急的任务线处理掉。我这个异步框架:有消息的发射,消息的处理,事件的模型定义以及具体执行的eventhandle,我定义了一些公共接口把这些实现了。

8.邮件(smtp协议)

  • 做了一个简单邮件,怎么连接上服务器,我当时做这个的时候,ssl问题
    ,ssl理解,服务器需要ssl链接,为了安全服务器是怎么做的;java sdk 1.7 1.8的问题,1.8是需要换一个jar包的
  • 豆瓣电影排序:好的问题能挑选出来,互动越多,时间越新,评分越高。

    9.timeline(时间轴)

  • 肯定会问:为什么用推拉模式,用推实时性高能让好友快速得到消息,用拉能节省僵尸号、不是活跃用户的存储空间。怎么区分?最后把timeline组合起来‘timeline模板系统,每个新鲜事展向不一样,和velocity结合起来,后台存储的都是核心数据,每个数据对应的是一个模板,我把模板结合起来,我就能快速的把时间轴展示出来。

    10.爬虫

    11.solr搜索

  • 搜索去重:对比相似度,敏感哈希算法,哈希算法:两个字符串稍微有一点点不一样,结构就是不一样的。可能头尾是不一样的,内容一样:采用敏感哈希算法把相似度求出来,区别:敏感哈希算法两个文档相似度很高,他生成的哈希值的比例是很相似的。

    12.单元测试/部署

  • 部署:运维,llinux nigix反向代理,与正向对比。负载均衡:为什么要负载均衡。
l