public static void main(String[] args) { LOGGER.info("{} {} starting", NAME, VERSION); Configuration configuration = new Configuration(); try { configuration = DataBindingUtils.readConfiguration(new File("config.yml")); } catch (IOException e) { LOGGER.error("Unable to read configuration, exiting."); LOGGER.error(e.getMessage()); System.exit(1); } final ServiceFactory configurationAwareServiceFactory = new ServiceFactory(configuration); configurationAwareServiceFactory.initializePlugins(); final ServiceManager serviceManager = new ServiceManager(configurationAwareServiceFactory.getServices()); LOGGER.info("Starting services"); serviceManager.startAsync(); }
public LocalJobLauncher(Properties jobProps) throws Exception { super(jobProps); TimingEvent jobLocalSetupTimer = this.eventSubmitter.getTimingEvent(TimingEventNames.RunJobTimings.JOB_LOCAL_SETUP); this.taskExecutor = new TaskExecutor(jobProps); this.taskStateTracker = new LocalTaskStateTracker(jobProps, this.taskExecutor); this.serviceManager = new ServiceManager(Lists.newArrayList( // The order matters due to dependencies between services this.taskExecutor, this.taskStateTracker)); // Start all dependent services this.serviceManager.startAsync().awaitHealthy(5, TimeUnit.SECONDS); startCancellationExecutor(); jobLocalSetupTimer.stop(); }
public LocalJobLauncher(Properties jobProps, SharedResourcesBroker<GobblinScopeTypes> instanceBroker) throws Exception { super(jobProps, ImmutableList.<Tag<?>> of(), instanceBroker); log.debug("Local job launched with properties: {}", jobProps); TimingEvent jobLocalSetupTimer = this.eventSubmitter.getTimingEvent(TimingEvent.RunJobTimings.JOB_LOCAL_SETUP); this.taskExecutor = new TaskExecutor(jobProps); this.taskStateTracker = new LocalTaskStateTracker(jobProps, this.jobContext.getJobState(), this.taskExecutor, this.eventBus); this.serviceManager = new ServiceManager(Lists.newArrayList( // The order matters due to dependencies between services this.taskExecutor, this.taskStateTracker)); // Start all dependent services this.serviceManager.startAsync().awaitHealthy(5, TimeUnit.SECONDS); startCancellationExecutor(); jobLocalSetupTimer.stop(); }
protected StandardGobblinInstanceDriver(String instanceName, Configurable sysConfig, JobCatalog jobCatalog, JobSpecScheduler jobScheduler, JobExecutionLauncher jobLauncher, Optional<MetricContext> instanceMetricContext, Optional<Logger> log, List<GobblinInstancePluginFactory> plugins, SharedResourcesBroker<GobblinScopeTypes> instanceBroker) { super(instanceName, sysConfig, jobCatalog, jobScheduler, jobLauncher, instanceMetricContext, log, instanceBroker); List<Service> componentServices = new ArrayList<>(); checkComponentService(getJobCatalog(), componentServices); checkComponentService(getJobScheduler(), componentServices); checkComponentService(getJobLauncher(), componentServices); _plugins = createPlugins(plugins, componentServices); if (componentServices.size() > 0) { _subservices = new ServiceManager(componentServices); } }
@BeforeClass public void setUp() throws Exception { this.jobConfigDir = Files.createTempDirectory(String.format("gobblin-test_%s_job-conf", this.getClass().getSimpleName())) .toString(); FileUtils.forceDeleteOnExit(new File(this.jobConfigDir)); FileUtils.copyDirectory(new File(JOB_CONFIG_FILE_DIR), new File(jobConfigDir)); Properties properties = new Properties(); try (Reader schedulerPropsReader = new FileReader("gobblin-test/resource/gobblin.test.properties")) { properties.load(schedulerPropsReader); } properties.setProperty(ConfigurationKeys.JOB_CONFIG_FILE_DIR_KEY, jobConfigDir); properties.setProperty(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY, jobConfigDir); properties.setProperty(ConfigurationKeys.JOB_CONFIG_FILE_MONITOR_POLLING_INTERVAL_KEY, "1000"); properties.setProperty(ConfigurationKeys.METRICS_ENABLED_KEY, "false"); SchedulerService quartzService = new SchedulerService(new Properties()); this.jobScheduler = new JobScheduler(properties, quartzService); this.serviceManager = new ServiceManager(Lists.newArrayList(quartzService, this.jobScheduler)); this.serviceManager.startAsync().awaitHealthy(10, TimeUnit.SECONDS);; }
@SuppressWarnings({"unchecked", "rawtypes"}) private static void guavaServices(final Env env, final Binder binder, final Set<Class<Service>> serviceTypes) { Consumer<Class> guavaService = klass -> { binder.bind(klass).asEagerSingleton(); serviceTypes.add(klass); }; serviceTypes.forEach(guavaService); // lazy service manager AtomicReference<ServiceManager> sm = new AtomicReference<>(); Provider<ServiceManager> smProvider = () -> sm.get(); binder.bind(ServiceManager.class).toProvider(smProvider); // ask Guice for services, create ServiceManager and start services env.onStart(r -> { List<Service> services = serviceTypes.stream() .map(r::require) .collect(Collectors.toList()); sm.set(new ServiceManager(services)); sm.get().startAsync().awaitHealthy(); }); // stop services env.onStop(() -> { sm.get().stopAsync().awaitStopped(); }); }
@Inject private GroningenWorkhorse(final Provider<Pipeline> pipelineProvider, final PipelineIdGenerator pipelineIdGenerator, final ServiceManager backgroundServices, final SystemAdapter systemAdapter, final Settings settings, final PipelineManager pipelineManager, final ProtoBufConfigManagerFactory protoBufConfigManagerFactory, final Build build) { this.backgroundServices = backgroundServices; this.systemAdapter = systemAdapter; this.pipelineManager = pipelineManager; this.settings = settings; this.protoBufConfigManagerFactory = protoBufConfigManagerFactory; this.build = build; }
@VisibleForTesting ProcessTracker( BuckEventBus buckEventBus, InvocationInfo invocationInfo, ProcessHelper processHelper, ProcessRegistry processRegistry, boolean isDaemon, boolean deepEnabled) { this.eventBus = buckEventBus; this.invocationInfo = invocationInfo; this.serviceManager = new ServiceManager(ImmutableList.of(this)); this.processHelper = processHelper; this.processRegistry = processRegistry; this.isDaemon = isDaemon; this.deepEnabled = deepEnabled; serviceManager.startAsync(); this.processRegistry.subscribe(processRegisterCallback); }
@Override public ServiceManager get() { final ImmutableSet<Service> allServices = ImmutableSet.<Service>builder() .addAll(services) .addAll(configuration.getServices()) .build(); return new ServiceManager(allServices); }
@BeforeEach public void setup( @OracleSettings Map<String, String> settings ) throws Exception { this.config = new OracleSourceConnectorConfig(settings); this.offsetStorageReader = mock(OffsetStorageReader.class); this.changeWriter = mock(ChangeWriter.class); this.queryService = new QueryService(this.config, this.offsetStorageReader, this.changeWriter); this.serviceManager = new ServiceManager(Arrays.asList(this.queryService)); }
/** * Create a new {@link ServiceManagerIface} that wraps a {@link ServiceManager}. * * @param delegate Service manager to delegate to. * @return A wrapper. */ public static ServiceManagerIface serviceManager(final ServiceManager delegate) { return new ServiceManagerIface() { @Override public ServiceManagerIface startAsync() { delegate.startAsync(); return this; } @Override public void awaitHealthy() { delegate.awaitHealthy(); } @Override public ServiceManagerIface stopAsync() { delegate.stopAsync(); return this; } @Override public void awaitStopped(long timeout, TimeUnit unit) throws TimeoutException { delegate.awaitStopped(timeout, unit); } @Override public ImmutableMultimap<State, Service> servicesByState() { return delegate.servicesByState(); } }; }
@Provides @Singleton @AppStartup ServiceManagerIface provideAppStartupServiceManager( @AppStartup Set<Service> services, LifecycleShutdownListener listener) { ServiceManager manager = new ServiceManager(services); manager.addListener(listener); return GuavaUtils.serviceManager(manager); }
@Provides @Singleton @SchedulerActive ServiceManagerIface provideSchedulerActiveServiceManager( @SchedulerActive Set<Service> services, LifecycleShutdownListener listener) { ServiceManager manager = new ServiceManager(services); manager.addListener(listener); return GuavaUtils.serviceManager(manager); }
@Bean(initMethod = "startAsync", destroyMethod = "stopAsync") public ServiceManager serviceManager() { List<PoolingThreadedService> services = new ArrayList<>(); services.addAll(requestParsingServices()); services.addAll(downloadingServices()); services.addAll(metaparsingServices()); return new ServiceManager(services); }
public SchedulerDaemon(Properties defaultProperties, Properties customProperties) throws Exception { Properties properties = new Properties(); properties.putAll(defaultProperties); properties.putAll(customProperties); List<Service> services = Lists.<Service>newArrayList(new JobScheduler(properties)); boolean jobExecInfoServerEnabled = Boolean .valueOf(properties.getProperty(ConfigurationKeys.JOB_EXECINFO_SERVER_ENABLED_KEY, Boolean.FALSE.toString())); if (jobExecInfoServerEnabled) { services.add(new JobExecutionInfoServer(properties)); } this.serviceManager = new ServiceManager(services); }
@BeforeClass public void setUp() throws Exception { Properties properties = new Properties(); properties.load(new FileReader("gobblin-test/resource/gobblin.test.properties")); properties.setProperty(ConfigurationKeys.JOB_CONFIG_FILE_DIR_KEY, JOB_CONFIG_FILE_DIR); properties.setProperty(ConfigurationKeys.JOB_CONFIG_FILE_MONITOR_POLLING_INTERVAL_KEY, "1000"); properties.setProperty(ConfigurationKeys.METRICS_ENABLED_KEY, "false"); this.jobScheduler = new JobScheduler(properties); this.serviceManager = new ServiceManager(Lists.newArrayList(this.jobScheduler)); this.serviceManager.startAsync(); }
@Inject DefaultCultivarStartStopManager(@Curator final CuratorManagementService curatorManagementService, @Cultivar final ServiceManager serviceManager) { this.curatorManagementService = curatorManagementService; this.serviceManager = serviceManager; }
@Before public void setUp() { /* * Multiple things in AbstractIdleService and ServiceManager are final, necessitating indirect methods of seeing * if the service has been started up or shut down. */ Service service = new AbstractIdleService() { @Override protected void startUp() throws Exception { logger.info("startUp"); } @Override protected void shutDown() throws Exception { logger.info("shutDown"); } @Override protected Executor executor() { return MoreExecutors.sameThreadExecutor(); } }; serviceManager = new ServiceManager(ImmutableSet.of(service)); when(curatorManagementService.startAsync()).thenReturn(curatorManagementService); when(curatorManagementService.stopAsync()).thenReturn(curatorManagementService); startStopManager = new DefaultCultivarStartStopManager(curatorManagementService, serviceManager); }
private void getServices() { final Properties properties = ConfigUtils.configToProperties(this.clusterConfig); this.taskExecutor = new TaskExecutor(properties); this.taskStateTracker = new GobblinHelixTaskStateTracker(properties); final List<Service> services = Lists.newArrayList(this.taskExecutor, this.taskStateTracker); this.serviceManager = new ServiceManager(services); }
public GobblinTaskRunner(String applicationName, String helixInstanceName, String applicationId, String taskRunnerId, Config config, Optional<Path> appWorkDirOptional) throws Exception { this.helixInstanceName = helixInstanceName; this.taskRunnerId = taskRunnerId; this.applicationName = applicationName; this.applicationId = applicationId; Configuration conf = HadoopUtils.newConfiguration(); this.fs = buildFileSystem(config, conf); this.appWorkPath = initAppWorkDir(config, appWorkDirOptional); this.config = saveConfigToFile(config); initHelixManager(); this.containerMetrics = buildContainerMetrics(); this.taskStateModelFactory = registerHelixTaskFactory(); services.addAll(getServices()); if (services.isEmpty()) { this.serviceManager = null; } else { this.serviceManager = new ServiceManager(services); } logger.debug("GobblinTaskRunner: applicationName {}, helixInstanceName {}, applicationId {}, taskRunnerId {}, config {}, appWorkDir {}", applicationName, helixInstanceName, applicationId, taskRunnerId, config, appWorkDirOptional); }
@Override protected void configure() { bind(BlankRegistrationService.class).in(Scopes.SINGLETON); Multibinder<RegistrationService<?>> services = Multibinder.newSetBinder(binder(), new TypeLiteral<RegistrationService<?>>() { }); services.addBinding().to(BlankRegistrationService.class); Multibinder.newSetBinder(binder(), ServiceManager.class, Cultivar.class).addBinding() .toProvider(getProvider(Key.get(ServiceManager.class, Discovery.class))); }
@Inject DefaultCultivarStartStopManager(@Curator final CuratorManagementService curatorManagementService, @Cultivar final ServiceManager serviceManager, @Cultivar final Set<ServiceManager> additionalManagers) { this.curatorManagementService = curatorManagementService; this.serviceManager = serviceManager; this.additionalManagers = additionalManagers; }
public static DaemonSlayer getSlayer(NGContext context) { if (daemonSlayerInstance == null) { synchronized (DaemonSlayer.class) { if (daemonSlayerInstance == null) { DaemonSlayer slayer = new DaemonSlayer(context); ServiceManager manager = new ServiceManager(ImmutableList.of(slayer)); manager.startAsync(); daemonSlayerInstance = new DaemonSlayerInstance(slayer); } } } return daemonSlayerInstance.daemonSlayer; }
public AutoStartInstance(Consumer<String> hangReportConsumer, Duration hangCheckTimeout) { LOG.info("HangMonitorAutoStart"); hangMonitor = new HangMonitor(hangReportConsumer, hangCheckTimeout); serviceManager = new ServiceManager(ImmutableList.of(hangMonitor)); serviceManager.startAsync(); }
@PostConstruct public void start() throws TimeoutException { ImmutableList.Builder<Service> services = ImmutableList.builder(); if (contentIndexerEnabled) { services.add(equivalentContentIndexingMessageListener()); services.add(equivalentContentIndexingGraphMessageListener()); } if (equivalentContentStoreEnabled) { services.add(equivalentContentStoreContentUpdateListener()); } if (equivalentContentGraphEnabled) { services.add(equivalentContentStoreGraphUpdateListener()); } if (equivalentScheduleContentEnabled) { services.add(equivalentScheduleStoreContentListener()); } if (equivalentScheduleScheduleEnabled) { services.add(equivalentScheduleStoreScheduleUpdateListener()); } if (equivalentScheduleGraphEnabled) { services.add(equivalentScheduleStoreGraphUpdateListener()); } if (topicIndexerEnabled) { services.add(topicIndexerMessageListener()); } if (equivalenceGraphEnabled) { services.add(equivUpdateListener()); } if (neo4jContentStoreContentUpdateEnabled) { services.add(neo4jContentStoreContentUpdateMessageListener()); } if (neo4jContentStoreGraphUpdateEnabled) { services.add(neo4jContentStoreGraphUpdateMessageListener()); } consumerManager = new ServiceManager(services.build()); consumerManager.startAsync().awaitHealthy(1, TimeUnit.MINUTES); }
@PostConstruct public void start() throws TimeoutException { ImmutableList.Builder<Service> services = ImmutableList.builder(); if (contentBootstrapEnabled) { services.add(contentBootstrapWorker()); } if (cqlContentBootstrapEnabled) { services.add(cqlContentBootstrapWorker()); } if (scheduleBootstrapEnabled) { services.add(scheduleReadWriter()); } if (v2ScheduleEnabled) { services.add(scheduleV2ReadWriter()); } if (topicBootstrapEnabled) { services.add(topicReadWriter()); } if (eventBootstrapEnabled) { services.add(eventReadWriter()); } if (organisationBootstrapEnabled) { services.add(organisationBootstrapWorker()); } consumerManager = new ServiceManager(services.build()); consumerManager.startAsync().awaitHealthy(1, TimeUnit.MINUTES); }
@Inject public CollectorServiceManager(ServiceManager serviceManager, ConfigurationRegistry configuration) { this.serviceManager = serviceManager; this.configuration = configuration; }
@Override protected void configure() { bind(CollectorServiceManager.class); bind(ServiceManager.class).toProvider(ServiceManagerProvider.class); }
@BeforeClass public void setup() throws UnknownHostException, ExecutionException, InterruptedException, SocketException { provider = new NTPServerTimeProvider(new String[]{"localhost"}); // use localhost to avoid delays and usage caps serviceManager = new ServiceManager(Arrays.asList(provider)); serviceManager.startAsync().awaitHealthy(); }
@Before public void setUp() { poolingService = new PoolingThreadedService(toDownload, client::download, "download", store); serviceManager = new ServiceManager(Lists.newArrayList(poolingService)); }
@Override protected void setup(Context context) { try { this.fs = FileSystem.get(context.getConfiguration()); this.taskStateStore = new FsStateStore<TaskState>(this.fs, SequenceFileOutputFormat.getOutputPath(context).toUri().getPath(), TaskState.class); Path jobStateFilePath = new Path(context.getConfiguration().get(ConfigurationKeys.JOB_STATE_FILE_PATH_KEY)); SerializationUtils.deserializeState(this.fs, jobStateFilePath, this.jobState); } catch (IOException ioe) { throw new RuntimeException("Failed to setup the mapper task", ioe); } this.taskExecutor = new TaskExecutor(context.getConfiguration()); this.taskStateTracker = new MRTaskStateTracker(context); this.serviceManager = new ServiceManager(Lists.newArrayList(this.taskExecutor, this.taskStateTracker)); try { this.serviceManager.startAsync().awaitHealthy(5, TimeUnit.SECONDS); } catch (TimeoutException te) { LOG.error("Timed out while waiting for the service manager to start up", te); throw new RuntimeException(te); } Configuration configuration = context.getConfiguration(); // Setup and start metrics reporting if metric reporting is enabled if (Boolean.valueOf( configuration.get(ConfigurationKeys.METRICS_ENABLED_KEY, ConfigurationKeys.DEFAULT_METRICS_ENABLED))) { this.jobMetrics = Optional.of(JobMetrics.get(this.jobState)); String metricFileSuffix = configuration.get(ConfigurationKeys.METRICS_FILE_SUFFIX, ConfigurationKeys.DEFAULT_METRICS_FILE_SUFFIX); // If running in MR mode, all mappers will try to write metrics to the same file, which will fail. // Instead, append the taskAttemptId to each file name. if (Strings.isNullOrEmpty(metricFileSuffix)) { metricFileSuffix = context.getTaskAttemptID().getTaskID().toString(); } else { metricFileSuffix += "." + context.getTaskAttemptID().getTaskID().toString(); } configuration.set(ConfigurationKeys.METRICS_FILE_SUFFIX, metricFileSuffix); this.jobMetrics.get().startMetricReporting(configuration); } }
/** * Launch a new Gobblin instance on Yarn. * * @throws IOException if there's something wrong launching the application * @throws YarnException if there's something wrong launching the application */ public void launch() throws IOException, YarnException { this.eventBus.register(this); createGobblinYarnHelixCluster(); try { this.helixManager.connect(); } catch (Exception e) { LOGGER.error("HelixManager failed to connect", e); throw Throwables.propagate(e); } this.yarnClient.start(); this.applicationId = Optional.of(setupAndSubmitApplication()); this.applicationStatusMonitor.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { eventBus.post(new ApplicationReportArrivalEvent(yarnClient.getApplicationReport(applicationId.get()))); } catch (YarnException ye) { LOGGER.error("Failed to get application report for Gobblin Yarn application " + applicationId.get(), ye); } catch (IOException ioe) { LOGGER.error("Failed to get application report for Gobblin Yarn application " + applicationId.get(), ioe); } } }, 0, this.appReportIntervalMinutes, TimeUnit.MINUTES); List<Service> services = Lists.newArrayList(); if (config.hasPath(GobblinYarnConfigurationKeys.KEYTAB_FILE_PATH)) { LOGGER.info("Adding YarnAppSecurityManager since login is keytab based"); services.add(new YarnAppSecurityManager(config, this.helixManager, this.fs)); } services.add(buildLogCopier( new Path(this.sinkLogRootDir, this.applicationName + Path.SEPARATOR + this.applicationId.get().toString()), YarnHelixUtils.getAppWorkDirPath(this.fs, this.applicationName, this.applicationId.get()))); this.serviceManager = Optional.of(new ServiceManager(services)); // Start all the services running in the ApplicationMaster this.serviceManager.get().startAsync(); }
public GobblinApplicationMaster(String applicationName, Config config) throws Exception { ContainerId containerId = ConverterUtils.toContainerId(System.getenv().get(ApplicationConstants.Environment.CONTAINER_ID.key())); ApplicationAttemptId applicationAttemptId = containerId.getApplicationAttemptId(); String zkConnectionString = config.getString(GobblinYarnConfigurationKeys.ZK_CONNECTION_STRING_KEY); LOGGER.info("Using ZooKeeper connection string: " + zkConnectionString); String helixInstanceName = YarnHelixUtils.getHelixInstanceName(YarnHelixUtils.getHostname(), containerId); // This will create and register a Helix controller in ZooKeeper this.helixManager = HelixManagerFactory.getZKHelixManager( config.getString(GobblinYarnConfigurationKeys.HELIX_CLUSTER_NAME_KEY), helixInstanceName, InstanceType.CONTROLLER, zkConnectionString); FileSystem fs = config.hasPath(ConfigurationKeys.FS_URI_KEY) ? FileSystem.get(URI.create(config.getString(ConfigurationKeys.FS_URI_KEY)), new Configuration()) : FileSystem.get(new Configuration()); Path appWorkDir = YarnHelixUtils.getAppWorkDirPath(fs, applicationName, applicationAttemptId.getApplicationId()); List<Service> services = Lists.newArrayList(); services.add(buildLogCopier(containerId, fs, appWorkDir)); services.add( new YarnService(config, applicationName, applicationAttemptId.getApplicationId(), fs, this.eventBus, Strings.nullToEmpty(config.getString(GobblinYarnConfigurationKeys.CONTAINER_JVM_ARGS_KEY)))); services.add( new GobblinHelixJobScheduler(YarnHelixUtils.configToProperties(config), this.helixManager, this.eventBus, appWorkDir)); services.add(new JobConfigurationManager(this.eventBus, config.hasPath(GobblinYarnConfigurationKeys.JOB_CONF_PACKAGE_PATH_KEY) ? Optional .of(config.getString(GobblinYarnConfigurationKeys.JOB_CONF_PACKAGE_PATH_KEY)) : Optional.<String>absent())); if (UserGroupInformation.isSecurityEnabled()) { LOGGER.info("Adding YarnContainerSecurityManager since security is enabled"); services.add(new YarnContainerSecurityManager(config, fs, this.eventBus)); } this.serviceManager = new ServiceManager(services); List<Tag<?>> tags = ImmutableList.<Tag<?>>builder() .add(new Tag<String>(GobblinYarnMetricTagNames.YARN_APPLICATION_NAME, applicationName)) .add(new Tag<String>(GobblinYarnMetricTagNames.YARN_APPLICATION_ID, applicationAttemptId.getApplicationId().toString())) .add(new Tag<String>(GobblinYarnMetricTagNames.CONTAINER_ID, containerId.toString())) .add(new Tag<String>(GobblinYarnMetricTagNames.HELIX_INSTANCE_NAME, helixInstanceName)) .build(); this.metricContext = MetricContext.builder(GobblinApplicationMaster.class.getSimpleName()) .addTags(tags) .build(); this.jmxReporter = JmxReporter.forRegistry(this.metricContext) .convertRatesTo(TimeUnit.SECONDS) .convertDurationsTo(TimeUnit.MILLISECONDS) .build(); }
public GobblinWorkUnitRunner(String applicationName, Config config) throws Exception { this.containerId = ConverterUtils.toContainerId(System.getenv().get(ApplicationConstants.Environment.CONTAINER_ID.key())); ApplicationAttemptId applicationAttemptId = this.containerId.getApplicationAttemptId(); FileSystem fs = config.hasPath(ConfigurationKeys.FS_URI_KEY) ? FileSystem.get(URI.create(config.getString(ConfigurationKeys.FS_URI_KEY)), new Configuration()) : FileSystem.get(new Configuration()); String zkConnectionString = config.getString(GobblinYarnConfigurationKeys.ZK_CONNECTION_STRING_KEY); LOGGER.info("Using ZooKeeper connection string: " + zkConnectionString); String helixInstanceName = YarnHelixUtils.getHelixInstanceName(YarnHelixUtils.getHostname(), this.containerId); this.helixManager = HelixManagerFactory.getZKHelixManager( config.getString(GobblinYarnConfigurationKeys.HELIX_CLUSTER_NAME_KEY), helixInstanceName, InstanceType.PARTICIPANT, zkConnectionString); Properties properties = YarnHelixUtils.configToProperties(config); TaskExecutor taskExecutor = new TaskExecutor(properties); TaskStateTracker taskStateTracker = new GobblinHelixTaskStateTracker(properties, this.helixManager); List<Service> services = Lists.newArrayList(); services.add(buildLogCopier(this.containerId, fs, YarnHelixUtils.getAppWorkDirPath(fs, applicationName, applicationAttemptId.getApplicationId()))); services.add(taskExecutor); services.add(taskStateTracker); if (config.hasPath(GobblinYarnConfigurationKeys.KEYTAB_FILE_PATH)) { LOGGER.info("Adding YarnContainerSecurityManager since login is keytab based"); services.add(new YarnContainerSecurityManager(config, fs, this.eventBus)); } this.serviceManager = new ServiceManager(services); // Register task factory for the Helix task state model Map<String, TaskFactory> taskFactoryMap = Maps.newHashMap(); Path appWorkDir = YarnHelixUtils.getAppWorkDirPath( fs, applicationName, containerId.getApplicationAttemptId().getApplicationId()); taskFactoryMap.put(GOBBLIN_TASK_FACTORY_NAME, new GobblinHelixTaskFactory(taskExecutor, taskStateTracker, fs, appWorkDir)); this.taskStateModelFactory = new TaskStateModelFactory(this.helixManager, taskFactoryMap); this.helixManager.getStateMachineEngine().registerStateModelFactory("Task", this.taskStateModelFactory); List<Tag<?>> tags = ImmutableList.<Tag<?>>builder() .add(new Tag<String>(GobblinYarnMetricTagNames.YARN_APPLICATION_NAME, applicationName)) .add(new Tag<String>(GobblinYarnMetricTagNames.YARN_APPLICATION_ID, containerId.getApplicationAttemptId().getApplicationId().toString())) .add(new Tag<String>(GobblinYarnMetricTagNames.CONTAINER_ID, containerId.toString())) .add(new Tag<String>(GobblinYarnMetricTagNames.HELIX_INSTANCE_NAME, helixInstanceName)) .build(); this.metricContext = MetricContext.builder(GobblinApplicationMaster.class.getSimpleName()) .addTags(tags) .build(); this.jmxReporter = JmxReporter.forRegistry(this.metricContext) .convertRatesTo(TimeUnit.SECONDS) .convertDurationsTo(TimeUnit.MILLISECONDS) .build(); }
@Provides @Singleton @Cultivar public ServiceManager manager(final Set<CuratorService> services) { return new ServiceManager(services); }
@Test public void configure_WithDependenciesInjected_AllowsCreatingServiceManager() { Injector inj = Guice.createInjector(new CuratorModule(dependencies)); assertNotNull(inj.getInstance(Key.get(ServiceManager.class, Cultivar.class))); }
@Test public void configure_WithDependenciesAdjacent_AllowsCreatingServiceManager() { Injector inj = Guice.createInjector(dependencies, new CuratorModule()); assertNotNull(inj.getInstance(Key.get(ServiceManager.class, Cultivar.class))); }