public void addResource(FileSystem fs, Configuration conf, Path destPath, Map<String, LocalResource> localResources, LocalResourceType resourceType, String link, Map<URI, FileStatus> statCache, boolean appMasterOnly) throws IOException { FileStatus destStatus = fs.getFileStatus(destPath); LocalResource amJarRsrc = Records.newRecord(LocalResource.class); amJarRsrc.setType(resourceType); LocalResourceVisibility visibility = getVisibility(conf, destPath.toUri(), statCache); amJarRsrc.setVisibility(visibility); amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(destPath)); amJarRsrc.setTimestamp(destStatus.getModificationTime()); amJarRsrc.setSize(destStatus.getLen()); if (link == null || link.isEmpty()) throw new IOException("You must specify a valid link name"); localResources.put(link, amJarRsrc); }
private Path copy(Path sCopy, Path dstdir) throws IOException { FileSystem sourceFs = sCopy.getFileSystem(conf); Path dCopy = new Path(dstdir, "tmp_"+sCopy.getName()); FileStatus sStat = sourceFs.getFileStatus(sCopy); if (sStat.getModificationTime() != resource.getTimestamp()) { throw new IOException("Resource " + sCopy + " changed on src filesystem (expected " + resource.getTimestamp() + ", was " + sStat.getModificationTime()); } if (resource.getVisibility() == LocalResourceVisibility.PUBLIC) { if (!isPublic(sourceFs, sCopy, sStat, statCache)) { throw new IOException("Resource " + sCopy + " is not publicly accessable and as such cannot be part of the" + " public cache."); } } FileUtil.copy(sourceFs, sStat, FileSystem.getLocal(conf), dCopy, false, true, conf); return dCopy; }
static LocalResource createJar(FileContext files, Path p, LocalResourceVisibility vis) throws IOException { LOG.info("Create jar file " + p); File jarFile = new File((files.makeQualified(p)).toUri()); FileOutputStream stream = new FileOutputStream(jarFile); LOG.info("Create jar out stream "); JarOutputStream out = new JarOutputStream(stream, new Manifest()); LOG.info("Done writing jar stream "); out.close(); LocalResource ret = recordFactory.newRecordInstance(LocalResource.class); ret.setResource(ConverterUtils.getYarnUrlFromPath(p)); FileStatus status = files.getFileStatus(p); ret.setSize(status.getLen()); ret.setTimestamp(status.getModificationTime()); ret.setType(LocalResourceType.PATTERN); ret.setVisibility(vis); ret.setPattern("classes/.*"); return ret; }
static LocalResource createJarFile(FileContext files, Path p, int len, Random r, LocalResourceVisibility vis) throws IOException, URISyntaxException { byte[] bytes = new byte[len]; r.nextBytes(bytes); File archiveFile = new File(p.toUri().getPath() + ".jar"); archiveFile.createNewFile(); JarOutputStream out = new JarOutputStream( new FileOutputStream(archiveFile)); out.putNextEntry(new JarEntry(p.getName())); out.write(bytes); out.closeEntry(); out.close(); LocalResource ret = recordFactory.newRecordInstance(LocalResource.class); ret.setResource(ConverterUtils.getYarnUrlFromPath(new Path(p.toString() + ".jar"))); ret.setSize(len); ret.setType(LocalResourceType.ARCHIVE); ret.setVisibility(vis); ret.setTimestamp(files.getFileStatus(new Path(p.toString() + ".jar")) .getModificationTime()); return ret; }
static LocalResource createZipFile(FileContext files, Path p, int len, Random r, LocalResourceVisibility vis) throws IOException, URISyntaxException { byte[] bytes = new byte[len]; r.nextBytes(bytes); File archiveFile = new File(p.toUri().getPath() + ".ZIP"); archiveFile.createNewFile(); ZipOutputStream out = new ZipOutputStream( new FileOutputStream(archiveFile)); out.putNextEntry(new ZipEntry(p.getName())); out.write(bytes); out.closeEntry(); out.close(); LocalResource ret = recordFactory.newRecordInstance(LocalResource.class); ret.setResource(ConverterUtils.getYarnUrlFromPath(new Path(p.toString() + ".ZIP"))); ret.setSize(len); ret.setType(LocalResourceType.ARCHIVE); ret.setVisibility(vis); ret.setTimestamp(files.getFileStatus(new Path(p.toString() + ".ZIP")) .getModificationTime()); return ret; }
/** * For each of the requested resources for a container, determines the * appropriate {@link LocalResourcesTracker} and forwards a * {@link LocalResourceRequest} to that tracker. */ private void handleInitContainerResources( ContainerLocalizationRequestEvent rsrcReqs) { Container c = rsrcReqs.getContainer(); // create a loading cache for the file statuses LoadingCache<Path,Future<FileStatus>> statCache = CacheBuilder.newBuilder().build(FSDownload.createStatusCacheLoader(getConfig())); LocalizerContext ctxt = new LocalizerContext( c.getUser(), c.getContainerId(), c.getCredentials(), statCache); Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs = rsrcReqs.getRequestedResources(); for (Map.Entry<LocalResourceVisibility, Collection<LocalResourceRequest>> e : rsrcs.entrySet()) { LocalResourcesTracker tracker = getLocalResourcesTracker(e.getKey(), c.getUser(), c.getContainerId().getApplicationAttemptId() .getApplicationId()); for (LocalResourceRequest req : e.getValue()) { tracker.handle(new ResourceRequestEvent(req, e.getKey(), ctxt)); } } }
private Path getPathForLocalization(LocalResource rsrc) throws IOException, URISyntaxException { String user = context.getUser(); ApplicationId appId = context.getContainerId().getApplicationAttemptId().getApplicationId(); LocalResourceVisibility vis = rsrc.getVisibility(); LocalResourcesTracker tracker = getLocalResourcesTracker(vis, user, appId); String cacheDirectory = null; if (vis == LocalResourceVisibility.PRIVATE) {// PRIVATE Only cacheDirectory = getUserFileCachePath(user); } else {// APPLICATION ONLY cacheDirectory = getAppFileCachePath(user, appId.toString()); } Path dirPath = dirsHandler.getLocalPathForWrite(cacheDirectory, ContainerLocalizer.getEstimatedSize(rsrc), false); return tracker.getPathForLocalization(new LocalResourceRequest(rsrc), dirPath, delService); }
@SuppressWarnings("unchecked") // dispatcher not typed public void cleanup() { Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrc = new HashMap<LocalResourceVisibility, Collection<LocalResourceRequest>>(); if (!publicRsrcs.isEmpty()) { rsrc.put(LocalResourceVisibility.PUBLIC, publicRsrcs); } if (!privateRsrcs.isEmpty()) { rsrc.put(LocalResourceVisibility.PRIVATE, privateRsrcs); } if (!appRsrcs.isEmpty()) { rsrc.put(LocalResourceVisibility.APPLICATION, appRsrcs); } dispatcher.getEventHandler().handle( new ContainerLocalizationCleanupEvent(this, rsrc)); }
static ResourceLocalizationSpec getMockRsrc(Random r, LocalResourceVisibility vis, Path p) { ResourceLocalizationSpec resourceLocalizationSpec = mock(ResourceLocalizationSpec.class); LocalResource rsrc = mock(LocalResource.class); String name = Long.toHexString(r.nextLong()); URL uri = mock(org.apache.hadoop.yarn.api.records.URL.class); when(uri.getScheme()).thenReturn("file"); when(uri.getHost()).thenReturn(null); when(uri.getFile()).thenReturn("/local/" + vis + "/" + name); when(rsrc.getResource()).thenReturn(uri); when(rsrc.getSize()).thenReturn(r.nextInt(1024) + 1024L); when(rsrc.getTimestamp()).thenReturn(r.nextInt(1024) + 2048L); when(rsrc.getType()).thenReturn(LocalResourceType.FILE); when(rsrc.getVisibility()).thenReturn(vis); when(resourceLocalizationSpec.getResource()).thenReturn(rsrc); when(resourceLocalizationSpec.getDestinationDirectory()). thenReturn(ConverterUtils.getYarnUrlFromPath(p)); return resourceLocalizationSpec; }
/** * If resource is public, verifyAccess should succeed */ @Test public void testVerifyAccessPublicResource() throws Exception { Configuration conf = new Configuration(); conf.setBoolean(YarnConfiguration.SHARED_CACHE_ENABLED, true); LocalResource resource = mock(LocalResource.class); // give public visibility when(resource.getVisibility()).thenReturn(LocalResourceVisibility.PUBLIC); Path localPath = mock(Path.class); when(localPath.getName()).thenReturn("foo.jar"); String user = "joe"; SCMUploaderProtocol scmClient = mock(SCMUploaderProtocol.class); FileSystem fs = mock(FileSystem.class); FileSystem localFs = FileSystem.getLocal(conf); SharedCacheUploader spied = createSpiedUploader(resource, localPath, user, conf, scmClient, fs, localFs); assertTrue(spied.verifyAccess()); }
/** * If the localPath does not exists, getActualPath should get to one level * down */ @Test public void testGetActualPath() throws Exception { Configuration conf = new Configuration(); conf.setBoolean(YarnConfiguration.SHARED_CACHE_ENABLED, true); LocalResource resource = mock(LocalResource.class); // give public visibility when(resource.getVisibility()).thenReturn(LocalResourceVisibility.PUBLIC); Path localPath = new Path("foo.jar"); String user = "joe"; SCMUploaderProtocol scmClient = mock(SCMUploaderProtocol.class); FileSystem fs = mock(FileSystem.class); FileSystem localFs = mock(FileSystem.class); // stub it to return a status that indicates a directory FileStatus status = mock(FileStatus.class); when(status.isDirectory()).thenReturn(true); when(localFs.getFileStatus(localPath)).thenReturn(status); SharedCacheUploader spied = createSpiedUploader(resource, localPath, user, conf, scmClient, fs, localFs); Path actualPath = spied.getActualPath(); assertEquals(actualPath.getName(), localPath.getName()); assertEquals(actualPath.getParent().getName(), localPath.getName()); }
LocalResourcesTracker createMockTracker(String user, final long rsrcSize, long nRsrcs, long timestamp, long tsstep) { Configuration conf = new Configuration(); ConcurrentMap<LocalResourceRequest,LocalizedResource> trackerResources = new ConcurrentHashMap<LocalResourceRequest,LocalizedResource>(); LocalResourcesTracker ret = spy(new LocalResourcesTrackerImpl(user, null, null, trackerResources, false, conf, new NMNullStateStoreService(),null)); for (int i = 0; i < nRsrcs; ++i) { final LocalResourceRequest req = new LocalResourceRequest( new Path("file:///" + user + "/rsrc" + i), timestamp + i * tsstep, LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, null); final long ts = timestamp + i * tsstep; final Path p = new Path("file:///local/" + user + "/rsrc" + i); LocalizedResource rsrc = new LocalizedResource(req, null) { @Override public int getRefCount() { return 0; } @Override public long getSize() { return rsrcSize; } @Override public Path getLocalPath() { return p; } @Override public long getTimestamp() { return ts; } @Override public ResourceState getState() { return ResourceState.LOCALIZED; } }; trackerResources.put(req, rsrc); } return ret; }
/** * Verify correct container request events sent to localizer. */ @Test public void testLocalizationRequest() throws Exception { WrappedContainer wc = null; try { wc = new WrappedContainer(7, 314159265358979L, 4344, "yak"); assertEquals(ContainerState.NEW, wc.c.getContainerState()); wc.initContainer(); // Verify request for public/private resources to localizer ResourcesRequestedMatcher matchesReq = new ResourcesRequestedMatcher(wc.localResources, EnumSet.of( LocalResourceVisibility.PUBLIC, LocalResourceVisibility.PRIVATE, LocalResourceVisibility.APPLICATION)); verify(wc.localizerBus).handle(argThat(matchesReq)); assertEquals(ContainerState.LOCALIZING, wc.c.getContainerState()); } finally { if (wc != null) { wc.finished(); } } }
private void changePermissions(FileSystem fs, final Path path) throws IOException, InterruptedException { File f = new File(path.toUri()); if (FileUtils.isSymlink(f)) { // avoid following symlinks when changing permissions return; } boolean isDir = f.isDirectory(); FsPermission perm = cachePerms; // set public perms as 755 or 555 based on dir or file if (resource.getVisibility() == LocalResourceVisibility.PUBLIC) { perm = isDir ? PUBLIC_DIR_PERMS : PUBLIC_FILE_PERMS; } // set private perms as 700 or 500 else { // PRIVATE: // APPLICATION: perm = isDir ? PRIVATE_DIR_PERMS : PRIVATE_FILE_PERMS; } LOG.debug("Changing permissions for path " + path + " to perm " + perm); final FsPermission fPerm = perm; if (null == userUgi) { files.setPermission(path, perm); } else { userUgi.doAs(new PrivilegedExceptionAction<Void>() { public Void run() throws Exception { files.setPermission(path, fPerm); return null; } }); } if (isDir) { FileStatus[] statuses = fs.listStatus(path); for (FileStatus status : statuses) { changePermissions(fs, status.getPath()); } } }
private Map<String, LocalResource> setupEsZipResource(Config conf) { // elasticsearch.zip Map<String, LocalResource> resources = new LinkedHashMap<String, LocalResource>(); LocalResource esZip = Records.newRecord(LocalResource.class); String esZipHdfsPath = conf.esZipHdfsPath(); Path p = new Path(esZipHdfsPath); FileStatus fsStat; try { fsStat = FileSystem.get(cfg).getFileStatus(p); } catch (IOException ex) { throw new IllegalArgumentException( String.format("Cannot find Elasticsearch zip at [%s]; make sure the artifacts have been properly provisioned and the correct permissions are in place.", esZipHdfsPath), ex); } // use the normalized path as otherwise YARN chokes down the line esZip.setResource(ConverterUtils.getYarnUrlFromPath(fsStat.getPath())); esZip.setSize(fsStat.getLen()); esZip.setTimestamp(fsStat.getModificationTime()); esZip.setType(LocalResourceType.ARCHIVE); esZip.setVisibility(LocalResourceVisibility.PUBLIC); resources.put(conf.esZipName(), esZip); return resources; }
private Map<String, LocalResource> setupEsYarnJar() { Map<String, LocalResource> resources = new LinkedHashMap<String, LocalResource>(); LocalResource esYarnJar = Records.newRecord(LocalResource.class); Path p = new Path(clientCfg.jarHdfsPath()); FileStatus fsStat; try { fsStat = FileSystem.get(client.getConfiguration()).getFileStatus(p); } catch (IOException ex) { throw new IllegalArgumentException( String.format("Cannot find jar [%s]; make sure the artifacts have been properly provisioned and the correct permissions are in place.", clientCfg.jarHdfsPath()), ex); } // use the normalized path as otherwise YARN chokes down the line esYarnJar.setResource(ConverterUtils.getYarnUrlFromPath(fsStat.getPath())); esYarnJar.setSize(fsStat.getLen()); esYarnJar.setTimestamp(fsStat.getModificationTime()); esYarnJar.setType(LocalResourceType.FILE); esYarnJar.setVisibility(LocalResourceVisibility.PUBLIC); resources.put(clientCfg.jarName(), esYarnJar); return resources; }
private void setUpLocalResources(ContainerLauncherEvent event) { String resourceFileName = event.getResourceFileName(); String resourcePath = event.getResourceFilePath(); if (resourcePath != "") { FileSystem fs = null; try { fs = FileSystem.get(new YarnConfiguration()); Path dst = new Path(fs.getHomeDirectory(), resourcePath); boolean exists = fs.exists(dst); if (exists) { FileStatus scFileStatus = fs.getFileStatus(dst); LocalResource scRsrc = LocalResource.newInstance(ConverterUtils.getYarnUrlFromURI(dst.toUri()), LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, scFileStatus.getLen(), scFileStatus.getModificationTime()); localResources.put(resourceFileName, scRsrc); } } catch (IOException e) { e.printStackTrace(); } } }
private Path getPathForLocalization(LocalResource rsrc) throws IOException, URISyntaxException { String user = context.getUser(); ApplicationId appId = context.getContainerId().getApplicationAttemptId().getApplicationId(); LocalResourceVisibility vis = rsrc.getVisibility(); LocalResourcesTracker tracker = getLocalResourcesTracker(vis, user, appId); String cacheDirectory = null; if (vis == LocalResourceVisibility.PRIVATE) {// PRIVATE Only cacheDirectory = getUserFileCachePath(user); } else {// APPLICATION ONLY cacheDirectory = getAppFileCachePath(user, appId.toString()); } Path dirPath = dirsHandler.getLocalPathForWrite(cacheDirectory, ContainerLocalizer.getEstimatedSize(rsrc), false); return tracker.getPathForLocalization(new LocalResourceRequest(rsrc), dirPath); }
public LocalResourceVisibility getVisibility(Configuration conf, URI uri, Map<URI, FileStatus> statCache) throws IOException { if (isPublic(conf, uri, statCache)) { return LocalResourceVisibility.PUBLIC; } else { return LocalResourceVisibility.PRIVATE; } }
private static void parseDistributedCacheArtifacts(Configuration conf, Map<String, LocalResource> localResources, LocalResourceType type, URI[] uris, long[] timestamps, long[] sizes, boolean visibilities[]) throws IOException { if (uris != null) { // Sanity check if ((uris.length != timestamps.length) || (uris.length != sizes.length) || (uris.length != visibilities.length)) { throw new IllegalArgumentException("Invalid specification for " + "distributed-cache artifacts of type " + type + " :" + " #uris=" + uris.length + " #timestamps=" + timestamps.length + " #visibilities=" + visibilities.length); } for (int i = 0; i < uris.length; ++i) { URI u = uris[i]; Path p = new Path(u); FileSystem remoteFS = p.getFileSystem(conf); p = remoteFS .resolvePath(p.makeQualified(remoteFS.getUri(), remoteFS.getWorkingDirectory())); // Add URI fragment or just the filename Path name = new Path((null == u.getFragment()) ? p.getName() : u.getFragment()); if (name.isAbsolute()) { throw new IllegalArgumentException("Resource name must be relative"); } String linkName = name.toUri().getPath(); LocalResource orig = localResources.get(linkName); org.apache.hadoop.yarn.api.records.URL url = ConverterUtils.getYarnUrlFromURI(p.toUri()); if (orig != null && !orig.getResource().equals(url)) { LOG.warn(getResourceDescription(orig.getType()) + toString(orig.getResource()) + " conflicts with " + getResourceDescription(type) + toString(url) + " This will be an error in Hadoop 2.0"); continue; } localResources.put(linkName, LocalResource.newInstance(ConverterUtils.getYarnUrlFromURI(p .toUri()), type, visibilities[i] ? LocalResourceVisibility.PUBLIC : LocalResourceVisibility.PRIVATE, sizes[i], timestamps[i])); } } }
private LocalResource toLocalResource(Path path, LocalResourceVisibility visibility) throws IOException { FileSystem fs = path.getFileSystem(clusterConf.conf()); FileStatus stat = fs.getFileStatus(path); return LocalResource.newInstance( ConverterUtils.getYarnUrlFromPath(path), LocalResourceType.FILE, visibility, stat.getLen(), stat.getModificationTime() ); }
private LocalResource createApplicationResource(FileContext fs, Path p, LocalResourceType type) throws IOException { LocalResource rsrc = recordFactory.newRecordInstance(LocalResource.class); FileStatus rsrcStat = fs.getFileStatus(p); rsrc.setResource(ConverterUtils.getYarnUrlFromPath(fs.getDefaultFileSystem().resolvePath(rsrcStat.getPath()))); rsrc.setSize(rsrcStat.getLen()); rsrc.setTimestamp(rsrcStat.getModificationTime()); rsrc.setType(type); rsrc.setVisibility(LocalResourceVisibility.APPLICATION); return rsrc; }
private static void addToLocalResources(FileSystem fs, String key, Path dst, Map<String, LocalResource> localResources) throws IOException { FileStatus scFileStatus = fs.getFileStatus(dst); LocalResource resource = LocalResource.newInstance( URL.fromURI(dst.toUri()), LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, scFileStatus.getLen(), scFileStatus.getModificationTime()); localResources.put(key, resource); }
@Override public synchronized LocalResourceVisibility getVisibility() { LocalResourceProtoOrBuilder p = viaProto ? proto : builder; if (!p.hasVisibility()) { return null; } return convertFromProtoFormat(p.getVisibility()); }
@Override public synchronized void setVisibility(LocalResourceVisibility visibility) { maybeInitBuilder(); if (visibility == null) { builder.clearVisibility(); return; } builder.setVisibility(convertToProtoFormat(visibility)); }
/** * Recursively change permissions of all files/dirs on path based * on resource visibility. * Change to 755 or 700 for dirs, 555 or 500 for files. * @param fs FileSystem * @param path Path to modify perms for * @throws IOException * @throws InterruptedException */ private void changePermissions(FileSystem fs, final Path path) throws IOException, InterruptedException { File f = new File(path.toUri()); if (FileUtils.isSymlink(f)) { // avoid following symlinks when changing permissions return; } boolean isDir = f.isDirectory(); FsPermission perm = cachePerms; // set public perms as 755 or 555 based on dir or file if (resource.getVisibility() == LocalResourceVisibility.PUBLIC) { perm = isDir ? PUBLIC_DIR_PERMS : PUBLIC_FILE_PERMS; } // set private perms as 700 or 500 else { // PRIVATE: // APPLICATION: perm = isDir ? PRIVATE_DIR_PERMS : PRIVATE_FILE_PERMS; } LOG.debug("Changing permissions for path " + path + " to perm " + perm); final FsPermission fPerm = perm; if (null == userUgi) { files.setPermission(path, perm); } else { userUgi.doAs(new PrivilegedExceptionAction<Void>() { public Void run() throws Exception { files.setPermission(path, fPerm); return null; } }); } if (isDir) { FileStatus[] statuses = fs.listStatus(path); for (FileStatus status : statuses) { changePermissions(fs, status.getPath()); } } }
static LocalResource createFile(FileContext files, Path p, int len, Random r, LocalResourceVisibility vis) throws IOException { createFile(files, p, len, r); LocalResource ret = recordFactory.newRecordInstance(LocalResource.class); ret.setResource(ConverterUtils.getYarnUrlFromPath(p)); ret.setSize(len); ret.setType(LocalResourceType.FILE); ret.setVisibility(vis); ret.setTimestamp(files.getFileStatus(p).getModificationTime()); return ret; }