@Override protected void doReleaseExclusiveReadLock(GenericFileOperations<File> operations, GenericFile<File> file, Exchange exchange) throws Exception { // must call super super.doReleaseExclusiveReadLock(operations, file, exchange); FileLock lock = exchange.getProperty(asReadLockKey(file, Exchange.FILE_LOCK_EXCLUSIVE_LOCK), FileLock.class); RandomAccessFile rac = exchange.getProperty(asReadLockKey(file, Exchange.FILE_LOCK_EXCLUSIVE_LOCK), RandomAccessFile.class); String target = file.getFileName(); if (lock != null) { Channel channel = lock.acquiredBy(); try { lock.release(); } finally { // close channel as well IOHelper.close(channel, "while releasing exclusive read lock for file: " + target, LOG); IOHelper.close(rac, "while releasing exclusive read lock for file: " + target, LOG); } } }
@Override public boolean begin(GenericFileOperations<T> operations, GenericFileEndpoint<T> endpoint, Exchange exchange, GenericFile<T> file) throws Exception { // must invoke super boolean result = super.begin(operations, endpoint, exchange, file); if (!result) { return false; } // okay we got the file then execute the begin renamer if (beginRenamer != null) { GenericFile<T> newName = beginRenamer.renameFile(exchange, file); GenericFile<T> to = renameFile(operations, file, newName); if (to != null) { to.bindToExchange(exchange); } } return true; }
@Override public void rollback(GenericFileOperations<T> operations, GenericFileEndpoint<T> endpoint, Exchange exchange, GenericFile<T> file) throws Exception { try { deleteLocalWorkFile(exchange); operations.releaseRetreivedFileResources(exchange); // moved the failed file if specifying the moveFailed option if (failureRenamer != null) { // create a copy and bind the file to the exchange to be used by the renamer to evaluate the file name Exchange copy = exchange.copy(); file.bindToExchange(copy); // must preserve message id copy.getIn().setMessageId(exchange.getIn().getMessageId()); copy.setExchangeId(exchange.getExchangeId()); GenericFile<T> newName = failureRenamer.renameFile(copy, file); renameFile(operations, file, newName); } } finally { // must release lock last if (exclusiveReadLockStrategy != null) { exclusiveReadLockStrategy.releaseExclusiveReadLockOnRollback(operations, file, exchange); } } }
@Override public void rollback(GenericFileOperations<T> operations, GenericFileEndpoint<T> endpoint, Exchange exchange, GenericFile<T> file) throws Exception { try { operations.releaseRetreivedFileResources(exchange); if (failureRenamer != null) { // create a copy and bind the file to the exchange to be used by the renamer to evaluate the file name Exchange copy = exchange.copy(); file.bindToExchange(copy); // must preserve message id copy.getIn().setMessageId(exchange.getIn().getMessageId()); copy.setExchangeId(exchange.getExchangeId()); GenericFile<T> newName = failureRenamer.renameFile(copy, file); renameFile(operations, file, newName); } } finally { if (exclusiveReadLockStrategy != null) { exclusiveReadLockStrategy.releaseExclusiveReadLockOnRollback(operations, file, exchange); } deleteLocalWorkFile(exchange); } }
@Override public void commit(GenericFileOperations<T> operations, GenericFileEndpoint<T> endpoint, Exchange exchange, GenericFile<T> file) throws Exception { try { if (commitRenamer != null) { // create a copy and bind the file to the exchange to be used by the renamer to evaluate the file name Exchange copy = exchange.copy(); file.bindToExchange(copy); // must preserve message id copy.getIn().setMessageId(exchange.getIn().getMessageId()); copy.setExchangeId(exchange.getExchangeId()); GenericFile<T> newName = commitRenamer.renameFile(copy, file); renameFile(operations, file, newName); } } finally { // must invoke super super.commit(operations, endpoint, exchange, file); } }
@Override public boolean acquireExclusiveReadLock(GenericFileOperations<File> operations, GenericFile<File> file, Exchange exchange) throws Exception { if (!markerFile) { // if not using marker file then we assume acquired return true; } String lockFileName = getLockFileName(file); LOG.trace("Locking the file: {} using the lock file name: {}", file, lockFileName); // create a plain file as marker filer for locking (do not use FileLock) boolean acquired = FileUtil.createNewFile(new File(lockFileName)); // store read-lock state exchange.setProperty(asReadLockKey(file, Exchange.FILE_LOCK_FILE_ACQUIRED), acquired); exchange.setProperty(asReadLockKey(file, Exchange.FILE_LOCK_FILE_NAME), lockFileName); return acquired; }
protected void doReleaseExclusiveReadLock(GenericFileOperations<File> operations, GenericFile<File> file, Exchange exchange) throws Exception { if (!markerFile) { // if not using marker file then nothing to release return; } boolean acquired = exchange.getProperty(asReadLockKey(file, Exchange.FILE_LOCK_FILE_ACQUIRED), false, Boolean.class); // only release the file if camel get the lock before if (acquired) { String lockFileName = exchange.getProperty(asReadLockKey(file, Exchange.FILE_LOCK_FILE_NAME), String.class); File lock = new File(lockFileName); if (lock.exists()) { LOG.trace("Unlocking file: {}", lockFileName); boolean deleted = FileUtil.deleteFile(lock); LOG.trace("Lock file: {} was deleted: {}", lockFileName, deleted); } } }
@Override public boolean acquireExclusiveReadLock(GenericFileOperations<File> operations, GenericFile<File> file, Exchange exchange) throws Exception { // in clustered mode then another node may have processed the file so we must check here again if the file exists File path = file.getFile(); if (!path.exists()) { return false; } // check if we can begin on this file String key = asKey(file); boolean answer = idempotentRepository.add(key); if (!answer) { // another node is processing the file so skip CamelLogger.log(LOG, readLockLoggingLevel, "Cannot acquire read lock. Will skip the file: " + file); } return answer; }
protected GenericFile<T> renameFile(GenericFileOperations<T> operations, GenericFile<T> from, GenericFile<T> to) throws IOException { // deleting any existing files before renaming try { operations.deleteFile(to.getAbsoluteFilePath()); } catch (GenericFileOperationFailedException e) { // ignore the file does not exists } // make parent folder if missing boolean mkdir = operations.buildDirectory(to.getParent(), to.isAbsolute()); if (!mkdir) { throw new GenericFileOperationFailedException("Cannot create directory: " + to.getParent() + " (could be because of denied permissions)"); } log.debug("Renaming file: {} to: {}", from, to); boolean renamed = operations.renameFile(from.getAbsoluteFilePath(), to.getAbsoluteFilePath()); if (!renamed) { throw new GenericFileOperationFailedException("Cannot rename file: " + from + " to: " + to); } return to; }
public Exchange createExchange() { // create the file String uri = "file://target/filelanguage?fileExist=Override"; template.sendBodyAndHeader(uri, "Hello World", Exchange.FILE_NAME, "test/hello.txt"); // get the file handle file = new File("target/filelanguage/test/hello.txt"); GenericFile<File> gf = FileConsumer.asGenericFile("target/filelanguage", file, null, false); FileEndpoint endpoint = getMandatoryEndpoint(uri, FileEndpoint.class); Exchange answer = endpoint.createExchange(gf); endpoint.configureMessage(gf, answer.getIn()); Calendar cal = Calendar.getInstance(); cal.set(1974, Calendar.APRIL, 20); answer.getIn().setHeader("birthday", cal.getTime()); cal.set(2008, Calendar.AUGUST, 8); answer.getOut().setHeader("special", cal.getTime()); return answer; }
public Exchange createExchange() { // create the file String uri = "file://target/filelanguage?fileExist=Override"; template.sendBodyAndHeader(uri, "Bye World", Exchange.FILE_NAME, "test/bye.def.txt"); // get the file handle file = new File("target/filelanguage/test/bye.def.txt"); GenericFile<File> gf = FileConsumer.asGenericFile("target/filelanguage", file, null, false); FileEndpoint endpoint = getMandatoryEndpoint(uri, FileEndpoint.class); Exchange answer = endpoint.createExchange(gf); endpoint.configureMessage(gf, answer.getIn()); Calendar cal = Calendar.getInstance(); cal.set(1974, Calendar.APRIL, 20); answer.getIn().setHeader("birthday", cal.getTime()); cal.set(2008, Calendar.AUGUST, 8); answer.getOut().setHeader("special", cal.getTime()); return answer; }
public void testTroubleDeletingFile() throws Exception { deleteCounter = 0; existsCounter = 0; @SuppressWarnings("unchecked") GenericFileEndpoint<Object> endpoint = context.getEndpoint("file://target/foo", GenericFileEndpoint.class); Exchange exchange = endpoint.createExchange(); GenericFile<Object> file = new GenericFile<Object>(); file.setAbsoluteFilePath("target/foo/me.txt"); GenericFileDeleteProcessStrategy<Object> strategy = new GenericFileDeleteProcessStrategy<Object>(); strategy.commit(new MyGenericFileOperations(), endpoint, exchange, file); assertEquals("Should have tried to delete file 2 times", 2, deleteCounter); assertEquals("Should have tried to delete file 2 times", 2, existsCounter); }
public void testCannotDeleteFile() throws Exception { deleteCounter = 0; existsCounter = 0; @SuppressWarnings("unchecked") GenericFileEndpoint<Object> endpoint = context.getEndpoint("file://target/foo", GenericFileEndpoint.class); Exchange exchange = endpoint.createExchange(); GenericFile<Object> file = new GenericFile<Object>(); file.setAbsoluteFilePath("target/foo/boom.txt"); GenericFileDeleteProcessStrategy<Object> strategy = new GenericFileDeleteProcessStrategy<Object>(); try { strategy.commit(new MyGenericFileOperations(), endpoint, exchange, file); fail("Should have thrown an exception"); } catch (GenericFileOperationFailedException e) { // expected } assertEquals("Should have tried to delete file 3 times", 3, deleteCounter); assertEquals("Should have tried to delete file 3 times", 3, existsCounter); }
@Override protected boolean pollDirectory(String fileName, List<GenericFile<ChannelSftp.LsEntry>> fileList, int depth) { String currentDir = null; if (isStepwise()) { // must remember current dir so we stay in that directory after the poll currentDir = operations.getCurrentDirectory(); } // strip trailing slash fileName = FileUtil.stripTrailingSeparator(fileName); boolean answer = doPollDirectory(fileName, null, fileList, depth); if (currentDir != null) { operations.changeCurrentDirectory(currentDir); } return answer; }
/** * Executes doPollDirectory and on exception checks if it can be ignored by calling ignoreCannotRetrieveFile . * * @param absolutePath The path of the directory to poll * @param dirName The name of the directory to poll * @param fileList current list of files gathered * @param depth the current depth of the directory * @return whether or not to continue polling, <tt>false</tt> means the maxMessagesPerPoll limit has been hit * @throws GenericFileOperationFailedException if the exception during doPollDirectory can not be ignored */ protected boolean doSafePollSubDirectory(String absolutePath, String dirName, List<GenericFile<T>> fileList, int depth) { try { log.trace("Polling sub directory: {} from: {}", absolutePath, endpoint); //Try to poll the directory return doPollDirectory(absolutePath, dirName, fileList, depth); } catch (Exception e) { log.debug("Caught exception " + e.getMessage()); if (ignoreCannotRetrieveFile(absolutePath, null, e)) { log.trace("Ignoring file error " + e.getMessage() + " for " + absolutePath); //indicate no files in this directory to poll, continue with fileList return true; } else { log.trace("Not ignoring file error " + e.getMessage() + " for " + absolutePath); if (e instanceof GenericFileOperationFailedException) { throw (GenericFileOperationFailedException) e; } else { throw new GenericFileOperationFailedException("Cannot poll sub-directory: " + absolutePath + " from: " + endpoint, e); } } } }
@Override protected boolean pollDirectory(String fileName, List<GenericFile<FTPFile>> fileList, int depth) { String currentDir = null; if (isStepwise()) { // must remember current dir so we stay in that directory after the poll currentDir = operations.getCurrentDirectory(); } // strip trailing slash fileName = FileUtil.stripTrailingSeparator(fileName); boolean answer = doPollDirectory(fileName, null, fileList, depth); if (currentDir != null) { operations.changeCurrentDirectory(currentDir); } return answer; }
@Test public void testSftpSimpleConsume() throws Exception { if (!canTest()) { return; } String expected = "Hello World"; // create file using regular file template.sendBodyAndHeader("file://" + FTP_ROOT_DIR, expected, Exchange.FILE_NAME, "hello.txt"); MockEndpoint mock = getMockEndpoint("mock:result"); mock.expectedMessageCount(1); mock.expectedHeaderReceived(Exchange.FILE_NAME, "hello.txt"); mock.expectedBodiesReceived(expected); context.startRoute("foo"); assertMockEndpointsSatisfied(); GenericFile<?> remoteFile = mock.getExchanges().get(0).getIn().getBody(GenericFile.class); assertTrue(remoteFile.getBody() instanceof InputStream); }
@Test public void testSftpSimpleConsume() throws Exception { if (!canTest()) { return; } String expected = "Hello World"; String expected2 = "Goodbye World"; // create file using regular file template.sendBodyAndHeader("file://" + FTP_ROOT_DIR, expected, Exchange.FILE_NAME, "hello.txt"); template.sendBodyAndHeader("file://" + FTP_ROOT_DIR, expected2, Exchange.FILE_NAME, "goodbye.txt"); MockEndpoint mock = getMockEndpoint("mock:result"); mock.expectedMessageCount(2); mock.expectedBodiesReceivedInAnyOrder(expected, expected2); context.startRoute("foo"); assertMockEndpointsSatisfied(); GenericFile<?> remoteFile1 = mock.getExchanges().get(0).getIn().getBody(GenericFile.class); GenericFile<?> remoteFile2 = mock.getExchanges().get(1).getIn().getBody(GenericFile.class); assertTrue(remoteFile1.getBody() instanceof InputStream); assertTrue(remoteFile2.getBody() instanceof InputStream); }
@Test public void testFtpSimpleConsumeAbsolute() throws Exception { if (!canTest()) { return; } String expected = "Hello World"; // create file using regular file // FTP Server does not support absolute path, so lets simulate it String path = FTP_ROOT_DIR + "/tmp/mytemp"; template.sendBodyAndHeader("file:" + path, expected, Exchange.FILE_NAME, "hello.txt"); MockEndpoint mock = getMockEndpoint("mock:result"); mock.expectedMessageCount(1); mock.expectedHeaderReceived(Exchange.FILE_NAME, "hello.txt"); context.startRoute("foo"); assertMockEndpointsSatisfied(); GenericFile<?> remoteFile = (GenericFile<?>) mock.getExchanges().get(0).getIn().getBody(); assertTrue(remoteFile.getBody() instanceof InputStream); }
@FallbackConverter public static Object convertTo(Class<?> type, Exchange exchange, Object value, TypeConverterRegistry registry) { // use a fallback type converter so we can convert the embedded body if the value is GenericFile if (GenericFile.class.isAssignableFrom(value.getClass())) { GenericFile<?> file = (GenericFile<?>) value; Class<?> from = file.getBody().getClass(); // maybe from is already the type we want if (from.isAssignableFrom(type)) { return file.getBody(); } // no then try to lookup a type converter TypeConverter tc = registry.lookup(type, from); if (tc != null) { Object body = file.getBody(); return tc.convertTo(type, exchange, body); } } return null; }
@FallbackConverter @SuppressWarnings("unchecked") public static <T extends Payload> T convertTo(Class<T> type, Exchange exchange, Object value, TypeConverterRegistry registry) throws IOException { Class<?> sourceType = value.getClass(); if (GenericFile.class.isAssignableFrom(sourceType)) { GenericFile<?> genericFile = (GenericFile<?>) value; if (genericFile.getFile() != null) { Class<?> genericFileType = genericFile.getFile().getClass(); TypeConverter converter = registry.lookup(Payload.class, genericFileType); if (converter != null) { return (T) converter.convertTo(Payload.class, genericFile.getFile()); } } } return null; }
@Converter public static BoxFileUploadRequestObject genericFileToBoxFileUploadRequestObject(GenericFile<?> file, Exchange exchange) throws Exception { String folderId = ROOT_FOLDER; if (exchange != null && exchange.getIn() != null) { folderId = exchange.getIn().getHeader(PROPERTY_FOLDER_ID_DELIMITED, folderId, String.class); // support camel case CamelBoxFolderId folderId = exchange.getIn().getHeader(PROPERTY_FOLDER_ID, folderId, String.class); } if (file.getFile() instanceof File) { // prefer to use a file input stream if its a java.io.File File f = (File) file.getFile(); return BoxFileUploadRequestObject.uploadFileRequestObject(folderId, file.getFileName(), f); } if (exchange != null) { // otherwise ensure the body is loaded as we want the input stream of the body file.getBinding().loadContent(exchange, file); InputStream is = exchange.getContext().getTypeConverter().convertTo(InputStream.class, exchange, file.getBody()); return BoxFileUploadRequestObject.uploadFileRequestObject(folderId, file.getFileName(), is); } return null; }
/** * Only accept if the counter is the same as the priority * i.e: first priority gets first file, second priority gets second file, and so on * * @param file * @return */ @Override public boolean accept(GenericFile<T> file) { if (possiblePriorities == null || possiblePriorities.size() <= 0) { throw new IllegalStateException("Possible priorities is null or empty. Have you called init?"); } boolean isPastMaxMessagesPerPoll = counter.get() > maxMessagesPerPoll; if (isPastMaxMessagesPerPoll) { counter.set(0); } int currentCount = counter.getAndIncrement(); boolean isMatched = possiblePriorities.contains(currentCount); LOG.debug("{}, isMatched: {}", toString(), isMatched); return isMatched; }
@Test public void acceptOnlyOneFileStartingAt0() { PriorityFileFilter<File> filter = new PriorityFileFilter<File>(0, 3, 1); filter.init(); for (int i = 0; i < 1; i++) { GenericFile<File> file1 = new GenericFile<File>(); file1.setFileName("file0"); Boolean answer = filter.accept(file1); Assert.assertNotNull(answer); if (i == 0) { Assert.assertTrue(answer); } else { Assert.assertFalse(answer); } } }
@Test public void acceptOnlyOneFileStartingAt1() { PriorityFileFilter<File> filter = new PriorityFileFilter<File>(1, 3, 1); filter.init(); for (int i = 0; i < 1; i++) { GenericFile<File> file1 = new GenericFile<File>(); file1.setFileName("file1"); Boolean answer = filter.accept(file1); Assert.assertNotNull(answer); if (i == 1) { Assert.assertTrue(answer); } else { Assert.assertFalse(answer); } } }
@Test public void acceptFourFilesStartingAt0() { PriorityFileFilter<File> filter = new PriorityFileFilter<File>(0, 3, 10); filter.init(); for (int i = 0; i < 10; i++) { GenericFile<File> file1 = new GenericFile<File>(); file1.setFileName("file0"); Boolean answer = filter.accept(file1); Assert.assertNotNull(answer); if (i == 0 || i == 3 || i == 6 || i == 9) { Assert.assertTrue(answer); } else { Assert.assertFalse(answer); } } }
@Test public void accept3FilesStartingAt1() { PriorityFileFilter<File> filter = new PriorityFileFilter<File>(1, 3, 10); filter.init(); for (int i = 0; i < 10; i++) { GenericFile<File> file1 = new GenericFile<File>(); file1.setFileName("file0"); Boolean answer = filter.accept(file1); Assert.assertNotNull(answer); if (i == 1 || i == 4 || i == 7) { Assert.assertTrue(answer); } else { Assert.assertFalse(answer); } } }
@Test public void accept7FilesStartingAt0Until20() { PriorityFileFilter<File> filter = new PriorityFileFilter<File>(0, 3, 20); filter.init(); for (int i = 0; i < 20; i++) { GenericFile<File> file1 = new GenericFile<File>(); file1.setFileName("file0"); Boolean answer = filter.accept(file1); Assert.assertNotNull(answer); if (i == 0 || i == 3 || i == 6 || i == 9 || i == 12 || i == 15 || i == 18) { Assert.assertTrue(answer); } else { Assert.assertFalse(answer); } } }
@Override public void process( Exchange exchng ) throws Exception { Status tweet = exchng.getIn().getBody( Status.class ); String text = tweet.getText(); System.out.println( text ); if ( tweet.getMediaEntities() != null && tweet.getMediaEntities().length > 0 ) { System.out.println( String.format( "\tMedias asociados: %d. %s", tweet.getMediaEntities().length, tweet.getMediaEntities()[0].getMediaURL() ) ); } // process text from tweet GenericFile gf = new GenericFile(); gf.setFileName( "tweet" + tweet.getId() + ".txt" ); gf.setCharset( "UTF-8" ); gf.setBody( text.getBytes( "UTF-8" ) ); exchng.getOut().setBody( gf ); }
@SuppressWarnings("rawtypes") public void process(Exchange exchange) throws Exception { final File file = (File) ((GenericFile) exchange.getIn().getBody()).getFile(); log.info("***** START processing file : " + file.getName()); final FileInputStream fileInputStream = new FileInputStream(file);; try { byte[] charsReaded = new byte[2048]; while (fileInputStream.read(charsReaded) != -1) { System.out.println("-- Le�da l�nea del fichero " + file.getName()); } } finally { fileInputStream.close(); } log.info("***** END processing file : " + file.getName()); }
private static String asReadLockKey(GenericFile file, String key) { // use the copy from absolute path as that was the original path of the file when the lock was acquired // for example if the file consumer uses preMove then the file is moved and therefore has another name // that would no longer match String path = file.getCopyFromAbsoluteFilePath() != null ? file.getCopyFromAbsoluteFilePath() : file.getAbsoluteFilePath(); return path + "-" + key; }
@Override public boolean acquireExclusiveReadLock(GenericFileOperations<File> operations, GenericFile<File> file, Exchange exchange) throws Exception { // must call marker first if (markerFile && !marker.acquireExclusiveReadLock(operations, file, exchange)) { return false; } return super.acquireExclusiveReadLock(operations, file, exchange); }
@Override public void releaseExclusiveReadLockOnAbort(GenericFileOperations<File> operations, GenericFile<File> file, Exchange exchange) throws Exception { // must call marker first try { if (markerFile) { marker.releaseExclusiveReadLockOnAbort(operations, file, exchange); } } finally { super.releaseExclusiveReadLockOnAbort(operations, file, exchange); } }
@Override public void releaseExclusiveReadLockOnRollback(GenericFileOperations<File> operations, GenericFile<File> file, Exchange exchange) throws Exception { // must call marker first try { if (markerFile) { marker.releaseExclusiveReadLockOnRollback(operations, file, exchange); } } finally { super.releaseExclusiveReadLockOnRollback(operations, file, exchange); } }
@Override public void releaseExclusiveReadLockOnCommit(GenericFileOperations<File> operations, GenericFile<File> file, Exchange exchange) throws Exception { // must call marker first try { if (markerFile) { marker.releaseExclusiveReadLockOnCommit(operations, file, exchange); } } finally { super.releaseExclusiveReadLockOnCommit(operations, file, exchange); } }
public GenericFile<T> renameFile(Exchange exchange, GenericFile<T> file) { ObjectHelper.notNull(expression, "expression"); String newName = expression.evaluate(exchange, String.class); // make a copy as result and change its file name GenericFile<T> result = file.copyFrom(file); result.changeFileName(newName); return result; }
@Override public void releaseExclusiveReadLockOnRollback(GenericFileOperations<File> operations, GenericFile<File> file, Exchange exchange) throws Exception { String key = asKey(file); if (removeOnRollback) { idempotentRepository.remove(key); } else { // okay we should not remove then confirm it instead idempotentRepository.confirm(key); } }