序
本文主要研究一下sentinel的SimpleHttpCommandCenter
SimpleHttpCommandCenter
sentinel-transport-simple-http-0.1.1-sources.jar!/com/alibaba/csp/sentinel/transport/command/SimpleHttpCommandCenter.java
/***
* The simple command center provides service to exchange information.
*
* @author youji.zj
*/
public class SimpleHttpCommandCenter implements CommandCenter {
private static final int PORT_UNINITIALIZED = -1;
private static final int DEFAULT_SERVER_SO_TIMEOUT = 3000;
private static final int DEFAULT_PORT = 8719;
private static final Map<String, CommandHandler> handlerMap = new HashMap<String, CommandHandler>();
private ExecutorService executor = Executors.newSingleThreadExecutor(
new NamedThreadFactory("sentinel-command-center-executor"));
private ExecutorService bizExecutor;
private ServerSocket socketReference;
@Override
public void beforeStart() throws Exception {
// Register handlers
Map<String, CommandHandler> handlers = CommandHandlerProvider.getInstance().namedHandlers();
registerCommands(handlers);
}
@Override
public void start() throws Exception {
int nThreads = Runtime.getRuntime().availableProcessors();
this.bizExecutor = new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(10),
new NamedThreadFactory("sentinel-command-center-service-executor"),
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
CommandCenterLog.info("EventTask rejected");
throw new RejectedExecutionException();
}
});
Runnable serverInitTask = new Runnable() {
int port;
{
try {
port = Integer.parseInt(TransportConfig.getPort());
} catch (Exception e) {
port = DEFAULT_PORT;
}
}
@Override
public void run() {
int repeat = 0;
int tmpPort = port;
boolean success = false;
while (true) {
ServerSocket serverSocket = null;
try {
serverSocket = new ServerSocket(tmpPort);
} catch (IOException ex) {
CommandCenterLog.info(
String.format("IO error occurs, port: %d, repeat times: %d", tmpPort, repeat), ex);
tmpPort = adjustPort(repeat);
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e1) {
break;
}
repeat++;
}
if (serverSocket != null) {
CommandCenterLog.info("[CommandCenter] Begin listening at port " + serverSocket.getLocalPort());
socketReference = serverSocket;
executor.submit(new ServerThread(serverSocket));
success = true;
break;
}
}
if (!success) {
tmpPort = PORT_UNINITIALIZED;
}
TransportConfig.setRuntimePort(tmpPort);
executor.shutdown();
}
/**
* Adjust the port to settle down.
*/
private int adjustPort(int repeat) {
int mod = repeat / 5;
return port + mod;
}
};
new Thread(serverInitTask).start();
}
@Override
public void stop() throws Exception {
if (socketReference != null) {
try {
socketReference.close();
} catch (IOException e) {
CommandCenterLog.warn("Error when releasing the server socket", e);
}
}
bizExecutor.shutdownNow();
executor.shutdownNow();
TransportConfig.setRuntimePort(PORT_UNINITIALIZED);
handlerMap.clear();
}
/**
* Get the name set of all registered commands.
*/
public static Set<String> getCommands() {
return handlerMap.keySet();
}
public static CommandHandler getHandler(String commandName) {
return handlerMap.get(commandName);
}
public static void registerCommands(Map<String, CommandHandler> handlerMap) {
if (handlerMap != null) {
for (Entry<String, CommandHandler> e : handlerMap.entrySet()) {
registerCommand(e.getKey(), e.getValue());
}
}
}
public static void registerCommand(String commandName, CommandHandler handler) {
if (StringUtil.isEmpty(commandName)) {
return;
}
if (handlerMap.containsKey(commandName)) {
CommandCenterLog.info("Register failed (duplicate command): " + commandName);
return;
}
handlerMap.put(commandName, handler);
}
/**
* Avoid server thread hang, 3 seconds timeout by default.
*/
private void setSocketSoTimeout(Socket socket) throws SocketException {
if (socket != null) {
socket.setSoTimeout(DEFAULT_SERVER_SO_TIMEOUT);
}
}
}
- 这里直接使用的是java的ServerSocket(bio)构建的tcp服务
- 启动之前先调用CommandHandlerProvider.getInstance().namedHandlers()获取支持的命令及handler,然后调用registerCommands进行注册
- 之后创建bizExecutor以及异步serverInitTask线程,里头再异步启动ServerThread
ServerThread
class ServerThread extends Thread {
private ServerSocket serverSocket;
ServerThread(ServerSocket s) {
this.serverSocket = s;
setName("sentinel-courier-server-accept-thread");
}
@Override
public void run() {
while (true) {
Socket socket = null;
try {
socket = this.serverSocket.accept();
setSocketSoTimeout(socket);
HttpEventTask eventTask = new HttpEventTask(socket);
bizExecutor.submit(eventTask);
} catch (Exception e) {
CommandCenterLog.info("Server error", e);
if (socket != null) {
try {
socket.close();
} catch (Exception e1) {
CommandCenterLog.info("Error when closing an opened socket", e1);
}
}
try {
// In case of infinite log.
Thread.sleep(10);
} catch (InterruptedException e1) {
// Indicates the task should stop.
break;
}
}
}
}
}
- ServerThread采取的是worker线程池模式,接收一个请求之后,包装为HttpEventTask,然后提交给bizExecutor
HttpEventTask
sentinel-transport-simple-http-0.1.1-sources.jar!/com/alibaba/csp/sentinel/transport/command/http/HttpEventTask.java
public class HttpEventTask implements Runnable {
private final Socket socket;
private boolean writtenHead = false;
public HttpEventTask(Socket socket) {
this.socket = socket;
}
public void close() throws Exception {
socket.close();
}
@Override
public void run() {
if (socket == null) {
return;
}
BufferedReader in = null;
PrintWriter printWriter = null;
try {
long start = System.currentTimeMillis();
in = new BufferedReader(new InputStreamReader(socket.getInputStream(), SentinelConfig.charset()));
OutputStream outputStream = socket.getOutputStream();
printWriter = new PrintWriter(
new OutputStreamWriter(outputStream, Charset.forName(SentinelConfig.charset())));
String line = in.readLine();
CommandCenterLog.info("[CommandCenter] socket income:" + line
+ "," + socket.getInetAddress());
CommandRequest request = parseRequest(line);
// Validate the target command.
String commandName = HttpCommandUtils.getTarget(request);
if (StringUtil.isBlank(commandName)) {
badRequest(printWriter, "Invalid command");
return;
}
// Find the matching command handler.
CommandHandler<?> commandHandler = SimpleHttpCommandCenter.getHandler(commandName);
if (commandHandler != null) {
CommandResponse<?> response = commandHandler.handle(request);
handleResponse(response, printWriter, outputStream);
} else {
// No matching command handler.
badRequest(printWriter, "Unknown command `" + commandName + '`');
}
printWriter.flush();
long cost = System.currentTimeMillis() - start;
CommandCenterLog.info("[CommandCenter] deal a socket task:" + line
+ "," + socket.getInetAddress() + ", time cost=" + cost + " ms");
} catch (Throwable e) {
CommandCenterLog.info("CommandCenter error", e);
try {
if (printWriter != null) {
String errorMessage = SERVER_ERROR_MESSAGE;
if (!writtenHead) {
internalError(printWriter, errorMessage);
} else {
printWriter.println(errorMessage);
}
printWriter.flush();
}
} catch (Exception e1) {
CommandCenterLog.info("CommandCenter close serverSocket failed", e);
}
} finally {
closeResource(in);
closeResource(printWriter);
closeResource(socket);
}
}
private void closeResource(Closeable closeable) {
if (closeable != null) {
try {
closeable.close();
} catch (Exception e) {
CommandCenterLog.info("CommandCenter close resource failed", e);
}
}
}
private void handleResponse(CommandResponse response, /*@NonNull*/ final PrintWriter printWriter,
/*@NonNull*/ final OutputStream rawOutputStream) throws Exception {
if (response.isSuccess()) {
if (response.getResult() == null) {
writeOkStatusLine(printWriter);
return;
}
// Write 200 OK status line.
writeOkStatusLine(printWriter);
// Here we directly use `toString` to encode the result to plain text.
byte[] buffer = response.getResult().toString().getBytes(SentinelConfig.charset());
rawOutputStream.write(buffer);
rawOutputStream.flush();
} else {
String msg = SERVER_ERROR_MESSAGE;
if (response.getException() != null) {
msg = response.getException().getMessage();
}
badRequest(printWriter, msg);
}
}
/**
* Write `400 Bad Request` HTTP response status line and message body, then flush.
*/
private void badRequest(/*@NonNull*/ final PrintWriter out, String message) {
out.print("HTTP/1.1 400 Bad Request\r\n"
+ "Connection: close\r\n\r\n");
out.print(message);
out.flush();
writtenHead = true;
}
/**
* Write `500 Internal Server Error` HTTP response status line and message body, then flush.
*/
private void internalError(/*@NonNull*/ final PrintWriter out, String message) {
out.print("HTTP/1.1 500 Internal Server Error\r\n"
+ "Connection: close\r\n\r\n");
out.print(message);
out.flush();
writtenHead = true;
}
/**
* Write `200 OK` HTTP response status line and flush.
*/
private void writeOkStatusLine(/*@NonNull*/ final PrintWriter out) {
out.print("HTTP/1.1 200 OK\r\n"
+ "Connection: close\r\n\r\n");
out.flush();
writtenHead = true;
}
/**
* Parse raw HTTP request line to a {@link CommandRequest}.
*
* @param line HTTP request line
* @return parsed command request
*/
private CommandRequest parseRequest(String line) {
CommandRequest request = new CommandRequest();
if (StringUtil.isBlank(line)) {
return request;
}
int start = line.indexOf('/');
int ask = line.indexOf('?') == -1 ? line.lastIndexOf(' ') : line.indexOf('?');
int space = line.lastIndexOf(' ');
String target = line.substring(start != -1 ? start + 1 : 0, ask != -1 ? ask : line.length());
request.addMetadata(HttpCommandUtils.REQUEST_TARGET, target);
if (ask == -1 || ask == space) {
return request;
}
String parameterStr = line.substring(ask != -1 ? ask + 1 : 0, space != -1 ? space : line.length());
for (String parameter : parameterStr.split("&")) {
if (StringUtil.isBlank(parameter)) {
continue;
}
String[] keyValue = parameter.split("=");
if (keyValue.length != 2) {
continue;
}
String value = StringUtil.trim(keyValue[1]);
try {
value = URLDecoder.decode(value, SentinelConfig.charset());
} catch (UnsupportedEncodingException e) {
}
request.addParam(StringUtil.trim(keyValue[0]), value);
}
return request;
}
private static final String SERVER_ERROR_MESSAGE = "Command server error";
}
- 这里直接readLine,然后解析为CommandRequest,然后调用SimpleHttpCommandCenter.getHandler获取处理程序,执行返回结果
- 可以看到这里是手工解析http协议
CommandHandlerProvider
sentinel-transport-common-0.1.1-sources.jar!/com/alibaba/csp/sentinel/command/CommandHandlerProvider.java
/**
* Provides and filters command handlers registered via SPI.
*
* @author Eric Zhao
*/
public class CommandHandlerProvider implements Iterable<CommandHandler> {
private final ServiceLoader<CommandHandler> serviceLoader = ServiceLoader.load(CommandHandler.class);
/**
* Get all command handlers annotated with {@link CommandMapping} with command name.
*
* @return list of all named command handlers
*/
public Map<String, CommandHandler> namedHandlers() {
Map<String, CommandHandler> map = new HashMap<String, CommandHandler>();
for (CommandHandler handler : serviceLoader) {
String name = parseCommandName(handler);
if (!StringUtil.isEmpty(name)) {
map.put(name, handler);
}
}
return map;
}
private String parseCommandName(CommandHandler handler) {
CommandMapping commandMapping = handler.getClass().getAnnotation(CommandMapping.class);
if (commandMapping != null) {
return commandMapping.name();
} else {
return null;
}
}
@Override
public Iterator<CommandHandler> iterator() {
return serviceLoader.iterator();
}
private static final CommandHandlerProvider INSTANCE = new CommandHandlerProvider();
public static CommandHandlerProvider getInstance() {
return INSTANCE;
}
}
- 这个类采用单例模式,以及SPI机制,来动态加载CommandHandler的实现,然后namedHandlers方法提供一个命令名及handler的map
com.alibaba.csp.sentinel.command.CommandHandler
sentinel-transport-common-0.1.1.jar!/META-INF/services/com.alibaba.csp.sentinel.command.CommandHandler
com.alibaba.csp.sentinel.command.handler.BasicInfoCommandHandler
com.alibaba.csp.sentinel.command.handler.FetchActiveRuleCommandHandler
com.alibaba.csp.sentinel.command.handler.FetchClusterNodeByIdCommandHandler
com.alibaba.csp.sentinel.command.handler.FetchClusterNodeHumanCommandHandler
com.alibaba.csp.sentinel.command.handler.FetchJsonTreeCommandHandler
com.alibaba.csp.sentinel.command.handler.FetchOriginCommandHandler
com.alibaba.csp.sentinel.command.handler.FetchSimpleClusterNodeCommandHandler
com.alibaba.csp.sentinel.command.handler.FetchSystemStatusCommandHandler
com.alibaba.csp.sentinel.command.handler.FetchTreeCommandHandler
com.alibaba.csp.sentinel.command.handler.ModifyRulesCommandHandler
com.alibaba.csp.sentinel.command.handler.OnOffGetCommandHandler
com.alibaba.csp.sentinel.command.handler.OnOffSetCommandHandler
com.alibaba.csp.sentinel.command.handler.SendMetricCommandHandler
com.alibaba.csp.sentinel.command.handler.VersionCommandHandler
- 这里在META-INF的services目录,使用SPI机制,在com.alibaba.csp.sentinel.command.CommandHandler文件下记录了各种实现
小结
- sentinel的transport部分,有一个common基础包,然后就是simple-http以及netty的实现。
- simple-http采用的是bio外加工作线程池模式,来一个请求,往线程池丢一个HttpEventTask。
- HttpEventTask解析请求的命令然后获取相应的handler执行返回
- commandHandler采用的是SPI的模式进行动态加载
doc
- SimpleHttpCommandCenter