private void mergeLocalToBuilder() { if (this.resource != null && !((LocalResourcePBImpl)this.resource).getProto() .equals(builder.getResource())) { builder.setResource(convertToProtoFormat(this.resource)); } if (this.localPath != null && !((URLPBImpl)this.localPath).getProto() .equals(builder.getLocalPath())) { builder.setLocalPath(convertToProtoFormat(this.localPath)); } if (this.exception != null && !((SerializedExceptionPBImpl)this.exception).getProto() .equals(builder.getException())) { builder.setException(convertToProtoFormat(this.exception)); } }
private void mergeLocalToBuilder() { ResourceLocalizationSpecProtoOrBuilder l = viaProto ? proto : builder; if (this.resource != null && !(l.getResource() .equals(((LocalResourcePBImpl) resource).getProto()))) { maybeInitBuilder(); builder.setResource(((LocalResourcePBImpl) resource).getProto()); } if (this.destinationDirectory != null && !(l.getDestinationDirectory() .equals(((URLPBImpl) destinationDirectory).getProto()))) { maybeInitBuilder(); builder.setDestinationDirectory(((URLPBImpl) destinationDirectory) .getProto()); } }
public static Map<String, LocalResource> createLocalResourceMapFromDAGPlan( List<PlanLocalResource> localResourcesList) { Map<String, LocalResource> map = new HashMap<String, LocalResource>(); for(PlanLocalResource res : localResourcesList){ LocalResource r = new LocalResourcePBImpl(); //NOTE: have to check every optional field in protobuf generated classes for existence before accessing //else we will receive a default value back, eg "" if(res.hasPattern()){ r.setPattern(res.getPattern()); } r.setResource(ConverterUtils.getYarnUrlFromPath(new Path(res.getUri()))); // see above notes on HDFS URL handling r.setSize(res.getSize()); r.setTimestamp(res.getTimeStamp()); r.setType(DagTypeConverters.convertFromDAGPlan(res.getType())); r.setVisibility(DagTypeConverters.convertFromDAGPlan(res.getVisibility())); map.put(res.getName(), r); } return map; }
public static Map<String, LocalResource> createLocalResourceMapFromDAGPlan( List<PlanLocalResource> localResourcesList) { Map<String, LocalResource> map = new HashMap<String, LocalResource>(); for(PlanLocalResource res : localResourcesList){ LocalResource r = new LocalResourcePBImpl(); //NOTE: have to check every optional field in protobuf generated classes for existence before accessing //else we will receive a default value back, eg "" if(res.hasPattern()){ r.setPattern(res.getPattern()); } r.setResource(convertToYarnURL(res.getUri())); r.setSize(res.getSize()); r.setTimestamp(res.getTimeStamp()); r.setType(DagTypeConverters.convertFromDAGPlan(res.getType())); r.setVisibility(DagTypeConverters.convertFromDAGPlan(res.getVisibility())); map.put(res.getName(), r); } return map; }
static LocalResource createResource() { LocalResource ret = recordFactory.newRecordInstance(LocalResource.class); assertTrue(ret instanceof LocalResourcePBImpl); ret.setResource(ConverterUtils.getYarnUrlFromPath(new Path( "hdfs://y.ak:8020/foo/bar"))); ret.setSize(4344L); ret.setTimestamp(3141592653589793L); ret.setVisibility(LocalResourceVisibility.PUBLIC); return ret; }
/** * @return {@link Path} absolute path for localization which includes local * directory path and the relative hierarchical path (if use local * cache directory manager is enabled) * * @param {@link LocalResourceRequest} Resource localization request to * localize the resource. * @param {@link Path} local directory path */ @Override public Path getPathForLocalization(LocalResourceRequest req, Path localDirPath) { Path rPath = localDirPath; if (useLocalCacheDirectoryManager && localDirPath != null) { if (!directoryManagers.containsKey(localDirPath)) { directoryManagers.putIfAbsent(localDirPath, new LocalCacheDirectoryManager(conf)); } LocalCacheDirectoryManager dir = directoryManagers.get(localDirPath); rPath = localDirPath; String hierarchicalPath = dir.getRelativePathForLocalization(); // For most of the scenarios we will get root path only which // is an empty string if (!hierarchicalPath.isEmpty()) { rPath = new Path(localDirPath, hierarchicalPath); } inProgressLocalResourcesMap.put(req, rPath); } rPath = new Path(rPath, Long.toString(uniqueNumberGenerator.incrementAndGet())); Path localPath = new Path(rPath, req.getPath().getName()); LocalizedResource rsrc = localrsrc.get(req); rsrc.setLocalPath(localPath); LocalResource lr = LocalResource.newInstance(req.getResource(), req.getType(), req.getVisibility(), req.getSize(), req.getTimestamp()); try { stateStore.startResourceLocalization(user, appId, ((LocalResourcePBImpl) lr).getProto(), localPath); } catch (IOException e) { LOG.error("Unable to record localization start for " + rsrc, e); } return rPath; }
static LocalResource createResource() { LocalResource ret = recordFactory.newRecordInstance(LocalResource.class); assertTrue(ret instanceof LocalResourcePBImpl); ret.setResource(URL.fromPath(new Path("hdfs://y.ak:8020/foo/bar"))); ret.setSize(4344L); ret.setTimestamp(3141592653589793L); ret.setVisibility(LocalResourceVisibility.PUBLIC); return ret; }
private LocalResourceProto convertToProtoFormat(LocalResource rsrc) { return ((LocalResourcePBImpl)rsrc).getProto(); }
private LocalResourcePBImpl convertFromProtoFormat(LocalResourceProto rsrc) { return new LocalResourcePBImpl(rsrc); }
@Test @SuppressWarnings("unchecked") public void testStateStoreSuccessfulLocalization() throws Exception { final String user = "someuser"; final ApplicationId appId = ApplicationId.newInstance(1, 1); // This is a random path. NO File creation will take place at this place. final Path localDir = new Path("/tmp"); Configuration conf = new YarnConfiguration(); DrainDispatcher dispatcher = null; dispatcher = createDispatcher(conf); EventHandler<LocalizerEvent> localizerEventHandler = mock(EventHandler.class); EventHandler<LocalizerEvent> containerEventHandler = mock(EventHandler.class); dispatcher.register(LocalizerEventType.class, localizerEventHandler); dispatcher.register(ContainerEventType.class, containerEventHandler); DeletionService mockDelService = mock(DeletionService.class); NMStateStoreService stateStore = mock(NMStateStoreService.class); try { LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user, appId, dispatcher, false, conf, stateStore); // Container 1 needs lr1 resource ContainerId cId1 = BuilderUtils.newContainerId(1, 1, 1, 1); LocalResourceRequest lr1 = createLocalResourceRequest(user, 1, 1, LocalResourceVisibility.APPLICATION); LocalizerContext lc1 = new LocalizerContext(user, cId1, null); // Container 1 requests lr1 to be localized ResourceEvent reqEvent1 = new ResourceRequestEvent(lr1, LocalResourceVisibility.APPLICATION, lc1); tracker.handle(reqEvent1); dispatcher.await(); // Simulate the process of localization of lr1 Path hierarchicalPath1 = tracker.getPathForLocalization(lr1, localDir, null); ArgumentCaptor<LocalResourceProto> localResourceCaptor = ArgumentCaptor.forClass(LocalResourceProto.class); ArgumentCaptor<Path> pathCaptor = ArgumentCaptor.forClass(Path.class); verify(stateStore).startResourceLocalization(eq(user), eq(appId), localResourceCaptor.capture(), pathCaptor.capture()); LocalResourceProto lrProto = localResourceCaptor.getValue(); Path localizedPath1 = pathCaptor.getValue(); Assert.assertEquals(lr1, new LocalResourceRequest(new LocalResourcePBImpl(lrProto))); Assert.assertEquals(hierarchicalPath1, localizedPath1.getParent()); // Simulate lr1 getting localized ResourceLocalizedEvent rle1 = new ResourceLocalizedEvent(lr1, pathCaptor.getValue(), 120); tracker.handle(rle1); dispatcher.await(); ArgumentCaptor<LocalizedResourceProto> localizedProtoCaptor = ArgumentCaptor.forClass(LocalizedResourceProto.class); verify(stateStore).finishResourceLocalization(eq(user), eq(appId), localizedProtoCaptor.capture()); LocalizedResourceProto localizedProto = localizedProtoCaptor.getValue(); Assert.assertEquals(lr1, new LocalResourceRequest( new LocalResourcePBImpl(localizedProto.getResource()))); Assert.assertEquals(localizedPath1.toString(), localizedProto.getLocalPath()); LocalizedResource localizedRsrc1 = tracker.getLocalizedResource(lr1); Assert.assertNotNull(localizedRsrc1); // simulate release and retention processing tracker.handle(new ResourceReleaseEvent(lr1, cId1)); dispatcher.await(); boolean removeResult = tracker.remove(localizedRsrc1, mockDelService); Assert.assertTrue(removeResult); verify(stateStore).removeLocalizedResource(eq(user), eq(appId), eq(localizedPath1)); } finally { if (dispatcher != null) { dispatcher.stop(); } } }
@Test @SuppressWarnings("unchecked") public void testStateStoreFailedLocalization() throws Exception { final String user = "someuser"; final ApplicationId appId = ApplicationId.newInstance(1, 1); // This is a random path. NO File creation will take place at this place. final Path localDir = new Path("/tmp"); Configuration conf = new YarnConfiguration(); DrainDispatcher dispatcher = null; dispatcher = createDispatcher(conf); EventHandler<LocalizerEvent> localizerEventHandler = mock(EventHandler.class); EventHandler<LocalizerEvent> containerEventHandler = mock(EventHandler.class); dispatcher.register(LocalizerEventType.class, localizerEventHandler); dispatcher.register(ContainerEventType.class, containerEventHandler); NMStateStoreService stateStore = mock(NMStateStoreService.class); try { LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user, appId, dispatcher, false, conf, stateStore); // Container 1 needs lr1 resource ContainerId cId1 = BuilderUtils.newContainerId(1, 1, 1, 1); LocalResourceRequest lr1 = createLocalResourceRequest(user, 1, 1, LocalResourceVisibility.APPLICATION); LocalizerContext lc1 = new LocalizerContext(user, cId1, null); // Container 1 requests lr1 to be localized ResourceEvent reqEvent1 = new ResourceRequestEvent(lr1, LocalResourceVisibility.APPLICATION, lc1); tracker.handle(reqEvent1); dispatcher.await(); // Simulate the process of localization of lr1 Path hierarchicalPath1 = tracker.getPathForLocalization(lr1, localDir, null); ArgumentCaptor<LocalResourceProto> localResourceCaptor = ArgumentCaptor.forClass(LocalResourceProto.class); ArgumentCaptor<Path> pathCaptor = ArgumentCaptor.forClass(Path.class); verify(stateStore).startResourceLocalization(eq(user), eq(appId), localResourceCaptor.capture(), pathCaptor.capture()); LocalResourceProto lrProto = localResourceCaptor.getValue(); Path localizedPath1 = pathCaptor.getValue(); Assert.assertEquals(lr1, new LocalResourceRequest(new LocalResourcePBImpl(lrProto))); Assert.assertEquals(hierarchicalPath1, localizedPath1.getParent()); ResourceFailedLocalizationEvent rfe1 = new ResourceFailedLocalizationEvent( lr1, new Exception("Test").toString()); tracker.handle(rfe1); dispatcher.await(); verify(stateStore).removeLocalizedResource(eq(user), eq(appId), eq(localizedPath1)); } finally { if (dispatcher != null) { dispatcher.stop(); } } }
@Test public void testLocalResourcePBImpl() throws Exception { validatePBImplRecord(LocalResourcePBImpl.class, LocalResourceProto.class); }
/** * @return {@link Path} absolute path for localization which includes local * directory path and the relative hierarchical path (if use local * cache directory manager is enabled) * * @param {@link LocalResourceRequest} Resource localization request to * localize the resource. * @param {@link Path} local directory path * @param {@link DeletionService} Deletion Service to delete existing * path for localization. */ @Override public Path getPathForLocalization(LocalResourceRequest req, Path localDirPath, DeletionService delService) { Path rPath = localDirPath; if (useLocalCacheDirectoryManager && localDirPath != null) { if (!directoryManagers.containsKey(localDirPath)) { directoryManagers.putIfAbsent(localDirPath, new LocalCacheDirectoryManager(conf)); } LocalCacheDirectoryManager dir = directoryManagers.get(localDirPath); rPath = localDirPath; String hierarchicalPath = dir.getRelativePathForLocalization(); // For most of the scenarios we will get root path only which // is an empty string if (!hierarchicalPath.isEmpty()) { rPath = new Path(localDirPath, hierarchicalPath); } inProgressLocalResourcesMap.put(req, rPath); } while (true) { Path uniquePath = new Path(rPath, Long.toString(uniqueNumberGenerator.incrementAndGet())); File file = new File(uniquePath.toUri().getRawPath()); if (!file.exists()) { rPath = uniquePath; break; } // If the directory already exists, delete it and move to next one. LOG.warn("Directory " + uniquePath + " already exists, " + "try next one."); if (delService != null) { delService.delete(getUser(), uniquePath); } } Path localPath = new Path(rPath, req.getPath().getName()); LocalizedResource rsrc = localrsrc.get(req); rsrc.setLocalPath(localPath); LocalResource lr = LocalResource.newInstance(req.getResource(), req.getType(), req.getVisibility(), req.getSize(), req.getTimestamp()); try { stateStore.startResourceLocalization(user, appId, ((LocalResourcePBImpl) lr).getProto(), localPath); } catch (IOException e) { LOG.error("Unable to record localization start for " + rsrc, e); } return rPath; }