victory的博客

长安一片月,万户捣衣声

0%

线程池

线程池的实现原理

线程池的使用
(1)线程池的创建

new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime,unit,workQueue, handler)

Parameters:
    corePoolSize: the number of threads to keep in the pool, even if they are idle, unless allowCoreThreadTimeOut is set
    maximumPoolSize: the maximum number of threads to allow in the pool
    keepAliveTime: when the number of threads is greater than the core, this is the maximum time that excess idle threads will wait for new tasks before terminating.
    unit the time: unit for the keepAliveTime argument
    workQueue:the queue to use for holding tasks before they are executed. This queue will hold only the Runnable tasks submitted by the execute method.
    threadFactory: set thread factory
    handler: the handler to use when execution is blocked because the thread bounds and queue capacities are reached
    
workQueue:
    ArrayBlockingQueue:基于数组结构的有界阻塞队列
    LinkedBlockingQueue:基于链表结果的阻塞队列
    SynchronousQueue:不存储元素的阻塞队列
    PriorityBlockingQueue:具有优先级的无限阻塞队列

handler:
    AbortPolicy:直接抛出RejectedExecutionException异常
    CallerRunsPolicy:使用调用者所在线程来执行任务
    DiscardOldestPolicy:丢弃掉在队列中存在时间最久的任务
    DiscardPolicy:默认丢弃任务,不进行任何通知

创建线程池的7种方式
Executor框架

阅读全文 »

Exchanger

Exchanger用于进行线程间的数据交换。它提供一个同步点,在这个同步点,两个线程可以交换彼此的数据。这两个线程
通过exchange方法交换数据,如果第一个线程先执行exchange方法,它会一直等待第二个线程也执行exchange方法,当
两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产出来的数据传递给对方。
应用场景
(1)遗传算法
遗传算法里需要选出两个人作为交配对象,这时候会交换两个人的数据,并使用交叉规则得出2个交配结果。
(2)校对工作
例如,我们需要将纸制银行流水通过人工的方式录入成电子银行流水,为了避免错误,采用AB岗两个人进行录入,录入到
Excel后,系统需要加载这两个Excel,并对两个Excel数据进行校对,看看是否录入一致。
示例代码:

package concurrency.exchanger;

import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ExchangerTest {
    private static final Exchanger<String> exgr = new Exchanger<String>();
    
    private static ExecutorService threadPool = Executors.newFixedThreadPool(2);
    
    public static void main(String[] args) {
        threadPool.execute(new Runnable(){
            @Override
            public void run() {
                try{
                    String A = "银行流水A";//A录入银行流水数据
                    exgr.exchange(A);
                }catch(InterruptedException e){
                    e.printStackTrace();
                }
            }
            
        });
        
        threadPool.execute(new Runnable(){
            @Override
            public void run() {
                try{
                    String B = "银行流水B";//B录入银行流水数据
                    String A = exgr.exchange(B);
                    System.out.println("A和B数据是否一致:"+A.equals(B)+",A录入的是:"+A+",B录入的是:"+B);
                }catch(InterruptedException e){
                    e.printStackTrace();
                }
            }
            
        });
    }
}

Semaphore

Semaphore(信号量)是用来控制同时访问特定资源的数量,它通过协调各个线程,以保证合理的使用公共资源。
应用场景:
Semaphore可以用于做流量控制,特别是公共资源优先的应用场景,比如数据库连接
示例代码:

package concurrency.semaphore;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

public class SemaphoreTest {
    private static final int THREAD_COUNT = 30;
    
    private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);
    
    private static Semaphore s = new Semaphore(10);
    
    public static void main(String[] args) {
        for(int i = 0; i < THREAD_COUNT; i++){//虽然有30个线程在执行,但是只允许10个并发执行
            threadPool.execute(new Runnable(){
                @Override
                public void run(){
                    try{
                        s.acquire();
                        System.out.println("save data");
                        s.release();
                    }catch(InterruptedException e){
                        e.printStackTrace();
                    }
                }
            });
        }
        threadPool.shutdown();
    }
}

Java中的13个原子操作类

在java.util.concurrent.atomic包中包含了12个原子类,属于四种类型的原子更新方式,分别是:
(1)原子更新基本类型
AtomicInteger:原子更新整型;
AtomicBoolean:原子更新布尔类型;
AtomicLong:源自更新长整型。
(2)原子更新数组
AtomicIntegerArray:原子更新整型数组里的元素;
AtomicLongArray:原子更新长整型数组里的元素;
AtomicReferenceArray:原子更新引用类型数组里的元素
(3)原子更新引用
AtomicReference:原子更新引用类型;
AtomicReferenceFieldUpdater:原子更新引用类型里的字段;
AtomicMarkableReference:原子更新带有标记位的引用类型(可以原子更新一个布尔类型的标记位和引用类型)。
(4)原子更新属性(字段)
AtomicIntegerFieldUpdater:原子更新整型的字段的更新器;
AtomicLongFieldUpdater:原子更新长整型的字段的更新器;
AtomicStampedReference:原子更新带有版本号的引用类型,能够解决使用CAS进行原子更新时可能出现的ABA问题。
以下给出每个类型的原子更新的一个示例代码:

阅读全文 »

CyclicBarrier用于多线程计算数据并合并计算结果

CyclicBarrier
CyclicBarrier让一组线程到达一个屏障(同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,
所有被屏障拦截的线程才会继续执行。
注:CyclicBarrier的计数器可以使用reset()方法重置

阅读全文 »

使用ForkJoin框架计算整数相加的结果

需求: 使用Fork/Join框架计算1+2+3+4的结果。
分析: 使用Fork/Join框架首先要考虑到的是如何分割任务,如果希望每个子任务最多执行两个数的相加
,那么我们设置分割的阈值是2,由于是四个数字相加,所以Fork/Join框架会把这个任务fork成两个子
任务,子任务1负责计算1+2,子任务2负责3+4,然后再join两个子任务的结果。因为是有结果的任务,所
以必须继承RecursiveTask。
实现代码:

package concurrency.forkjoin;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;

public class CountTask extends RecursiveTask<Integer>{
    private static final int THRESHOLD = 2;//阈值
    private int start;
    private int end;
    
    public CountTask(int start, int end){
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        int sum = 0;
        
        //如果任务足够小就计算任务
        boolean canCompute = (end - start) <= THRESHOLD;
        if(canCompute){
            for(int i = start; i <= end; i++){
                sum += i;
            }
        }else{
            //如果任务大虚与之,就分裂成两个子任务计算
            int middle = (start + end) / 2;
            CountTask leftTask = new CountTask(start, middle);
            CountTask rightTask = new CountTask(middle + 1, end);
            //执行子任务
            leftTask.fork();
            rightTask.fork();
            //等待子任务执行完,并得到其结果
            int leftResult = leftTask.join();
            int rightResult = rightTask.join();
            //合并子任务
            sum = leftResult + rightResult;
        }
        return sum;
    }
    
    public static void main(String[] args){
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        //生成一个计算任务,负责计算1+2+3+4
        CountTask task = new CountTask(1, 4);
        //执行一个任务
        Future<Integer> result = forkJoinPool.submit(task);
        try{
            System.out.println(result.get());
        }catch(InterruptedException e){
            e.printStackTrace();
        }catch(ExecutionException e){
            e.printStackTrace();
        }
    }
}

读写锁+HashMap实现线程安全的HashMap

ReadWriteLockCache组合一个非线程安全的HashMap作为缓存的实现,同时使用读写锁的读锁和写锁来保证ReadWriteLockCaChe是线程安全的。
代码:

package concurrency.AQS; 

import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class ReadWriteLockCache {
    public static void main(String[] args) {
        ReadWriteLockCache.put("user1", "123");
        ReadWriteLockCache.put("user2", "456");
        ReadWriteLockCache.put("user3", "789");
        for(Entry entry : ReadWriteLockCache.map.entrySet()){
            System.out.println(entry.getKey()+":"+entry.getValue());
        }
        System.out.println("==========================");
        ReadWriteLockCache.put("user1", "1233");
        for(Entry entry : ReadWriteLockCache.map.entrySet()){
            System.out.println(entry.getKey()+":"+entry.getValue());
        }
        System.out.println("==========================");
        System.out.println("user1" + ReadWriteLockCache.get("user1"));
        System.out.println("==========================");
        ReadWriteLockCache.clear();
        for(Entry entry : ReadWriteLockCache.map.entrySet()){
            System.out.println(entry.getKey()+":"+entry.getValue());
        }
        
    }
    
    static Map<String, Object> map = new HashMap<String, Object>();
    static ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
    static Lock r = rwl.readLock();
    static Lock w = rwl.writeLock();
    
    //获取一个key对应的value
    public static final Object get(String key){
        r.lock();
        try{
            return map.get(key);
        }finally{
            r.unlock();
        }
    }
    
    //设置key对应的value,并返回旧的value
    public static final Object put(String key, Object value){
        w.lock();
        try{
            return map.put(key, value);
        }finally{
            w.unlock();
        }
    }
    
    //清空所有内容
    public static final void clear(){
        w.lock();
        try{
            map.clear();
        }finally{
            w.unlock();
        }
    }
}

基于线程池技术的简单Web服务器

基于线程池技术的简单Web服务器,这个Web服务器用来处理HTTP请求(目前智能处理简单的文本和JPG图片内容)。这个服务器使用
main线程不断地接受可换段Socket的连接,将连接以及请求提交给线程池处理,这样使得Web服务器能够同时处理多个客户端请求。

阅读全文 »

CountDownLatch

CountDownLatch允许一个或多个线程等待其他线程完成操作。
例如:

CountDownLatch c = new CountDownLatch(2);

CountDownLatch的构造函数接收一个int类型的参数作为计数器,如果你想等待N个点完成,这里就传入N.
当我们调用CountDownLatch的countDown方法时,N就会减1,CountDownLatch的await方法会阻塞当前
线程,直到N变为0。由于countDown方法可以用在任何地方,所以这里说的N个点,可以是N个线程,也可以
是1个线程里的N个步骤

注:CountDownLatch的计数器智能使用一次

阅读全文 »