public void addResource(FileSystem fs, Configuration conf, Path destPath, Map<String, LocalResource> localResources, LocalResourceType resourceType, String link, Map<URI, FileStatus> statCache, boolean appMasterOnly) throws IOException { FileStatus destStatus = fs.getFileStatus(destPath); LocalResource amJarRsrc = Records.newRecord(LocalResource.class); amJarRsrc.setType(resourceType); LocalResourceVisibility visibility = getVisibility(conf, destPath.toUri(), statCache); amJarRsrc.setVisibility(visibility); amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(destPath)); amJarRsrc.setTimestamp(destStatus.getModificationTime()); amJarRsrc.setSize(destStatus.getLen()); if (link == null || link.isEmpty()) throw new IOException("You must specify a valid link name"); localResources.put(link, amJarRsrc); }
@SuppressWarnings("deprecation") public static void setupDistributedCache(Configuration conf, Map<String, LocalResource> localResources) throws IOException { // Cache archives parseDistributedCacheArtifacts(conf, localResources, LocalResourceType.ARCHIVE, DistributedCache.getCacheArchives(conf), DistributedCache.getArchiveTimestamps(conf), getFileSizes(conf, MRJobConfig.CACHE_ARCHIVES_SIZES), DistributedCache.getArchiveVisibilities(conf)); // Cache files parseDistributedCacheArtifacts(conf, localResources, LocalResourceType.FILE, DistributedCache.getCacheFiles(conf), DistributedCache.getFileTimestamps(conf), getFileSizes(conf, MRJobConfig.CACHE_FILES_SIZES), DistributedCache.getFileVisibilities(conf)); }
static LocalResource createJar(FileContext files, Path p, LocalResourceVisibility vis) throws IOException { LOG.info("Create jar file " + p); File jarFile = new File((files.makeQualified(p)).toUri()); FileOutputStream stream = new FileOutputStream(jarFile); LOG.info("Create jar out stream "); JarOutputStream out = new JarOutputStream(stream, new Manifest()); LOG.info("Done writing jar stream "); out.close(); LocalResource ret = recordFactory.newRecordInstance(LocalResource.class); ret.setResource(ConverterUtils.getYarnUrlFromPath(p)); FileStatus status = files.getFileStatus(p); ret.setSize(status.getLen()); ret.setTimestamp(status.getModificationTime()); ret.setType(LocalResourceType.PATTERN); ret.setVisibility(vis); ret.setPattern("classes/.*"); return ret; }
static LocalResource createJarFile(FileContext files, Path p, int len, Random r, LocalResourceVisibility vis) throws IOException, URISyntaxException { byte[] bytes = new byte[len]; r.nextBytes(bytes); File archiveFile = new File(p.toUri().getPath() + ".jar"); archiveFile.createNewFile(); JarOutputStream out = new JarOutputStream( new FileOutputStream(archiveFile)); out.putNextEntry(new JarEntry(p.getName())); out.write(bytes); out.closeEntry(); out.close(); LocalResource ret = recordFactory.newRecordInstance(LocalResource.class); ret.setResource(ConverterUtils.getYarnUrlFromPath(new Path(p.toString() + ".jar"))); ret.setSize(len); ret.setType(LocalResourceType.ARCHIVE); ret.setVisibility(vis); ret.setTimestamp(files.getFileStatus(new Path(p.toString() + ".jar")) .getModificationTime()); return ret; }
static LocalResource createZipFile(FileContext files, Path p, int len, Random r, LocalResourceVisibility vis) throws IOException, URISyntaxException { byte[] bytes = new byte[len]; r.nextBytes(bytes); File archiveFile = new File(p.toUri().getPath() + ".ZIP"); archiveFile.createNewFile(); ZipOutputStream out = new ZipOutputStream( new FileOutputStream(archiveFile)); out.putNextEntry(new ZipEntry(p.getName())); out.write(bytes); out.closeEntry(); out.close(); LocalResource ret = recordFactory.newRecordInstance(LocalResource.class); ret.setResource(ConverterUtils.getYarnUrlFromPath(new Path(p.toString() + ".ZIP"))); ret.setSize(len); ret.setType(LocalResourceType.ARCHIVE); ret.setVisibility(vis); ret.setTimestamp(files.getFileStatus(new Path(p.toString() + ".ZIP")) .getModificationTime()); return ret; }
static ResourceLocalizationSpec getMockRsrc(Random r, LocalResourceVisibility vis, Path p) { ResourceLocalizationSpec resourceLocalizationSpec = mock(ResourceLocalizationSpec.class); LocalResource rsrc = mock(LocalResource.class); String name = Long.toHexString(r.nextLong()); URL uri = mock(org.apache.hadoop.yarn.api.records.URL.class); when(uri.getScheme()).thenReturn("file"); when(uri.getHost()).thenReturn(null); when(uri.getFile()).thenReturn("/local/" + vis + "/" + name); when(rsrc.getResource()).thenReturn(uri); when(rsrc.getSize()).thenReturn(r.nextInt(1024) + 1024L); when(rsrc.getTimestamp()).thenReturn(r.nextInt(1024) + 2048L); when(rsrc.getType()).thenReturn(LocalResourceType.FILE); when(rsrc.getVisibility()).thenReturn(vis); when(resourceLocalizationSpec.getResource()).thenReturn(rsrc); when(resourceLocalizationSpec.getDestinationDirectory()). thenReturn(ConverterUtils.getYarnUrlFromPath(p)); return resourceLocalizationSpec; }
LocalResourcesTracker createMockTracker(String user, final long rsrcSize, long nRsrcs, long timestamp, long tsstep) { Configuration conf = new Configuration(); ConcurrentMap<LocalResourceRequest,LocalizedResource> trackerResources = new ConcurrentHashMap<LocalResourceRequest,LocalizedResource>(); LocalResourcesTracker ret = spy(new LocalResourcesTrackerImpl(user, null, null, trackerResources, false, conf, new NMNullStateStoreService(),null)); for (int i = 0; i < nRsrcs; ++i) { final LocalResourceRequest req = new LocalResourceRequest( new Path("file:///" + user + "/rsrc" + i), timestamp + i * tsstep, LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, null); final long ts = timestamp + i * tsstep; final Path p = new Path("file:///local/" + user + "/rsrc" + i); LocalizedResource rsrc = new LocalizedResource(req, null) { @Override public int getRefCount() { return 0; } @Override public long getSize() { return rsrcSize; } @Override public Path getLocalPath() { return p; } @Override public long getTimestamp() { return ts; } @Override public ResourceState getState() { return ResourceState.LOCALIZED; } }; trackerResources.put(req, rsrc); } return ret; }
public static void setupDistributedCache( Configuration conf, Map<String, LocalResource> localResources) throws IOException { // Cache archives parseDistributedCacheArtifacts(conf, localResources, LocalResourceType.ARCHIVE, DistributedCache.getCacheArchives(conf), DistributedCache.getArchiveTimestamps(conf), getFileSizes(conf, MRJobConfig.CACHE_ARCHIVES_SIZES), DistributedCache.getArchiveVisibilities(conf)); // Cache files parseDistributedCacheArtifacts(conf, localResources, LocalResourceType.FILE, DistributedCache.getCacheFiles(conf), DistributedCache.getFileTimestamps(conf), getFileSizes(conf, MRJobConfig.CACHE_FILES_SIZES), DistributedCache.getFileVisibilities(conf)); }
@SuppressWarnings("deprecation") public void testSetupDistributedCacheConflictsFiles() throws Exception { Configuration conf = new Configuration(); conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class); URI mockUri = URI.create("mockfs://mock/"); FileSystem mockFs = ((FilterFileSystem)FileSystem.get(mockUri, conf)) .getRawFileSystem(); URI file = new URI("mockfs://mock/tmp/something.zip#something"); Path filePath = new Path(file); URI file2 = new URI("mockfs://mock/tmp/something.txt#something"); Path file2Path = new Path(file2); when(mockFs.resolvePath(filePath)).thenReturn(filePath); when(mockFs.resolvePath(file2Path)).thenReturn(file2Path); DistributedCache.addCacheFile(file, conf); DistributedCache.addCacheFile(file2, conf); conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "10,11"); conf.set(MRJobConfig.CACHE_FILES_SIZES, "10,11"); conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, "true,true"); Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(); MRApps.setupDistributedCache(conf, localResources); assertEquals(1, localResources.size()); LocalResource lr = localResources.get("something"); //First one wins assertNotNull(lr); assertEquals(10l, lr.getSize()); assertEquals(10l, lr.getTimestamp()); assertEquals(LocalResourceType.FILE, lr.getType()); }
private void setUpLocalResources(ContainerLauncherEvent event) { String resourceFileName = event.getResourceFileName(); String resourcePath = event.getResourceFilePath(); if (resourcePath != "") { FileSystem fs = null; try { fs = FileSystem.get(new YarnConfiguration()); Path dst = new Path(fs.getHomeDirectory(), resourcePath); boolean exists = fs.exists(dst); if (exists) { FileStatus scFileStatus = fs.getFileStatus(dst); LocalResource scRsrc = LocalResource.newInstance(ConverterUtils.getYarnUrlFromURI(dst.toUri()), LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, scFileStatus.getLen(), scFileStatus.getModificationTime()); localResources.put(resourceFileName, scRsrc); } } catch (IOException e) { e.printStackTrace(); } } }
private void addToLocalResources(FileSystem fs, String fileSrcPath, String fileDstPath, String appId, Map<String, LocalResource> localResources, String resources) throws IOException { String suffix = yacopConfig.getName() + "/" + appId + "/" + fileDstPath; Path dst = new Path(fs.getHomeDirectory(), suffix); if (fileSrcPath == null) { FSDataOutputStream ostream = null; try { ostream = FileSystem.create(fs, dst, new FsPermission((short) 0710)); ostream.writeUTF(resources); } finally { IOUtils.closeQuietly(ostream); } } else { fs.copyFromLocalFile(new Path(fileSrcPath), dst); } FileStatus scFileStatus = fs.getFileStatus(dst); LocalResource scRsrc = LocalResource.newInstance(ConverterUtils.getYarnUrlFromURI(dst.toUri()), LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, scFileStatus.getLen(), scFileStatus.getModificationTime()); localResources.put(fileDstPath, scRsrc); }
LocalResourcesTracker createMockTracker(String user, final long rsrcSize, long nRsrcs, long timestamp, long tsstep) { Configuration conf = new Configuration(); ConcurrentMap<LocalResourceRequest,LocalizedResource> trackerResources = new ConcurrentHashMap<LocalResourceRequest,LocalizedResource>(); LocalResourcesTracker ret = spy(new LocalResourcesTrackerImpl(user, null, null, trackerResources, false, conf, new NMNullStateStoreService())); for (int i = 0; i < nRsrcs; ++i) { final LocalResourceRequest req = new LocalResourceRequest( new Path("file:///" + user + "/rsrc" + i), timestamp + i * tsstep, LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, null); final long ts = timestamp + i * tsstep; final Path p = new Path("file:///local/" + user + "/rsrc" + i); LocalizedResource rsrc = new LocalizedResource(req, null) { @Override public int getRefCount() { return 0; } @Override public long getSize() { return rsrcSize; } @Override public Path getLocalPath() { return p; } @Override public long getTimestamp() { return ts; } @Override public ResourceState getState() { return ResourceState.LOCALIZED; } }; trackerResources.put(req, rsrc); } return ret; }
private Map<String, LocalResource> setupEsYarnJar() { Map<String, LocalResource> resources = new LinkedHashMap<String, LocalResource>(); LocalResource esYarnJar = Records.newRecord(LocalResource.class); Path p = new Path(clientCfg.jarHdfsPath()); FileStatus fsStat; try { fsStat = FileSystem.get(client.getConfiguration()).getFileStatus(p); } catch (IOException ex) { throw new IllegalArgumentException( String.format("Cannot find jar [%s]; make sure the artifacts have been properly provisioned and the correct permissions are in place.", clientCfg.jarHdfsPath()), ex); } // use the normalized path as otherwise YARN chokes down the line esYarnJar.setResource(ConverterUtils.getYarnUrlFromPath(fsStat.getPath())); esYarnJar.setSize(fsStat.getLen()); esYarnJar.setTimestamp(fsStat.getModificationTime()); esYarnJar.setType(LocalResourceType.FILE); esYarnJar.setVisibility(LocalResourceVisibility.PUBLIC); resources.put(clientCfg.jarName(), esYarnJar); return resources; }
private Map<String, LocalResource> setupEsZipResource(Config conf) { // elasticsearch.zip Map<String, LocalResource> resources = new LinkedHashMap<String, LocalResource>(); LocalResource esZip = Records.newRecord(LocalResource.class); String esZipHdfsPath = conf.esZipHdfsPath(); Path p = new Path(esZipHdfsPath); FileStatus fsStat; try { fsStat = FileSystem.get(cfg).getFileStatus(p); } catch (IOException ex) { throw new IllegalArgumentException( String.format("Cannot find Elasticsearch zip at [%s]; make sure the artifacts have been properly provisioned and the correct permissions are in place.", esZipHdfsPath), ex); } // use the normalized path as otherwise YARN chokes down the line esZip.setResource(ConverterUtils.getYarnUrlFromPath(fsStat.getPath())); esZip.setSize(fsStat.getLen()); esZip.setTimestamp(fsStat.getModificationTime()); esZip.setType(LocalResourceType.ARCHIVE); esZip.setVisibility(LocalResourceVisibility.PUBLIC); resources.put(conf.esZipName(), esZip); return resources; }
static LocalResource createJar(FileContext files, Path p, LocalResourceVisibility vis) throws IOException { LOG.info("Create jar file " + p); File jarFile = new File((files.makeQualified(p)).toUri()); FileOutputStream stream = new FileOutputStream(jarFile); LOG.info("Create jar out stream "); JarOutputStream out = new JarOutputStream(stream, new Manifest()); LOG.info("Done writing jar stream "); out.close(); LocalResource ret = recordFactory.newRecordInstance(LocalResource.class); ret.setResource(URL.fromPath(p)); FileStatus status = files.getFileStatus(p); ret.setSize(status.getLen()); ret.setTimestamp(status.getModificationTime()); ret.setType(LocalResourceType.PATTERN); ret.setVisibility(vis); ret.setPattern("classes/.*"); return ret; }
@Test public void testCLCPBImplNullResourceURL() throws IOException { RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); // Throw NPE if resource URL is null try { LocalResource rsrc_alpha = recordFactory.newRecordInstance(LocalResource.class); rsrc_alpha.setResource(null); rsrc_alpha.setSize(-1); rsrc_alpha.setVisibility(LocalResourceVisibility.APPLICATION); rsrc_alpha.setType(LocalResourceType.FILE); rsrc_alpha.setTimestamp(System.currentTimeMillis()); Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(); localResources.put("null_url_resource", rsrc_alpha); ContainerLaunchContext containerLaunchContext = recordFactory.newRecordInstance(ContainerLaunchContext.class); containerLaunchContext.setLocalResources(localResources); Assert.fail("Setting an invalid local resource should be an error!"); } catch (NullPointerException e) { Assert.assertTrue(e.getMessage().contains("Null resource URL for local resource")); } }
@SuppressWarnings("deprecation") public static void setupDistributedCache( Configuration conf, Map<String, LocalResource> localResources) throws IOException { // Cache archives parseDistributedCacheArtifacts(conf, localResources, LocalResourceType.ARCHIVE, DistributedCache.getCacheArchives(conf), DistributedCache.getArchiveTimestamps(conf), getFileSizes(conf, MRJobConfig.CACHE_ARCHIVES_SIZES), DistributedCache.getArchiveVisibilities(conf)); // Cache files parseDistributedCacheArtifacts(conf, localResources, LocalResourceType.FILE, DistributedCache.getCacheFiles(conf), DistributedCache.getFileTimestamps(conf), getFileSizes(conf, MRJobConfig.CACHE_FILES_SIZES), DistributedCache.getFileVisibilities(conf)); }
static ResourceLocalizationSpec getMockRsrc(Random r, LocalResourceVisibility vis, Path p) { ResourceLocalizationSpec resourceLocalizationSpec = mock(ResourceLocalizationSpec.class); LocalResource rsrc = mock(LocalResource.class); String name = Long.toHexString(r.nextLong()); URL uri = mock(org.apache.hadoop.yarn.api.records.URL.class); when(uri.getScheme()).thenReturn("file"); when(uri.getHost()).thenReturn(null); when(uri.getFile()).thenReturn("/local/" + vis + "/" + name); when(rsrc.getResource()).thenReturn(uri); when(rsrc.getSize()).thenReturn(r.nextInt(1024) + 1024L); when(rsrc.getTimestamp()).thenReturn(r.nextInt(1024) + 2048L); when(rsrc.getType()).thenReturn(LocalResourceType.FILE); when(rsrc.getVisibility()).thenReturn(vis); when(resourceLocalizationSpec.getResource()).thenReturn(rsrc); when(resourceLocalizationSpec.getDestinationDirectory()). thenReturn(URL.fromPath(p)); return resourceLocalizationSpec; }
private LocalResource createLocalResource(Path resourcePath, LocalResourceType resourceType, LocalResourceVisibility resourceVisibility) { LocalResource localResource = Records.newRecord(LocalResource.class); URL resourceUrl = ConverterUtils.getYarnUrlFromPath(resourcePath); try { FileStatus resourceFileStatus = resourcePath.getFileSystem(yarnConfiguration).getFileStatus(resourcePath); if (null == resourceFileStatus) { throw new LocalizerResourceException("Check getFileStatus implementation. getFileStatus gets unexpected null for resourcePath " + resourcePath); } localResource.setResource(resourceUrl); log.info("setLocalizerResource for {}", resourceUrl); localResource.setSize(resourceFileStatus.getLen()); localResource.setTimestamp(resourceFileStatus.getModificationTime()); localResource.setType(resourceType); localResource.setVisibility(resourceVisibility); return localResource; } catch (IOException ioe) { log.error("IO Exception when accessing the resource file status from the filesystem: " + resourcePath, ioe); throw new LocalizerResourceException("IO Exception when accessing the resource file status from the filesystem: " + resourcePath); } }
private void addLibJars(Path srcLibJarDir, Optional<Map<String, LocalResource>> resourceMap, Path destDir) throws IOException { FileSystem localFs = FileSystem.getLocal(this.yarnConfiguration); FileStatus[] libJarFiles = localFs.listStatus(srcLibJarDir); if (libJarFiles == null || libJarFiles.length == 0) { return; } for (FileStatus libJarFile : libJarFiles) { Path destFilePath = new Path(destDir, libJarFile.getPath().getName()); this.fs.copyFromLocalFile(libJarFile.getPath(), destFilePath); if (resourceMap.isPresent()) { YarnHelixUtils.addFileAsLocalResource(this.fs, destFilePath, LocalResourceType.FILE, resourceMap.get()); } } }
static LocalResource createJarFile(FileContext files, Path p, int len, Random r, LocalResourceVisibility vis) throws IOException, URISyntaxException { byte[] bytes = new byte[len]; r.nextBytes(bytes); File archiveFile = new File(p.toUri().getPath() + ".jar"); archiveFile.createNewFile(); JarOutputStream out = new JarOutputStream( new FileOutputStream(archiveFile)); out.putNextEntry(new JarEntry(p.getName())); out.write(bytes); out.closeEntry(); out.close(); LocalResource ret = recordFactory.newRecordInstance(LocalResource.class); ret.setResource(URL.fromPath(new Path(p.toString() + ".jar"))); ret.setSize(len); ret.setType(LocalResourceType.ARCHIVE); ret.setVisibility(vis); ret.setTimestamp(files.getFileStatus(new Path(p.toString() + ".jar")) .getModificationTime()); return ret; }
static LocalResource createZipFile(FileContext files, Path p, int len, Random r, LocalResourceVisibility vis) throws IOException, URISyntaxException { byte[] bytes = new byte[len]; r.nextBytes(bytes); File archiveFile = new File(p.toUri().getPath() + ".ZIP"); archiveFile.createNewFile(); ZipOutputStream out = new ZipOutputStream( new FileOutputStream(archiveFile)); out.putNextEntry(new ZipEntry(p.getName())); out.write(bytes); out.closeEntry(); out.close(); LocalResource ret = recordFactory.newRecordInstance(LocalResource.class); ret.setResource(URL.fromPath(new Path(p.toString() + ".ZIP"))); ret.setSize(len); ret.setType(LocalResourceType.ARCHIVE); ret.setVisibility(vis); ret.setTimestamp(files.getFileStatus(new Path(p.toString() + ".ZIP")) .getModificationTime()); return ret; }
/** * @param file Path. * @param fs File system. * @param type Local resource type. * @throws Exception If failed. */ public static LocalResource setupFile(Path file, FileSystem fs, LocalResourceType type) throws Exception { LocalResource resource = Records.newRecord(LocalResource.class); file = fs.makeQualified(file); FileStatus stat = fs.getFileStatus(file); resource.setResource(ConverterUtils.getYarnUrlFromPath(file)); resource.setSize(stat.getLen()); resource.setTimestamp(stat.getModificationTime()); resource.setType(type); resource.setVisibility(LocalResourceVisibility.APPLICATION); return resource; }
static LocalResource createZipFile(FileContext files, Path p, int len, Random r, LocalResourceVisibility vis) throws IOException, URISyntaxException { byte[] bytes = new byte[len]; r.nextBytes(bytes); File archiveFile = new File(p.toUri().getPath() + ".zip"); archiveFile.createNewFile(); ZipOutputStream out = new ZipOutputStream( new FileOutputStream(archiveFile)); out.putNextEntry(new ZipEntry(p.getName())); out.write(bytes); out.closeEntry(); out.close(); LocalResource ret = recordFactory.newRecordInstance(LocalResource.class); ret.setResource(ConverterUtils.getYarnUrlFromPath(new Path(p.toString() + ".zip"))); ret.setSize(len); ret.setType(LocalResourceType.ARCHIVE); ret.setVisibility(vis); ret.setTimestamp(files.getFileStatus(new Path(p.toString() + ".zip")) .getModificationTime()); return ret; }