victory的博客

长安一片月,万户捣衣声

0%

并发 | 基于线程池技术的简单Web服务器

基于线程池技术的简单Web服务器

基于线程池技术的简单Web服务器,这个Web服务器用来处理HTTP请求(目前智能处理简单的文本和JPG图片内容)。这个服务器使用
main线程不断地接受可换段Socket的连接,将连接以及请求提交给线程池处理,这样使得Web服务器能够同时处理多个客户端请求。

Web服务器示例代码

package concurrency.threadPool;

import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;

public class SimpleHttpServer {
    //处理HttpRequest的线程池
    static ThreadPool<HttpRequestHandler> threadPool = new DefaultThreadPool<HttpRequestHandler>(1);
    
    //SimpleHttpServer的根路径
    static String basePath;
    
    static ServerSocket serverSocket;
    
    //服务器监听端口
    static int port = 8080;
    
    public static void setPort(int port){
        if(port > 0){
            SimpleHttpServer.port = port;
        }
    }
    
    public static void setBasePath(String basePath){
        if(basePath != null && new File(basePath).exists() && new File(basePath).isDirectory()){
            SimpleHttpServer.basePath = basePath;
        }
    }
    
    //启动SimpleHttpServer
    public static void start() throws IOException{
        serverSocket = new ServerSocket(port);
        Socket socket = null;
        while((socket = serverSocket.accept()) != null){
            //接收一个可换段Socket,生成一个HttpRequestHandler,放入线程池执行
            threadPool.execute(new HttpRequestHandler(socket));
        }
        serverSocket.close();
    }
    
    static class HttpRequestHandler implements Runnable{
        private Socket socket;
        public HttpRequestHandler(Socket socket){
            this.socket = socket;
        }
        
        @Override
        public void run() {
            String line = null;
            BufferedReader br = null;
            BufferedReader reader = null;
            PrintWriter out = null;
            InputStream in = null;
            try{
                reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                String header = reader.readLine();
                //由相对路径计算出绝对路径
                String filePath = basePath + header.split(" ")[1];
                out = new PrintWriter(socket.getOutputStream());
                //如果请求资源为jpg或者ico,则读取资源并输出
                if(filePath.endsWith("jpg") || filePath.endsWith("ico")){
                    in = new FileInputStream(filePath);
                    ByteArrayOutputStream baos = new ByteArrayOutputStream();
                    int i = 0;
                    while((i = in.read()) != -1){
                        baos.write(i);
                    }
                    byte[] array = baos.toByteArray();
                    out.println("HTTP/1.1 200 OK");
                    out.println("Server: Molly");
                    out.println("Content-Type: image/jpeg");
                    out.println("Content-Length: "+array.length);
                    out.println("");
                    socket.getOutputStream().write(array, 0, array.length);
                }else{
                    br = new BufferedReader(new InputStreamReader(new FileInputStream(filePath)));
                    out = new PrintWriter(socket.getOutputStream());
                    out.println("HTTP/1.1 200 OK");
                    out.println("Server: Molly");
                    out.println("Content-Type: text/html; charset=UTF-8");
                    out.println("");
                    while((line = br.readLine()) != null){
                        out.println(line);
                    }
                }
                out.flush();
            }catch(Exception ex){
                out.println("HTTP/1.1 500");
                out.println("");
                out.flush();
            }finally{
                close(br, in, reader, out, socket);
            }
        }
        
    }
    
    //关闭流或者Socket
    private static void close(Closeable... closeables){
        if(closeables != null){
            for(Closeable closeable : closeables){
                try {
                    closeable.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

ThreadPool.java

package concurrency.threadPool;

public interface ThreadPool<Job extends Runnable> {
    //执行一个Job,这个Job需要实现Runnable
    void execute(Job job);
    
    //关闭线程池
    void shutdown();
    
    //增加工作者线程
    void addWorkers(int num);
    
    //减少工作者线程
    void remvoeWorkers(int num);
    
    //得到正在等待执行的任务数量
    int getJobSize();
}

DefaultThreadPool.java

package concurrency.threadPool;

import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;

public class DefaultThreadPool<Job extends Runnable> implements ThreadPool<Job> {
    //线程池最大数量
    private static final int MAX_WORKER_NUMBERS = 10;
    
    //线程池默认数量
    private static final int DEFAULT_WORKER_NUMBERS = 5;
    
    //线程池最小的数量
    private static final int MIN_WORKER_NUMBERS = 1;
    
    //这是一个工作列表,将会向里面插入工作
    private final LinkedList<Job> jobs = new LinkedList<Job>();//生产者
    
    //工作者列表
    private final List<Worker> workers = Collections.synchronizedList(new ArrayList<Worker>());//消费者
    
    //工作者线程的数量
    private int workerNum = DEFAULT_WORKER_NUMBERS;
    
    //线程编号的生成
    private AtomicLong threadNum = new AtomicLong();
    
    //工作者,负责消费任务
    class Worker implements Runnable{
        //是否工作
        private volatile boolean running = true;
        @Override
        public void run(){
            while(running){
                Job job = null;
                synchronized(jobs){
                    //如果工作者列表是空的,那么就wait
                    while(jobs.isEmpty()){
                        try{
                            jobs.wait();
                        }catch(InterruptedException ex){
                            //感知到外部对WorkerThread的中断操作,返回
                            Thread.currentThread().interrupt();
                            return;
                        }
                    }
                    //取出一个Job
                    job = jobs.removeFirst();
                }
                if(job != null){
                    try{
                        job.run();
                    }catch(Exception ex){
                        //忽略Job执行中的Exception
                    }
                }
            }
        }
        
        public void shutdown(){
            running = false;
        }
    }
    
    //初始化线程工作者
    private void initializeWorkers(int num){
        for(int i = 0; i < num; i++){
            Worker worker = new Worker();
            workers.add(worker);
            Thread thread = new Thread(worker, "ThreadPool-Worker-" + threadNum.incrementAndGet());
            thread.start();
        }
    }
    
    public DefaultThreadPool(){
        initializeWorkers(DEFAULT_WORKER_NUMBERS);
    }
    
    public DefaultThreadPool(int num){
        workerNum = num > MAX_WORKER_NUMBERS ? MAX_WORKER_NUMBERS : num < MIN_WORKER_NUMBERS ? MIN_WORKER_NUMBERS : num;
        initializeWorkers(workerNum);
    }
    
    @Override
    public void execute(Job job) {
        if(job != null){
            //添加一个工作,然后进行通知
            synchronized(jobs){
                jobs.addLast(job);
                jobs.notify();
            }
        }
    }

    @Override
    public void shutdown() {
        for(Worker worker : workers){
            worker.shutdown();
        }
    }

    @Override
    public void addWorkers(int num) {
        synchronized(jobs){
            //限制新增的Worker数量不能超过最大值
            if(num + this.workerNum > MAX_WORKER_NUMBERS){
                num = MAX_WORKER_NUMBERS - this.workerNum;
            }
            initializeWorkers(num);
            this.workerNum += num;
        }
    }

    @Override
    public void remvoeWorkers(int num) {
        synchronized(jobs){
            if(num >= this.workerNum){
                throw new IllegalArgumentException("beyond workNum");
            }
            //按照给定的数量停止Worker
            int count = 0;
            while(count < num){
                Worker worker = workers.get(count);
                if(workers.remove(worker)){
                    worker.shutdown();
                    count++;
                }
            }
            this.workerNum -= count;
        }
    }

    @Override
    public int getJobSize() {
        return jobs.size();
    }
}