优秀的编程知识分享平台

网站首页 > 技术文章 正文

聊聊sentinel的SimpleHttpCommandCenter

nanyue 2024-07-29 01:30:21 技术文章 5 ℃

本文主要研究一下sentinel的SimpleHttpCommandCenter

SimpleHttpCommandCenter

sentinel-transport-simple-http-0.1.1-sources.jar!/com/alibaba/csp/sentinel/transport/command/SimpleHttpCommandCenter.java

/***

* The simple command center provides service to exchange information.

*

* @author youji.zj

*/

public class SimpleHttpCommandCenter implements CommandCenter {

private static final int PORT_UNINITIALIZED = -1;

private static final int DEFAULT_SERVER_SO_TIMEOUT = 3000;

private static final int DEFAULT_PORT = 8719;

private static final Map<String, CommandHandler> handlerMap = new HashMap<String, CommandHandler>();

private ExecutorService executor = Executors.newSingleThreadExecutor(

new NamedThreadFactory("sentinel-command-center-executor"));

private ExecutorService bizExecutor;

private ServerSocket socketReference;

@Override

public void beforeStart() throws Exception {

// Register handlers

Map<String, CommandHandler> handlers = CommandHandlerProvider.getInstance().namedHandlers();

registerCommands(handlers);

}

@Override

public void start() throws Exception {

int nThreads = Runtime.getRuntime().availableProcessors();

this.bizExecutor = new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,

new ArrayBlockingQueue<Runnable>(10),

new NamedThreadFactory("sentinel-command-center-service-executor"),

new RejectedExecutionHandler() {

@Override

public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {

CommandCenterLog.info("EventTask rejected");

throw new RejectedExecutionException();

}

});

Runnable serverInitTask = new Runnable() {

int port;

{

try {

port = Integer.parseInt(TransportConfig.getPort());

} catch (Exception e) {

port = DEFAULT_PORT;

}

}

@Override

public void run() {

int repeat = 0;

int tmpPort = port;

boolean success = false;

while (true) {

ServerSocket serverSocket = null;

try {

serverSocket = new ServerSocket(tmpPort);

} catch (IOException ex) {

CommandCenterLog.info(

String.format("IO error occurs, port: %d, repeat times: %d", tmpPort, repeat), ex);

tmpPort = adjustPort(repeat);

try {

TimeUnit.SECONDS.sleep(5);

} catch (InterruptedException e1) {

break;

}

repeat++;

}

if (serverSocket != null) {

CommandCenterLog.info("[CommandCenter] Begin listening at port " + serverSocket.getLocalPort());

socketReference = serverSocket;

executor.submit(new ServerThread(serverSocket));

success = true;

break;

}

}

if (!success) {

tmpPort = PORT_UNINITIALIZED;

}

TransportConfig.setRuntimePort(tmpPort);

executor.shutdown();

}

/**

* Adjust the port to settle down.

*/

private int adjustPort(int repeat) {

int mod = repeat / 5;

return port + mod;

}

};

new Thread(serverInitTask).start();

}

@Override

public void stop() throws Exception {

if (socketReference != null) {

try {

socketReference.close();

} catch (IOException e) {

CommandCenterLog.warn("Error when releasing the server socket", e);

}

}

bizExecutor.shutdownNow();

executor.shutdownNow();

TransportConfig.setRuntimePort(PORT_UNINITIALIZED);

handlerMap.clear();

}

/**

* Get the name set of all registered commands.

*/

public static Set<String> getCommands() {

return handlerMap.keySet();

}

public static CommandHandler getHandler(String commandName) {

return handlerMap.get(commandName);

}

public static void registerCommands(Map<String, CommandHandler> handlerMap) {

if (handlerMap != null) {

for (Entry<String, CommandHandler> e : handlerMap.entrySet()) {

registerCommand(e.getKey(), e.getValue());

}

}

}

public static void registerCommand(String commandName, CommandHandler handler) {

if (StringUtil.isEmpty(commandName)) {

return;

}

if (handlerMap.containsKey(commandName)) {

CommandCenterLog.info("Register failed (duplicate command): " + commandName);

return;

}

handlerMap.put(commandName, handler);

}

/**

* Avoid server thread hang, 3 seconds timeout by default.

*/

private void setSocketSoTimeout(Socket socket) throws SocketException {

if (socket != null) {

socket.setSoTimeout(DEFAULT_SERVER_SO_TIMEOUT);

}

}

}

  • 这里直接使用的是java的ServerSocket(bio)构建的tcp服务
  • 启动之前先调用CommandHandlerProvider.getInstance().namedHandlers()获取支持的命令及handler,然后调用registerCommands进行注册
  • 之后创建bizExecutor以及异步serverInitTask线程,里头再异步启动ServerThread

ServerThread

class ServerThread extends Thread {

private ServerSocket serverSocket;

ServerThread(ServerSocket s) {

this.serverSocket = s;

setName("sentinel-courier-server-accept-thread");

}

@Override

public void run() {

while (true) {

Socket socket = null;

try {

socket = this.serverSocket.accept();

setSocketSoTimeout(socket);

HttpEventTask eventTask = new HttpEventTask(socket);

bizExecutor.submit(eventTask);

} catch (Exception e) {

CommandCenterLog.info("Server error", e);

if (socket != null) {

try {

socket.close();

} catch (Exception e1) {

CommandCenterLog.info("Error when closing an opened socket", e1);

}

}

try {

// In case of infinite log.

Thread.sleep(10);

} catch (InterruptedException e1) {

// Indicates the task should stop.

break;

}

}

}

}

}

  • ServerThread采取的是worker线程池模式,接收一个请求之后,包装为HttpEventTask,然后提交给bizExecutor

HttpEventTask

sentinel-transport-simple-http-0.1.1-sources.jar!/com/alibaba/csp/sentinel/transport/command/http/HttpEventTask.java

public class HttpEventTask implements Runnable {

private final Socket socket;

private boolean writtenHead = false;

public HttpEventTask(Socket socket) {

this.socket = socket;

}

public void close() throws Exception {

socket.close();

}

@Override

public void run() {

if (socket == null) {

return;

}

BufferedReader in = null;

PrintWriter printWriter = null;

try {

long start = System.currentTimeMillis();

in = new BufferedReader(new InputStreamReader(socket.getInputStream(), SentinelConfig.charset()));

OutputStream outputStream = socket.getOutputStream();

printWriter = new PrintWriter(

new OutputStreamWriter(outputStream, Charset.forName(SentinelConfig.charset())));

String line = in.readLine();

CommandCenterLog.info("[CommandCenter] socket income:" + line

+ "," + socket.getInetAddress());

CommandRequest request = parseRequest(line);

// Validate the target command.

String commandName = HttpCommandUtils.getTarget(request);

if (StringUtil.isBlank(commandName)) {

badRequest(printWriter, "Invalid command");

return;

}

// Find the matching command handler.

CommandHandler<?> commandHandler = SimpleHttpCommandCenter.getHandler(commandName);

if (commandHandler != null) {

CommandResponse<?> response = commandHandler.handle(request);

handleResponse(response, printWriter, outputStream);

} else {

// No matching command handler.

badRequest(printWriter, "Unknown command `" + commandName + '`');

}

printWriter.flush();

long cost = System.currentTimeMillis() - start;

CommandCenterLog.info("[CommandCenter] deal a socket task:" + line

+ "," + socket.getInetAddress() + ", time cost=" + cost + " ms");

} catch (Throwable e) {

CommandCenterLog.info("CommandCenter error", e);

try {

if (printWriter != null) {

String errorMessage = SERVER_ERROR_MESSAGE;

if (!writtenHead) {

internalError(printWriter, errorMessage);

} else {

printWriter.println(errorMessage);

}

printWriter.flush();

}

} catch (Exception e1) {

CommandCenterLog.info("CommandCenter close serverSocket failed", e);

}

} finally {

closeResource(in);

closeResource(printWriter);

closeResource(socket);

}

}

private void closeResource(Closeable closeable) {

if (closeable != null) {

try {

closeable.close();

} catch (Exception e) {

CommandCenterLog.info("CommandCenter close resource failed", e);

}

}

}

private void handleResponse(CommandResponse response, /*@NonNull*/ final PrintWriter printWriter,

/*@NonNull*/ final OutputStream rawOutputStream) throws Exception {

if (response.isSuccess()) {

if (response.getResult() == null) {

writeOkStatusLine(printWriter);

return;

}

// Write 200 OK status line.

writeOkStatusLine(printWriter);

// Here we directly use `toString` to encode the result to plain text.

byte[] buffer = response.getResult().toString().getBytes(SentinelConfig.charset());

rawOutputStream.write(buffer);

rawOutputStream.flush();

} else {

String msg = SERVER_ERROR_MESSAGE;

if (response.getException() != null) {

msg = response.getException().getMessage();

}

badRequest(printWriter, msg);

}

}

/**

* Write `400 Bad Request` HTTP response status line and message body, then flush.

*/

private void badRequest(/*@NonNull*/ final PrintWriter out, String message) {

out.print("HTTP/1.1 400 Bad Request\r\n"

+ "Connection: close\r\n\r\n");

out.print(message);

out.flush();

writtenHead = true;

}

/**

* Write `500 Internal Server Error` HTTP response status line and message body, then flush.

*/

private void internalError(/*@NonNull*/ final PrintWriter out, String message) {

out.print("HTTP/1.1 500 Internal Server Error\r\n"

+ "Connection: close\r\n\r\n");

out.print(message);

out.flush();

writtenHead = true;

}

/**

* Write `200 OK` HTTP response status line and flush.

*/

private void writeOkStatusLine(/*@NonNull*/ final PrintWriter out) {

out.print("HTTP/1.1 200 OK\r\n"

+ "Connection: close\r\n\r\n");

out.flush();

writtenHead = true;

}

/**

* Parse raw HTTP request line to a {@link CommandRequest}.

*

* @param line HTTP request line

* @return parsed command request

*/

private CommandRequest parseRequest(String line) {

CommandRequest request = new CommandRequest();

if (StringUtil.isBlank(line)) {

return request;

}

int start = line.indexOf('/');

int ask = line.indexOf('?') == -1 ? line.lastIndexOf(' ') : line.indexOf('?');

int space = line.lastIndexOf(' ');

String target = line.substring(start != -1 ? start + 1 : 0, ask != -1 ? ask : line.length());

request.addMetadata(HttpCommandUtils.REQUEST_TARGET, target);

if (ask == -1 || ask == space) {

return request;

}

String parameterStr = line.substring(ask != -1 ? ask + 1 : 0, space != -1 ? space : line.length());

for (String parameter : parameterStr.split("&")) {

if (StringUtil.isBlank(parameter)) {

continue;

}

String[] keyValue = parameter.split("=");

if (keyValue.length != 2) {

continue;

}

String value = StringUtil.trim(keyValue[1]);

try {

value = URLDecoder.decode(value, SentinelConfig.charset());

} catch (UnsupportedEncodingException e) {

}

request.addParam(StringUtil.trim(keyValue[0]), value);

}

return request;

}

private static final String SERVER_ERROR_MESSAGE = "Command server error";

}

  • 这里直接readLine,然后解析为CommandRequest,然后调用SimpleHttpCommandCenter.getHandler获取处理程序,执行返回结果
  • 可以看到这里是手工解析http协议

CommandHandlerProvider

sentinel-transport-common-0.1.1-sources.jar!/com/alibaba/csp/sentinel/command/CommandHandlerProvider.java

/**

* Provides and filters command handlers registered via SPI.

*

* @author Eric Zhao

*/

public class CommandHandlerProvider implements Iterable<CommandHandler> {

private final ServiceLoader<CommandHandler> serviceLoader = ServiceLoader.load(CommandHandler.class);

/**

* Get all command handlers annotated with {@link CommandMapping} with command name.

*

* @return list of all named command handlers

*/

public Map<String, CommandHandler> namedHandlers() {

Map<String, CommandHandler> map = new HashMap<String, CommandHandler>();

for (CommandHandler handler : serviceLoader) {

String name = parseCommandName(handler);

if (!StringUtil.isEmpty(name)) {

map.put(name, handler);

}

}

return map;

}

private String parseCommandName(CommandHandler handler) {

CommandMapping commandMapping = handler.getClass().getAnnotation(CommandMapping.class);

if (commandMapping != null) {

return commandMapping.name();

} else {

return null;

}

}

@Override

public Iterator<CommandHandler> iterator() {

return serviceLoader.iterator();

}

private static final CommandHandlerProvider INSTANCE = new CommandHandlerProvider();

public static CommandHandlerProvider getInstance() {

return INSTANCE;

}

}

  • 这个类采用单例模式,以及SPI机制,来动态加载CommandHandler的实现,然后namedHandlers方法提供一个命令名及handler的map

com.alibaba.csp.sentinel.command.CommandHandler

sentinel-transport-common-0.1.1.jar!/META-INF/services/com.alibaba.csp.sentinel.command.CommandHandler

com.alibaba.csp.sentinel.command.handler.BasicInfoCommandHandler

com.alibaba.csp.sentinel.command.handler.FetchActiveRuleCommandHandler

com.alibaba.csp.sentinel.command.handler.FetchClusterNodeByIdCommandHandler

com.alibaba.csp.sentinel.command.handler.FetchClusterNodeHumanCommandHandler

com.alibaba.csp.sentinel.command.handler.FetchJsonTreeCommandHandler

com.alibaba.csp.sentinel.command.handler.FetchOriginCommandHandler

com.alibaba.csp.sentinel.command.handler.FetchSimpleClusterNodeCommandHandler

com.alibaba.csp.sentinel.command.handler.FetchSystemStatusCommandHandler

com.alibaba.csp.sentinel.command.handler.FetchTreeCommandHandler

com.alibaba.csp.sentinel.command.handler.ModifyRulesCommandHandler

com.alibaba.csp.sentinel.command.handler.OnOffGetCommandHandler

com.alibaba.csp.sentinel.command.handler.OnOffSetCommandHandler

com.alibaba.csp.sentinel.command.handler.SendMetricCommandHandler

com.alibaba.csp.sentinel.command.handler.VersionCommandHandler

  • 这里在META-INF的services目录,使用SPI机制,在com.alibaba.csp.sentinel.command.CommandHandler文件下记录了各种实现

小结

  • sentinel的transport部分,有一个common基础包,然后就是simple-http以及netty的实现。
  • simple-http采用的是bio外加工作线程池模式,来一个请求,往线程池丢一个HttpEventTask。
  • HttpEventTask解析请求的命令然后获取相应的handler执行返回
  • commandHandler采用的是SPI的模式进行动态加载

doc

  • SimpleHttpCommandCenter

Tags:

最近发表
标签列表