本篇主要以Node节点的创建与start流程为主,文中提到的其他具体环节的内容在后面会专门来分析。
紧接着昨天的Bootstrap的初始化来进行开篇,对应的是org.elasticsearch.bootstrap.Bootstrap#setup方法,详见代码片段:
1. `//根据Node创建节点`
2. `node = new Node(environment) {`
3. `@Override`
4. `protected void validateNodeBeforeAcceptingRequests(`
5. `final BootstrapContext context,`
6. `final BoundTransportAddress boundTransportAddress, List<BootstrapCheck> checks) throws NodeValidationException {`
7. `BootstrapChecks.check(context, boundTransportAddress, checks);`
8. `}`
9. `};`
下面针对Node的创建与启动进行分析。
Node构造
直接上构造方法代码:
1. `public Node(Environment environment) {`
2. `this(environment, Collections.emptyList(), true);`
3. `}`
由于具体的构造方法比较长,现在分成多个片段进行分析。
一、加载插件和环境配置
1. `protected Node(`
2. `final Environment environment, Collection<Class<? extends Plugin>> classpathPlugins, boolean forbidPrivateIndexSettings) {`
3. `--------省略部分代码--------`
4. `Settings tmpSettings = Settings.builder().put(environment.settings())`
5. `.put(Client.CLIENT_TYPE_SETTING_S.getKey(), CLIENT_TYPE).build();`
6. `//生成节点的环境对象,包括节点中文件lucene锁的初始化`
7. `nodeEnvironment = new NodeEnvironment(tmpSettings, environment);`
8. `resourcesToClose.add(nodeEnvironment);`
9. `------------省略部分代码-------------`
10. `//主要用于加载插件的服务类实例`
11. `this.pluginsService = new PluginsService(tmpSettings, environment.configFile(), environment.modulesFile(),`
12. `environment.pluginsFile(), classpathPlugins);`
13. `final Settings settings = pluginsService.updatedSettings();`
14. `//节点的角色`
15. `final Set<DiscoveryNodeRole> possibleRoles = Stream.concat(`
16. `//public static Set<DiscoveryNodeRole> BUILT_IN_ROLES = Set.of(DATA_ROLE, INGEST_ROLE, MASTER_ROLE);`
17. `DiscoveryNodeRole.BUILT_IN_ROLES.stream(),`
18. `pluginsService.filterPlugins(Plugin.class)`
19. `.stream()`
20. `.map(Plugin::getRoles)`
21. `.flatMap(Set::stream))`
22. `.collect(Collectors.toSet());`
23. `DiscoveryNode.setPossibleRoles(possibleRoles);`
24. `//通过节点的配置和节点的nodeId创建本地节点工厂`
25. `localNodeFactory = new LocalNodeFactory(settings, nodeEnvironment.nodeId());`
根据settings创建NodeEnvironment和pluginService实例。加载DiscoveryNodeRole列表信息,并创建LocalNodeFactory实例。主要是给Node实例的成员变量赋值。
二、线程池创建部分
代码片段如下:
1. `// create the environment based on the finalized (processed) view of the settings`
2. `// this is just to makes sure that people get the same settings, no matter where they ask them from`
3. `this.environment = new Environment(settings, environment.configFile());`
4. `Environment.assertEquivalent(environment, this.environment);`
5. `//获取插件中的执行器builder`
6. `final List<ExecutorBuilder<?>> executorBuilders = pluginsService.getExecutorBuilders(settings);`
7. `//线程池`
8. `final ThreadPool threadPool = new ThreadPool(settings, executorBuilders.toArray(new ExecutorBuilder[0]));`
9. `resourcesToClose.add(() -> ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS));`
10. `// adds the context to the DeprecationLogger so that it does not need to be injected everywhere`
11. `DeprecationLogger.setThreadContext(threadPool.getThreadContext());`
12. `resourcesToClose.add(() -> DeprecationLogger.removeThreadContext(threadPool.getThreadContext()));`
13.
14. `final List<Setting<?>> additionalSettings = new ArrayList<>(pluginsService.getPluginSettings());`
15. `final List<String> additionalSettingsFilter = new ArrayList<>(pluginsService.getPluginSettingsFilter());`
16. `for (final ExecutorBuilder<?> builder : threadPool.builders()) {`
17. `additionalSettings.addAll(builder.getRegisteredSettings());`
18. `}`
19. `client = new NodeClient(settings, threadPool);`
- 通过pluginsService.getExecutorBuilders(settings)加载插件里的ExecutorBuilder列表。
- 通过executorBuilders创建ThreadPool实例,这一部分由于比较复杂,后面用专门的文章来分析。
- resourcesToClose为List类型的列表,里面添加的是Closeable类型的实例,方便在最好释放需要关闭的资源。
- 初始化一些配置信息。
- 创建NodeClient实例,赋值给Node实例的client属性。
三、创建和加载节点多个模块和服务
Elasticsearch节点中分工比较明确,不同工作是交给不同模块和对应的服务去处理的,我们直接来看Node构造方法接下来的部分代码片段:
1. `// 用于监听资源文件变化的service`
2. `final ResourceWatcherService resourceWatcherService = new ResourceWatcherService(settings, threadPool);`
3. `// 从插件服务中加载脚本模块`
4. `final ScriptModule scriptModule = new ScriptModule(settings, pluginsService.filterPlugins(ScriptPlugin.class));`
5. `// 解析模块,主要是lucene索引、分词等操作`
6. `AnalysisModule analysisModule = new AnalysisModule(this.environment, pluginsService.filterPlugins(AnalysisPlugin.class));`
7. `// this is as early as we can validate settings at this point. we already pass them to ScriptModule as well as ThreadPool`
8. `// so we might be late here already`
9. `// 用于升级的配置信息`
10. `final Set<SettingUpgrader<?>> settingsUpgraders = pluginsService.filterPlugins(Plugin.class)`
11. `.stream()`
12. `.map(Plugin::getSettingUpgraders)`
13. `.flatMap(List::stream)`
14. `.collect(Collectors.toSet());`
15. `// 配置模块`
16. `final SettingsModule settingsModule =`
17. `new SettingsModule(settings, additionalSettings, additionalSettingsFilter, settingsUpgraders);`
18. `// 添加集群配置监听器`
19. `scriptModule.registerClusterSettingsListeners(settingsModule.getClusterSettings());`
20. `// 将resourceWatcherService添加到resourcesToClose列表中`
21. `resourcesToClose.add(resourceWatcherService);`
22. `// 从插件中加载网络服务`
23. `final NetworkService networkService = new NetworkService(`
24. `getCustomNameResolvers(pluginsService.filterPlugins(DiscoveryPlugin.class)));`
25. `// 加载集群插件`
26. `List<ClusterPlugin> clusterPlugins = pluginsService.filterPlugins(ClusterPlugin.class);`
27. `// 创建集群服务`
28. `final ClusterService clusterService = new ClusterService(settings, settingsModule.getClusterSettings(), threadPool);`
29. `// 添加集群状态应用`
30. `clusterService.addStateApplier(scriptModule.getScriptService());`
31. `resourcesToClose.add(clusterService);`
32. `// 添加本地主节点监听器`
33. `clusterService.addLocalNodeMasterListener(`
34. `new ConsistentSettingsService(settings, clusterService, settingsModule.getConsistentSettings())`
35. `.newHashPublisher());`
36. `// 实例化ingest 服务`
37. `final IngestService ingestService = new IngestService(clusterService, threadPool, this.environment,`
38. `scriptModule.getScriptService(), analysisModule.getAnalysisRegistry(),`
39. `pluginsService.filterPlugins(IngestPlugin.class), client);`
40. `// 创建集群信息服务`
41. `final ClusterInfoService clusterInfoService = newClusterInfoService(settings, clusterService, threadPool, client);`
42. `// 统计和查看节点使用情况的服务`
43. `final UsageService usageService = new UsageService();`
44.
45. `ModulesBuilder modules = new ModulesBuilder();`
46. `// 节点监控服务`
47. `final MonitorService monitorService = new MonitorService(settings, nodeEnvironment, threadPool, clusterInfoService);`
48. `// 集群模块`
49. `ClusterModule clusterModule = new ClusterModule(settings, clusterService, clusterPlugins, clusterInfoService);`
50. `modules.add(clusterModule);`
51. `// 索引模块`
52. `IndicesModule indicesModule = new IndicesModule(pluginsService.filterPlugins(MapperPlugin.class));`
53. `modules.add(indicesModule);`
54. `// 搜索模块`
55. `SearchModule searchModule = new SearchModule(settings, pluginsService.filterPlugins(SearchPlugin.class));`
56. `// 断路器服务`
57. `CircuitBreakerService circuitBreakerService = createCircuitBreakerService(settingsModule.getSettings(),`
58. `settingsModule.getClusterSettings());`
59. `resourcesToClose.add(circuitBreakerService);`
60. `// 添加网关模块`
61. `modules.add(new GatewayModule());`
详见代码注释,这段代码主要用于加载插件配置,创建脚本模块、解析模块、配置模块、集群模块、索引模块、搜索模块、网关模块等,以及节点管理和监控、断路器等多个服务。
继续往下看代码,还是继续加载一些基础服务:
1. `// page 缓存回收器`
2. `PageCacheRecycler pageCacheRecycler = createPageCacheRecycler(settings);`
3. `// 创建大数组,用于下文中的持久化服务`
4. `BigArrays bigArrays = createBigArrays(pageCacheRecycler, circuitBreakerService);`
5. `// 添加配置模块`
6. `modules.add(settingsModule);`
7. `// 聚集各个模块的特性信息实体,比如ClusterModule中的snapshots、restore等`
8. `List<NamedWriteableRegistry.Entry> namedWriteables = Stream.of(`
9. `NetworkModule.getNamedWriteables().stream(),`
10. `IndicesModule.getNamedWriteables().stream(),`
11. `searchModule.getNamedWriteables().stream(),`
12. `pluginsService.filterPlugins(Plugin.class).stream()`
13. `.flatMap(p -> p.getNamedWriteables().stream()),`
14. `ClusterModule.getNamedWriteables().stream())`
15. `.flatMap(Function.identity()).collect(Collectors.toList());`
16. `// 通过namedWriteables创建NamedWriteableRegistry`
17. `final NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(namedWriteables);`
18. `// 加载各模块的namedXContents创建NamedXContentRegistry`
19. `NamedXContentRegistry xContentRegistry = new NamedXContentRegistry(Stream.of(`
20. `NetworkModule.getNamedXContents().stream(),`
21. `IndicesModule.getNamedXContents().stream(),`
22. `searchModule.getNamedXContents().stream(),`
23. `pluginsService.filterPlugins(Plugin.class).stream()`
24. `.flatMap(p -> p.getNamedXContent().stream()),`
25. `ClusterModule.getNamedXWriteables().stream())`
26. `.flatMap(Function.identity()).collect(toList()));`
27. `// 元数据状态服务`
28. `final MetaStateService metaStateService = new MetaStateService(nodeEnvironment, xContentRegistry);`
29. `// 用于持久化集群状态的服务`
30. `final PersistedClusterStateService lucenePersistedStateFactory`
31. `= new PersistedClusterStateService(nodeEnvironment, xContentRegistry, bigArrays, clusterService.getClusterSettings(),`
32. `threadPool::relativeTimeInMillis);`
主要加载元数据状态服务和持久化集群状态服务,主要为下面的服务创建做准备。
继续往下看代码,代码片段如下:
1. `// collect engine factory providers from server and from plugins`
2. `// 从服务端和插件列表中收集引擎工厂providers`
3. `final Collection<EnginePlugin> enginePlugins = pluginsService.filterPlugins(EnginePlugin.class);`
4. `// 生成引擎提供者工厂列表`
5. `final Collection<Function<IndexSettings, Optional<EngineFactory>>> engineFactoryProviders =`
6. `Stream.concat(`
7. `indicesModule.getEngineFactories().stream(),`
8. `enginePlugins.stream().map(plugin -> plugin::getEngineFactory))`
9. `.collect(Collectors.toList());`
10.
11. `// 从插件中过滤出索引存储工厂`
12. `final Map<String, IndexStorePlugin.DirectoryFactory> indexStoreFactories =`
13. `pluginsService.filterPlugins(IndexStorePlugin.class)`
14. `.stream()`
15. `.map(IndexStorePlugin::getDirectoryFactories)`
16. `.flatMap(m -> m.entrySet().stream())`
17. `.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));`
18. `//系统索引描述符map`
19. `final Map<String, Collection<SystemIndexDescriptor>> systemIndexDescriptorMap = pluginsService`
20. `.filterPlugins(SystemIndexPlugin.class)`
21. `.stream()`
22. `.collect(Collectors.toUnmodifiableMap(`
23. `plugin -> plugin.getClass().getSimpleName(),`
24. `plugin -> plugin.getSystemIndexDescriptors()));`
25. `SystemIndexDescriptor.checkForOverlappingPatterns(systemIndexDescriptorMap);`
26. `// map 转成list`
27. `final List<SystemIndexDescriptor> systemIndexDescriptors = systemIndexDescriptorMap.values().stream()`
28. `.flatMap(Collection::stream)`
29. `.collect(Collectors.toList());`
30. `// 索引服务`
31. `final IndicesService indicesService =`
32. `new IndicesService(settings, pluginsService, nodeEnvironment, xContentRegistry, analysisModule.getAnalysisRegistry(),`
33. `clusterModule.getIndexNameExpressionResolver(), indicesModule.getMapperRegistry(), namedWriteableRegistry,`
34. `threadPool, settingsModule.getIndexScopedSettings(), circuitBreakerService, bigArrays, scriptModule.getScriptService(),`
35. `clusterService, client, metaStateService, engineFactoryProviders, indexStoreFactories);`
36. `// 别名校验器`
37. `final AliasValidator aliasValidator = new AliasValidator();`
38. `// 元数据索引创建服务`
39. `final MetaDataCreateIndexService metaDataCreateIndexService = new MetaDataCreateIndexService(`
40. `settings,`
41. `clusterService,`
42. `indicesService,`
43. `clusterModule.getAllocationService(),`
44. `aliasValidator,`
45. `environment,`
46. `settingsModule.getIndexScopedSettings(),`
47. `threadPool,`
48. `xContentRegistry,`
49. `systemIndexDescriptors,`
50. `forbidPrivateIndexSettings);`
51. `// 插件组件`
52. `Collection<Object> pluginComponents = pluginsService.filterPlugins(Plugin.class).stream()`
53. `.flatMap(p -> p.createComponents(client, clusterService, threadPool, resourceWatcherService,`
54. `scriptModule.getScriptService(), xContentRegistry, environment, nodeEnvironment,`
55. `namedWriteableRegistry).stream())`
56. `.collect(Collectors.toList());`
57. `// action 模块`
58. `ActionModule actionModule = new ActionModule(settings, clusterModule.getIndexNameExpressionResolver(),`
59. `settingsModule.getIndexScopedSettings(), settingsModule.getClusterSettings(), settingsModule.getSettingsFilter(),`
60. `threadPool, pluginsService.filterPlugins(ActionPlugin.class), client, circuitBreakerService, usageService, clusterService);`
61. `modules.add(actionModule);`
62. `// 从action 模块中获取restController`
63. `final RestController restController = actionModule.getRestController();`
64. `// 网络模块`
65. `final NetworkModule networkModule = new NetworkModule(settings, pluginsService.filterPlugins(NetworkPlugin.class),`
66. `threadPool, bigArrays, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, xContentRegistry,`
67. `networkService, restController, clusterService.getClusterSettings());`
68. `Collection<UnaryOperator<Map<String, IndexTemplateMetaData>>> indexTemplateMetaDataUpgraders =`
69. `pluginsService.filterPlugins(Plugin.class).stream()`
70. `.map(Plugin::getIndexTemplateMetaDataUpgrader)`
71. `.collect(Collectors.toList());`
72. `final MetaDataUpgrader metaDataUpgrader = new MetaDataUpgrader(indexTemplateMetaDataUpgraders);`
73. `// 元数据索引升级服务`
74. `final MetaDataIndexUpgradeService metaDataIndexUpgradeService = new MetaDataIndexUpgradeService(settings, xContentRegistry,`
75. `indicesModule.getMapperRegistry(), settingsModule.getIndexScopedSettings());`
76. `new TemplateUpgradeService(client, clusterService, threadPool, indexTemplateMetaDataUpgraders);`
77. `// 底层传输对象`
78. `final Transport transport = networkModule.getTransportSupplier().get();`
79. `Set<String> taskHeaders = Stream.concat(`
80. `pluginsService.filterPlugins(ActionPlugin.class).stream().flatMap(p -> p.getTaskHeaders().stream()),`
81. `Stream.of(Task.X_OPAQUE_ID)`
82. `).collect(Collectors.toSet());`
83. `// 传输服务`
84. `final TransportService transportService = newTransportService(settings, transport, threadPool,`
85. `networkModule.getTransportInterceptor(), localNodeFactory, settingsModule.getClusterSettings(), taskHeaders);`
86. `// 网关元数据状态`
87. `final GatewayMetaState gatewayMetaState = new GatewayMetaState();`
88. `// 响应归集服务`
89. `final ResponseCollectorService responseCollectorService = new ResponseCollectorService(clusterService);`
90. `// 搜索传输服务`
91. `final SearchTransportService searchTransportService = new SearchTransportService(transportService,`
92. `SearchExecutionStatsCollector.makeWrapper(responseCollectorService));`
93. `// http请求传输实例`
94. `final HttpServerTransport httpServerTransport = newHttpTransport(networkModule);`
主要用于从插件和服务中加载多个模块和进行服务的创建,如索引、网络传输、搜索、http服务等,具体可以参考代码注释来看。
继续分析接下来的代码,这里直接继续上代码:
1. `// 仓库模块`
2. `RepositoriesModule repositoriesModule = new RepositoriesModule(this.environment,`
3. `pluginsService.filterPlugins(RepositoryPlugin.class), transportService, clusterService, threadPool, xContentRegistry);`
4. `// 仓库服务`
5. `RepositoriesService repositoryService = repositoriesModule.getRepositoryService();`
6. `// 快照服务`
7. `SnapshotsService snapshotsService = new SnapshotsService(settings, clusterService,`
8. `clusterModule.getIndexNameExpressionResolver(), repositoryService, threadPool);`
9. `// 分片快照服务`
10. `SnapshotShardsService snapshotShardsService = new SnapshotShardsService(settings, clusterService, repositoryService,`
11. `threadPool, transportService, indicesService, actionModule.getActionFilters(),`
12. `clusterModule.getIndexNameExpressionResolver());`
13. `// 数据、索引的恢复服务`
14. `RestoreService restoreService = new RestoreService(clusterService, repositoryService, clusterModule.getAllocationService(),`
15. `metaDataCreateIndexService, metaDataIndexUpgradeService, clusterService.getClusterSettings());`
16. `// 重新路由服务`
17. `final RerouteService rerouteService`
18. `= new BatchedRerouteService(clusterService, clusterModule.getAllocationService()::reroute);`
19. `// 磁盘临界点监控器`
20. `final DiskThresholdMonitor diskThresholdMonitor = new DiskThresholdMonitor(settings, clusterService::state,`
21. `clusterService.getClusterSettings(), client, threadPool::relativeTimeInMillis, rerouteService);`
22. `clusterInfoService.addListener(diskThresholdMonitor::onNewInfo);`
23. `// 服务发现模块`
24. `final DiscoveryModule discoveryModule = new DiscoveryModule(settings, transportService, namedWriteableRegistry,`
25. `networkService, clusterService.getMasterService(), clusterService.getClusterApplierService(),`
26. `clusterService.getClusterSettings(), pluginsService.filterPlugins(DiscoveryPlugin.class),`
27. `clusterModule.getAllocationService(), environment.configFile(), gatewayMetaState, rerouteService);`
28. `// 节点服务`
29. `this.nodeService = new NodeService(settings, threadPool, monitorService, discoveryModule.getDiscovery(),`
30. `transportService, indicesService, pluginsService, circuitBreakerService, scriptModule.getScriptService(),`
31. `httpServerTransport, ingestService, clusterService, settingsModule.getSettingsFilter(), responseCollectorService,`
32. `searchTransportService);`
33. `// 搜索服务`
34. `final SearchService searchService = newSearchService(clusterService, indicesService,`
35. `threadPool, scriptModule.getScriptService(), bigArrays, searchModule.getFetchPhase(),`
36. `responseCollectorService, circuitBreakerService);`
37. `// 持久化任务执行器`
38. `final List<PersistentTasksExecutor<?>> tasksExecutors = pluginsService`
39. `.filterPlugins(PersistentTaskPlugin.class).stream()`
40. `.map(p -> p.getPersistentTasksExecutor(clusterService, threadPool, client, settingsModule))`
41. `.flatMap(List::stream)`
42. `.collect(toList());`
43. `// 持久化任务执行器注册`
44. `final PersistentTasksExecutorRegistry registry = new PersistentTasksExecutorRegistry(tasksExecutors);`
45. `// 持久化任务集群服务`
46. `final PersistentTasksClusterService persistentTasksClusterService =`
47. `new PersistentTasksClusterService(settings, registry, clusterService, threadPool);`
48. `resourcesToClose.add(persistentTasksClusterService);`
49. `// 持久化任务服务`
50. `final PersistentTasksService persistentTasksService = new PersistentTasksService(clusterService, threadPool, client);`
这里仍然还是创建一些基础服务设施,如存储、快照、路由、恢复、搜索等服务。
四、Elasticsearch的IOC部分
继续上面的代码逻辑往下走,就到了Elasticsearch中十分重要的部分,相当于IOC部分,我们直接上代码:
1. `// 这里是elasticsearch 自己的IOC管理机制`
2. `// 使用lambda创建了一个Module对象,并执行configure(Binder binder)方法,在方法里面执行inject逻辑`
3. `modules.add(b -> {`
4. `b.bind(Node.class).toInstance(this);`
5. `b.bind(NodeService.class).toInstance(nodeService);`
6. `b.bind(NamedXContentRegistry.class).toInstance(xContentRegistry);`
7. `b.bind(PluginsService.class).toInstance(pluginsService);`
8. `b.bind(Client.class).toInstance(client);`
9. `b.bind(NodeClient.class).toInstance(client);`
10. `b.bind(Environment.class).toInstance(this.environment);`
11. `b.bind(ThreadPool.class).toInstance(threadPool);`
12. `b.bind(NodeEnvironment.class).toInstance(nodeEnvironment);`
13. `b.bind(ResourceWatcherService.class).toInstance(resourceWatcherService);`
14. `b.bind(CircuitBreakerService.class).toInstance(circuitBreakerService);`
15. `b.bind(BigArrays.class).toInstance(bigArrays);`
16. `b.bind(PageCacheRecycler.class).toInstance(pageCacheRecycler);`
17. `b.bind(ScriptService.class).toInstance(scriptModule.getScriptService());`
18. `b.bind(AnalysisRegistry.class).toInstance(analysisModule.getAnalysisRegistry());`
19. `b.bind(IngestService.class).toInstance(ingestService);`
20. `b.bind(UsageService.class).toInstance(usageService);`
21. `b.bind(NamedWriteableRegistry.class).toInstance(namedWriteableRegistry);`
22. `b.bind(MetaDataUpgrader.class).toInstance(metaDataUpgrader);`
23. `b.bind(MetaStateService.class).toInstance(metaStateService);`
24. `b.bind(PersistedClusterStateService.class).toInstance(lucenePersistedStateFactory);`
25. `b.bind(IndicesService.class).toInstance(indicesService);`
26. `b.bind(AliasValidator.class).toInstance(aliasValidator);`
27. `b.bind(MetaDataCreateIndexService.class).toInstance(metaDataCreateIndexService);`
28. `b.bind(SearchService.class).toInstance(searchService);`
29. `b.bind(SearchTransportService.class).toInstance(searchTransportService);`
30. `b.bind(SearchPhaseController.class).toInstance(new SearchPhaseController(searchService::createReduceContext));`
31. `b.bind(Transport.class).toInstance(transport);`
32. `b.bind(TransportService.class).toInstance(transportService);`
33. `b.bind(NetworkService.class).toInstance(networkService);`
34. `b.bind(UpdateHelper.class).toInstance(new UpdateHelper(scriptModule.getScriptService()));`
35. `b.bind(MetaDataIndexUpgradeService.class).toInstance(metaDataIndexUpgradeService);`
36. `b.bind(ClusterInfoService.class).toInstance(clusterInfoService);`
37. `b.bind(GatewayMetaState.class).toInstance(gatewayMetaState);`
38. `b.bind(Discovery.class).toInstance(discoveryModule.getDiscovery());`
39. `{`
40. `RecoverySettings recoverySettings = new RecoverySettings(settings, settingsModule.getClusterSettings());`
41. `processRecoverySettings(settingsModule.getClusterSettings(), recoverySettings);`
42. `b.bind(PeerRecoverySourceService.class).toInstance(new PeerRecoverySourceService(transportService,`
43. `indicesService, recoverySettings));`
44. `b.bind(PeerRecoveryTargetService.class).toInstance(new PeerRecoveryTargetService(threadPool,`
45. `transportService, recoverySettings, clusterService));`
46. `}`
47. `b.bind(HttpServerTransport.class).toInstance(httpServerTransport);`
48. `pluginComponents.stream().forEach(p -> b.bind((Class) p.getClass()).toInstance(p));`
49. `b.bind(PersistentTasksService.class).toInstance(persistentTasksService);`
50. `b.bind(PersistentTasksClusterService.class).toInstance(persistentTasksClusterService);`
51. `b.bind(PersistentTasksExecutorRegistry.class).toInstance(registry);`
52. `b.bind(RepositoriesService.class).toInstance(repositoryService);`
53. `b.bind(SnapshotsService.class).toInstance(snapshotsService);`
54. `b.bind(SnapshotShardsService.class).toInstance(snapshotShardsService);`
55. `b.bind(RestoreService.class).toInstance(restoreService);`
56. `b.bind(RerouteService.class).toInstance(rerouteService);`
57. `}`
58. `);`
这里使用lambda创建了一个Module对象,并执行configure(Binder binder)方法,在方法里面执行inject逻辑,通过bind操作将上面流程中创建的各种服务都放到一个容器里去。
具体的实例注入是通过Elasticsearch自己实现的Guice机制,其中google也有一套Guice机制用于IOC管理。我们接着往下看代码:
1. `// 创建injector,之后可以从IOC容器中获取上面注入的实例对象`
2. `injector = modules.createInjector();`
3.
4. `// TODO hack around circular dependencies problems in AllocationService`
5. `clusterModule.getAllocationService().setGatewayAllocator(injector.getInstance(GatewayAllocator.class));`
6. `// 插件生命周期组件`
7. `List<LifecycleComponent> pluginLifecycleComponents = pluginComponents.stream()`
8. `.filter(p -> p instanceof LifecycleComponent)`
9. `.map(p -> (LifecycleComponent) p).collect(Collectors.toList());`
10. `resourcesToClose.addAll(pluginLifecycleComponents);`
11. `resourcesToClose.add(injector.getInstance(PeerRecoverySourceService.class));`
12. `this.pluginLifecycleComponents = Collections.unmodifiableList(pluginLifecycleComponents);`
13. `// 初始化client`
14. `client.initialize(injector.getInstance(new Key<Map<ActionType, TransportAction>>() {}), transportService.getTaskManager(),`
15. `() -> clusterService.localNode().getId(), transportService.getRemoteClusterService());`
16. `this.namedWriteableRegistry = namedWriteableRegistry;`
17.
18. `logger.debug("initializing HTTP handlers ...");`
19. `actionModule.initRestHandlers(() -> clusterService.state().nodes());`
这里创建injector,之后可以从IOC容器中获取上面注入的实例对象进行client和actionModule的初始化。
到这里Node节点的创建就已经完成了,接下来我们继续看下Node节点的启动流程。
节点的启动是由start方法来进行的,org.elasticsearch.node.Node#start方法的代码:
1. `/**`
2. `* Start the node. If the node is already started, this method is no-op.`
3. `*/`
4. `public Node start() throws NodeValidationException {`
5. `if (!lifecycle.moveToStarted()) {`
6. `return this;`
7. `}`
8.
9. `logger.info("starting ...");`
10. `// 启动生命周期组件`
11. `pluginLifecycleComponents.forEach(LifecycleComponent::start);`
12. `// 从IOC容器中获取MappingUpdatedAction实例并设置client属性`
13. `injector.getInstance(MappingUpdatedAction.class).setClient(client);`
14. `// 从IOC容器中获取索引服务并启动`
15. `injector.getInstance(IndicesService.class).start();`
16. `// 从IOC容器中获取索引集群状态服务并启动,底层是通过线程池来定时处理`
17. `injector.getInstance(IndicesClusterStateService.class).start();`
18. `// 从IOC容器中获取快照服务并启动`
19. `injector.getInstance(SnapshotsService.class).start();`
20. `// 从IOC容器中获取分片快照服务并启动`
21. `injector.getInstance(SnapshotShardsService.class).start();`
22. `// 从IOC容器中获取仓库服务并启动`
23. `injector.getInstance(RepositoriesService.class).start();`
24. `// 从IOC容器中获取搜索服务并启动`
25. `injector.getInstance(SearchService.class).start();`
26. `// 启动监控服务`
27. `nodeService.getMonitorService().start();`
28. `// 从IOC容器中获取集群服务实例`
29. `final ClusterService clusterService = injector.getInstance(ClusterService.class);`
30. `// 从容器中获取节点连接服务`
31. `final NodeConnectionsService nodeConnectionsService = injector.getInstance(NodeConnectionsService.class);`
32. `nodeConnectionsService.start();`
33. `clusterService.setNodeConnectionsService(nodeConnectionsService);`
34. `// 启动资源监控服务`
35. `injector.getInstance(ResourceWatcherService.class).start();`
36. `// 启动网关服务`
37. `injector.getInstance(GatewayService.class).start();`
38. `Discovery discovery = injector.getInstance(Discovery.class);`
39. `clusterService.getMasterService().setClusterStatePublisher(discovery::publish);`
40.
41. `// Start the transport service now so the publish address will be added to the local disco node in ClusterService`
42. `TransportService transportService = injector.getInstance(TransportService.class);`
43. `transportService.getTaskManager().setTaskResultsService(injector.getInstance(TaskResultsService.class));`
44. `// 启动网络传输服务`
45. `transportService.start();`
46. `assert localNodeFactory.getNode() != null;`
47. `assert transportService.getLocalNode().equals(localNodeFactory.getNode())`
48. `: "transportService has a different local node than the factory provided";`
49. `injector.getInstance(PeerRecoverySourceService.class).start();`
50.
51. `// Load (and maybe upgrade) the metadata stored on disk`
52. `final GatewayMetaState gatewayMetaState = injector.getInstance(GatewayMetaState.class);`
53. `// 启动网关元数据状态服务`
54. `gatewayMetaState.start(settings(), transportService, clusterService, injector.getInstance(MetaStateService.class),`
55. `injector.getInstance(MetaDataIndexUpgradeService.class), injector.getInstance(MetaDataUpgrader.class),`
56. `injector.getInstance(PersistedClusterStateService.class));`
57. `if (Assertions.ENABLED) {`
58. `---------省略部分代码--------`
59. `}`
60. `// we load the global state here (the persistent part of the cluster state stored on disk) to`
61. `// pass it to the bootstrap checks to allow plugins to enforce certain preconditions based on the recovered state.`
62. `// 加载磁盘上持久化的集群状态`
63. `final MetaData onDiskMetadata = gatewayMetaState.getPersistedState().getLastAcceptedState().metaData();`
64. `assert onDiskMetadata != null : "metadata is null but shouldn't"; // this is never null`
65. `// 校验节点状态信息`
66. `validateNodeBeforeAcceptingRequests(new BootstrapContext(environment, onDiskMetadata), transportService.boundAddress(),`
67. `pluginsService.filterPlugins(Plugin.class).stream()`
68. `.flatMap(p -> p.getBootstrapChecks().stream()).collect(Collectors.toList()));`
69. `// 添加集群状态管理`
70. `clusterService.addStateApplier(transportService.getTaskManager());`
71. `// start after transport service so the local disco is known`
72. `// 启动服务发现`
73. `discovery.start(); // start before cluster service so that it can set initial state on ClusterApplierService`
74. `// 启动集群状态服务`
75. `clusterService.start();`
76. `assert clusterService.localNode().equals(localNodeFactory.getNode())`
77. `: "clusterService has a different local node than the factory provided";`
78. `transportService.acceptIncomingRequests();`
79. `discovery.startInitialJoin();`
80. `final TimeValue initialStateTimeout = INITIAL_STATE_TIMEOUT_SETTING.get(settings());`
81. `configureNodeAndClusterIdStateListener(clusterService);`
82.
83. `if (initialStateTimeout.millis() > 0) {`
84. `final ThreadPool thread = injector.getInstance(ThreadPool.class);`
85. `ClusterState clusterState = clusterService.state();`
86. `// 观察者`
87. `ClusterStateObserver observer =`
88. `new ClusterStateObserver(clusterState, clusterService, null, logger, thread.getThreadContext());`
89. `//当没有主节点的时候`
90. `if (clusterState.nodes().getMasterNodeId() == null) {`
91. `logger.debug("waiting to join the cluster. timeout [{}]", initialStateTimeout);`
92. `// 闭锁 让集群状态初始化同步完成之后才进行之后的操作`
93. `final CountDownLatch latch = new CountDownLatch(1);`
94. `observer.waitForNextChange(new ClusterStateObserver.Listener() {`
95. `@Override`
96. `public void onNewClusterState(ClusterState state) { latch.countDown(); }`
97.
98. `@Override`
99. `public void onClusterServiceClose() {`
100. `latch.countDown();`
101. `}`
102.
103. `@Override`
104. `public void onTimeout(TimeValue timeout) {`
105. `logger.warn("timed out while waiting for initial discovery state - timeout: {}",`
106. `initialStateTimeout);`
107. `latch.countDown();`
108. `}`
109. `}, state -> state.nodes().getMasterNodeId() != null, initialStateTimeout);`
110.
111. `try {`
112. `// 等待直到集群状态同步完成才继续往下走`
113. `latch.await();`
114. `} catch (InterruptedException e) {`
115. `throw new ElasticsearchTimeoutException("Interrupted while waiting for initial discovery state");`
116. `}`
117. `}`
118. `}`
119. `// 启动httpServer服务`
120. `injector.getInstance(HttpServerTransport.class).start();`
121. `//是否写ports文件`
122. `if (WRITE_PORTS_FILE_SETTING.get(settings())) {`
123. `TransportService transport = injector.getInstance(TransportService.class);`
124. `// 写ports文件`
125. `writePortsFile("transport", transport.boundAddress());`
126. `HttpServerTransport http = injector.getInstance(HttpServerTransport.class);`
127. `writePortsFile("http", http.boundAddress());`
128. `}`
129.
130. `logger.info("started");`
131.
132. `pluginsService.filterPlugins(ClusterPlugin.class).forEach(ClusterPlugin::onNodeStarted);`
133.
134. `return this;`
135. `}`
具体代码细节可以参考下代码注释,这里主要是启动从Elasticsearch的IOC容器中获取在创建Node节点时初始化的一系列基础服务,具体启动细节涉及到的内容比较多,后面再具体分析。
到这里关于Node节点的创建和启动流程就梳理完了。注意,这里只涉及流程分析,对具体的细节在后面再详细介绍。