基于线程池技术的简单Web服务器
基于线程池技术的简单Web服务器,这个Web服务器用来处理HTTP请求(目前智能处理简单的文本和JPG图片内容)。这个服务器使用
main线程不断地接受可换段Socket的连接,将连接以及请求提交给线程池处理,这样使得Web服务器能够同时处理多个客户端请求。
Web服务器示例代码
package concurrency.threadPool;
import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
public class SimpleHttpServer {
//处理HttpRequest的线程池
static ThreadPool<HttpRequestHandler> threadPool = new DefaultThreadPool<HttpRequestHandler>(1);
//SimpleHttpServer的根路径
static String basePath;
static ServerSocket serverSocket;
//服务器监听端口
static int port = 8080;
public static void setPort(int port){
if(port > 0){
SimpleHttpServer.port = port;
}
}
public static void setBasePath(String basePath){
if(basePath != null && new File(basePath).exists() && new File(basePath).isDirectory()){
SimpleHttpServer.basePath = basePath;
}
}
//启动SimpleHttpServer
public static void start() throws IOException{
serverSocket = new ServerSocket(port);
Socket socket = null;
while((socket = serverSocket.accept()) != null){
//接收一个可换段Socket,生成一个HttpRequestHandler,放入线程池执行
threadPool.execute(new HttpRequestHandler(socket));
}
serverSocket.close();
}
static class HttpRequestHandler implements Runnable{
private Socket socket;
public HttpRequestHandler(Socket socket){
this.socket = socket;
}
@Override
public void run() {
String line = null;
BufferedReader br = null;
BufferedReader reader = null;
PrintWriter out = null;
InputStream in = null;
try{
reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
String header = reader.readLine();
//由相对路径计算出绝对路径
String filePath = basePath + header.split(" ")[1];
out = new PrintWriter(socket.getOutputStream());
//如果请求资源为jpg或者ico,则读取资源并输出
if(filePath.endsWith("jpg") || filePath.endsWith("ico")){
in = new FileInputStream(filePath);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
int i = 0;
while((i = in.read()) != -1){
baos.write(i);
}
byte[] array = baos.toByteArray();
out.println("HTTP/1.1 200 OK");
out.println("Server: Molly");
out.println("Content-Type: image/jpeg");
out.println("Content-Length: "+array.length);
out.println("");
socket.getOutputStream().write(array, 0, array.length);
}else{
br = new BufferedReader(new InputStreamReader(new FileInputStream(filePath)));
out = new PrintWriter(socket.getOutputStream());
out.println("HTTP/1.1 200 OK");
out.println("Server: Molly");
out.println("Content-Type: text/html; charset=UTF-8");
out.println("");
while((line = br.readLine()) != null){
out.println(line);
}
}
out.flush();
}catch(Exception ex){
out.println("HTTP/1.1 500");
out.println("");
out.flush();
}finally{
close(br, in, reader, out, socket);
}
}
}
//关闭流或者Socket
private static void close(Closeable... closeables){
if(closeables != null){
for(Closeable closeable : closeables){
try {
closeable.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
ThreadPool.java
package concurrency.threadPool;
public interface ThreadPool<Job extends Runnable> {
//执行一个Job,这个Job需要实现Runnable
void execute(Job job);
//关闭线程池
void shutdown();
//增加工作者线程
void addWorkers(int num);
//减少工作者线程
void remvoeWorkers(int num);
//得到正在等待执行的任务数量
int getJobSize();
}
DefaultThreadPool.java
package concurrency.threadPool;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
public class DefaultThreadPool<Job extends Runnable> implements ThreadPool<Job> {
//线程池最大数量
private static final int MAX_WORKER_NUMBERS = 10;
//线程池默认数量
private static final int DEFAULT_WORKER_NUMBERS = 5;
//线程池最小的数量
private static final int MIN_WORKER_NUMBERS = 1;
//这是一个工作列表,将会向里面插入工作
private final LinkedList<Job> jobs = new LinkedList<Job>();//生产者
//工作者列表
private final List<Worker> workers = Collections.synchronizedList(new ArrayList<Worker>());//消费者
//工作者线程的数量
private int workerNum = DEFAULT_WORKER_NUMBERS;
//线程编号的生成
private AtomicLong threadNum = new AtomicLong();
//工作者,负责消费任务
class Worker implements Runnable{
//是否工作
private volatile boolean running = true;
@Override
public void run(){
while(running){
Job job = null;
synchronized(jobs){
//如果工作者列表是空的,那么就wait
while(jobs.isEmpty()){
try{
jobs.wait();
}catch(InterruptedException ex){
//感知到外部对WorkerThread的中断操作,返回
Thread.currentThread().interrupt();
return;
}
}
//取出一个Job
job = jobs.removeFirst();
}
if(job != null){
try{
job.run();
}catch(Exception ex){
//忽略Job执行中的Exception
}
}
}
}
public void shutdown(){
running = false;
}
}
//初始化线程工作者
private void initializeWorkers(int num){
for(int i = 0; i < num; i++){
Worker worker = new Worker();
workers.add(worker);
Thread thread = new Thread(worker, "ThreadPool-Worker-" + threadNum.incrementAndGet());
thread.start();
}
}
public DefaultThreadPool(){
initializeWorkers(DEFAULT_WORKER_NUMBERS);
}
public DefaultThreadPool(int num){
workerNum = num > MAX_WORKER_NUMBERS ? MAX_WORKER_NUMBERS : num < MIN_WORKER_NUMBERS ? MIN_WORKER_NUMBERS : num;
initializeWorkers(workerNum);
}
@Override
public void execute(Job job) {
if(job != null){
//添加一个工作,然后进行通知
synchronized(jobs){
jobs.addLast(job);
jobs.notify();
}
}
}
@Override
public void shutdown() {
for(Worker worker : workers){
worker.shutdown();
}
}
@Override
public void addWorkers(int num) {
synchronized(jobs){
//限制新增的Worker数量不能超过最大值
if(num + this.workerNum > MAX_WORKER_NUMBERS){
num = MAX_WORKER_NUMBERS - this.workerNum;
}
initializeWorkers(num);
this.workerNum += num;
}
}
@Override
public void remvoeWorkers(int num) {
synchronized(jobs){
if(num >= this.workerNum){
throw new IllegalArgumentException("beyond workNum");
}
//按照给定的数量停止Worker
int count = 0;
while(count < num){
Worker worker = workers.get(count);
if(workers.remove(worker)){
worker.shutdown();
count++;
}
}
this.workerNum -= count;
}
}
@Override
public int getJobSize() {
return jobs.size();
}
}