Apache Flume获取Twitter数据 Apache Flume配置 Apache Flume序列发生器源 使用Flume,我们可以从各种服务中获取数据并将其传输到集中存储(HDFS和HBase)。本章介绍如何从Twitter服务获取数据并使用Apache Flume将其存储在HDFS中。 正如Flume Architecture中所讨论的,Web服务器生成日志数据,这些数据由Flume中的代理收集。通道将此数据缓冲到接收器,最终将其推送到集中存储。 在本章提供的示例中,我们将使用Apache Flume提供的实验性Twitter源创建一个应用程序并从中获取推文。我们将使用内存通道缓冲这些推文和HDFS接收器,将这些推文推送到HDFS。 要获取Twitter数据,我们必须遵循以下步骤 创建一个Twitter应用程序 安装/启动HDFS 配置Flume 创建Twitter应用程序 为了从Twitter获取推文,需要创建一个Twitter应用程序。按照以下步骤创建Twitter应用程序。 步骤1 要创建Twitter应用程序,请单击以下链接https://apps.twitter.com/。登录您的Twitter帐户。您将拥有一个Twitter应用程序管理窗口,您可以在其中创建,删除和管理Twitter应用程序。 第2步 单击 创建新应用程序 按钮。您将被重定向到一个窗口,在该窗口中您将获得一个申请表,您必须填写您的详细信息才能创建应用程序。填写网站地址时,请提供完整的网址格式,例如http://example.com 第3步 填写详细信息,完成后接受 开发者协议 ,单击页面底部的 创建您的Twitter应用程序按钮。如果一切顺利,将使用给定的详细信息创建应用程序,如下所示。 步骤4 在页面底部的 键和访问标记 选项卡下,您可以观察名为 创建访问令牌 的按钮。单击它以生成访问令牌。 第5步 最后,单击页面右侧顶部的“ 测试OAuth” 按钮。这将导致显示您的 使用者密钥,使用者密钥,访问令牌 和 访问令牌密钥的页面。复制这些细节。这些对于在Flume中配置代理很有用。 启动HDFS 由于我们将数据存储在HDFS中,因此我们需要安装/验证Hadoop。启动Hadoop并在其中创建一个文件夹来存储Flume数据。在配置Flume之前,请按照以下步骤进行操作。 第1步:安装/验证Hadoop 安装Hadoop环境安装。如果系统中已安装Hadoop,请使用Hadoop version命令验证安装,如下所示。 $ hadoop version 如果您的系统包含Hadoop,并且您已设置路径变量,那么您将获得以下输出 Hadoop 2.6.0 Subversion https://git-wip-us.apache.org/repos/asf/hadoop.git -r e3496499ecb8d220fba99dc5ed4c99c8f9e33bb1 Compiled by jenkins on 2014-11-13T21:10Z Compiled with protoc 2.5.0 From source with checksum 18e43357c8f927c0695f1e9522859d6a This command was run using /home/Hadoop/hadoop/share/hadoop/common/hadoop-common-2.6.0.jar 第2步:启动Hadoop 浏览Hadoop 的 sbin 目录并启动yarn和Hadoop dfs(分布式文件系统),如下所示。 cd /$Hadoop_Home/sbin/ $ start-dfs.sh localhost: starting namenode, logging to /home/Hadoop/hadoop/logs/hadoop-Hadoop-namenode-localhost.localdomain.out localhost: starting datanode, logging to /home/Hadoop/hadoop/logs/hadoop-Hadoop-datanode-localhost.localdomain.out Starting secondary namenodes [0.0.0.0] starting secondarynamenode, logging to /home/Hadoop/hadoop/logs/hadoop-Hadoop-secondarynamenode-localhost.localdomain.out $ start-yarn.sh starting yarn daemons starting resourcemanager, logging to /home/Hadoop/hadoop/logs/yarn-Hadoop-resourcemanager-localhost.localdomain.out localhost: starting nodemanager, logging to /home/Hadoop/hadoop/logs/yarn-Hadoop-nodemanager-localhost.localdomain.out 第3步:在HDFS中创建目录 在Hadoop DFS中,您可以使用命令 mkdir 创建目录。浏览它并在所需路径中创建名为 twitter_data 的目录,如下所示。 $cd /$Hadoop_Home/bin/ $ hdfs dfs -mkdir hdfs://localhost:9000/user/Hadoop/twitter_data 配置Flume 我们必须使用 conf 文件夹中的配置文件配置源,通道和接收器。本章给出的示例使用Apache Flume提供的名为 Twitter 1%Firehose Memory channel和HDFS sink 的实验源。 Twitter 1%Firehose来源 这个来源是高度实验性的。它使用流API连接到1%的示例Twitter Firehose,并不断下载推文,将它们转换为Avro格式,并将Avro事件发送到下游的Flume接收器。 我们将默认获得此源以及Flume的安装。与此源对应的 jar 文件可以位于 lib 文件夹中,如下所示。 设置类路径 将 classpath 变量设置为 Flume-env.sh 文件中Flume 的 lib 文件夹,如下所示。 ** export CLASSPATH=$CLASSPATH:/FLUME_HOME/lib/* 此源需要Twitter应用程序的 Consumer key,Consumer secret,Access token 和 Access token secret 等详细信息。配置此源时,您必须为以下属性提供值 - 通道 来源类型:org.apache.flume.source.twitter.TwitterSource consumerKey - OAuth使用者密钥 consumerSecret - OAuth消费者秘密 accessToken - OAuth访问令牌 accessTokenSecret - OAuth令牌密钥 maxBatchSize - 应该在Twitter批处理中的最大Twitter消息数。 默认值为1000(可选)。 maxBatchDurationMillis - 关闭批处理之前等待的最大毫秒数。 默认值为1000(可选)。 渠道 我们正在使用内存通道。要配置内存通道, 必须 为通道类型提供值。 type - 它保存通道的类型。 在我们的示例中,类型是 MemChannel 。 容量 - 它是通道中存储的最大事件数。 其默认值为100(可选)。 TransactionCapacity - 它是通道接受或发送的最大事件数。 其默认值为100(可选)。 HDFS接收器 该接收器将数据写入HDFS。要配置此接收器,您 必须 提供以下详细信息。 渠道 type - hdfs hdfs.path - HDFS中要存储数据的目录的路径。 我们可以根据场景提供一些可选值。下面给出了我们在应用程序中配置的HDFS接收器的可选属性。 fileType - 这是我们的HDFS文件所需的文件格式。 SequenceFile,DataStream 和 CompressedStream 是此流可用的三种类型。在我们的示例中,我们使用的是 DataStream 。 writeFormat - 可以是文本也可以是可写的。 batchSize - 在将文件刷新到HDFS之前写入文件的事件数。 其默认值为100。 rollsize - 触发滚动的文件大小。 它的默认值是100。 rollCount - 它是在滚动之前写入文件的事件数。 其默认值为10。 示例 - 配置文件 下面给出了配置文件的示例。复制此内容并将其保存为Flume的conf文件夹中的 twitter.conf 。 # Naming the components on the current agent. TwitterAgent.sources = Twitter TwitterAgent.channels = MemChannel TwitterAgent.sinks = HDFS # Describing/Configuring the source TwitterAgent.sources.Twitter.type = org.apache.flume.source.twitter.TwitterSource TwitterAgent.sources.Twitter.consumerKey = Your OAuth consumer key TwitterAgent.sources.Twitter.consumerSecret = Your OAuth consumer secret TwitterAgent.sources.Twitter.accessToken = Your OAuth consumer key access token TwitterAgent.sources.Twitter.accessTokenSecret = Your OAuth consumer key access token secret TwitterAgent.sources.Twitter.keywords = tutorials point,java, bigdata, mapreduce, mahout, hbase, nosql # Describing/Configuring the sink TwitterAgent.sinks.HDFS.type = hdfs TwitterAgent.sinks.HDFS.hdfs.path = hdfs://localhost:9000/user/Hadoop/twitter_data/ TwitterAgent.sinks.HDFS.hdfs.fileType = DataStream TwitterAgent.sinks.HDFS.hdfs.writeFormat = Text TwitterAgent.sinks.HDFS.hdfs.batchSize = 1000 TwitterAgent.sinks.HDFS.hdfs.rollSize = 0 TwitterAgent.sinks.HDFS.hdfs.rollCount = 10000 # Describing/Configuring the channel TwitterAgent.channels.MemChannel.type = memory TwitterAgent.channels.MemChannel.capacity = 10000 TwitterAgent.channels.MemChannel.transactionCapacity = 100 # Binding the source and sink to the channel TwitterAgent.sources.Twitter.channels = MemChannel TwitterAgent.sinks.HDFS.channel = MemChannel 执行 浏览Flume主目录并执行应用程序,如下所示。 $ cd $FLUME_HOME $ bin/flume-ng agent --conf ./conf/ -f conf/twitter.conf Dflume.root.logger=DEBUG,console -n TwitterAgent 如果一切顺利,将开始将推文流式传输到HDFS。下面给出了获取推文时命令提示符窗口的快照。 验证HDFS 您可以使用下面给出的URL访问Hadoop Administration Web UI。 http://localhost:50070/ 单击页面右侧名为 Utilities 的下拉列表。您可以看到两个选项,如下面给出的快照中所示。 单击 浏览文件系统 然后输入存储推文的HDFS目录的路径。在我们的示例中,路径将是 / user / Hadoop / twitter_data / 。然后,您可以看到存储在HDFS中的twitter日志文件列表,如下所示。 Apache Flume配置 Apache Flume序列发生器源