victory的博客

长安一片月,万户捣衣声

0%

线程池

1.线程池的作用
对于服务器端的程序,如果服务端每接收到一个任务,创建一个线程,然后进行执行;这样的做法在面对成千上万的任务递交进服务器时,
那么会创建数以万记的线程,这样会时操作系统频繁的进行上下文切换,无故增加系统的负载,而线程的创建和小王都是需要好耗费
系统资源的,也无疑浪费了系统资源。线程池技术(通过使用固定或较为固定数目的线程来完成任务的执行)能够很好地解决这个问题,
消除频繁创建和消亡线程的系统资源开销。
2.线程池的本质
使用了一个线程安全的工作队列连接工作者线程和客户端线程,客户端线程将任务放入工作队列后便返回,而工作者线程则不断地从工作
队列取出工作并执行。当工作队列为空时,所有的工作者线程均等待在工作队列上,当有客户端提交了一个任务之后会通知任意一个工作者
线程,随着大量任务被提交,更多的工作者线程会被唤醒。
3.线程池示例
ThreadPool.java

package concurrency.threadPool;

public interface ThreadPool<Job extends Runnable> {
    //执行一个Job,这个Job需要实现Runnable
    void execute(Job job);
    
    //关闭线程池
    void shutdown();
    
    //增加工作者线程
    void addWorkers(int num);
    
    //减少工作者线程
    void remvoeWorkers(int num);
    
    //得到正在等待执行的任务数量
    int getJobSize();
}

DefaultThreadPool.java

package concurrency.threadPool;

import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;

public class DefaultThreadPool<Job extends Runnable> implements ThreadPool<Job> {
    //线程池最大数量
    private static final int MAX_WORKER_NUMBERS = 10;
    
    //线程池默认数量
    private static final int DEFAULT_WORKER_NUMBERS = 5;
    
    //线程池最小的数量
    private static final int MIN_WORKER_NUMBERS = 1;
    
    //这是一个工作列表,将会向里面插入工作
    private final LinkedList<Job> jobs = new LinkedList<Job>();
    
    //工作者列表
    private final List<Worker> workers = Collections.synchronizedList(new ArrayList<Worker>());
    
    //工作者线程的数量
    private int workerNum = DEFAULT_WORKER_NUMBERS;
    
    //线程编号的生成
    private AtomicLong threadNum = new AtomicLong();
    
    //工作者,负责消费任务
    class Worker implements Runnable{
        //是否工作
        private volatile boolean running = true;
        @Override
        public void run(){
            while(running){
                Job job = null;
                synchronized(jobs){
                    //如果工作者列表是空的,那么就wait
                    while(jobs.isEmpty()){
                        try{
                            jobs.wait();
                        }catch(InterruptedException ex){
                            //感知到外部对WorkerThread的中断操作,返回
                            Thread.currentThread().interrupt();
                            return;
                        }
                    }
                    //取出一个Job
                    job = jobs.removeFirst();
                }
                if(job != null){
                    try{
                        job.run();
                    }catch(Exception ex){
                        //忽略Job执行中的Exception
                    }
                }
            }
        }
        
        public void shutdown(){
            running = false;
        }
    }
    
    //初始化线程工作者
    private void initializeWorkers(int num){
        for(int i = 0; i < num; i++){
            Worker worker = new Worker();
            workers.add(worker);
            Thread thread = new Thread(worker, "ThreadPool-Worker-" + threadNum.incrementAndGet());
            thread.start();
        }
    }
    
    public DefaultThreadPool(){
        initializeWorkers(DEFAULT_WORKER_NUMBERS);
    }
    
    public DefaultThreadPool(int num){
        workerNum = num > MAX_WORKER_NUMBERS ? MAX_WORKER_NUMBERS : num < MIN_WORKER_NUMBERS ? MIN_WORKER_NUMBERS : num;
        initializeWorkers(workerNum);
    }
    
    @Override
    public void execute(Job job) {
        if(job != null){
            //添加一个工作,然后进行通知
            synchronized(jobs){
                jobs.addLast(job);
                jobs.notify();
            }
        }
    }

    @Override
    public void shutdown() {
        for(Worker worker : workers){
            worker.shutdown();
        }
    }

    @Override
    public void addWorkers(int num) {
        synchronized(jobs){
            //限制新增的Worker数量不能超过最大值
            if(num + this.workerNum > MAX_WORKER_NUMBERS){
                num = MAX_WORKER_NUMBERS - this.workerNum;
            }
            initializeWorkers(num);
            this.workerNum += num;
        }
    }

    @Override
    public void remvoeWorkers(int num) {
        synchronized(jobs){
            if(num >= this.workerNum){
                throw new IllegalArgumentException("beyond workNum");
            }
            //按照给定的数量停止Worker
            int count = 0;
            while(count < num){
                Worker worker = workers.get(count);
                if(workers.remove(worker)){
                    worker.shutdown();
                    count++;
                }
            }
            this.workerNum -= count;
        }
    }

    @Override
    public int getJobSize() {
        return jobs.size();
    }
}

等待/通知机制

等待通知机制,是指一个线程A调用了对象O的wait()方法进入等待状态,而另一个线程B调用了对象O的notify()或着notifyAll()方法,线程A收到通知后从对象O的wait()方法返回,进而执行后续操作。
示例代码

package concurrency;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.TimeUnit;

public class WaitNotify {
    static boolean flag = true;
    static Object lock = new Object();
    
    public static void main(String[] args) throws InterruptedException {
        Thread waitThread = new Thread(new Wait(), "WaitThread");
        waitThread.start();
        TimeUnit.SECONDS.sleep(1);
        Thread notifyThread = new Thread(new Notify(), "NotifyThread");
        notifyThread.start();
    }
    
    static class Wait implements Runnable{
        @Override
        public void run(){
            //加锁,用于lock的Monitor
            synchronized(lock){
                //当条件不满足时,继续wait,同时释放了lock的锁
                while(flag){
                    try {
                        System.out.println(Thread.currentThread()+" flag is true. wait@ " + new SimpleDateFormat("HH:mm:ss").format(new Date()));
                        lock.wait();
                    } catch (InterruptedException e) {
                    }
                }
                //条件满足时,完成工作
                System.out.println(Thread.currentThread() + "flag is false. running@ " + new SimpleDateFormat("HH:mm:ss").format(new Date()));
            }
        }
    }
    
    static class Notify implements Runnable{
        @Override
        public void run() {
            synchronized(lock){
                //获取lock的锁,然后进行通知,通知时不会释放lock的锁
                //直到当前线程释放了lock后,WaitThread才能从wait方法中返回
                System.out.println(Thread.currentThread() + " hold lock. notify @ " + new SimpleDateFormat("HH:mm:ss").format(new Date()));
                lock.notifyAll();
                flag = false;
                SleepUtils.second(5);
            }
            
            //再次加锁
            synchronized(lock){
                System.out.println(Thread.currentThread()+ "hold lock again. sleep@ " + new SimpleDateFormat("HH:mm:ss").format(new Date()));
                SleepUtils.second(5);
            }
            
        }
    }
}

等待超时模式构造简单的数据库连接池

ConnectionPool.java

package concurrency.connetionPool;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.sql.Connection;
import java.util.LinkedList;
import java.util.concurrent.TimeUnit;

class ConnectionDriver{
    static class ConnectionHandler implements InvocationHandler{
        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            if(method.getName().equals("commit")){
                TimeUnit.MILLISECONDS.sleep(100);
            }
            return null;
        }
    }
    
    //创建一个Connection的代理,在commit时休眠100毫秒
    public static final Connection createConnection(){
        return (Connection) Proxy.newProxyInstance(ConnectionDriver.class.getClassLoader(), new Class<?>[] { Connection.class }, new ConnectionHandler());
    }
}

public class ConnectionPool {
    private LinkedList<Connection> pool = new LinkedList<Connection>();
    
    public ConnectionPool(int initialSize){
        if(initialSize > 0){
            for(int i = 0; i < initialSize; i++){
                pool.addLast(ConnectionDriver.createConnection());
            }
        }
    }
    
    public void releaseConnection(Connection connection){
        if(connection != null){
            synchronized(pool){
                //连接释放后需要进行通知,这样其他消费者能够感知到连接池中已经归还了一个连接
                pool.addLast(connection);
                pool.notifyAll();
            }
        }
    }
    
    //在mills内无法获取到连接,将会返回null
    public Connection fetchConnection(long mills) throws InterruptedException{
        synchronized(pool){
            //完全超时
            if(mills < 0){
                while(pool.isEmpty()){
                    pool.wait();
                }
                return pool.removeFirst();//Removes and returns the first element from this list.
            }else{
                long future = System.currentTimeMillis() + mills;
                long remaining = mills;
                while(pool.isEmpty() && remaining > 0){
                    pool.wait(remaining);
                    remaining = future - System.currentTimeMillis();
                }
                Connection result = null;
                if(!pool.isEmpty()){
                    result = pool.removeFirst();
                }
                return result;
            }
        }
    }
}

ConnectionPoolTest.java

package concurrency.connetionPool;

import java.sql.Connection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

public class ConnectionPoolTest {
    static ConnectionPool pool = new ConnectionPool(10);
    
    //保证所有ConnectionRunner能够同时开始
    static CountDownLatch start = new CountDownLatch(1);
    
    //main线程将会等待所有ConnectionRunner结束后才能继续执行
    static CountDownLatch end;
    
    public static void main(String[] args) throws InterruptedException{
        //线程数量,可以修改线程数量进行观察
        int threadCount = 50;
        end = new CountDownLatch(threadCount);
        int count = 20;
        AtomicInteger got = new AtomicInteger();
        AtomicInteger notGot = new AtomicInteger();
        for(int i = 0; i < threadCount; i++){
            Thread thread = new Thread(new ConnectionRunner(count, got, notGot), "ConnectionRunnerThread");
            thread.start();
        }
        start.countDown();//使latch的值减1,如果减到了0,则会唤醒所有等待在这个latch上的线程。
        end.await();//使当前线程进入同步队列进行等待,直到latch的值被减到0或者当前线程被中断,当前线程就会被唤醒。
        System.out.println("total invoke: " + (threadCount * count));
        System.out.println("got connection: " + got);
        System.out.println("not got connection: " + notGot);
    }
    static class ConnectionRunner implements Runnable{
        int count;
        AtomicInteger got;
        AtomicInteger notGot;
        
        public ConnectionRunner(int count, AtomicInteger got, AtomicInteger notGot){
            this.count = count;
            this.got = got;
            this.notGot = notGot;
        }
        
        public void run(){
            try{
                start.await();
            }catch(Exception ex){
                
            }
            while(count > 0){
                try{
                    //从线程池中获取连接,如果1000ms内无法获取到,将会返回null
                    //分别统计连接获取的数量got和未获取到的数量notGot
                    Connection connection = pool.fetchConnection(1000);
                    if(connection != null){
                        try{
                            connection.createStatement();
                            connection.commit();
                        }finally{
                            pool.releaseConnection(connection);
                            got.incrementAndGet();
                        }
                    }else{
                        notGot.incrementAndGet();
                    }
                }catch(Exception ex){
                    
                }finally{
                    count--;
                }
                
            }
            end.countDown();
        }
    }
}

volatile写读的内存语义和锁的释放获取的内存语义

volatile写、读的内存语义

当写一个volatile变量时,JMM会把该线程对应的本地内存中的共享变量值刷新到主存。

当读一个volatile变量时,JMM会把该线程对应的本地内存置为无效。线程接下来将从主内存中读取共享变量。

volatile内存语义的实现

编译器生成字节码时,会在指令序列中插入内存屏障来禁止特定类型的处理器重排序。
JMM基于保守策略的JMM内存屏障插入策略如下:
(1)在每个volatile写操作的前面插入一个StoreStore屏障。
(2)在每个volatile写操作的后面插入一个StoreLoad屏障。
(3)在每个volatile读操作的后面插入一个LoadLoad屏障。
(4)在每个volatile读操作的后面插入一个LoadStore屏障。

锁的释放、获取的内存语义

当线程释放锁时,JMM会把该线程对应的本地内存中的共享变量刷新到主内存中。
当线程获取锁时,JMM会把该线程对应的本地内存置为无效。从而使得被监视器保护的临界区代码必须从主内存中读取共享变量。
注:锁释放与volatile写有相同的内存语义;锁获取与volatile读具有相同的内存语义。

volatile、synchronized和原子操作

volatile

volatile是轻量级的synchronized,它在多处理器并发中保证了共享变量的“可见性”。可见性是指当一个线程修改一个共享变量时
,另外一个线程能读到这个修改的值(Java内存模型确保所有线程看到这个变量的值是一致的)。

volatile的两条实现原则

(1)Lock前缀指令会引起处理器缓存回写到内存。
(2)一个处理器的缓存回写到内存会导致其他处理器的缓存无效。

volatile的使用优化

使用追加到64字节的方式来填满高速缓冲区的缓存行,避免头节点和尾节点加载到同一个缓存行,使头、尾节点在修改时不会互相锁定。

synchronized实现原理

JVM基于进入和推出Monitor对象来实现方法同步和代码块同步,但两者实现细节不一样。
代码块同步使用monitorenter和monitorexit指令实现的。monitorenter指令在编译后插入到同步代码块的开始位置,而monitorexit是
插入到方法结束处和异常处,JVM要博爱正每个monitorenter必须有对应的monitorexit与之配对。任何对象都有一个monitor与之关联,
当且一个monitor被持有后,它将处于锁定状态。线程执行到monitorenter指令时,将会尝试获取对象monitor的所有权,即尝试获得对象的锁。

原子操作

java如何实现原子操作?
通过循环CAS的方式实现原子操作。JVM中CAS操作使用处理器提供的CMPXCHG指令实现。自旋CAS实现的基本思路就是循环及进行CAS操作指导成功为止。

使用CAS实现线程安全计数器

package concurrency;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

public class Counter {
    private AtomicInteger atomicI = new AtomicInteger(0);
    private int i = 0;
    
    public static void main(String[] args) {
        final Counter cas = new Counter();
        List<Thread> ts = new ArrayList<Thread>(600);
        long start = System.currentTimeMillis();
        for(int j = 0; j < 100; j++){
            Thread t = new Thread(new Runnable(){
                @Override
                public void run(){
                    for(int i = 0; i < 10000; i++){
                        cas.count();
                        cas.safeCount();
                    }
                }
            });
            ts.add(t);
        }
        for(Thread t : ts){
            t.start();
        }
        //等待所有线程执行完成
        for(Thread t : ts){
            try {
                t.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println(cas.i);
        System.out.println(cas.atomicI.get());
        System.out.println(System.currentTimeMillis() - start);
    }
    
    //使用CAS实现线程安全计数器
    private void safeCount(){
        for(;;){
            int i = atomicI.get();
            boolean suc = atomicI.compareAndSet(i, ++i);
            if(suc){
                break;
            }
        }
    }
    
    //非线程安全计数器
    private void count(){
        i++;
    }
}

CAS实现原子操作的三大问题

(1)ABA问题。可以使用版本号解决。JDK Atomic AtomicStampedReference可以解决ABA问题。
(2)循环时间长开销大。自旋CAS如果长时间不成功,会给CPU带来非常大的执行开销。
(3)只能保证一个共享变量的原子操作。对多个共享变量操作可以使用锁/将多个共享变量合并成一个共享变量来操作(两个共享变量i=2,j=a,合并以下ij=2a,然后采用CAS来操作)。

上下文切换和死锁

上下文切换

CPU通过时间片分配算法来循环执行任务,当前任务执行一个时间片后会切换到下一个任务。但是,再切换前会保存上一个任务的
状态,以便切换回这个任务时,可以再加载这个任务的状态。所以任务从保存到再加载的过程就是一次上下文切换。频繁的上下文
切换会影响多线程的执行速度。

如何减少上下文切换

(1)无锁并发编程。多线程竞争锁时,会引起上下文切换,所以多线程处理数据时,可以用一些办法来避免使用锁(如采用分段锁,不同的线程处理不同段的数据)
(2)CAS算法。
(3)使用最小线程。避免创建不需要的线程(任务很少,创建了很多线程,造成大量线程阻塞等待)

死锁

死锁产生的条件

(1)互斥。一个资源同一时刻只能被一个线程拥有。
(2)请求和保持。线程在请求新的资源时,不释放已经拥有的资源。
(3)不剥夺条件。进程所获得的资源在未使用完之前,不被其他的线程强行剥夺。
(4)循环等待。竞争资源的各个线程形成一个线程等待环路。

避免死锁

破坏产生死锁的条件:
(2)在进程开始执行时就申请他所需的全部资源
(3)一个进程不能获得所需要的全部资源时便处于等待状态,等待期间他占有的资源将被隐式的释放重新加入到系统的资源列表中
,可以被其他的进程使用,而等待的进程只有重新获得自己原有的资源以及新申请的资源才可以重新启动、执行。
(4)资源有序分配(银行家算法)

死锁Demo

package concurrency;

public class DeadLockDemo {
    private static String A = "A";
    private static String B = "B";
    
    public static void main(String[] args) {
        new DeadLockDemo().deadLock();
    }
    
    private void deadLock(){
        Thread t1 = new Thread(new Runnable(){
            @Override
            public void run(){
                synchronized(A){
                    try {
                        Thread.currentThread().sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    synchronized(B){
                        System.out.println("1");
                    }
                }
            }
        });
        
        Thread t2 = new Thread(new Runnable(){
            @Override
            public void run() {
                synchronized(B){
                    synchronized(A){
                        System.out.println("2");
                    }
                }
            }
        });
        
        t1.start();
        t2.start();
    }
}

缓存和数据库的一致性问题

1、想要提高应用的性能,可以引入「缓存」来解决
2、引入缓存后,需要考虑缓存和数据库一致性问题,可选的方案有:「更新数据库 + 更新缓存」、「更新数据库 + 删除缓存」
3、更新数据库 + 更新缓存方案,在「并发」场景下无法保证缓存和数据一致性,且存在「缓存资源浪费」和「机器性能浪费」的情况发生
4、在更新数据库 + 删除缓存的方案中,「先删除缓存,再更新数据库」在「并发」场景下依旧有数据不一致问题,解决方案是「延迟双删」,但这个延迟时间很难评估,所以推荐用「先更新数据库,再删除缓存」的方案
5、在「先更新数据库,再删除缓存」方案下,为了保证两步都成功执行,需配合「消息队列」或「订阅变更日志」的方案来做,本质是通过「重试」的方式保证数据一致性
6、在「先更新数据库,再删除缓存」方案下,「读写分离 + 主从库延迟」也会导致缓存和数据库不一致,缓解此问题的方案是「延迟双删」,凭借经验发送「延迟消息」到队列中,延迟删除缓存,同时也要控制主从库延迟,尽可能降低不一致发生的概率
参考资料

Redis集群搭建

1.下载Redis并将redis放到linux目录/opt下
2.在opt下创建myRedis文件夹,并备份redis.conf到此处
3.安装ruby环境
(1)yum install ruby
(2)yum install rubygems
4.创建6个实例(此处使用相同主机的不同端口来模拟多个节点)
通过include引入备份文件redis.conf(redis.conf中需要修改配置:protected-mode no daemonize yes appendonly no)的内容并修改每个节点不同的信息
(1)redis6379.conf

include /opt/myRedis/redis.conf
pidfile "/var/run/redis6379.pid"
port 6379
dbfilename "dump6379.rdb"
cluster-enabled yes
cluster-config-file nodes-6379.conf
cluster-node-timeout 15000

(2)redis6380.conf

include /opt/myRedis/redis.conf
pidfile "/var/run/redis6380.pid"
port 6380
dbfilename "dump6380.rdb"
cluster-enabled yes
cluster-config-file nodes-6380.conf
cluster-node-timeout 15000

(3)redis6381.conf

include /opt/myRedis/redis.conf
pidfile "/var/run/redis6381.pid"
port 6381
dbfilename "dump6381.rdb"
cluster-enabled yes
cluster-config-file nodes-6381.conf
cluster-node-timeout 15000

(4)redis6389.conf

include /opt/myRedis/redis.conf
pidfile "/var/run/redis6389.pid"
port 6389
dbfilename "dump6389.rdb"
cluster-enabled yes
cluster-config-file nodes-6389.conf
cluster-node-timeout 15000

(5)redis6390.conf

include /opt/myRedis/redis.conf
pidfile "/var/run/redis6390.pid"
port 6390
dbfilename "dump6390.rdb"
cluster-enabled yes
cluster-config-file nodes-6390.conf
cluster-node-timeout 15000

(6)redis6391.conf

include /opt/myRedis/redis.conf
pidfile "/var/run/redis6391.pid"
port 6391
dbfilename "dump6391.rdb"
cluster-enabled yes
cluster-config-file nodes-6391.conf
cluster-node-timeout 15000

5.启动所有redis实例,nodes-xxxx.conf文件都生成正常。

6.将六个实例组合到一个集群当中

redis-cli --cluster create 192.168.1.108:6379 192.168.1.108:6380 192.168.1.108:6381 192.168.1.108:6389 192.168.1.108:6390 192.168.1.108:6391 --cluster-replicas 1

7.启动客户端并查看集群

Redis关闭服务报错

在linux中执行redis-cli shutdown命令来关闭redis服务时报错:

(error) ERR Errors trying to SHUTDOWN. Check logs.

解决办法

1)执行vim命令打开redis.conf文件

vim redis.conf

2)在配置文件中找到logfile “”,并修改为 logfile “/opt/myRedis/redis_log.log”
3)修改日志文件redis_log.log的文件权限

sudo chmod 777 /opt/myRedis/redis_log.log

4)强制关闭redis服务

kill -9 当前redis服务的进程号

5)重启redis服务(案例将redis的配置文件做了备份,放在/opt/myRedis/目录下)

redis-server redis.conf