@Override public boolean begin(GenericFileOperations<T> operations, GenericFileEndpoint<T> endpoint, Exchange exchange, GenericFile<T> file) throws Exception { // must invoke super boolean result = super.begin(operations, endpoint, exchange, file); if (!result) { return false; } // okay we got the file then execute the begin renamer if (beginRenamer != null) { GenericFile<T> newName = beginRenamer.renameFile(exchange, file); GenericFile<T> to = renameFile(operations, file, newName); if (to != null) { to.bindToExchange(exchange); } } return true; }
@Override public void rollback(GenericFileOperations<T> operations, GenericFileEndpoint<T> endpoint, Exchange exchange, GenericFile<T> file) throws Exception { try { deleteLocalWorkFile(exchange); operations.releaseRetreivedFileResources(exchange); // moved the failed file if specifying the moveFailed option if (failureRenamer != null) { // create a copy and bind the file to the exchange to be used by the renamer to evaluate the file name Exchange copy = exchange.copy(); file.bindToExchange(copy); // must preserve message id copy.getIn().setMessageId(exchange.getIn().getMessageId()); copy.setExchangeId(exchange.getExchangeId()); GenericFile<T> newName = failureRenamer.renameFile(copy, file); renameFile(operations, file, newName); } } finally { // must release lock last if (exclusiveReadLockStrategy != null) { exclusiveReadLockStrategy.releaseExclusiveReadLockOnRollback(operations, file, exchange); } } }
@Override public void rollback(GenericFileOperations<T> operations, GenericFileEndpoint<T> endpoint, Exchange exchange, GenericFile<T> file) throws Exception { try { operations.releaseRetreivedFileResources(exchange); if (failureRenamer != null) { // create a copy and bind the file to the exchange to be used by the renamer to evaluate the file name Exchange copy = exchange.copy(); file.bindToExchange(copy); // must preserve message id copy.getIn().setMessageId(exchange.getIn().getMessageId()); copy.setExchangeId(exchange.getExchangeId()); GenericFile<T> newName = failureRenamer.renameFile(copy, file); renameFile(operations, file, newName); } } finally { if (exclusiveReadLockStrategy != null) { exclusiveReadLockStrategy.releaseExclusiveReadLockOnRollback(operations, file, exchange); } deleteLocalWorkFile(exchange); } }
@Override public void commit(GenericFileOperations<T> operations, GenericFileEndpoint<T> endpoint, Exchange exchange, GenericFile<T> file) throws Exception { try { if (commitRenamer != null) { // create a copy and bind the file to the exchange to be used by the renamer to evaluate the file name Exchange copy = exchange.copy(); file.bindToExchange(copy); // must preserve message id copy.getIn().setMessageId(exchange.getIn().getMessageId()); copy.setExchangeId(exchange.getExchangeId()); GenericFile<T> newName = commitRenamer.renameFile(copy, file); renameFile(operations, file, newName); } } finally { // must invoke super super.commit(operations, endpoint, exchange, file); } }
@Override public void prepareOnStartup(GenericFileOperations<File> operations, GenericFileEndpoint<File> endpoint) { if (deleteOrphanLockFiles) { String dir = endpoint.getConfiguration().getDirectory(); File file = new File(dir); LOG.debug("Prepare on startup by deleting orphaned lock files from: {}", dir); Pattern excludePattern = endpoint.getExclude() != null ? Pattern.compile(endpoint.getExclude()) : null; Pattern includePattern = endpoint.getInclude() != null ? Pattern.compile(endpoint.getInclude()) : null; String endpointPath = endpoint.getConfiguration().getDirectory(); StopWatch watch = new StopWatch(); deleteLockFiles(file, endpoint.isRecursive(), endpointPath, endpoint.getFilter(), endpoint.getAntFilter(), excludePattern, includePattern); // log anything that takes more than a second if (watch.taken() > 1000) { LOG.info("Prepared on startup by deleting orphaned lock files from: {} took {} millis to complete.", dir, watch.taken()); } } }
public void testTroubleDeletingFile() throws Exception { deleteCounter = 0; existsCounter = 0; @SuppressWarnings("unchecked") GenericFileEndpoint<Object> endpoint = context.getEndpoint("file://target/foo", GenericFileEndpoint.class); Exchange exchange = endpoint.createExchange(); GenericFile<Object> file = new GenericFile<Object>(); file.setAbsoluteFilePath("target/foo/me.txt"); GenericFileDeleteProcessStrategy<Object> strategy = new GenericFileDeleteProcessStrategy<Object>(); strategy.commit(new MyGenericFileOperations(), endpoint, exchange, file); assertEquals("Should have tried to delete file 2 times", 2, deleteCounter); assertEquals("Should have tried to delete file 2 times", 2, existsCounter); }
public void testCannotDeleteFile() throws Exception { deleteCounter = 0; existsCounter = 0; @SuppressWarnings("unchecked") GenericFileEndpoint<Object> endpoint = context.getEndpoint("file://target/foo", GenericFileEndpoint.class); Exchange exchange = endpoint.createExchange(); GenericFile<Object> file = new GenericFile<Object>(); file.setAbsoluteFilePath("target/foo/boom.txt"); GenericFileDeleteProcessStrategy<Object> strategy = new GenericFileDeleteProcessStrategy<Object>(); try { strategy.commit(new MyGenericFileOperations(), endpoint, exchange, file); fail("Should have thrown an exception"); } catch (GenericFileOperationFailedException e) { // expected } assertEquals("Should have tried to delete file 3 times", 3, deleteCounter); assertEquals("Should have tried to delete file 3 times", 3, existsCounter); }
@Override protected GenericFileEndpoint<ChannelSftp.LsEntry> buildFileEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { // get the base uri part before the options as they can be non URI valid such as the expression using $ chars // and the URI constructor will regard $ as an illegal character and we dont want to enforce end users to // to escape the $ for the expression (file language) String baseUri = uri; if (uri.indexOf("?") != -1) { baseUri = uri.substring(0, uri.indexOf("?")); } // lets make sure we create a new configuration as each endpoint can // customize its own version SftpConfiguration config = new SftpConfiguration(new URI(baseUri)); FtpUtils.ensureRelativeFtpDirectory(this, config); return new SftpEndpoint(uri, this, config); }
@Override protected GenericFileEndpoint<FTPFile> buildFileEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { String baseUri = getBaseUri(uri); // lets make sure we create a new configuration as each endpoint can customize its own version // must pass on baseUri to the configuration (see above) FtpConfiguration config = new FtpConfiguration(new URI(baseUri)); FtpUtils.ensureRelativeFtpDirectory(this, config); FtpEndpoint<FTPFile> answer = new FtpEndpoint<FTPFile>(uri, this, config); extractAndSetFtpClientConfigParameters(parameters, answer); extractAndSetFtpClientParameters(parameters, answer); return answer; }
@Override protected GenericFileEndpoint<FTPFile> buildFileEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { String baseUri = getBaseUri(uri); // lets make sure we create a new configuration as each endpoint can customize its own version // must pass on baseUri to the configuration (see above) FtpsConfiguration config = new FtpsConfiguration(new URI(baseUri)); FtpUtils.ensureRelativeFtpDirectory(this, config); FtpsEndpoint endpoint = new FtpsEndpoint(uri, this, config); extractAndSetFtpClientKeyStoreParameters(parameters, endpoint); extractAndSetFtpClientTrustStoreParameters(parameters, endpoint); extractAndSetFtpClientConfigParameters(parameters, endpoint); extractAndSetFtpClientParameters(parameters, endpoint); return endpoint; }
@Override protected void afterPropertiesSet(GenericFileEndpoint<File> endpoint) throws Exception { super.afterPropertiesSet(endpoint); if (endpoint instanceof LoadBalancedFileEndpoint) { LoadBalancedFileEndpoint lbEndpoint = (LoadBalancedFileEndpoint)endpoint; PriorityFileFilterFactory factory = lbEndpoint.getPriorityFileFilterFactory(); if (factory == null) { throw new ResolveEndpointFailedException(lbEndpoint.getEndpointUri(), "PriorityFileFilterFactory is null"); } updateFilter(lbEndpoint, factory); updateMaxMessagesPerPoll(lbEndpoint, factory); updateMove(lbEndpoint); } }
public boolean begin(GenericFileOperations<T> operations, GenericFileEndpoint<T> endpoint, Exchange exchange, GenericFile<T> file) throws Exception { // if we use exclusive read then acquire the exclusive read (waiting until we got it) if (exclusiveReadLockStrategy != null) { boolean lock = exclusiveReadLockStrategy.acquireExclusiveReadLock(operations, file, exchange); if (!lock) { // do not begin since we could not get the exclusive read lock return false; } } return true; }
public void abort(GenericFileOperations<T> operations, GenericFileEndpoint<T> endpoint, Exchange exchange, GenericFile<T> file) throws Exception { deleteLocalWorkFile(exchange); operations.releaseRetreivedFileResources(exchange); // must release lock last if (exclusiveReadLockStrategy != null) { exclusiveReadLockStrategy.releaseExclusiveReadLockOnAbort(operations, file, exchange); } }
public void commit(GenericFileOperations<T> operations, GenericFileEndpoint<T> endpoint, Exchange exchange, GenericFile<T> file) throws Exception { deleteLocalWorkFile(exchange); operations.releaseRetreivedFileResources(exchange); // must release lock last if (exclusiveReadLockStrategy != null) { exclusiveReadLockStrategy.releaseExclusiveReadLockOnCommit(operations, file, exchange); } }
public void rollback(GenericFileOperations<T> operations, GenericFileEndpoint<T> endpoint, Exchange exchange, GenericFile<T> file) throws Exception { deleteLocalWorkFile(exchange); operations.releaseRetreivedFileResources(exchange); // must release lock last if (exclusiveReadLockStrategy != null) { exclusiveReadLockStrategy.releaseExclusiveReadLockOnRollback(operations, file, exchange); } }
@Override protected GenericFileEndpoint<File> buildFileEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { //Use the base camel code to create the endpoint config, and then dump it into our endpoint GenericFileEndpoint<File> camelFileEndpoint = super.buildFileEndpoint(uri, remaining, parameters); LoadBalancedFileEndpoint answer = new LoadBalancedFileEndpoint(uri, this); answer.setFile(new File(remaining)); answer.setConfiguration(camelFileEndpoint.getConfiguration()); return answer; }
@Override public void prepareOnStartup(GenericFileOperations<File> operations, GenericFileEndpoint<File> endpoint) { // noop }
@Override public void commit(GenericFileOperations<T> operations, GenericFileEndpoint<T> endpoint, Exchange exchange, GenericFile<T> file) throws Exception { // special for file lock strategy as we must release that lock first before we can delete the file boolean releaseEager = exclusiveReadLockStrategy instanceof FileLockExclusiveReadLockStrategy; if (releaseEager) { exclusiveReadLockStrategy.releaseExclusiveReadLockOnCommit(operations, file, exchange); } try { deleteLocalWorkFile(exchange); operations.releaseRetreivedFileResources(exchange); int retries = 3; boolean deleted = false; while (retries > 0 && !deleted) { retries--; if (operations.deleteFile(file.getAbsoluteFilePath())) { // file is deleted deleted = true; break; } // some OS can report false when deleting but the file is still deleted // use exists to check instead boolean exits = operations.existsFile(file.getAbsoluteFilePath()); if (!exits) { deleted = true; } else { log.trace("File was not deleted at this attempt will try again in 1 sec.: {}", file); // sleep a bit and try again Thread.sleep(1000); } } if (!deleted) { throw new GenericFileOperationFailedException("Cannot delete file: " + file); } } finally { // must release lock last if (!releaseEager && exclusiveReadLockStrategy != null) { exclusiveReadLockStrategy.releaseExclusiveReadLockOnCommit(operations, file, exchange); } } }
@Override public void prepareOnStartup(GenericFileOperations<T> operations, GenericFileEndpoint<T> endpoint) throws Exception { // noop }
@Override public void prepareOnStartup(GenericFileOperations<File> operations, GenericFileEndpoint<File> endpoint) throws Exception { this.endpoint = endpoint; LOG.info("Using FileIdempotentRepositoryReadLockStrategy: {} on endpoint: {}", idempotentRepository, endpoint); }
public void prepareOnStartup(GenericFileOperations<T> operations, GenericFileEndpoint<T> endpoint) throws Exception { if (exclusiveReadLockStrategy != null) { exclusiveReadLockStrategy.prepareOnStartup(operations, endpoint); } }
public void setEndpoint(GenericFileEndpoint<Object> endpoint) { }
public void setEndpoint(GenericFileEndpoint<ChannelSftp.LsEntry> endpoint) { this.endpoint = (SftpEndpoint) endpoint; }
public void setEndpoint(GenericFileEndpoint<FTPFile> endpoint) { this.endpoint = (RemoteFileEndpoint<FTPFile>) endpoint; }
protected void afterPropertiesSet(GenericFileEndpoint<ChannelSftp.LsEntry> endpoint) throws Exception { // noop }
protected void afterPropertiesSet(GenericFileEndpoint<FTPFile> endpoint) throws Exception { // noop }
@Override public void prepareOnStartup(GenericFileOperations<FTPFile> tGenericFileOperations, GenericFileEndpoint<FTPFile> tGenericFileEndpoint) throws Exception { // noop }
@Override public void prepareOnStartup(GenericFileOperations<ChannelSftp.LsEntry> tGenericFileOperations, GenericFileEndpoint<ChannelSftp.LsEntry> tGenericFileEndpoint) throws Exception { // noop }
@Override public void setEndpoint(GenericFileEndpoint<ScpFile> endpoint) { this.endpoint = (ScpEndpoint)endpoint; }
@Override protected GenericFileEndpoint<ScpFile> buildFileEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { int query = uri.indexOf("?"); return new ScpEndpoint(uri, this, new ScpConfiguration(new URI(query >= 0 ? uri.substring(0, query) : uri))); }
protected void afterPropertiesSet(GenericFileEndpoint<ScpFile> endpoint) throws Exception { // noop }
@Override protected GenericFileEndpoint<ScpFile> buildFileEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { // TODO: revisit stripping the query part; should not be needed with valid uris int query = uri.indexOf("?"); return new ScpEndpoint(uri, this, new ScpConfiguration(new URI(query >= 0 ? uri.substring(0, query) : uri))); }
@Override public void setEndpoint(GenericFileEndpoint<ScpFile> endpoint) { this.endpoint = (ScpEndpoint) endpoint; }