Java 类org.apache.hadoop.io.SequenceFile 实例源码
项目:hadoop-oss
文件:TestTFileSeqFileComparison.java
public SeqFileAppendable(FileSystem fs, Path path, int osBufferSize,
String compress, int minBlkSize) throws IOException {
Configuration conf = new Configuration();
CompressionCodec codec = null;
if ("lzo".equals(compress)) {
codec = Compression.Algorithm.LZO.getCodec();
}
else if ("gz".equals(compress)) {
codec = Compression.Algorithm.GZ.getCodec();
}
else if (!"none".equals(compress))
throw new IOException("Codec not supported.");
this.fsdos = fs.create(path, true, osBufferSize);
if (!"none".equals(compress)) {
writer =
SequenceFile.createWriter(conf, fsdos, BytesWritable.class,
BytesWritable.class, SequenceFile.CompressionType.BLOCK, codec);
}
else {
writer =
SequenceFile.createWriter(conf, fsdos, BytesWritable.class,
BytesWritable.class, SequenceFile.CompressionType.NONE, null);
}
}
项目:circus-train
文件:SimpleCopyListing.java
/**
* Collect the list of <sourceRelativePath, sourceFileStatus> to be copied and write to the sequence file. In essence,
* any file or directory that need to be copied or sync-ed is written as an entry to the sequence file, with the
* possible exception of the source root: when either -update (sync) or -overwrite switch is specified, and if the the
* source root is a directory, then the source root entry is not written to the sequence file, because only the
* contents of the source directory need to be copied in this case. See
* {@link com.hotels.bdp.circustrain.s3mapreducecp.util.ConfigurationUtil#getRelativePath} for how relative path is
* computed. See computeSourceRootPath method for how the root path of the source is computed.
*
* @param fileListWriter
* @param options
* @param globbedPaths
* @throws IOException
*/
@VisibleForTesting
public void doBuildListing(SequenceFile.Writer fileListWriter, S3MapReduceCpOptions options) throws IOException {
List<Path> globbedPaths = new ArrayList<>(options.getSources().size());
for (Path sourcePath : options.getSources()) {
FileSystem fs = sourcePath.getFileSystem(getConf());
FileStatus sourceFileStatus = fs.getFileStatus(sourcePath);
if (sourceFileStatus.isFile()) {
LOG.debug("Adding path {}", sourceFileStatus.getPath());
globbedPaths.add(sourceFileStatus.getPath());
} else {
FileStatus[] inputs = fs.globStatus(sourcePath);
if (inputs != null && inputs.length > 0) {
for (FileStatus onePath : inputs) {
LOG.debug("Adding path {}", onePath.getPath());
globbedPaths.add(onePath.getPath());
}
} else {
throw new InvalidInputException("Source path " + sourcePath + " doesn't exist");
}
}
}
doBuildListing(fileListWriter, options, globbedPaths);
}
项目:circus-train
文件:SimpleCopyListing.java
private void traverseNonEmptyDirectory(
SequenceFile.Writer fileListWriter,
FileStatus sourceStatus,
Path sourcePathRoot,
S3MapReduceCpOptions options)
throws IOException {
FileSystem sourceFS = sourcePathRoot.getFileSystem(getConf());
Stack<FileStatus> pathStack = new Stack<>();
pathStack.push(sourceStatus);
while (!pathStack.isEmpty()) {
for (FileStatus child : getChildren(sourceFS, pathStack.pop())) {
if (child.isFile()) {
LOG.debug("Recording source-path: {} for copy.", sourceStatus.getPath());
CopyListingFileStatus childCopyListingStatus = new CopyListingFileStatus(child);
writeToFileListing(fileListWriter, childCopyListingStatus, sourcePathRoot, options);
}
if (isDirectoryAndNotEmpty(sourceFS, child)) {
LOG.debug("Traversing non-empty source dir: {}", sourceStatus.getPath());
pathStack.push(child);
}
}
}
}
项目:circus-train
文件:SimpleCopyListing.java
private void writeToFileListing(
SequenceFile.Writer fileListWriter,
CopyListingFileStatus fileStatus,
Path sourcePathRoot,
S3MapReduceCpOptions options)
throws IOException {
LOG.debug("REL PATH: {}, FULL PATH: {}", PathUtil.getRelativePath(sourcePathRoot, fileStatus.getPath()),
fileStatus.getPath());
FileStatus status = fileStatus;
if (!shouldCopy(fileStatus.getPath(), options)) {
return;
}
fileListWriter.append(new Text(PathUtil.getRelativePath(sourcePathRoot, fileStatus.getPath())), status);
fileListWriter.sync();
if (!fileStatus.isDirectory()) {
totalBytesToCopy += fileStatus.getLen();
}
totalPaths++;
}
项目:hadoop
文件:SimpleCopyListing.java
private void writeToFileListing(SequenceFile.Writer fileListWriter,
CopyListingFileStatus fileStatus,
Path sourcePathRoot,
DistCpOptions options) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("REL PATH: " + DistCpUtils.getRelativePath(sourcePathRoot,
fileStatus.getPath()) + ", FULL PATH: " + fileStatus.getPath());
}
FileStatus status = fileStatus;
if (!shouldCopy(fileStatus.getPath(), options)) {
return;
}
fileListWriter.append(new Text(DistCpUtils.getRelativePath(sourcePathRoot,
fileStatus.getPath())), status);
fileListWriter.sync();
if (!fileStatus.isDirectory()) {
totalBytesToCopy += fileStatus.getLen();
}
totalPaths++;
}
项目:circus-train
文件:SimpleCopyListingTest.java
@Test(timeout = 10000)
public void skipFlagFiles() throws Exception {
FileSystem fs = cluster.getFileSystem();
Path source = new Path("/tmp/in4");
URI target = URI.create("s3://bucket/tmp/out4/");
createFile(fs, new Path(source, "1/_SUCCESS"));
createFile(fs, new Path(source, "1/file"));
createFile(fs, new Path(source, "2"));
Path listingFile = new Path("/tmp/list4");
listing.buildListing(listingFile, options(source, target));
assertThat(listing.getNumberOfPaths(), is(2L));
try (SequenceFile.Reader reader = new SequenceFile.Reader(CONFIG, SequenceFile.Reader.file(listingFile))) {
CopyListingFileStatus fileStatus = new CopyListingFileStatus();
Text relativePath = new Text();
assertThat(reader.next(relativePath, fileStatus), is(true));
assertThat(relativePath.toString(), is("/1/file"));
assertThat(reader.next(relativePath, fileStatus), is(true));
assertThat(relativePath.toString(), is("/2"));
assertThat(reader.next(relativePath, fileStatus), is(false));
}
}
项目:circus-train
文件:SimpleCopyListingTest.java
@Test
public void failOnCloseError() throws IOException {
File inFile = File.createTempFile("TestCopyListingIn", null);
inFile.deleteOnExit();
File outFile = File.createTempFile("TestCopyListingOut", null);
outFile.deleteOnExit();
Path source = new Path(inFile.toURI());
Exception expectedEx = new IOException("boom");
SequenceFile.Writer writer = mock(SequenceFile.Writer.class);
doThrow(expectedEx).when(writer).close();
SimpleCopyListing listing = new SimpleCopyListing(CONFIG, CREDENTIALS);
Exception actualEx = null;
try {
listing.doBuildListing(writer, options(source, outFile.toURI()));
} catch (Exception e) {
actualEx = e;
}
Assert.assertNotNull("close writer didn't fail", actualEx);
Assert.assertEquals(expectedEx, actualEx);
}
项目:flume-release-1.7.0
文件:HDFSSequenceFile.java
protected void open(Path dstPath, CompressionCodec codeC,
CompressionType compType, Configuration conf, FileSystem hdfs)
throws IOException {
if (useRawLocalFileSystem) {
if (hdfs instanceof LocalFileSystem) {
hdfs = ((LocalFileSystem)hdfs).getRaw();
} else {
logger.warn("useRawLocalFileSystem is set to true but file system " +
"is not of type LocalFileSystem: " + hdfs.getClass().getName());
}
}
if (conf.getBoolean("hdfs.append.support", false) == true && hdfs.isFile(dstPath)) {
outStream = hdfs.append(dstPath);
} else {
outStream = hdfs.create(dstPath);
}
writer = SequenceFile.createWriter(conf, outStream,
serializer.getKeyClass(), serializer.getValueClass(), compType, codeC);
registerCurrentStream(outStream, hdfs, dstPath);
}
项目:flume-release-1.7.0
文件:TestBucketWriter.java
@Test
public void testEventCountingRoller() throws IOException, InterruptedException {
int maxEvents = 100;
MockHDFSWriter hdfsWriter = new MockHDFSWriter();
BucketWriter bucketWriter = new BucketWriter(
0, 0, maxEvents, 0, ctx, "/tmp", "file", "", ".tmp", null, null,
SequenceFile.CompressionType.NONE, hdfsWriter, timedRollerPool, proxy,
new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0, null, null, 30000,
Executors.newSingleThreadExecutor(), 0, 0);
Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
for (int i = 0; i < 1000; i++) {
bucketWriter.append(e);
}
logger.info("Number of events written: {}", hdfsWriter.getEventsWritten());
logger.info("Number of bytes written: {}", hdfsWriter.getBytesWritten());
logger.info("Number of files opened: {}", hdfsWriter.getFilesOpened());
Assert.assertEquals("events written", 1000, hdfsWriter.getEventsWritten());
Assert.assertEquals("bytes written", 3000, hdfsWriter.getBytesWritten());
Assert.assertEquals("files opened", 10, hdfsWriter.getFilesOpened());
}
项目:flume-release-1.7.0
文件:TestBucketWriter.java
@Test
public void testSizeRoller() throws IOException, InterruptedException {
int maxBytes = 300;
MockHDFSWriter hdfsWriter = new MockHDFSWriter();
BucketWriter bucketWriter = new BucketWriter(
0, maxBytes, 0, 0, ctx, "/tmp", "file", "", ".tmp", null, null,
SequenceFile.CompressionType.NONE, hdfsWriter, timedRollerPool, proxy,
new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0, null, null, 30000,
Executors.newSingleThreadExecutor(), 0, 0);
Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
for (int i = 0; i < 1000; i++) {
bucketWriter.append(e);
}
logger.info("Number of events written: {}", hdfsWriter.getEventsWritten());
logger.info("Number of bytes written: {}", hdfsWriter.getBytesWritten());
logger.info("Number of files opened: {}", hdfsWriter.getFilesOpened());
Assert.assertEquals("events written", 1000, hdfsWriter.getEventsWritten());
Assert.assertEquals("bytes written", 3000, hdfsWriter.getBytesWritten());
Assert.assertEquals("files opened", 10, hdfsWriter.getFilesOpened());
}
项目:hadoop
文件:TestGlobbedCopyListing.java
private void verifyContents(Path listingPath) throws Exception {
SequenceFile.Reader reader = new SequenceFile.Reader(cluster.getFileSystem(),
listingPath, new Configuration());
Text key = new Text();
CopyListingFileStatus value = new CopyListingFileStatus();
Map<String, String> actualValues = new HashMap<String, String>();
while (reader.next(key, value)) {
if (value.isDirectory() && key.toString().equals("")) {
// ignore root with empty relPath, which is an entry to be
// used for preserving root attributes etc.
continue;
}
actualValues.put(value.getPath().toString(), key.toString());
}
Assert.assertEquals(expectedValues.size(), actualValues.size());
for (Map.Entry<String, String> entry : actualValues.entrySet()) {
Assert.assertEquals(entry.getValue(), expectedValues.get(entry.getKey()));
}
}
项目:flume-release-1.7.0
文件:TestBucketWriter.java
@Test
public void testInUsePrefix() throws IOException, InterruptedException {
final int ROLL_INTERVAL = 1000; // seconds. Make sure it doesn't change in course of test
final String PREFIX = "BRNO_IS_CITY_IN_CZECH_REPUBLIC";
MockHDFSWriter hdfsWriter = new MockHDFSWriter();
HDFSTextSerializer formatter = new HDFSTextSerializer();
BucketWriter bucketWriter = new BucketWriter(
ROLL_INTERVAL, 0, 0, 0, ctx, "/tmp", "file", PREFIX, ".tmp", null, null,
SequenceFile.CompressionType.NONE, hdfsWriter, timedRollerPool, proxy,
new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0, null, null, 30000,
Executors.newSingleThreadExecutor(), 0, 0);
Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
bucketWriter.append(e);
Assert.assertTrue("Incorrect in use prefix", hdfsWriter.getOpenedFilePath().contains(PREFIX));
}
项目:flume-release-1.7.0
文件:TestBucketWriter.java
@Test
public void testInUseSuffix() throws IOException, InterruptedException {
final int ROLL_INTERVAL = 1000; // seconds. Make sure it doesn't change in course of test
final String SUFFIX = "WELCOME_TO_THE_HELLMOUNTH";
MockHDFSWriter hdfsWriter = new MockHDFSWriter();
HDFSTextSerializer serializer = new HDFSTextSerializer();
BucketWriter bucketWriter = new BucketWriter(
ROLL_INTERVAL, 0, 0, 0, ctx, "/tmp", "file", "", SUFFIX, null, null,
SequenceFile.CompressionType.NONE, hdfsWriter, timedRollerPool, proxy,
new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0, null, null, 30000,
Executors.newSingleThreadExecutor(), 0, 0);
Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
bucketWriter.append(e);
Assert.assertTrue("Incorrect in use suffix", hdfsWriter.getOpenedFilePath().contains(SUFFIX));
}
项目:flume-release-1.7.0
文件:TestBucketWriter.java
@Test
public void testCallbackOnClose() throws IOException, InterruptedException {
final int ROLL_INTERVAL = 1000; // seconds. Make sure it doesn't change in course of test
final String SUFFIX = "WELCOME_TO_THE_EREBOR";
final AtomicBoolean callbackCalled = new AtomicBoolean(false);
MockHDFSWriter hdfsWriter = new MockHDFSWriter();
BucketWriter bucketWriter = new BucketWriter(
ROLL_INTERVAL, 0, 0, 0, ctx, "/tmp", "file", "", SUFFIX, null, null,
SequenceFile.CompressionType.NONE, hdfsWriter, timedRollerPool, proxy,
new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0,
new HDFSEventSink.WriterCallback() {
@Override
public void run(String filePath) {
callbackCalled.set(true);
}
}, "blah", 30000, Executors.newSingleThreadExecutor(), 0, 0);
Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
bucketWriter.append(e);
bucketWriter.close(true);
Assert.assertTrue(callbackCalled.get());
}
项目:flume-release-1.7.0
文件:TestHDFSCompressedDataStream.java
@Test
public void testGzipDurability() throws Exception {
Context context = new Context();
HDFSCompressedDataStream writer = new HDFSCompressedDataStream();
writer.configure(context);
writer.open(fileURI, factory.getCodec(new Path(fileURI)),
SequenceFile.CompressionType.BLOCK);
String[] bodies = { "yarf!" };
writeBodies(writer, bodies);
byte[] buf = new byte[256];
GZIPInputStream cmpIn = new GZIPInputStream(new FileInputStream(file));
int len = cmpIn.read(buf);
String result = new String(buf, 0, len, Charsets.UTF_8);
result = result.trim(); // BodyTextEventSerializer adds a newline
Assert.assertEquals("input and output must match", bodies[0], result);
}
项目:hadoop
文件:DistCpV1.java
static private void finalize(Configuration conf, JobConf jobconf,
final Path destPath, String presevedAttributes) throws IOException {
if (presevedAttributes == null) {
return;
}
EnumSet<FileAttribute> preseved = FileAttribute.parse(presevedAttributes);
if (!preseved.contains(FileAttribute.USER)
&& !preseved.contains(FileAttribute.GROUP)
&& !preseved.contains(FileAttribute.PERMISSION)) {
return;
}
FileSystem dstfs = destPath.getFileSystem(conf);
Path dstdirlist = new Path(jobconf.get(DST_DIR_LIST_LABEL));
try (SequenceFile.Reader in =
new SequenceFile.Reader(jobconf, Reader.file(dstdirlist))) {
Text dsttext = new Text();
FilePair pair = new FilePair();
for(; in.next(dsttext, pair); ) {
Path absdst = new Path(destPath, pair.output);
updateDestStatus(pair.input, dstfs.getFileStatus(absdst),
preseved, dstfs);
}
}
}
项目:LDA
文件:SequenceFileIterator.java
/**
* @throws IOException if path can't be read, or its key or value class can't be instantiated
*/
public SequenceFileIterator(Path path, boolean reuseKeyValueInstances, Configuration conf) throws IOException {
key = null;
value = null;
FileSystem fs = path.getFileSystem(conf);
path = path.makeQualified(fs);
reader = new SequenceFile.Reader(fs, path, conf);
this.conf = conf;
keyClass = (Class<K>) reader.getKeyClass();
valueClass = (Class<V>) reader.getValueClass();
noValue = NullWritable.class.equals(valueClass);
this.reuseKeyValueInstances = reuseKeyValueInstances;
}
项目:hadoop
文件:GenerateDistCacheData.java
@Override
public List<InputSplit> getSplits(JobContext jobCtxt) throws IOException {
final JobConf jobConf = new JobConf(jobCtxt.getConfiguration());
final JobClient client = new JobClient(jobConf);
ClusterStatus stat = client.getClusterStatus(true);
int numTrackers = stat.getTaskTrackers();
final int fileCount = jobConf.getInt(GRIDMIX_DISTCACHE_FILE_COUNT, -1);
// Total size of distributed cache files to be generated
final long totalSize = jobConf.getLong(GRIDMIX_DISTCACHE_BYTE_COUNT, -1);
// Get the path of the special file
String distCacheFileList = jobConf.get(GRIDMIX_DISTCACHE_FILE_LIST);
if (fileCount < 0 || totalSize < 0 || distCacheFileList == null) {
throw new RuntimeException("Invalid metadata: #files (" + fileCount
+ "), total_size (" + totalSize + "), filelisturi ("
+ distCacheFileList + ")");
}
Path sequenceFile = new Path(distCacheFileList);
FileSystem fs = sequenceFile.getFileSystem(jobConf);
FileStatus srcst = fs.getFileStatus(sequenceFile);
// Consider the number of TTs * mapSlotsPerTracker as number of mappers.
int numMapSlotsPerTracker = jobConf.getInt(TTConfig.TT_MAP_SLOTS, 2);
int numSplits = numTrackers * numMapSlotsPerTracker;
List<InputSplit> splits = new ArrayList<InputSplit>(numSplits);
LongWritable key = new LongWritable();
BytesWritable value = new BytesWritable();
// Average size of data to be generated by each map task
final long targetSize = Math.max(totalSize / numSplits,
DistributedCacheEmulator.AVG_BYTES_PER_MAP);
long splitStartPosition = 0L;
long splitEndPosition = 0L;
long acc = 0L;
long bytesRemaining = srcst.getLen();
SequenceFile.Reader reader = null;
try {
reader = new SequenceFile.Reader(fs, sequenceFile, jobConf);
while (reader.next(key, value)) {
// If adding this file would put this split past the target size,
// cut the last split and put this file in the next split.
if (acc + key.get() > targetSize && acc != 0) {
long splitSize = splitEndPosition - splitStartPosition;
splits.add(new FileSplit(
sequenceFile, splitStartPosition, splitSize, (String[])null));
bytesRemaining -= splitSize;
splitStartPosition = splitEndPosition;
acc = 0L;
}
acc += key.get();
splitEndPosition = reader.getPosition();
}
} finally {
if (reader != null) {
reader.close();
}
}
if (bytesRemaining != 0) {
splits.add(new FileSplit(
sequenceFile, splitStartPosition, bytesRemaining, (String[])null));
}
return splits;
}
项目:hadoop
文件:JHLogAnalyzer.java
public void run() {
try {
for(int i=start; i < end; i++) {
String name = getFileName(i);
Path controlFile = new Path(INPUT_DIR, "in_file_" + name);
SequenceFile.Writer writer = null;
try {
writer = SequenceFile.createWriter(fs, fs.getConf(), controlFile,
Text.class, LongWritable.class,
CompressionType.NONE);
String logFile = jhLogFiles[i].getPath().toString();
writer.append(new Text(logFile), new LongWritable(0));
} catch(Exception e) {
throw new IOException(e);
} finally {
if (writer != null)
writer.close();
writer = null;
}
}
} catch(IOException ex) {
LOG.error("FileCreateDaemon failed.", ex);
}
numFinishedThreads++;
}
项目:hadoop
文件:TestDatamerge.java
private static SequenceFile.Writer[] createWriters(Path testdir,
Configuration conf, int srcs, Path[] src) throws IOException {
for (int i = 0; i < srcs; ++i) {
src[i] = new Path(testdir, Integer.toString(i + 10, 36));
}
SequenceFile.Writer out[] = new SequenceFile.Writer[srcs];
for (int i = 0; i < srcs; ++i) {
out[i] = new SequenceFile.Writer(testdir.getFileSystem(conf), conf,
src[i], IntWritable.class, IntWritable.class);
}
return out;
}
项目:hadoop
文件:GenericMRLoadGenerator.java
public List<InputSplit> getSplits(JobContext job)
throws IOException {
Configuration conf = job.getConfiguration();
Path src = new Path(conf.get(INDIRECT_INPUT_FILE, null));
FileSystem fs = src.getFileSystem(conf);
List<InputSplit> splits = new ArrayList<InputSplit>();
LongWritable key = new LongWritable();
Text value = new Text();
for (SequenceFile.Reader sl = new SequenceFile.Reader(fs, src, conf);
sl.next(key, value);) {
splits.add(new IndirectSplit(new Path(value.toString()), key.get()));
}
return splits;
}
项目:hadoop
文件:TestTotalOrderPartitioner.java
private static <T extends WritableComparable<?>> Path writePartitionFile(
String testname, Configuration conf, T[] splits) throws IOException {
final FileSystem fs = FileSystem.getLocal(conf);
final Path testdir = new Path(System.getProperty("test.build.data", "/tmp")
).makeQualified(fs);
Path p = new Path(testdir, testname + "/_partition.lst");
TotalOrderPartitioner.setPartitionFile(conf, p);
conf.setInt(MRJobConfig.NUM_REDUCES, splits.length + 1);
SequenceFile.Writer w = null;
try {
w = SequenceFile.createWriter(fs, conf, p,
splits[0].getClass(), NullWritable.class,
SequenceFile.CompressionType.NONE);
for (int i = 0; i < splits.length; ++i) {
w.append(splits[i], NullWritable.get());
}
} finally {
if (null != w)
w.close();
}
return p;
}
项目:hadoop
文件:TestJoinDatamerge.java
private static int countProduct(IntWritable key, Path[] src,
Configuration conf) throws IOException {
int product = 1;
for (Path p : src) {
int count = 0;
SequenceFile.Reader r = new SequenceFile.Reader(
cluster.getFileSystem(), p, conf);
IntWritable k = new IntWritable();
IntWritable v = new IntWritable();
while (r.next(k, v)) {
if (k.equals(key)) {
count++;
}
}
r.close();
if (count != 0) {
product *= count;
}
}
return product;
}
项目:hadoop
文件:TestCombineSequenceFileInputFormat.java
private static void createFiles(int length, int numFiles, Random random,
Job job) throws IOException {
Range[] ranges = createRanges(length, numFiles, random);
for (int i = 0; i < numFiles; i++) {
Path file = new Path(workDir, "test_" + i + ".seq");
// create a file with length entries
@SuppressWarnings("deprecation")
SequenceFile.Writer writer =
SequenceFile.createWriter(localFs, job.getConfiguration(), file,
IntWritable.class, BytesWritable.class);
Range range = ranges[i];
try {
for (int j = range.start; j < range.end; j++) {
IntWritable key = new IntWritable(j);
byte[] data = new byte[random.nextInt(10)];
random.nextBytes(data);
BytesWritable value = new BytesWritable(data);
writer.append(key, value);
}
} finally {
writer.close();
}
}
}
项目:hadoop
文件:NNBench.java
/**
* Create control files before a test run.
* Number of files created is equal to the number of maps specified
*
* @throws IOException on error
*/
private static void createControlFiles() throws IOException {
FileSystem tempFS = FileSystem.get(config);
LOG.info("Creating " + numberOfMaps + " control files");
for (int i = 0; i < numberOfMaps; i++) {
String strFileName = "NNBench_Controlfile_" + i;
Path filePath = new Path(new Path(baseDir, CONTROL_DIR_NAME),
strFileName);
SequenceFile.Writer writer = null;
try {
writer = SequenceFile.createWriter(tempFS, config, filePath, Text.class,
LongWritable.class, CompressionType.NONE);
writer.append(new Text(strFileName), new LongWritable(0l));
} finally {
if (writer != null) {
writer.close();
}
}
}
}
项目:hadoop
文件:UniformSizeInputFormat.java
private SequenceFile.Reader getListingFileReader(Configuration configuration) {
final Path listingFilePath = getListingFilePath(configuration);
try {
final FileSystem fileSystem = listingFilePath.getFileSystem(configuration);
if (!fileSystem.exists(listingFilePath))
throw new IllegalArgumentException("Listing file doesn't exist at: "
+ listingFilePath);
return new SequenceFile.Reader(configuration,
SequenceFile.Reader.file(listingFilePath));
}
catch (IOException exception) {
LOG.error("Couldn't find listing file at: " + listingFilePath, exception);
throw new IllegalArgumentException("Couldn't find listing-file at: "
+ listingFilePath, exception);
}
}
项目:hadoop
文件:TotalOrderPartitioner.java
/**
* Read the cut points from the given IFile.
* @param fs The file system
* @param p The path to read
* @param keyClass The map output key class
* @param job The job config
* @throws IOException
*/
// matching key types enforced by passing in
@SuppressWarnings("unchecked") // map output key class
private K[] readPartitions(FileSystem fs, Path p, Class<K> keyClass,
Configuration conf) throws IOException {
SequenceFile.Reader reader = new SequenceFile.Reader(fs, p, conf);
ArrayList<K> parts = new ArrayList<K>();
K key = ReflectionUtils.newInstance(keyClass, conf);
NullWritable value = NullWritable.get();
try {
while (reader.next(key, value)) {
parts.add(key);
key = ReflectionUtils.newInstance(keyClass, conf);
}
reader.close();
reader = null;
} finally {
IOUtils.cleanup(LOG, reader);
}
return parts.toArray((K[])Array.newInstance(keyClass, parts.size()));
}
项目:hadoop
文件:TestClientDistributedCacheManager.java
@SuppressWarnings("deprecation")
void createTempFile(Path p, Configuration conf) throws IOException {
SequenceFile.Writer writer = null;
try {
writer = SequenceFile.createWriter(fs, conf, p,
Text.class, Text.class,
CompressionType.NONE);
writer.append(new Text("text"), new Text("moretext"));
} catch(Exception e) {
throw new IOException(e.getLocalizedMessage());
} finally {
if (writer != null) {
writer.close();
}
writer = null;
}
LOG.info("created: " + p);
}
项目:wherehowsX
文件:SequenceFileAnalyzer.java
@Override
public DatasetJsonRecord getSchema(Path path) throws IOException {
DatasetJsonRecord record = null;
if (!fs.exists(path))
LOG.error("sequencefileanalyzer file : " + path.toUri().getPath() + " is not exist on hdfs");
else {
try {
LOG.info("sequencefileanalyzer start parse schema for file path : {}", path.toUri().getPath());
SequenceFile.Reader reader = new SequenceFile.Reader(fs.getConf(), SequenceFile.Reader.file(path));
String keyName = "Key";
String keyType = getWritableType(reader.getKeyClassName());
String valueName = "Value";
String valueType = getWritableType(reader.getValueClassName());
FileStatus status = fs.getFileStatus(path);
String storage = STORAGE_TYPE;
String abstractPath = path.toUri().getPath();
String codec = "sequence.codec";
String schemaString = "{\"fields\": [{\"name\": \"" + keyName + "\", \"type\": \"" + keyType + "\"}, {\"name\": \"" + valueName + "\", \"type\": \"" + valueType + "\"}], \"name\": \"Result\", \"namespace\": \"com.tencent.lake\", \"type\": \"record\"}";
record = new DatasetJsonRecord(schemaString, abstractPath, status.getModificationTime(), status.getOwner(), status.getGroup(),
status.getPermission().toString(), codec, storage, "");
LOG.info("sequencefileanalyzer parse path :{},schema is {}", path.toUri().getPath(), record.toCsvString());
} catch (Exception e) {
LOG.error("path : {} content " + " is not Sequence File format content ",path.toUri().getPath());
LOG.info(e.getStackTrace().toString());
}
}
return record;
}
项目:wherehowsX
文件:SequenceFileAnalyzer.java
@Override
public SampleDataRecord getSampleData(Path path) throws IOException {
SampleDataRecord dataRecord = null;
if (!fs.exists(path))
LOG.error("sequence file : " + path.toUri().getPath() + " is not exist on hdfs");
else {
try {
LOG.info("sequencefileanalyzer start parse sampledata for file path : {}", path.toUri().getPath());
SequenceFile.Reader reader = new SequenceFile.Reader(fs.getConf(), SequenceFile.Reader.file(path));
List<Object> sampleValues = new ArrayList<Object>();
Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), fs.getConf());
Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), fs.getConf());
int count = 0;
String keyName = "Key";
String valueName = "Value";
while (reader.next(key, value) && count < 12) {
sampleValues.add("{\"" + keyName + "\": \"" + key + "\", \"" + valueName + "\": \"" + value + "\"}");
count++;
}
dataRecord = new SampleDataRecord(path.toUri().getPath(), sampleValues);
LOG.info("sequence file path : {}, sample data is {}", path.toUri().getPath(), sampleValues);
} catch (Exception e) {
LOG.error("path : {} content " + " is not Sequence File format content ",path.toUri().getPath());
LOG.info(e.getStackTrace().toString());
}
}
return dataRecord;
}
项目:hadoop-oss
文件:Display.java
public TextRecordInputStream(FileStatus f) throws IOException {
final Path fpath = f.getPath();
final Configuration lconf = getConf();
r = new SequenceFile.Reader(lconf,
SequenceFile.Reader.file(fpath));
key = ReflectionUtils.newInstance(
r.getKeyClass().asSubclass(Writable.class), lconf);
val = ReflectionUtils.newInstance(
r.getValueClass().asSubclass(Writable.class), lconf);
inbuf = new DataInputBuffer();
outbuf = new DataOutputBuffer();
}
项目:ditb
文件:HashTable.java
private void readPartitionFile(FileSystem fs, Configuration conf, Path path)
throws IOException {
@SuppressWarnings("deprecation")
SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
ImmutableBytesWritable key = new ImmutableBytesWritable();
partitions = new ArrayList<ImmutableBytesWritable>();
while (reader.next(key)) {
partitions.add(new ImmutableBytesWritable(key.copyBytes()));
}
reader.close();
if (!Ordering.natural().isOrdered(partitions)) {
throw new IOException("Partitions are not ordered!");
}
}
项目:circus-train
文件:SimpleCopyListing.java
@VisibleForTesting
public void doBuildListing(SequenceFile.Writer fileListWriter, S3MapReduceCpOptions options, List<Path> globbedPaths)
throws IOException {
try {
for (Path path : globbedPaths) {
FileSystem sourceFS = path.getFileSystem(getConf());
path = makeQualified(path);
FileStatus rootStatus = sourceFS.getFileStatus(path);
Path sourcePathRoot = computeSourceRootPath(rootStatus, options);
LOG.info("Root source path is {}", sourcePathRoot);
FileStatus[] sourceFiles = sourceFS.listStatus(path);
boolean explore = (sourceFiles != null && sourceFiles.length > 0);
if (explore || rootStatus.isDirectory()) {
for (FileStatus sourceStatus : sourceFiles) {
if (sourceStatus.isFile()) {
LOG.debug("Recording source-path: {} for copy.", sourceStatus.getPath());
CopyListingFileStatus sourceCopyListingStatus = new CopyListingFileStatus(sourceStatus);
writeToFileListing(fileListWriter, sourceCopyListingStatus, sourcePathRoot, options);
}
if (isDirectoryAndNotEmpty(sourceFS, sourceStatus)) {
LOG.debug("Traversing non-empty source dir: {}", sourceStatus.getPath());
traverseNonEmptyDirectory(fileListWriter, sourceStatus, sourcePathRoot, options);
}
}
}
}
fileListWriter.close();
fileListWriter = null;
} finally {
IoUtil.closeSilently(LOG, fileListWriter);
}
}
项目:circus-train
文件:SimpleCopyListing.java
private SequenceFile.Writer getWriter(Path pathToListFile) throws IOException {
FileSystem fs = pathToListFile.getFileSystem(getConf());
if (fs.exists(pathToListFile)) {
fs.delete(pathToListFile, false);
}
return SequenceFile.createWriter(getConf(), SequenceFile.Writer.file(pathToListFile),
SequenceFile.Writer.keyClass(Text.class), SequenceFile.Writer.valueClass(CopyListingFileStatus.class),
SequenceFile.Writer.compression(SequenceFile.CompressionType.NONE));
}
项目:circus-train
文件:UniformSizeInputFormat.java
private SequenceFile.Reader getListingFileReader(Configuration configuration) {
final Path listingFilePath = getListingFilePath(configuration);
try {
final FileSystem fileSystem = listingFilePath.getFileSystem(configuration);
if (!fileSystem.exists(listingFilePath)) {
throw new IllegalArgumentException("Listing file doesn't exist at: " + listingFilePath);
}
return new SequenceFile.Reader(configuration, SequenceFile.Reader.file(listingFilePath));
} catch (IOException exception) {
LOG.error("Couldn't find listing file at: " + listingFilePath, exception);
throw new IllegalArgumentException("Couldn't find listing-file at: " + listingFilePath, exception);
}
}
项目:circus-train
文件:CopyListing.java
/**
* Validate the final resulting path listing. Checks if there are duplicate entries. If preserving ACLs, checks that
* file system can support ACLs. If preserving XAttrs, checks that file system can support XAttrs.
*
* @param pathToListFile path listing build by doBuildListing
* @param options Input options to S3MapReduceCp
* @throws IOException Any issues while checking for duplicates and throws
* @throws DuplicateFileException if there are duplicates
*/
private void validateFinalListing(Path pathToListFile, S3MapReduceCpOptions options)
throws DuplicateFileException, IOException {
Configuration config = getConf();
FileSystem fs = pathToListFile.getFileSystem(config);
Path sortedList = sortListing(fs, config, pathToListFile);
SequenceFile.Reader reader = new SequenceFile.Reader(config, SequenceFile.Reader.file(sortedList));
try {
Text lastKey = new Text("*"); // source relative path can never hold *
CopyListingFileStatus lastFileStatus = new CopyListingFileStatus();
Text currentKey = new Text();
while (reader.next(currentKey)) {
if (currentKey.equals(lastKey)) {
CopyListingFileStatus currentFileStatus = new CopyListingFileStatus();
reader.getCurrentValue(currentFileStatus);
throw new DuplicateFileException("File "
+ lastFileStatus.getPath()
+ " and "
+ currentFileStatus.getPath()
+ " would cause duplicates. Aborting");
}
reader.getCurrentValue(lastFileStatus);
lastKey.set(currentKey);
}
} finally {
IOUtils.closeStream(reader);
}
}
项目:circus-train
文件:CopyListing.java
/**
* Sort sequence file containing FileStatus and Text as key and value respecitvely
*
* @param fs File System
* @param conf Configuration
* @param sourceListing Source listing file
* @return Path of the sorted file. Is source file with _sorted appended to the name
* @throws IOException Any exception during sort.
*/
private static Path sortListing(FileSystem fs, Configuration conf, Path sourceListing) throws IOException {
SequenceFile.Sorter sorter = new SequenceFile.Sorter(fs, Text.class, CopyListingFileStatus.class, conf);
Path output = new Path(sourceListing.toString() + "_sorted");
if (fs.exists(output)) {
fs.delete(output, false);
}
sorter.sort(sourceListing, output);
return output;
}
项目:circus-train
文件:SimpleCopyListingTest.java
@Test(timeout = 10000)
public void buildListingForSingleFile() throws Exception {
FileSystem fs = cluster.getFileSystem();
String testRootString = "/tmp/source";
Path testRoot = new Path(testRootString);
if (fs.exists(testRoot)) {
delete(fs, testRootString);
}
Path sourceFile = new Path(testRoot, "foo/bar/source.txt");
Path decoyFile = new Path(testRoot, "target/mooc");
URI targetFile = URI.create("s3://bucket/target/moo/target.txt");
createFile(fs, sourceFile.toString());
createFile(fs, decoyFile.toString());
final Path listFile = new Path(testRoot, "/tmp/fileList.seq");
listing.buildListing(listFile, options(sourceFile, targetFile));
try (SequenceFile.Reader reader = new SequenceFile.Reader(CONFIG, SequenceFile.Reader.file(listFile))) {
CopyListingFileStatus fileStatus = new CopyListingFileStatus();
Text relativePath = new Text();
assertThat(reader.next(relativePath, fileStatus), is(true));
assertThat(relativePath.toString(), is("/source.txt"));
}
}
项目:circus-train
文件:SimpleCopyListingTest.java
@Test(timeout = 10000)
public void buildListingForMultipleSources() throws Exception {
FileSystem fs = cluster.getFileSystem();
String testRootString = "/tmp/source";
Path testRoot = new Path(testRootString);
if (fs.exists(testRoot)) {
delete(fs, testRootString);
}
Path sourceDir1 = new Path(testRoot, "foo/baz/");
Path sourceDir2 = new Path(testRoot, "foo/bang/");
Path sourceFile1 = new Path(testRoot, "foo/bar/source.txt");
URI target = URI.create("s3://bucket/target/moo/");
fs.mkdirs(sourceDir1);
fs.mkdirs(sourceDir2);
createFile(fs, new Path(sourceDir1, "baz_1.dat"));
createFile(fs, new Path(sourceDir1, "baz_2.dat"));
createFile(fs, new Path(sourceDir2, "bang_0.dat"));
createFile(fs, sourceFile1.toString());
final Path listFile = new Path(testRoot, "/tmp/fileList.seq");
listing.buildListing(listFile, options(Arrays.asList(sourceFile1, sourceDir1, sourceDir2), target));
try (SequenceFile.Reader reader = new SequenceFile.Reader(CONFIG, SequenceFile.Reader.file(listFile))) {
CopyListingFileStatus fileStatus = new CopyListingFileStatus();
Text relativePath = new Text();
assertThat(reader.next(relativePath, fileStatus), is(true));
assertThat(relativePath.toString(), is("/source.txt"));
assertThat(reader.next(relativePath, fileStatus), is(true));
assertThat(relativePath.toString(), is("/baz_1.dat"));
assertThat(reader.next(relativePath, fileStatus), is(true));
assertThat(relativePath.toString(), is("/baz_2.dat"));
assertThat(reader.next(relativePath, fileStatus), is(true));
assertThat(relativePath.toString(), is("/bang_0.dat"));
}
}
项目:ditb
文件:HashTable.java
void writePartitionFile(Configuration conf, Path path) throws IOException {
FileSystem fs = path.getFileSystem(conf);
@SuppressWarnings("deprecation")
SequenceFile.Writer writer = SequenceFile.createWriter(
fs, conf, path, ImmutableBytesWritable.class, NullWritable.class);
for (int i = 0; i < partitions.size(); i++) {
writer.append(partitions.get(i), NullWritable.get());
}
writer.close();
}