@Override public void fetch(final FetchState fetchState, final Callback callback) { final Future<?> future = mExecutorService.submit( new Runnable() { @Override public void run() { HttpURLConnection connection = null; Uri uri = fetchState.getUri(); String scheme = uri.getScheme(); String uriString = fetchState.getUri().toString(); while (true) { String nextUriString; String nextScheme; InputStream is; try { connection = createConnection(uriString); nextUriString = connection.getHeaderField("Location"); nextScheme = (nextUriString == null) ? null : Uri.parse(nextUriString).getScheme(); if (nextUriString == null || nextScheme.equals(scheme)) { is = connection.getInputStream(); callback.onResponse(is, -1); break; } uriString = nextUriString; scheme = nextScheme; } catch (Exception e) { callback.onFailure(e); break; } finally { if (connection != null) { connection.disconnect(); } } } } }); fetchState.getContext().addCallbacks( new BaseProducerContextCallbacks() { @Override public void onCancellationRequested() { if (future.cancel(false)) { callback.onCancellation(); } } }); }