private void initWebHdfs(Configuration conf) throws IOException { if (WebHdfsFileSystem.isEnabled(conf, HttpServer2.LOG)) { // set user pattern based on configuration file UserParam.setUserPattern(conf.get( DFSConfigKeys.DFS_WEBHDFS_USER_PATTERN_KEY, DFSConfigKeys.DFS_WEBHDFS_USER_PATTERN_DEFAULT)); // add authentication filter for webhdfs final String className = conf.get( DFSConfigKeys.DFS_WEBHDFS_AUTHENTICATION_FILTER_KEY, DFSConfigKeys.DFS_WEBHDFS_AUTHENTICATION_FILTER_DEFAULT); final String name = className; final String pathSpec = WebHdfsFileSystem.PATH_PREFIX + "/*"; Map<String, String> params = getAuthFilterParams(conf); HttpServer2.defineFilter(httpServer.getWebAppContext(), name, className, params, new String[] { pathSpec }); HttpServer2.LOG.info("Added filter '" + name + "' (class=" + className + ")"); // add webhdfs packages httpServer.addJerseyResourcePackage(NamenodeWebHdfsMethods.class .getPackage().getName() + ";" + Param.class.getPackage().getName(), pathSpec); } }
private void initWebHdfs(Configuration conf) throws IOException { // set user pattern based on configuration file UserParam.setUserPattern(conf.get( HdfsClientConfigKeys.DFS_WEBHDFS_USER_PATTERN_KEY, HdfsClientConfigKeys.DFS_WEBHDFS_USER_PATTERN_DEFAULT)); // add authentication filter for webhdfs final String className = conf.get( DFSConfigKeys.DFS_WEBHDFS_AUTHENTICATION_FILTER_KEY, DFSConfigKeys.DFS_WEBHDFS_AUTHENTICATION_FILTER_DEFAULT); final String name = className; final String pathSpec = WebHdfsFileSystem.PATH_PREFIX + "/*"; Map<String, String> params = getAuthFilterParams(conf); HttpServer2.defineFilter(httpServer.getWebAppContext(), name, className, params, new String[] { pathSpec }); HttpServer2.LOG.info("Added filter '" + name + "' (class=" + className + ")"); // add webhdfs packages httpServer.addJerseyResourcePackage(NamenodeWebHdfsMethods.class .getPackage().getName() + ";" + Param.class.getPackage().getName(), pathSpec); }
private void init(final UserGroupInformation ugi, final DelegationParam delegation, final String nnId, final UriFsPathParam path, final HttpOpParam<?> op, final Param<?, ?>... parameters) throws IOException { if (LOG.isTraceEnabled()) { LOG.trace("HTTP " + op.getValue().getType() + ": " + op + ", " + path + ", ugi=" + ugi + Param.toSortedString(", ", parameters)); } if (nnId == null) { throw new IllegalArgumentException(NamenodeAddressParam.NAME + " is not specified."); } //clear content type response.setContentType(null); if (UserGroupInformation.isSecurityEnabled()) { //add a token for RPC. final Token<DelegationTokenIdentifier> token = deserializeToken (delegation.getValue(), nnId); ugi.addToken(token); } }
Param<?,?>[] getAuthParameters(final HttpOpParam.Op op) throws IOException { List<Param<?,?>> authParams = Lists.newArrayList(); // Skip adding delegation token for token operations because these // operations require authentication. Token<?> token = null; if (UserGroupInformation.isSecurityEnabled() && !op.getRequireAuth()) { token = getDelegationToken(); } if (token != null) { authParams.add(new DelegationParam(token.encodeToUrlString())); } else { UserGroupInformation userUgi = ugi; UserGroupInformation realUgi = userUgi.getRealUser(); if (realUgi != null) { // proxy user authParams.add(new DoAsParam(userUgi.getShortUserName())); userUgi = realUgi; } authParams.add(new UserParam(userUgi.getShortUserName())); } return authParams.toArray(new Param<?,?>[0]); }
private void init(final UserGroupInformation ugi, final DelegationParam delegation, final InetSocketAddress nnRpcAddr, final UriFsPathParam path, final HttpOpParam<?> op, final Param<?, ?>... parameters) throws IOException { if (LOG.isTraceEnabled()) { LOG.trace("HTTP " + op.getValue().getType() + ": " + op + ", " + path + ", ugi=" + ugi + Param.toSortedString(", ", parameters)); } if (nnRpcAddr == null) { throw new IllegalArgumentException(NamenodeRpcAddressParam.NAME + " is not specified."); } //clear content type response.setContentType(null); if (UserGroupInformation.isSecurityEnabled()) { //add a token for RPC. final Token<DelegationTokenIdentifier> token = new Token<DelegationTokenIdentifier>(); token.decodeFromUrlString(delegation.getValue()); SecurityUtil.setTokenService(token, nnRpcAddr); token.setKind(DelegationTokenIdentifier.HDFS_DELEGATION_KIND); ugi.addToken(token); } }
private void init(final UserGroupInformation ugi, final DelegationParam delegation, final InetSocketAddress nnRpcAddr, final UriFsPathParam path, final HttpOpParam<?> op, final Param<?, ?>... parameters) throws IOException { if (LOG.isTraceEnabled()) { LOG.trace("HTTP " + op.getValue().getType() + ": " + op + ", " + path + ", ugi=" + ugi + Param.toSortedString(", ", parameters)); } if (nnRpcAddr == null) { throw new IllegalArgumentException( NamenodeRpcAddressParam.NAME + " is not specified."); } //clear content type response.setContentType(null); if (UserGroupInformation.isSecurityEnabled()) { //add a token for RPC. final Token<DelegationTokenIdentifier> token = new Token<>(); token.decodeFromUrlString(delegation.getValue()); SecurityUtil.setTokenService(token, nnRpcAddr); token.setKind(DelegationTokenIdentifier.HDFS_DELEGATION_KIND); ugi.addToken(token); } }
URL toUrl(final HttpOpParam.Op op, final Path fspath, final Param<?,?>... parameters) throws IOException { //initialize URI path and query final String path = PATH_PREFIX + (fspath == null? "/": makeQualified(fspath).toUri().getPath()); final String query = op.toQueryString() + '&' + new UserParam(ugi) + Param.toSortedString("&", parameters); final URL url; if (op.equals(PutOpParam.Op.RENEWDELEGATIONTOKEN) || op.equals(GetOpParam.Op.GETDELEGATIONTOKEN)) { // Skip adding delegation token for getting or renewing delegation token, // because these operations require kerberos authentication. url = getNamenodeURL(path, query); } else { url = getNamenodeURL(path, addDt2Query(query)); } if (LOG.isTraceEnabled()) { LOG.trace("url=" + url); } return url; }
private void init(final UserGroupInformation ugi, final DelegationParam delegation, final UriFsPathParam path, final HttpOpParam<?> op, final Param<?, ?>... parameters) throws IOException { if (LOG.isTraceEnabled()) { LOG.trace("HTTP " + op.getValue().getType() + ": " + op + ", " + path + ", ugi=" + ugi + Param.toSortedString(", ", parameters)); } //clear content type response.setContentType(null); if (UserGroupInformation.isSecurityEnabled()) { //add a token for RPC. final DataNode datanode = (DataNode)context.getAttribute("datanode"); final InetSocketAddress nnRpcAddr = NameNode.getAddress(datanode.getConf()); final Token<DelegationTokenIdentifier> token = new Token<DelegationTokenIdentifier>(); token.decodeFromUrlString(delegation.getValue()); SecurityUtil.setTokenService(token, nnRpcAddr); token.setKind(DelegationTokenIdentifier.HDFS_DELEGATION_KIND); ugi.addToken(token); } }
public static URL toUrl(final WebHdfsFileSystem webhdfs, final HttpOpParam.Op op, final Path fspath, final Param<?,?>... parameters) throws IOException { final URL url = webhdfs.toUrl(op, fspath, parameters); WebHdfsTestUtil.LOG.info("url=" + url); return url; }
@Test public void testWebHdfsOffsetAndLength() throws Exception{ MiniDFSCluster cluster = null; final Configuration conf = WebHdfsTestUtil.createConf(); final int OFFSET = 42; final int LENGTH = 512; final String PATH = "/foo"; byte[] CONTENTS = new byte[1024]; RANDOM.nextBytes(CONTENTS); try { cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); final WebHdfsFileSystem fs = WebHdfsTestUtil.getWebHdfsFileSystem(conf, WebHdfsFileSystem.SCHEME); try (OutputStream os = fs.create(new Path(PATH))) { os.write(CONTENTS); } InetSocketAddress addr = cluster.getNameNode().getHttpAddress(); URL url = new URL("http", addr.getHostString(), addr .getPort(), WebHdfsFileSystem.PATH_PREFIX + PATH + "?op=OPEN" + Param.toSortedString("&", new OffsetParam((long) OFFSET), new LengthParam((long) LENGTH)) ); HttpURLConnection conn = (HttpURLConnection) url.openConnection(); conn.setInstanceFollowRedirects(true); Assert.assertEquals(LENGTH, conn.getContentLength()); byte[] subContents = new byte[LENGTH]; byte[] realContents = new byte[LENGTH]; System.arraycopy(CONTENTS, OFFSET, subContents, 0, LENGTH); IOUtils.readFully(conn.getInputStream(), realContents); Assert.assertArrayEquals(subContents, realContents); } finally { if (cluster != null) { cluster.shutdown(); } } }
@Test public void testWebHdfsOffsetAndLength() throws Exception{ MiniDFSCluster cluster = null; final Configuration conf = WebHdfsTestUtil.createConf(); final int OFFSET = 42; final int LENGTH = 512; final String PATH = "/foo"; byte[] CONTENTS = new byte[1024]; RANDOM.nextBytes(CONTENTS); try { cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); final WebHdfsFileSystem fs = WebHdfsTestUtil.getWebHdfsFileSystem(conf, WebHdfsConstants.WEBHDFS_SCHEME); try (OutputStream os = fs.create(new Path(PATH))) { os.write(CONTENTS); } InetSocketAddress addr = cluster.getNameNode().getHttpAddress(); URL url = new URL("http", addr.getHostString(), addr .getPort(), WebHdfsFileSystem.PATH_PREFIX + PATH + "?op=OPEN" + Param.toSortedString("&", new OffsetParam((long) OFFSET), new LengthParam((long) LENGTH)) ); HttpURLConnection conn = (HttpURLConnection) url.openConnection(); conn.setInstanceFollowRedirects(true); Assert.assertEquals(LENGTH, conn.getContentLength()); byte[] subContents = new byte[LENGTH]; byte[] realContents = new byte[LENGTH]; System.arraycopy(CONTENTS, OFFSET, subContents, 0, LENGTH); IOUtils.readFully(conn.getInputStream(), realContents); Assert.assertArrayEquals(subContents, realContents); } finally { if (cluster != null) { cluster.shutdown(); } } }
URL toUrl(final HttpOpParam.Op op, final Path fspath, final Param<?,?>... parameters) throws IOException { //initialize URI path and query final String path = PATH_PREFIX + (fspath == null? "/": makeQualified(fspath).toUri().getRawPath()); final String query = op.toQueryString() + Param.toSortedString("&", getAuthParameters(op)) + Param.toSortedString("&", parameters); final URL url = getNamenodeURL(path, query); if (LOG.isTraceEnabled()) { LOG.trace("url=" + url); } return url; }
private void init(final UserGroupInformation ugi, final DelegationParam delegation, final UserParam username, final DoAsParam doAsUser, final UriFsPathParam path, final HttpOpParam<?> op, final Param<?, ?>... parameters) { if (LOG.isTraceEnabled()) { LOG.trace("HTTP " + op.getValue().getType() + ": " + op + ", " + path + ", ugi=" + ugi + ", " + username + ", " + doAsUser + Param.toSortedString(", ", parameters)); } //clear content type response.setContentType(null); }
private URI redirectURI(final NameNode namenode, final UserGroupInformation ugi, final DelegationParam delegation, final UserParam username, final DoAsParam doAsUser, final String path, final HttpOpParam.Op op, final long openOffset, final long blocksize, final Param<?, ?>... parameters) throws URISyntaxException, IOException { final Configuration conf = (Configuration)context.getAttribute(JspHelper.CURRENT_CONF); final DatanodeInfo dn = chooseDatanode(namenode, path, op, openOffset, blocksize, conf); final String delegationQuery; if (!UserGroupInformation.isSecurityEnabled()) { //security disabled delegationQuery = Param.toSortedString("&", doAsUser, username); } else if (delegation.getValue() != null) { //client has provided a token delegationQuery = "&" + delegation; } else { //generate a token final Token<? extends TokenIdentifier> t = generateDelegationToken( namenode, ugi, request.getUserPrincipal().getName()); delegationQuery = "&" + new DelegationParam(t.encodeToUrlString()); } final String query = op.toQueryString() + delegationQuery + "&" + new NamenodeRpcAddressParam(namenode) + Param.toSortedString("&", parameters); final String uripath = WebHdfsFileSystem.PATH_PREFIX + path; final URI uri = new URI("http", null, dn.getHostName(), dn.getInfoPort(), uripath, query, null); if (LOG.isTraceEnabled()) { LOG.trace("redirectURI=" + uri); } return uri; }
Param<?, ?>[] getAuthParameters(final HttpOpParam.Op op) throws IOException { List<Param<?, ?>> authParams = Lists.newArrayList(); // Skip adding delegation token for token operations because these // operations require authentication. boolean hasToken = false; if (UserGroupInformation.isSecurityEnabled() && op != GetOpParam.Op.GETDELEGATIONTOKEN && op != PutOpParam.Op.RENEWDELEGATIONTOKEN) { synchronized (this) { hasToken = (delegationToken != null); if (hasToken) { final String encoded = delegationToken.encodeToUrlString(); authParams.add(new DelegationParam(encoded)); } // else we are talking to an insecure cluster } } UserGroupInformation userUgi = ugi; if (!hasToken) { UserGroupInformation realUgi = userUgi.getRealUser(); if (realUgi != null) { // proxy user authParams.add(new DoAsParam(userUgi.getShortUserName())); userUgi = realUgi; } } authParams.add(new UserParam(userUgi.getShortUserName())); return authParams.toArray(new Param<?, ?>[0]); }
URL toUrl(final HttpOpParam.Op op, final Path fspath, final Param<?, ?>... parameters) throws IOException { //initialize URI path and query final String path = PATH_PREFIX + (fspath == null ? "/" : makeQualified(fspath).toUri().getPath()); final String query = op.toQueryString() + Param.toSortedString("&", getAuthParameters(op)) + Param.toSortedString("&", parameters); final URL url = getNamenodeURL(path, query); if (LOG.isTraceEnabled()) { LOG.trace("url=" + url); } return url; }
private URI redirectURI(final NameNode namenode, final UserGroupInformation ugi, final DelegationParam delegation, final UserParam username, final DoAsParam doAsUser, final String path, final HttpOpParam.Op op, final long openOffset, final long blocksize, final Param<?, ?>... parameters) throws URISyntaxException, IOException { final Configuration conf = (Configuration) context.getAttribute(JspHelper.CURRENT_CONF); final DatanodeInfo dn = chooseDatanode(namenode, path, op, openOffset, blocksize, conf); final String delegationQuery; if (!UserGroupInformation.isSecurityEnabled()) { //security disabled delegationQuery = Param.toSortedString("&", doAsUser, username); } else if (delegation.getValue() != null) { //client has provided a token delegationQuery = "&" + delegation; } else { //generate a token final Token<? extends TokenIdentifier> t = generateDelegationToken(namenode, ugi, request.getUserPrincipal().getName()); delegationQuery = "&" + new DelegationParam(t.encodeToUrlString()); } final String query = op.toQueryString() + delegationQuery + "&" + new NamenodeRpcAddressParam(namenode) + Param.toSortedString("&", parameters); final String uripath = WebHdfsFileSystem.PATH_PREFIX + path; final URI uri = new URI("http", null, dn.getHostName(), dn.getInfoPort(), uripath, query, null); if (LOG.isTraceEnabled()) { LOG.trace("redirectURI=" + uri); } return uri; }
public static URL toUrl(final WebHdfsFileSystem webhdfs, final HttpOpParam.Op op, final Path fspath, final Param<?, ?>... parameters) throws IOException { final URL url = webhdfs.toUrl(op, fspath, parameters); WebHdfsTestUtil.LOG.info("url=" + url); return url; }