首页 > 编程学习 > vertx源码分析(一)---------------vertx启动流程分析

环境准备

源码下载

国内:https://gitee.com/mirrors/vertx

github: https://github.com/eclipse-vertx/vert.x

解决依赖报错

本系列以3.8为基础进行分析,所以请讲源码调整到3.8分支,
并修改pom.xml:

<stack.version>3.8.0</stack.version>

由于maven仓库的3.8.0不存在后边的SNAPSHOT,如果不修改则maven找不到相应的配置文件

启动流程

初始化vertx

@Overridepublic Vertx vertx() {return vertx(new VertxOptions());}@Overridepublic Vertx vertx(VertxOptions options) {if (options.getEventBusOptions().isClustered()) {throw new IllegalArgumentException("Please use Vertx.clusteredVertx() to create a clustered Vert.x instance");}return VertxImpl.vertx(options);}

VertxOptions配置请查看另外一篇博客

static VertxImpl vertx(VertxOptions options) {VertxImpl vertx = new VertxImpl(options, Transport.transport(options.getPreferNativeTransport()));vertx.init();return vertx;}
//transport 与netty的相同,具体请自行学习netty相关知识private VertxImpl(VertxOptions options, Transport transport) {// Sanity checkif (Vertx.currentContext() != null) {log.warn("You're already on a Vert.x context, are you sure you want to create a new Vertx instance?");}//关闭vertx时触发的handlercloseHooks = new CloseHooks(log);//阻塞线程检查(检查Map<VertxThread, Object> threads中注册的线程,线程通过registerThread来注册)checker = new BlockedThreadChecker(options.getBlockedThreadCheckInterval(), options.getBlockedThreadCheckIntervalUnit(), options.getWarningExceptionTime(), options.getWarningExceptionTimeUnit());//通过VertxThreadFactory创建EventLoop线程池eventLoopThreadFactory = new VertxThreadFactory("vert.x-eventloop-thread-", checker, false, options.getMaxEventLoopExecuteTime(), options.getMaxEventLoopExecuteTimeUnit());//根据eventLoopThreadFactory的配置真正的创建eventLoopGroupeventLoopGroup = transport.eventLoopGroup(options.getEventLoopPoolSize(), eventLoopThreadFactory, NETTY_IO_RATIO);// //创建Acceptor线程池 类似与多reactor模型中的mainReactor,即netty中的bossGroupThreadFactory acceptorEventLoopThreadFactory = new VertxThreadFactory("vert.x-acceptor-thread-", checker, false, options.getMaxEventLoopExecuteTime(), options.getMaxEventLoopExecuteTimeUnit());// The acceptor event loop thread needs to be from a different pool otherwise can get lags in accepted connections// under a lot of loadacceptorEventLoopGroup = transport.eventLoopGroup(1, acceptorEventLoopThreadFactory, 100);metrics = initialiseMetrics(options);//Vertx会初始化两个线程池workerExec和internalBlockingExecExecutorService workerExec = Executors.newFixedThreadPool(options.getWorkerPoolSize(),new VertxThreadFactory("vert.x-worker-thread-", checker, true, options.getMaxWorkerExecuteTime(), options.getMaxWorkerExecuteTimeUnit()));//线程池的metricsPoolMetrics workerPoolMetrics = metrics != null ? metrics.createPoolMetrics("worker", "vert.x-worker-thread", options.getWorkerPoolSize()) : null;ExecutorService internalBlockingExec = Executors.newFixedThreadPool(options.getInternalBlockingPoolSize(),new VertxThreadFactory("vert.x-internal-blocking-", checker, true, options.getMaxWorkerExecuteTime(), options.getMaxWorkerExecuteTimeUnit()));PoolMetrics internalBlockingPoolMetrics = metrics != null ? metrics.createPoolMetrics("worker", "vert.x-internal-blocking", options.getInternalBlockingPoolSize()) : null;internalBlockingPool = new WorkerPool(internalBlockingExec, internalBlockingPoolMetrics);namedWorkerPools = new HashMap<>();workerPool = new WorkerPool(workerExec, workerPoolMetrics);defaultWorkerPoolSize = options.getWorkerPoolSize();defaultWorkerMaxExecTime = options.getMaxWorkerExecuteTime();defaultWorkerMaxExecTimeUnit = options.getMaxWorkerExecuteTimeUnit();this.transport = transport;this.fileResolver = new FileResolver(options.getFileSystemOptions());this.addressResolverOptions = options.getAddressResolverOptions();this.addressResolver = new AddressResolver(this, options.getAddressResolverOptions());this.deploymentManager = new DeploymentManager(this);if (options.getEventBusOptions().isClustered()) {this.clusterManager = getClusterManager(options);this.eventBus = new ClusteredEventBus(this, options, clusterManager);} else {this.clusterManager = null;this.eventBus = new EventBusImpl(this);}this.sharedData = new SharedDataImpl(this, clusterManager);}

VertxThreadFactory的构建方法:

VertxThreadFactory(String prefix, BlockedThreadChecker checker, boolean worker, long maxExecTime, TimeUnit maxExecTimeUnit) {this.prefix = prefix;this.checker = checker;this.worker = worker;this.maxExecTime = maxExecTime;this.maxExecTimeUnit = maxExecTimeUnit;}public Thread newThread(Runnable runnable) {VertxThread t = new VertxThread(runnable, prefix + threadCount.getAndIncrement(), worker, maxExecTime, maxExecTimeUnit);// Vert.x threads are NOT daemons - we want them to prevent JVM exit so embededd user doesn't// have to explicitly prevent JVM from exiting.if (checker != null) {checker.registerThread(t);}addToMap(t);// I know the default is false anyway, but just to be explicit-  Vert.x threads are NOT daemons// we want to prevent the JVM from exiting until Vert.x instances are closedt.setDaemon(false);return t;}

transport.eventLoopGroup:

public EventLoopGroup eventLoopGroup(int nThreads, ThreadFactory threadFactory, int ioRatio) {NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(nThreads, threadFactory);eventLoopGroup.setIoRatio(ioRatio);return eventLoopGroup;}

deployVerticle

 @Overridepublic void deployVerticle(Supplier<Verticle> verticleSupplier, DeploymentOptions options, Handler<AsyncResult<String>> completionHandler) {boolean closed;synchronized (this) {closed = this.closed;}if (closed) {if (completionHandler != null) {completionHandler.handle(Future.failedFuture("Vert.x closed"));}} else {deploymentManager.deployVerticle(verticleSupplier, options, completionHandler);}}

DeploymentOptions:

DataObject(generateConverter = true, publicConverter = false)
public class DeploymentOptions {public static final boolean DEFAULT_WORKER = false;public static final boolean DEFAULT_MULTI_THREADED = false;public static final String DEFAULT_ISOLATION_GROUP = null;public static final boolean DEFAULT_HA = false;public static final int DEFAULT_INSTANCES = 1;private JsonObject config;private boolean worker;private boolean multiThreaded;private String isolationGroup;private String workerPoolName;private int workerPoolSize;private long maxWorkerExecuteTime;private boolean ha;private List<String> extraClasspath;private int instances;private List<String> isolatedClasses;private TimeUnit maxWorkerExecuteTimeUnit;/*** Default constructor*/public DeploymentOptions() {this.worker = DEFAULT_WORKER;this.config = null;this.multiThreaded = DEFAULT_MULTI_THREADED;this.isolationGroup = DEFAULT_ISOLATION_GROUP;this.ha = DEFAULT_HA;this.instances = DEFAULT_INSTANCES;this.workerPoolName = null;this.workerPoolSize = VertxOptions.DEFAULT_WORKER_POOL_SIZE;this.maxWorkerExecuteTime = VertxOptions.DEFAULT_MAX_WORKER_EXECUTE_TIME;this.maxWorkerExecuteTimeUnit = VertxOptions.DEFAULT_MAX_WORKER_EXECUTE_TIME_UNIT;}

deploymentManager.deployVerticle:

public void deployVerticle(Supplier<Verticle> verticleSupplier, DeploymentOptions options, Handler<AsyncResult<String>> completionHandler) {if (options.getInstances() < 1) {throw new IllegalArgumentException("Can't specify < 1 instances to deploy");}if (options.isMultiThreaded() && !options.isWorker()) {throw new IllegalArgumentException("If multi-threaded then must be worker too");}if (options.getExtraClasspath() != null) {throw new IllegalArgumentException("Can't specify extraClasspath for already created verticle");}if (options.getIsolationGroup() != null) {throw new IllegalArgumentException("Can't specify isolationGroup for already created verticle");}if (options.getIsolatedClasses() != null) {throw new IllegalArgumentException("Can't specify isolatedClasses for already created verticle");}//初始化contextContextInternal currentContext = vertx.getOrCreateContext();ClassLoader cl = getClassLoader(options);//将注册的verticle放入set中,并校验传入的verticle是否满足数量要求int nbInstances = options.getInstances();Set<Verticle> verticles = Collections.newSetFromMap(new IdentityHashMap<>());for (int i = 0; i < nbInstances; i++) {Verticle verticle;try {verticle = verticleSupplier.get();} catch (Exception e) {if (completionHandler != null) {completionHandler.handle(Future.failedFuture(e));}return;}if (verticle == null) {if (completionHandler != null) {completionHandler.handle(Future.failedFuture("Supplied verticle is null"));}return;}verticles.add(verticle);}if (verticles.size() != nbInstances) {if (completionHandler != null) {completionHandler.handle(Future.failedFuture("Same verticle supplied more than once"));}return;}Verticle[] verticlesArray = verticles.toArray(new Verticle[verticles.size()]);String verticleClass = verticlesArray[0].getClass().getName();//开始部署doDeploy("java:" + verticleClass, options, currentContext, currentContext, completionHandler, cl, verticlesArray);}

VertxImpl.getOrCreateContext:

  public ContextImpl getOrCreateContext() {ContextImpl ctx = getContext();if (ctx == null) {// We are running embedded - Create a contextctx = createEventLoopContext(null, null, new JsonObject(), Thread.currentThread().getContextClassLoader());}return ctx;}public ContextImpl getContext() {ContextImpl context = (ContextImpl) context();if (context != null && context.owner == this) {return context;}return null;}public static Context context() {Thread current = Thread.currentThread();if (current instanceof VertxThread) {return ((VertxThread) current).getContext();}return null;}//创建context  EventLoopContext继承了ContextImpl,这里保存了这个eventloop所有的信息//这里创建EventLoopContext的同时会调用ContextImpl.getEventLoop //从EventLoopGroup中获取到下一个eventloop并赋予context@Override public EventLoopContext createEventLoopContext(String deploymentID, WorkerPool workerPool, JsonObject config, ClassLoader tccl) {return new EventLoopContext(this, internalBlockingPool, workerPool != null ? workerPool : this.workerPool, deploymentID, config, tccl);}

ContextImpl.getEventLoop:

 private static EventLoop getEventLoop(VertxInternal vertx) {EventLoopGroup group = vertx.getEventLoopGroup();if (group != null) {return group.next();} else {return null;}}protected ContextImpl(VertxInternal vertx, WorkerPool internalBlockingPool, WorkerPool workerPool, String deploymentID, JsonObject config,ClassLoader tccl) {this(vertx, getEventLoop(vertx), internalBlockingPool, workerPool, deploymentID, config, tccl);}protected ContextImpl(VertxInternal vertx, EventLoop eventLoop, WorkerPool internalBlockingPool, WorkerPool workerPool, String deploymentID, JsonObject config,ClassLoader tccl) {if (DISABLE_TCCL && tccl != ClassLoader.getSystemClassLoader()) {log.warn("You have disabled TCCL checks but you have a custom TCCL to set.");}this.deploymentID = deploymentID;this.config = config;this.eventLoop = eventLoop;this.tccl = tccl;this.owner = vertx;this.workerPool = workerPool;this.internalBlockingPool = internalBlockingPool;this.orderedTasks = new TaskQueue();this.internalOrderedTasks = new TaskQueue();this.closeHooks = new CloseHooks(log);}
deploymentManager.doDeploy
private void doDeploy(String identifier,DeploymentOptions options,ContextInternal parentContext,ContextInternal callingContext,Handler<AsyncResult<String>> completionHandler,ClassLoader tccl, Verticle... verticles) {JsonObject conf = options.getConfig() == null ? new JsonObject() : options.getConfig().copy(); // Copy itString poolName = options.getWorkerPoolName();Deployment parent = parentContext.getDeployment();String deploymentID = generateDeploymentID();DeploymentImpl deployment = new DeploymentImpl(parent, deploymentID, identifier, options);AtomicInteger deployCount = new AtomicInteger();AtomicBoolean failureReported = new AtomicBoolean();for (Verticle verticle: verticles) {WorkerExecutorInternal workerExec = poolName != null ? vertx.createSharedWorkerExecutor(poolName, options.getWorkerPoolSize(), options.getMaxWorkerExecuteTime(), options.getMaxWorkerExecuteTimeUnit()) : null;WorkerPool pool = workerExec != null ? workerExec.getPool() : null;//创建verticle的context 原理和上面的一样ContextImpl context = options.isWorker() ? vertx.createWorkerContext(options.isMultiThreaded(), deploymentID, pool, conf, tccl) :vertx.createEventLoopContext(deploymentID, pool, conf, tccl);if (workerExec != null) {context.addCloseHook(workerExec);}context.setDeployment(deployment);deployment.addVerticle(new VerticleHolder(verticle, context));context.runOnContext(v -> {try {verticle.init(vertx, context);Future<Void> startFuture = Future.future();verticle.start(startFuture);startFuture.setHandler(ar -> {if (ar.succeeded()) {if (parent != null) {if (parent.addChild(deployment)) {deployment.child = true;} else {// Orphandeployment.undeploy(null);return;}}VertxMetrics metrics = vertx.metricsSPI();if (metrics != null) {metrics.verticleDeployed(verticle);}deployments.put(deploymentID, deployment);if (deployCount.incrementAndGet() == verticles.length) {reportSuccess(deploymentID, callingContext, completionHandler);}} else if (failureReported.compareAndSet(false, true)) {deployment.rollback(callingContext, completionHandler, context, ar.cause());}});} catch (Throwable t) {if (failureReported.compareAndSet(false, true))deployment.rollback(callingContext, completionHandler, context, t);}});}}

DeploymentImpl初始化:

private DeploymentImpl(Deployment parent, String deploymentID, String verticleIdentifier, DeploymentOptions options) {this.parent = parent;this.deploymentID = deploymentID;//Deployment的识别码(java)this.verticleIdentifier = verticleIdentifier;this.options = options;}

runOnContext:

@Overridepublic void runOnContext(Handler<Void> task) {try {executeAsync(task);} catch (RejectedExecutionException ignore) {// Pool is already shut down}}void executeAsync(Handler<Void> task) {nettyEventLoop().execute(() -> executeTask(null, task));}public EventLoop nettyEventLoop() {return eventLoop;}

本文链接:https://www.ngui.cc/zz/1443310.html
Copyright © 2010-2022 ngui.cc 版权所有 |关于我们| 联系方式| 豫B2-20100000