victory的博客

长安一片月,万户捣衣声

0%

Redis集群

  1. 问题
    1)服务器内存容量不够,redis如何进行扩容?
    2)大量并发写操作, redis如何分摊?
  2. 集群
    Redis 集群实现了对Redis的水平扩容,即启动N个redis节点,将整个数据库分布存储在这N个节点中,每个节点存储总数据的1/N
    注:Redis 集群通过分区(partition)来提供一定程度的可用性(availability): 即使集群中有一部分节点失效或者无法进行通讯, 集群也可以继续处理命令请求。
  3. slots
    一个 Redis 集群包含 16384 个插槽(hash slot), 数据库中的每个键都属于这 16384 个插槽的其中一个, 集群使用公式 CRC16(key) % 16384 来计算键 key 属于哪个槽, 其中 CRC16(key) 语句用于计算键 key 的 CRC16 校验和 。
    集群中的每个节点负责处理一部分插槽。举个例子, 如果一个集群可以有主节点,其中:节点A负责处理0号至5500号插槽,节点B负责处理5501号至11000号插槽,节点C负责处理11001号至16383号插槽。
  4. 集群的优点
    1)实现扩容
    2)分摊压力
    3)无中心配置相对简单
  5. 集群的缺点
    1)多键操作是不被支持的(这些键有可能存在不同的服务器上,可以通过{}来定义组的概念,从而使key中{}内相同内容的键值对放到一个slot中去。)
    2)多键的Redis事务是不被支持的。lua脚本不被支持。
    3)由于集群方案出现较晚,很多公司已经采用了其他的集群方案,而代理或者客户端分片的方案想要迁移至redis cluster,需要整体迁移而不是逐步过渡,复杂度较大。

Redis主从复制

  1. 主从复制
    主机数据更新后根据配置和策略,自动同步到备机的master/slaver机制,master以写为主,slave以读为主
  2. 复制原理
    (1)每次从机联通后,都会给主机发送sync指令
    (2)主机立刻进行存盘操作并发送RDB文给从机
    (3)从机收到RDB文件后,进行全盘加载
    (4)之后每次主机的写操作,都会立刻发送给从机,从机执行相同的命令
  3. 哨兵模式
    能够后台监控主机是否故障,如果故障了根据投票数自动将从机转换为主机
  4. 故障恢复
    (1)如果发生故障,从下线的主服务的所有从服务里挑选一个从服务,将其转成主服务
    选择条件:
    1)选择优先级靠前的(优先级可以在redis.conf中设置slave-priority)
    2)选择偏移量最大的(偏移量最大指获得原主服务数据最多的服务)
    3)选择runid最小的从服务(每个Redis实例启动后都会随机生成一个40为的runid)
  5. 优点
    (1)读写分离
    (2)容灾快速恢复

Redis持久化方式

  1. 两个不同形式得持久化方式
    RDB(Redis DataBase)
    AOF(Append Of File)
  2. RDB
  3. 1 RDB
    在指定的时间间隔内将内存中的数据集快照(Snapshot)写入磁盘,恢复时将快照文件直接读到内存中。
  4. 2 备份
    Redis会单独创建(fork)一个子进程来进行持久化,会先将数据写入到一个临时文件中,待持久化过程都结束了,再用这个临时文件替换上次持久化好的文件。整个过程中,主进程是不进行任何IO操作的,这就确保了极高的性能;
  5. 3 适用
    大规模数据的恢复且对于数据恢复的完整性不是非常敏感
  6. 4 优点
    1)节省磁盘空间
    2)恢复速度快
  7. 5 缺点
    1)虽然Redis在fork时使用了写时拷贝技术,但是如果数据庞大时还是比较消耗性能。
    2)在备份周期在一定间隔时间做一次备份,所以如果Redis意外down掉的话,就会丢失最后一次快照后的所有修改。
  8. AOF
  9. 1 AOF
    以日志的形式来记录每个写操作,将Redis执行过的所有写指令记录下来(读操作不记录),只许追加文件但不可以改写文件,Redis启动之初会读取该文件重新构建数据,换言之,Redis重启的话就根据日志文件的内容将写指令从前到后执行一次以完成数据的恢复工作。
  10. 2 AOF同步频率设置:
    1)始终同步
    2)每秒同步
    3)不主动进行同步
  11. 3 Rewrite:
    AOF采用文件追加方式,文件会越来越大为避免出现此种情况,新增了重写机制,当AOF文件的大小超过所设定的阈值时,Redis就会启动AOF文件的内容压缩,只保留可以恢复数据的最小指令集.可以使用命令bgrewriteaof。

如何实现重写?
AOF文件持续增长而过大时,会fork出一条新进程来将文件重写(也是先写临时文件最后再rename),遍历新进程的内存中数据,每条记录有一条的Set语句。重写aof文件的操作,并没有读取旧的aof文件,而是将整个内存中的数据库内容用命令的方式重写了一个新的aof文件,这点和快照有点类似。
何时重写
系统载入时或者上次重写完毕时,Redis会记录此时AOF大小,设为base_size,如果Redis的AOF当前大小>= base_size +base_size*100% (默认)或当前大小>=64mb(默认)的情况下,Redis会对AOF进行重写。
3.4 AOF的优点:
1)备份机制更稳健,丢失数据概率更低。
2)可读的日志文本,通过操作AOF文件,可以处理误操作。
3.5 AOF的缺点:
1)比起RDB占用更多的磁盘空间。
2)恢复备份速度要慢。
3)每次读写都同步的话,有一定的性能压力。
4)存在个别Bug,造成不能恢复。
注意:
1)AOF和RDB同时开启,系统默认取AOF的数据
2)推荐AOF和RDB都使用
3)如果对数据不敏感,可以选单独用RDB
4)不建议单独用 AOF,因为可能会出现Bug
5)如果只是做纯内存缓存,可以都不用

秒杀

秒杀需要解决的问题

解决商品库存计数器和秒杀成功的用户记录的事务操作

秒杀遇到的三个问题

1.连接超时问题 —> 使用Redis连接池
2.超卖(卖出的商品数量超过商品库存数量) —> 使用事务
3.库存遗留问题(并发的请求中只有一个请求能够秒杀成功造成库存遗留) —> 使用lua脚本

阅读全文 »

事务

Redis中事务的定义

Redis事务是一个单独的隔离操作:事务中的所有命令都会序列化、按顺序地执行。事务在执行的过程中,不会被其他客户端发送来的命令请求所打断
Redis事务的主要作用就是串联多个命令防止别的命令插队

事务相关的命令(multi、exec、discard)

1.从输入Multi命令开始,输入的命令都会依次进入命令队列中,但不会执行,至到输入Exec后,Redis会将之前的命令队列中的命令依次执行。
2.组队的过程中可以通过discard来放弃组队。

阅读全文 »

创建线程的三种方式

创建线程有三种方式,分别是继承Thread类、实现Runnable接口、实现Callable接口。

1.通过继承Thread类来创建并启动线程的步骤如下:
1.1定义Thread类的子类,并重写该类的run()方法,该run()方法将作为线程执行体。
1.2创建Thread子类的实例,即创建了线程对象。
1.3调用线程对象的start()方法来启动该线程。
2.通过实现Runnable接口来创建并启动线程的步骤如下:
2.1定义Runnable接口的实现类,并实现该接口的run()方法,该run()方法将作为线程执行体。
2.2创建Runnable实现类的实例,并将其作为Thread的target来创建Thread对象,Thread对象为线程对象。
2.3调用线程对象的start()方法来启动该线程。
3.通过实现Callable接口来创建并启动线程的步骤如下:
3.1创建Callable接口的实现类,并实现call()方法,该call()方法将作为线程执行体,且该call()方法有返回值。然后再创建Callable实现类的实例。
3.2使用FutureTask类来包装Callable对象,该FutureTask对象封装了该Callable对象的call()方法的返回值。
3.3使用FutureTask对象作为Thread对象的target创建并启动新线程。
3.4调用FutureTask对象的get()方法来获得子线程执行结束后的返回值。

阅读全文 »

创建线程池的7种方式

import java.util.Date;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolCreationTest {
    public static void fixedThreadPool(){
//        ExecutorService threadPool = Executors.newFixedThreadPool(2);
//        
//        Runnable runnable = new Runnable(){
//            @Override
//            public void run(){
//                System.out.println("任务被执行,线程:" + Thread.currentThread().getName());
//            }
//        };
//        
//        Future<?> submit = threadPool.submit(runnable);
////        System.out.println(submit);
////        System.out.println(submit.isDone());
//        threadPool.execute(runnable);
//        threadPool.execute(runnable);
//        threadPool.execute(runnable);
        
        ExecutorService threadPool = Executors.newFixedThreadPool(2);
        for(int i = 0; i < 4; i++){
            threadPool.execute(() -> {
                System.out.println("任务被执行,线程:" + Thread.currentThread().getName());
            });
        }
    }
    
    public static void cachedThreadPool(){
        ExecutorService threadPool = Executors.newCachedThreadPool();
        for(int i=0;i<10;i++){
            threadPool.execute(() -> {
                System.out.println("任务被执行,线程:" + Thread.currentThread().getName());
            });
        }
    }
    
    public static void singleThreadPool(){
        ExecutorService threadPool = Executors.newSingleThreadExecutor();
        for(int i = 0; i < 10; i++){
            threadPool.execute(() -> {
                System.out.println("任务被执行,线程:" + Thread.currentThread().getName());
            });
        }
    }
    
    public static void scheduledThreadPool(){
        ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(5);
        System.out.println("添加任务,时间:"+new Date());
        threadPool.schedule(()->{
            System.out.println("任务被执行,时间:"+new Date());
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, 1, TimeUnit.SECONDS);
    }
    
    public static void singleThreadScheduledPool(){
        ScheduledExecutorService threadPool = Executors.newSingleThreadScheduledExecutor();
        System.out.println("添加任务,时间:"+new Date());
        threadPool.schedule(()->{
            System.out.println("任务被执行,时间:"+new Date());
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, 2, TimeUnit.SECONDS);
    }
    
    public static void workStealingPool(){
        ExecutorService threadPool = Executors.newWorkStealingPool();
        for(int i = 0; i < 10; i++){
            final int index = i;
            threadPool.execute(() ->{
                System.out.println(index + "被执行,线程名"+Thread.currentThread().getName());
            });
        }
        while(!threadPool.isTerminated()){
            
        }
    }
    
    //推荐使用
    //阿里巴巴开发手册:线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
    public static void myThreadPoolExecutor(){
        final int CORE_POOL_SIZE = 5;  
        final int MAX_POOL_SIZE = 10;
        final int QUEUE_CAPACITY = 100;
        final Long KEEP_ALIVE_TIME = 1L;
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
                CORE_POOL_SIZE,
                MAX_POOL_SIZE,
                KEEP_ALIVE_TIME,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(QUEUE_CAPACITY),
                new ThreadPoolExecutor.CallerRunsPolicy()
                );
        for(int i = 0; i < 10; i++){
            final int index = i;
            threadPool.execute(() -> {
                System.out.println(index+"被执行,线程名:"+Thread.currentThread().getName());
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        
    }
    
    
    public static void main(String[] args) {
        //fixedThreadPool();
        //cachedThreadPool();
        //singleThreadPool();
        //scheduledThreadPool();
        //singleThreadScheduledPool();
        //workStealingPool();
        myThreadPoolExecutor();
    }
}

ThreadLocal示例

ThreadLocal:
通常情况下,我们创建的变量是可以被任何一个线程访问并修改的。如果想实现每一个线程都有自己的专属本地变量该如何解决呢? JDK 中提供的ThreadLocal类正是为了解决这样的问题。
ThreadLocal类主要解决的就是让每个线程绑定自己的值,可以将ThreadLocal类形象的比喻成存放数据的盒子,盒子中可以存储每个线程的私有数据。
如果你创建了一个ThreadLocal变量,那么访问这个变量的每个线程都会有这个变量的本地副本,这也是ThreadLocal变量名的由来。他们可以使用 get() 和 set() 方法来获取默认值或将修改其值,从而避免了线程安全问题。

代码:

import java.text.SimpleDateFormat;
import java.util.Random;

public class ThreadLocalTest implements Runnable{

    //private static final ThreadLocal<SimpleDateFormat> formatter = ThreadLocal.withInitial(()->new SimpleDateFormat("yyyyMMdd HHmm"));
    private static final ThreadLocal<SimpleDateFormat> formatter = new ThreadLocal<SimpleDateFormat>(){
        @Override
        protected SimpleDateFormat initialValue(){
            return new SimpleDateFormat("yyyyMMdd HHmm");
        }
    };
    
    @Override
    public void run() {
        System.out.println("Thread Name="+Thread.currentThread().getName()+" default formatter="+formatter.get().toPattern());
        try {
            Thread.sleep(new Random().nextInt(1000));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        formatter.set(new SimpleDateFormat());
        System.out.println("Thread Name="+Thread.currentThread().getName()+" current formatter="+formatter.get().toPattern());
        
    }
    
    public static void main(String[] args) throws InterruptedException{
        ThreadLocalTest obj = new ThreadLocalTest();
        for(int i=0;i<10;i++){
            Thread t = new Thread(obj, ""+i);
            Thread.sleep(new Random().nextInt(1000));
            t.start();
        }
    }
    
}

参考资料

package SingletonTest;

class Singleton{
    private volatile static Singleton instance;
    
    private Singleton(){
        
    }
    
    public static Singleton getInstance(){
        if(instance == null){
            synchronized (Singleton.class) {
                if(instance == null){
                    instance = new Singleton();
                }
                //instance = new Singleton();
            }
        }
        return instance;
    }
}

public class MyThread extends Thread{
    @Override
    public void run(){
        System.out.println(Singleton.getInstance().hashCode());
    }
    
    public static void main(String[] args){
        MyThread[] myThread = new MyThread[10];
        for(int i=0;i<myThread.length;i++){
            myThread[i] = new MyThread();
        }
        
        for(int i=0;i<myThread.length;i++){
            myThread[i].start();
        }
    }
}

双重锁的运行结果:

1156205522
1156205522
1156205522
1156205522
1156205522
1156205522
1156205522
1156205522
1156205522
1156205522

去掉第二重锁的运行结果(产生了多例):

1700548907
1486202908
1486202908
1700548907
1486202908
1486202908
1700548907
1486202908
1486202908
1700548907