public LogFDsCache(long flushIntervalSecs, long cleanIntervalSecs, long ttl) { domainLogFD = null; summanyLogFDs = new HashMap<ApplicationAttemptId, EntityLogFD>(); entityLogFDs = new HashMap<ApplicationAttemptId, HashMap<TimelineEntityGroupId, EntityLogFD>>(); this.flushTimer = new Timer(LogFDsCache.class.getSimpleName() + "FlushTimer", true); this.flushTimerTask = new FlushTimerTask(); this.flushTimer.schedule(flushTimerTask, flushIntervalSecs * 1000, flushIntervalSecs * 1000); this.cleanInActiveFDsTimer = new Timer(LogFDsCache.class.getSimpleName() + "cleanInActiveFDsTimer", true); this.cleanInActiveFDsTask = new CleanInActiveFDsTask(); this.cleanInActiveFDsTimer.schedule(cleanInActiveFDsTask, cleanIntervalSecs * 1000, cleanIntervalSecs * 1000); this.ttl = ttl * 1000; }
private void flushEntityFDMap(Map<ApplicationAttemptId, HashMap< TimelineEntityGroupId, EntityLogFD>> logFDs) throws IOException { if (!logFDs.isEmpty()) { for (Entry<ApplicationAttemptId, HashMap<TimelineEntityGroupId, EntityLogFD>> logFDMapEntry : logFDs.entrySet()) { HashMap<TimelineEntityGroupId, EntityLogFD> logFDMap = logFDMapEntry.getValue(); for (Entry<TimelineEntityGroupId, EntityLogFD> logFDEntry : logFDMap.entrySet()) { EntityLogFD logFD = logFDEntry.getValue(); try { logFD.lock(); logFD.flush(); } finally { logFD.unlock(); } } } } }
private void cleanInActiveEntityFDsforMap(Map<ApplicationAttemptId, HashMap<TimelineEntityGroupId, EntityLogFD>> logFDs, long currentTimeStamp) { if (!logFDs.isEmpty()) { for (Entry<ApplicationAttemptId, HashMap< TimelineEntityGroupId, EntityLogFD>> logFDMapEntry : logFDs.entrySet()) { HashMap<TimelineEntityGroupId, EntityLogFD> logFDMap = logFDMapEntry.getValue(); for (Entry<TimelineEntityGroupId, EntityLogFD> logFDEntry : logFDMap.entrySet()) { EntityLogFD logFD = logFDEntry.getValue(); try { logFD.lock(); if (currentTimeStamp - logFD.getLastModifiedTime() >= ttl) { logFD.close(); } } finally { logFD.unlock(); } } } } }
private void closeEntityFDs(Map<ApplicationAttemptId, HashMap<TimelineEntityGroupId, EntityLogFD>> logFDs) { try { entityTableLocker.lock(); if (!logFDs.isEmpty()) { for (Entry<ApplicationAttemptId, HashMap<TimelineEntityGroupId, EntityLogFD>> logFDMapEntry : logFDs.entrySet()) { HashMap<TimelineEntityGroupId, EntityLogFD> logFDMap = logFDMapEntry.getValue(); for (Entry<TimelineEntityGroupId, EntityLogFD> logFDEntry : logFDMap.entrySet()) { EntityLogFD logFD = logFDEntry.getValue(); try { logFD.lock(); logFD.close(); } finally { logFD.unlock(); } } } } } finally { entityTableLocker.unlock(); } }
@Test public void testTimelineEntityGroupId() { ApplicationId appId1 = ApplicationId.newInstance(1234, 1); ApplicationId appId2 = ApplicationId.newInstance(1234, 2); TimelineEntityGroupId group1 = TimelineEntityGroupId.newInstance(appId1, "1"); TimelineEntityGroupId group2 = TimelineEntityGroupId.newInstance(appId1, "2"); TimelineEntityGroupId group3 = TimelineEntityGroupId.newInstance(appId2, "1"); TimelineEntityGroupId group4 = TimelineEntityGroupId.newInstance(appId1, "1"); Assert.assertTrue(group1.equals(group4)); Assert.assertFalse(group1.equals(group2)); Assert.assertFalse(group1.equals(group3)); Assert.assertTrue(group1.compareTo(group4) == 0); Assert.assertTrue(group1.compareTo(group2) < 0); Assert.assertTrue(group1.compareTo(group3) < 0); Assert.assertTrue(group1.hashCode() == group4.hashCode()); Assert.assertFalse(group1.hashCode() == group2.hashCode()); Assert.assertFalse(group1.hashCode() == group3.hashCode()); Assert.assertEquals("timelineEntityGroupId_1234_1_1", group1.toString()); Assert.assertEquals(TimelineEntityGroupId.fromString("timelineEntityGroupId_1234_1_1"), group1); }
public LogFDsCache(long flushIntervalSecs, long cleanIntervalSecs, long ttl, long timerTaskRetainTTL) { domainLogFD = null; summanyLogFDs = new HashMap<ApplicationAttemptId, EntityLogFD>(); entityLogFDs = new HashMap<ApplicationAttemptId, HashMap<TimelineEntityGroupId, EntityLogFD>>(); this.ttl = ttl * 1000; this.flushIntervalSecs = flushIntervalSecs; this.cleanIntervalSecs = cleanIntervalSecs; long timerTaskRetainTTLVar = timerTaskRetainTTL * 1000; if (timerTaskRetainTTLVar > this.ttl) { this.timerTaskRetainTTL = timerTaskRetainTTLVar; } else { this.timerTaskRetainTTL = this.ttl + 2 * 60 * 1000; LOG.warn("The specific " + YarnConfiguration .TIMELINE_SERVICE_CLIENT_INTERNAL_TIMERS_TTL_SECS + " : " + timerTaskRetainTTL + " is invalid, because it is less than or " + "equal to " + YarnConfiguration .TIMELINE_SERVICE_CLIENT_FD_RETAIN_SECS + " : " + ttl + ". Use " + YarnConfiguration.TIMELINE_SERVICE_CLIENT_FD_RETAIN_SECS + " : " + ttl + " + 120s instead."); } ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); this.timerTasksMonitorReadLock = lock.readLock(); this.timerTasksMonitorWriteLock = lock.writeLock(); }
synchronized void loadDetailLog(TimelineDataManager tdm, TimelineEntityGroupId groupId) throws IOException { List<LogInfo> removeList = new ArrayList<>(); for (LogInfo log : detailLogs) { LOG.debug("Try refresh logs for {}", log.getFilename()); // Only refresh the log that matches the cache id if (log.matchesGroupId(groupId)) { Path dirPath = getAppDirPath(); if (fs.exists(log.getPath(dirPath))) { LOG.debug("Refresh logs for cache id {}", groupId); log.parseForStore(tdm, dirPath, isDone(), jsonFactory, objMapper, fs); } else { // The log may have been removed, remove the log removeList.add(log); LOG.info( "File {} no longer exists, removing it from log list", log.getPath(dirPath)); } } } detailLogs.removeAll(removeList); }
private List<TimelineStore> getTimelineStoresFromCacheIds( Set<TimelineEntityGroupId> groupIds, String entityType, List<EntityCacheItem> cacheItems) throws IOException { List<TimelineStore> stores = new LinkedList<TimelineStore>(); // For now we just handle one store in a context. We return the first // non-null storage for the group ids. for (TimelineEntityGroupId groupId : groupIds) { TimelineStore storeForId = getCachedStore(groupId, cacheItems); if (storeForId != null) { LOG.debug("Adding {} as a store for the query", storeForId.getName()); stores.add(storeForId); metrics.incrGetEntityToDetailOps(); } } if (stores.size() == 0) { LOG.debug("Using summary store for {}", entityType); stores.add(this.summaryStore); metrics.incrGetEntityToSummaryOps(); } return stores; }
protected List<TimelineStore> getTimelineStoresForRead(String entityId, String entityType, List<EntityCacheItem> cacheItems) throws IOException { Set<TimelineEntityGroupId> groupIds = new HashSet<TimelineEntityGroupId>(); for (TimelineEntityGroupPlugin cacheIdPlugin : cacheIdPlugins) { LOG.debug("Trying plugin {} for id {} and type {}", cacheIdPlugin.getClass().getName(), entityId, entityType); Set<TimelineEntityGroupId> idsFromPlugin = cacheIdPlugin.getTimelineEntityGroupId(entityId, entityType); if (idsFromPlugin == null) { LOG.debug("Plugin returned null " + cacheIdPlugin.getClass().getName()); } else { LOG.debug("Plugin returned ids: " + idsFromPlugin); } if (idsFromPlugin != null) { groupIds.addAll(idsFromPlugin); LOG.debug("plugin {} returns a non-null value on query", cacheIdPlugin.getClass().getName()); } } return getTimelineStoresFromCacheIds(groupIds, entityType, cacheItems); }
private List<TimelineStore> getTimelineStoresForRead(String entityType, NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters, List<EntityCacheItem> cacheItems) throws IOException { Set<TimelineEntityGroupId> groupIds = new HashSet<TimelineEntityGroupId>(); for (TimelineEntityGroupPlugin cacheIdPlugin : cacheIdPlugins) { Set<TimelineEntityGroupId> idsFromPlugin = cacheIdPlugin.getTimelineEntityGroupId(entityType, primaryFilter, secondaryFilters); if (idsFromPlugin != null) { LOG.debug("plugin {} returns a non-null value on query {}", cacheIdPlugin.getClass().getName(), idsFromPlugin); groupIds.addAll(idsFromPlugin); } } return getTimelineStoresFromCacheIds(groupIds, entityType, cacheItems); }
private void logEntity(TimelineEntityGroupId groupId, TimelineEntity entity, String domainId) { if (historyACLPolicyManager != null && domainId != null && !domainId.isEmpty()) { historyACLPolicyManager.updateTimelineEntityDomain(entity, domainId); } try { TimelinePutResponse response = timelineClient.putEntities( appContext.getApplicationAttemptId(), groupId, entity); if (response != null && !response.getErrors().isEmpty()) { int count = response.getErrors().size(); for (int i = 0; i < count; ++i) { TimelinePutError err = response.getErrors().get(i); if (err.getErrorCode() != 0) { LOG.warn("Could not post history event to ATS" + ", atsPutError=" + err.getErrorCode() + ", entityId=" + err.getEntityId()); } } } // Do nothing additional, ATS client library should handle throttling // or auto-disable as needed } catch (Exception e) { LOG.warn("Could not handle history events", e); } }
@Override public Set<TimelineEntityGroupId> getTimelineEntityGroupId(String entityType, SortedSet<String> entityIds, Set<String> eventTypes) { if (!knownEntityTypes.contains(entityType) || summaryEntityTypes.contains(entityType) || entityIds == null || entityIds.isEmpty()) { return null; } Set<TimelineEntityGroupId> groupIds = new HashSet<TimelineEntityGroupId>(); for (String entityId : entityIds) { Set<TimelineEntityGroupId> groupId = convertToTimelineEntityGroupIds(entityType, entityId); if (groupId != null) { groupIds.addAll(groupId); } } return groupIds; }
@Test public void testGetTimelineEntityGroupIdByPrimaryFilter() { TimelineCachePluginImpl plugin = createPlugin(100, null); for (Entry<String, String> entry : typeIdMap1.entrySet()) { NameValuePair primaryFilter = new NameValuePair(entry.getKey(), entry.getValue()); Assert.assertNull(plugin.getTimelineEntityGroupId(EntityTypes.TEZ_APPLICATION.name(), primaryFilter, null)); Set<TimelineEntityGroupId> groupIds = plugin.getTimelineEntityGroupId(entry.getKey(), primaryFilter, null); if (entry.getKey().equals(EntityTypes.TEZ_DAG_ID.name())) { Assert.assertNull(groupIds); continue; } Assert.assertEquals(2, groupIds.size()); Iterator<TimelineEntityGroupId> iter = groupIds.iterator(); while (iter.hasNext()) { TimelineEntityGroupId groupId = iter.next(); Assert.assertEquals(appId1, groupId.getApplicationId()); Assert.assertTrue(getGroupIds(dagID1, 100).contains(groupId.getTimelineEntityGroupId())); } } }
@Test public void testGetTimelineEntityGroupIdByIdDefaultConfig() { TimelineCachePluginImpl plugin = createPlugin(-1, null); for (Entry<String, String> entry : typeIdMap1.entrySet()) { Set<TimelineEntityGroupId> groupIds = plugin.getTimelineEntityGroupId(entry.getValue(), entry.getKey()); if (entry.getKey().equals(EntityTypes.TEZ_DAG_ID.name())) { Assert.assertNull(groupIds); continue; } Assert.assertEquals(1, groupIds.size()); Iterator<TimelineEntityGroupId> iter = groupIds.iterator(); while (iter.hasNext()) { TimelineEntityGroupId groupId = iter.next(); Assert.assertEquals(appId1, groupId.getApplicationId()); Assert.assertTrue(getGroupIds(dagID1).contains(groupId.getTimelineEntityGroupId())); } } }
@Test public void testGetTimelineEntityGroupIdByIdNoGroupingConf() { TimelineCachePluginImpl plugin = createPlugin(1, null); for (Entry<String, String> entry : typeIdMap1.entrySet()) { Set<TimelineEntityGroupId> groupIds = plugin.getTimelineEntityGroupId(entry.getValue(), entry.getKey()); if (entry.getKey().equals(EntityTypes.TEZ_DAG_ID.name())) { Assert.assertNull(groupIds); continue; } Assert.assertEquals(1, groupIds.size()); Iterator<TimelineEntityGroupId> iter = groupIds.iterator(); while (iter.hasNext()) { TimelineEntityGroupId groupId = iter.next(); Assert.assertEquals(appId1, groupId.getApplicationId()); Assert.assertTrue(getGroupIds(dagID1).contains(groupId.getTimelineEntityGroupId())); } } }
@Test public void testGetTimelineEntityGroupIdById() { TimelineCachePluginImpl plugin = createPlugin(100, null); for (Entry<String, String> entry : typeIdMap1.entrySet()) { Set<TimelineEntityGroupId> groupIds = plugin.getTimelineEntityGroupId(entry.getValue(), entry.getKey()); if (entry.getKey().equals(EntityTypes.TEZ_DAG_ID.name())) { Assert.assertNull(groupIds); continue; } Assert.assertEquals(2, groupIds.size()); Iterator<TimelineEntityGroupId> iter = groupIds.iterator(); while (iter.hasNext()) { TimelineEntityGroupId groupId = iter.next(); Assert.assertEquals(appId1, groupId.getApplicationId()); Assert.assertTrue(getGroupIds(dagID1, 100).contains(groupId.getTimelineEntityGroupId())); } } }
@Test public void testGetTimelineEntityGroupIdByIdWithOldGroupIdsSingle() { TimelineCachePluginImpl plugin = createPlugin(100, "50"); for (Entry<String, String> entry : typeIdMap2.entrySet()) { Set<TimelineEntityGroupId> groupIds = plugin.getTimelineEntityGroupId(entry.getValue(), entry.getKey()); if (entry.getKey().equals(EntityTypes.TEZ_DAG_ID.name())) { Assert.assertNull(groupIds); continue; } Assert.assertEquals(3, groupIds.size()); Iterator<TimelineEntityGroupId> iter = groupIds.iterator(); while (iter.hasNext()) { TimelineEntityGroupId groupId = iter.next(); Assert.assertEquals(appId2, groupId.getApplicationId()); Assert.assertTrue(getGroupIds(dagID2, 100, 50).contains(groupId.getTimelineEntityGroupId())); } } }
@Test public void testGetTimelineEntityGroupIdByIdWithOldGroupIdsMultiple() { TimelineCachePluginImpl plugin = createPlugin(100, "25, 50"); for (Entry<String, String> entry : typeIdMap2.entrySet()) { Set<TimelineEntityGroupId> groupIds = plugin.getTimelineEntityGroupId(entry.getValue(), entry.getKey()); if (entry.getKey().equals(EntityTypes.TEZ_DAG_ID.name())) { Assert.assertNull(groupIds); continue; } Assert.assertEquals(4, groupIds.size()); Iterator<TimelineEntityGroupId> iter = groupIds.iterator(); while (iter.hasNext()) { TimelineEntityGroupId groupId = iter.next(); Assert.assertEquals(appId2, groupId.getApplicationId()); Assert.assertTrue( getGroupIds(dagID2, 100, 25, 50).contains(groupId.getTimelineEntityGroupId())); } } }
@Test public void testGetTimelineEntityGroupIdByIdWithOldGroupIdsEmpty() { TimelineCachePluginImpl plugin = createPlugin(100, ""); for (Entry<String, String> entry : typeIdMap2.entrySet()) { Set<TimelineEntityGroupId> groupIds = plugin.getTimelineEntityGroupId(entry.getValue(), entry.getKey()); if (entry.getKey().equals(EntityTypes.TEZ_DAG_ID.name())) { Assert.assertNull(groupIds); continue; } Assert.assertEquals(2, groupIds.size()); Iterator<TimelineEntityGroupId> iter = groupIds.iterator(); while (iter.hasNext()) { TimelineEntityGroupId groupId = iter.next(); Assert.assertEquals(appId2, groupId.getApplicationId()); Assert.assertTrue(getGroupIds(dagID2, 100).contains(groupId.getTimelineEntityGroupId())); } } }
private Map<ApplicationAttemptId, HashMap<TimelineEntityGroupId, EntityLogFD>> copyEntityLogFDs(Map<ApplicationAttemptId, HashMap<TimelineEntityGroupId, EntityLogFD>> entityLogFDsToCopy) { try { entityTableCopyLocker.lock(); return new HashMap<ApplicationAttemptId, HashMap<TimelineEntityGroupId, EntityLogFD>>(entityLogFDsToCopy); } finally { entityTableCopyLocker.unlock(); } }
public void writeEntityLogs(FileSystem fs, Path entityLogPath, ObjectMapper objMapper, ApplicationAttemptId appAttemptId, TimelineEntityGroupId groupId, List<TimelineEntity> entitiesToEntity, boolean isAppendSupported) throws IOException{ writeEntityLogs(fs, entityLogPath, objMapper, appAttemptId, groupId, entitiesToEntity, isAppendSupported, this.entityLogFDs); }
private void writeEntityLogs(FileSystem fs, Path logPath, ObjectMapper objMapper, ApplicationAttemptId attemptId, TimelineEntityGroupId groupId, List<TimelineEntity> entities, boolean isAppendSupported, Map<ApplicationAttemptId, HashMap< TimelineEntityGroupId, EntityLogFD>> logFDs) throws IOException { HashMap<TimelineEntityGroupId, EntityLogFD>logMapFD = logFDs.get(attemptId); if (logMapFD != null) { EntityLogFD logFD = logMapFD.get(groupId); if (logFD != null) { try { logFD.lock(); if (serviceStopped) { return; } logFD.writeEntities(entities); } finally { logFD.unlock(); } } else { createEntityFDandWrite(fs, logPath, objMapper, attemptId, groupId, entities, isAppendSupported, logFDs); } } else { createEntityFDandWrite(fs, logPath, objMapper, attemptId, groupId, entities, isAppendSupported, logFDs); } }
private void createEntityFDandWrite(FileSystem fs, Path logPath, ObjectMapper objMapper, ApplicationAttemptId attemptId, TimelineEntityGroupId groupId, List<TimelineEntity> entities, boolean isAppendSupported, Map<ApplicationAttemptId, HashMap< TimelineEntityGroupId, EntityLogFD>> logFDs) throws IOException{ try { entityTableLocker.lock(); if (serviceStopped) { return; } HashMap<TimelineEntityGroupId, EntityLogFD> logFDMap = logFDs.get(attemptId); if (logFDMap == null) { logFDMap = new HashMap<TimelineEntityGroupId, EntityLogFD>(); } EntityLogFD logFD = logFDMap.get(groupId); if (logFD == null) { logFD = new EntityLogFD(fs, logPath, objMapper, isAppendSupported); } try { logFD.lock(); logFD.writeEntities(entities); try { entityTableCopyLocker.lock(); logFDMap.put(groupId, logFD); logFDs.put(attemptId, logFDMap); } finally { entityTableCopyLocker.unlock(); } } finally { logFD.unlock(); } } finally { entityTableLocker.unlock(); } }
@Override public TimelinePutResponse putEntities(ApplicationAttemptId appAttemptId, TimelineEntityGroupId groupId, TimelineEntity... entities) throws IOException, YarnException { if (Float.compare(this.timelineServiceVersion, 1.5f) != 0) { throw new YarnException( "This API is not supported under current Timeline Service Version: " + timelineServiceVersion); } return timelineWriter.putEntities(appAttemptId, groupId, entities); }
public void writeEntityLogs(FileSystem fs, Path entityLogPath, ObjectMapper objMapper, ApplicationAttemptId appAttemptId, TimelineEntityGroupId groupId, List<TimelineEntity> entitiesToEntity, boolean isAppendSupported) throws IOException{ checkAndStartTimeTasks(); writeEntityLogs(fs, entityLogPath, objMapper, appAttemptId, groupId, entitiesToEntity, isAppendSupported, this.entityLogFDs); }
@Override public Set<TimelineEntityGroupId> getTimelineEntityGroupId(String entityType, NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters) { ApplicationId appId = ApplicationId.fromString( primaryFilter.getValue().toString()); return Sets.newHashSet(getStandardTimelineGroupId(appId)); }
@Override public Set<TimelineEntityGroupId> getTimelineEntityGroupId(String entityId, String entityType) { ApplicationId appId = ApplicationId.fromString( entityId); return Sets.newHashSet(getStandardTimelineGroupId(appId)); }
@Override public Set<TimelineEntityGroupId> getTimelineEntityGroupId(String entityType, NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters) { if (ApplicationMaster.DSEntity.DS_CONTAINER.toString().equals(entityType)) { if (primaryFilter == null) { return null; } return toEntityGroupId(primaryFilter.getValue().toString()); } return null; }
@Override public Set<TimelineEntityGroupId> getTimelineEntityGroupId(String entityId, String entityType) { if (ApplicationMaster.DSEntity.DS_CONTAINER.toString().equals(entityId)) { ContainerId containerId = ContainerId.fromString(entityId); ApplicationId appId = containerId.getApplicationAttemptId() .getApplicationId(); return toEntityGroupId(appId.toString()); } return null; }
private Set<TimelineEntityGroupId> toEntityGroupId(String strAppId) { ApplicationId appId = ApplicationId.fromString(strAppId); TimelineEntityGroupId groupId = TimelineEntityGroupId.newInstance( appId, ApplicationMaster.CONTAINER_ENTITY_GROUP_ID); Set<TimelineEntityGroupId> result = new HashSet<>(); result.add(groupId); return result; }
private TimelinePutResponse putContainerEntity( TimelineClient timelineClient, ApplicationAttemptId currAttemptId, TimelineEntity entity) throws YarnException, IOException { if (TimelineUtils.timelineServiceV1_5Enabled(conf)) { TimelineEntityGroupId groupId = TimelineEntityGroupId.newInstance( currAttemptId.getApplicationId(), CONTAINER_ENTITY_GROUP_ID); return timelineClient.putEntities(currAttemptId, groupId, entity); } else { return timelineClient.putEntities(entity); } }
@VisibleForTesting public TimelineEntityGroupId getGroupId(DAGHistoryEvent event) { // Changing this function will impact TimelineCachePluginImpl and should be done very // carefully to account for handling different versions of Tez switch (event.getHistoryEvent().getEventType()) { case DAG_SUBMITTED: case DAG_INITIALIZED: case DAG_STARTED: case DAG_FINISHED: case DAG_KILL_REQUEST: case VERTEX_INITIALIZED: case VERTEX_STARTED: case VERTEX_CONFIGURE_DONE: case VERTEX_FINISHED: case TASK_STARTED: case TASK_FINISHED: case TASK_ATTEMPT_STARTED: case TASK_ATTEMPT_FINISHED: case DAG_COMMIT_STARTED: case VERTEX_COMMIT_STARTED: case VERTEX_GROUP_COMMIT_STARTED: case VERTEX_GROUP_COMMIT_FINISHED: case DAG_RECOVERED: String entityGroupId = numDagsPerGroup > 1 ? event.getDagID().getGroupId(numDagsPerGroup) : event.getDagID().toString(); return TimelineEntityGroupId.newInstance(event.getDagID().getApplicationId(), entityGroupId); case APP_LAUNCHED: case AM_LAUNCHED: case AM_STARTED: case CONTAINER_LAUNCHED: case CONTAINER_STOPPED: return TimelineEntityGroupId.newInstance(appContext.getApplicationID(), appContext.getApplicationID().toString()); } return null; }