@Override public void failedBulkLoad(final byte[] family, final String srcPath) throws IOException { if (!FSHDFSUtils.isSameHdfs(conf, srcFs, fs)) { // files are copied so no need to move them back return; } Path p = new Path(srcPath); Path stageP = new Path(stagingDir, new Path(Bytes.toString(family), p.getName())); LOG.debug("Moving " + stageP + " back to " + p); if(!fs.rename(stageP, p)) throw new IOException("Failed to move HFile: " + stageP + " to " + p); // restore original permission if (origPermissions.containsKey(srcPath)) { fs.setPermission(p, origPermissions.get(srcPath)); } else { LOG.warn("Can't find previous permission for path=" + srcPath); } }
/** * Bulk load: Add a specified store file to the specified family. If the source file is on the * same different file-system is moved from the source location to the destination location, * otherwise is copied over. * * @param familyName Family that will gain the file * @param srcPath {@link Path} to the file to import * @param seqNum Bulk Load sequence number * @return The destination {@link Path} of the bulk loaded file * @throws IOException */ Path bulkLoadStoreFile(final String familyName, Path srcPath, long seqNum) throws IOException { // Copy the file if it's on another filesystem FileSystem srcFs = srcPath.getFileSystem(conf); FileSystem desFs = fs instanceof HFileSystem ? ((HFileSystem) fs).getBackingFs() : fs; // We can't compare FileSystem instances as equals() includes UGI instance // as part of the comparison and won't work when doing SecureBulkLoad // TODO deal with viewFS if (!FSHDFSUtils.isSameHdfs(conf, srcFs, desFs)) { LOG.info("Bulk-load file " + srcPath + " is on different filesystem than " + "the destination store. Copying file over to destination filesystem."); Path tmpPath = createTempName(); FileUtil.copy(srcFs, srcPath, fs, tmpPath, false, conf); LOG.info("Copied " + srcPath + " to temporary path on destination filesystem: " + tmpPath); srcPath = tmpPath; } return commitStoreFile(familyName, srcPath, seqNum, true); }
/** * Bulk load: Add a specified store file to the specified family. * If the source file is on the same different file-system is moved from the * source location to the destination location, otherwise is copied over. * * @param familyName Family that will gain the file * @param srcPath {@link Path} to the file to import * @param seqNum Bulk Load sequence number * @return The destination {@link Path} of the bulk loaded file * @throws IOException */ Path bulkLoadStoreFile(final String familyName, Path srcPath, long seqNum) throws IOException { // Copy the file if it's on another filesystem FileSystem srcFs = srcPath.getFileSystem(conf); FileSystem desFs = fs instanceof HFileSystem ? ((HFileSystem)fs).getBackingFs() : fs; // We can't compare FileSystem instances as equals() includes UGI instance // as part of the comparison and won't work when doing SecureBulkLoad // TODO deal with viewFS if (!FSHDFSUtils.isSameHdfs(conf, srcFs, desFs)) { LOG.info("Bulk-load file " + srcPath + " is on different filesystem than " + "the destination store. Copying file over to destination filesystem."); Path tmpPath = createTempName(); FileUtil.copy(srcFs, srcPath, fs, tmpPath, false, conf); LOG.info("Copied " + srcPath + " to temporary path on destination filesystem: " + tmpPath); srcPath = tmpPath; } return commitStoreFile(familyName, srcPath, seqNum, true); }
@Override public String prepareBulkLoad(final byte[] family, final String srcPath) throws IOException { Path p = new Path(srcPath); Path stageP = new Path(stagingDir, new Path(Bytes.toString(family), p.getName())); if (srcFs == null) { srcFs = FileSystem.get(p.toUri(), conf); } if(!isFile(p)) { throw new IOException("Path does not reference a file: " + p); } // Check to see if the source and target filesystems are the same if (!FSHDFSUtils.isSameHdfs(conf, srcFs, fs)) { LOG.debug("Bulk-load file " + srcPath + " is on different filesystem than " + "the destination filesystem. Copying file over to destination staging dir."); FileUtil.copy(srcFs, p, fs, stageP, false, conf); } else { LOG.debug("Moving " + p + " to " + stageP); if(!fs.rename(p, stageP)) { throw new IOException("Failed to move HFile: " + p + " to " + stageP); } } return stageP.toString(); }
/** * Bulk load: Add a specified store file to the specified family. * If the source file is on the same different file-system is moved from the * source location to the destination location, otherwise is copied over. * * @param familyName Family that will gain the file * @param srcPath {@link Path} to the file to import * @param seqNum Bulk Load sequence number * @return The destination {@link Path} of the bulk loaded file * @throws IOException */ Pair<Path, Path> bulkLoadStoreFile(final String familyName, Path srcPath, long seqNum) throws IOException { // Copy the file if it's on another filesystem FileSystem srcFs = srcPath.getFileSystem(conf); srcPath = srcFs.resolvePath(srcPath); FileSystem realSrcFs = srcPath.getFileSystem(conf); FileSystem desFs = fs instanceof HFileSystem ? ((HFileSystem)fs).getBackingFs() : fs; // We can't compare FileSystem instances as equals() includes UGI instance // as part of the comparison and won't work when doing SecureBulkLoad // TODO deal with viewFS if (!FSHDFSUtils.isSameHdfs(conf, realSrcFs, desFs)) { LOG.info("Bulk-load file " + srcPath + " is on different filesystem than " + "the destination store. Copying file over to destination filesystem."); Path tmpPath = createTempName(); FileUtil.copy(realSrcFs, srcPath, fs, tmpPath, false, conf); LOG.info("Copied " + srcPath + " to temporary path on destination filesystem: " + tmpPath); srcPath = tmpPath; } return new Pair<>(srcPath, preCommitStoreFile(familyName, srcPath, seqNum, true)); }
@Override public String prepareBulkLoad(final byte[] family, final String srcPath) throws IOException { Path p = new Path(srcPath); Path stageP = new Path(stagingDir, new Path(Bytes.toString(family), p.getName())); if (srcFs == null) { srcFs = FileSystem.get(p.toUri(), conf); } if(!isFile(p)) { throw new IOException("Path does not reference a file: " + p); } // Check to see if the source and target filesystems are the same if (!FSHDFSUtils.isSameHdfs(conf, srcFs, fs)) { LOG.debug("Bulk-load file " + srcPath + " is on different filesystem than " + "the destination filesystem. Copying file over to destination staging dir."); FileUtil.copy(srcFs, p, fs, stageP, false, conf); } else { LOG.debug("Moving " + p + " to " + stageP); FileStatus origFileStatus = fs.getFileStatus(p); origPermissions.put(srcPath, origFileStatus.getPermission()); if(!fs.rename(p, stageP)) { throw new IOException("Failed to move HFile: " + p + " to " + stageP); } } fs.setPermission(stageP, PERM_ALL_ACCESS); return stageP.toString(); }
@Override public String prepareBulkLoad(final byte[] family, final String srcPath, boolean copyFile) throws IOException { Path p = new Path(srcPath); Path stageP = new Path(stagingDir, new Path(Bytes.toString(family), p.getName())); // In case of Replication for bulk load files, hfiles are already copied in staging directory if (p.equals(stageP)) { LOG.debug(p.getName() + " is already available in staging directory. Skipping copy or rename."); return stageP.toString(); } if (srcFs == null) { srcFs = FileSystem.get(p.toUri(), conf); } if(!isFile(p)) { throw new IOException("Path does not reference a file: " + p); } // Check to see if the source and target filesystems are the same if (!FSHDFSUtils.isSameHdfs(conf, srcFs, fs)) { LOG.debug("Bulk-load file " + srcPath + " is on different filesystem than " + "the destination filesystem. Copying file over to destination staging dir."); FileUtil.copy(srcFs, p, fs, stageP, false, conf); } else if (copyFile) { LOG.debug("Bulk-load file " + srcPath + " is copied to destination staging dir."); FileUtil.copy(srcFs, p, fs, stageP, false, conf); } else { LOG.debug("Moving " + p + " to " + stageP); FileStatus origFileStatus = fs.getFileStatus(p); origPermissions.put(srcPath, origFileStatus.getPermission()); if(!fs.rename(p, stageP)) { throw new IOException("Failed to move HFile: " + p + " to " + stageP); } } fs.setPermission(stageP, PERM_ALL_ACCESS); return stageP.toString(); }
@Override public void failedBulkLoad(final byte[] family, final String srcPath) throws IOException { if (!FSHDFSUtils.isSameHdfs(conf, srcFs, fs)) { // files are copied so no need to move them back return; } Path p = new Path(srcPath); Path stageP = new Path(stagingDir, new Path(Bytes.toString(family), p.getName())); // In case of Replication for bulk load files, hfiles are not renamed by end point during // prepare stage, so no need of rename here again if (p.equals(stageP)) { LOG.debug(p.getName() + " is already available in source directory. Skipping rename."); return; } LOG.debug("Moving " + stageP + " back to " + p); if(!fs.rename(stageP, p)) throw new IOException("Failed to move HFile: " + stageP + " to " + p); // restore original permission if (origPermissions.containsKey(srcPath)) { fs.setPermission(p, origPermissions.get(srcPath)); } else { LOG.warn("Can't find previous permission for path=" + srcPath); } }
@VisibleForTesting protected ClientServiceCallable<byte[]> buildClientServiceCallable(Connection conn, TableName tableName, byte[] first, Collection<LoadQueueItem> lqis, boolean copyFile) { List<Pair<byte[], String>> famPaths = lqis.stream().map(lqi -> Pair.newPair(lqi.getFamily(), lqi.getFilePath().toString())) .collect(Collectors.toList()); return new ClientServiceCallable<byte[]>(conn, tableName, first, rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET) { @Override protected byte[] rpcCall() throws Exception { SecureBulkLoadClient secureClient = null; boolean success = false; try { if (LOG.isDebugEnabled()) { LOG.debug("Going to connect to server " + getLocation() + " for row " + Bytes.toStringBinary(getRow()) + " with hfile group " + LoadIncrementalHFiles.this.toString(famPaths)); } byte[] regionName = getLocation().getRegionInfo().getRegionName(); try (Table table = conn.getTable(getTableName())) { secureClient = new SecureBulkLoadClient(getConf(), table); success = secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName, assignSeqIds, fsDelegationToken.getUserToken(), bulkToken, copyFile); } return success ? regionName : null; } finally { // Best effort copying of files that might not have been imported // from the staging directory back to original location // in user directory if (secureClient != null && !success) { FileSystem targetFs = FileSystem.get(getConf()); FileSystem sourceFs = lqis.iterator().next().getFilePath().getFileSystem(getConf()); // Check to see if the source and target filesystems are the same // If they are the same filesystem, we will try move the files back // because previously we moved them to the staging directory. if (FSHDFSUtils.isSameHdfs(getConf(), sourceFs, targetFs)) { for (Pair<byte[], String> el : famPaths) { Path hfileStagingPath = null; Path hfileOrigPath = new Path(el.getSecond()); try { hfileStagingPath = new Path(new Path(bulkToken, Bytes.toString(el.getFirst())), hfileOrigPath.getName()); if (targetFs.rename(hfileStagingPath, hfileOrigPath)) { LOG.debug("Moved back file " + hfileOrigPath + " from " + hfileStagingPath); } else if (targetFs.exists(hfileStagingPath)) { LOG.debug( "Unable to move back file " + hfileOrigPath + " from " + hfileStagingPath); } } catch (Exception ex) { LOG.debug( "Unable to move back file " + hfileOrigPath + " from " + hfileStagingPath, ex); } } } } } } }; }