In the previous part of this series, I discussed a concept of streaming Twitter data into Hadoop. In this part, I will focus on Apache Flume and its settings.
Flume is an Apache project that is capable of collecting data from many different sources into HDFS. Using connectors, Flume works with social media data, log files, email messages and pretty much anything one may think of.
Flume architecture is built on top of a producer-consumer model. The Source is the event producer and the Sink is the consumer. In this post, the Source will be Twitter data feed. Flume is also capable of processing of web server logs or network data are also possible to be captured by Flume.

Data are sent from a Source, received by an asynchronous Channel and drained into a Sink. A Channel drains into a Sink in an asynchronous fashion so the source doesn’t have to wait until data are stored by the Sink.
Getting your keys from Twitter
Go to Twitter Apps and click Create New App. Fill in required information to create a new app. Go to the Keys and Access Tokens tab and save securely following keys:
- Consumer Key (API Key)
- Consumer Secret (API Secret)
- Access Token
- Access Token Secret
Flume configuration
Flume comes preinstalled in HDP sandbox. To get Twitter source installed execute
wget -P /usr/hdp/2.2.0.0-1084/flume/lib http://files.cloudera.com/samples/flume-sources-1.0-SNAPSHOT.jar
Download a preconfigured Twitter conf file
rm /etc/flume/conf/flume.conf wget -P /etc/flume/conf http://github.com/cloudera/cdh-twitter-example/raw/master/flume-sources/flume.conf
Now it’s time to configure Flume. Open the downloaded flume.conf file in your favourite editor. Every entry is in the format [agent_name].[object_type].[object_name].[parameter_name] where agent_name attribute is specified at the beginning of the file and object_type is either the Sink or the Channel. My configured flume.conf file is included bellow.
TwitterAgent.sources = Twitter TwitterAgent.channels = MemChannel TwitterAgent.sinks = HDFS TwitterAgent.sources.Twitter.type = com.cloudera.flume.source.TwitterSource TwitterAgent.sources.Twitter.channels = MemChannel TwitterAgent.sources.Twitter.consumerKey = KEY TwitterAgent.sources.Twitter.consumerSecret = KEY TwitterAgent.sources.Twitter.accessToken = KEY TwitterAgent.sources.Twitter.accessTokenSecret = KEY TwitterAgent.sources.Twitter.keywords = theinterview, 17YearsOfNash, Warnock, RioCompetition, cpfc, Palace, London, Christmas, New Years ################## SINK ################################# TwitterAgent.sinks.HDFS.channel = MemChannel TwitterAgent.sinks.HDFS.type = hdfs TwitterAgent.sinks.HDFS.hdfs.path = hdfs://sandbox.hortonworks.com:8020/user/root/flume/test TwitterAgent.sinks.HDFS.hdfs.fileType = DataStream TwitterAgent.sinks.HDFS.hdfs.writeFormat = Text TwitterAgent.sinks.HDFS.hdfs.batchSize = 1000 TwitterAgent.sinks.HDFS.hdfs.rollSize = 67108864 TwitterAgent.sinks.HDFS.hdfs.rollInterval = 3600 TwitterAgent.sinks.HDFS.hdfs.rollCount = 0 #################### CHANNEL ######################### TwitterAgent.channels.MemChannel.type = memory TwitterAgent.channels.MemChannel.capacity = 1000 #default - TwitterAgent.channels.MemChannel.capacity = 100 TwitterAgent.channels.MemChannel.transactionCapacity = 1000
I’ve made couple of changes
sources:
- set my own hashtags I want to capture
- changed path to point to my NamedNode (as long as we are using HDP sandbox, it is the local DNS name) hdfs://sandbox.hortonworks.com:8020/user/root/flume/test
sinks:
- increase batchSize to 1000 (number of events written to the HDFS in one batch)
- set rollSize to 67108864 (start a new file when the current one reaches 64MB)
- set rollInterval to 3600 (start a new file after 1 hour)
- set rollCount to 0 (0 is for never roll over based on the count of events)
channels configuration
- increase capacity to 1000 (count of events sent to the sink in a single batch)
If I didn’t increase MemChannel.capacity, I would get the error bellow because memory buffer couldn’t keep pace with the incoming data volume
org.apache.flume.ChannelException: Take list for MemoryTransaction, capacity 100 full, consider committing more frequently, increasing capacity, or increasing thread count
Note to my configuration
I set up lots of hashtags to stresstest my sandbox. I reached 64MB rollsize in about 7 minutes.
Imagine a situation when you don’t capture trending topics. In such a situation, it would be better to lower rollSize or rollInterval to write files into HDFS more frequently. For memory resident sink, any disruption within the roll period would result in data loss.
Create destination folder for Twitter data
hdfs dfs -mkdir flume hdfs dfs -mkdir flume/tweets
Launching flume agent
Take a deep breath and execute a flume agent. Nohup launches Flume in the background so we may keep using the same console for viewing the generated log file.
nohup flume-ng agent --conf-file /etc/flume/conf/flume.conf --name TwitterAgent >flume_twitteragent.log &
Execute Tail command to view the generated log file.
tail -f flume_twitteragent.log
If everything was set correctly, the log file will contain following lines and will repeat BucketWriter entries for each file the Sink created.
14/12/19 22:01:13 INFO twitter4j.TwitterStreamImpl: Establishing connection. 14/12/19 22:01:16 INFO twitter4j.TwitterStreamImpl: Connection established. 14/12/19 22:01:16 INFO twitter4j.TwitterStreamImpl: Receiving status stream. 14/12/19 22:01:16 INFO hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false 14/12/19 22:01:16 INFO hdfs.BucketWriter: Creating hdfs://sandbox.hortonworks.com:8020/user/root/flume/test/FlumeData.1419026476113.tmp 14/12/19 22:11:19 INFO hdfs.BucketWriter: Closing hdfs://sandbox.hortonworks.com:8020/user/root/flume/test/FlumeData.1419026476113.tmp 14/12/19 22:11:19 INFO hdfs.BucketWriter: Close tries incremented 14/12/19 22:11:19 INFO hdfs.BucketWriter: Renaming hdfs://sandbox.hortonworks.com:8020/user/root/flume/test/FlumeData.1419026476113.tmp to hdfs://sandbox.hortonworks.com:8020/user/root/flume/tweets/FlumeData.1419026476113 14/12/19 22:11:19 INFO hdfs.HDFSEventSink: Writer callback called.
Checking on created files
To list HDFS’ filesystem execute the following Shell command.
hdfs dfs -ls flume/test
You might also check on files from Hue web console at http://YOURSANDBOXIP:8000/filebrowser/#/user/root/flume/test
To terminate Flume agent, simply issue kill command with the process number.
Next time, I’m going to process all the raw files using Hive.
Your posts were really helpful…eagerly waiting for your 3rd part of the post…pls upload it soon…
Hey Lija, thanks. Unfortunately, I ran into a problem with Hortonworks’ ODBC driver that defied my attempts to connect to 2.2 sandbox. It worked with 2.1 and 2.0, though. I will revisit this topic and let you know. Cheers!
Hi there.
Firstly, I will be ever grateful to you for such an easy and to the point tutorial. Secondly, by any chance did you performed the last step i.e processing all the raw tweets using hive.
If you didn’t, can you do it using HDP 2.0? It is an humble request.
Thanks.
I know it took some time but finally the next part is available. 🙂
Hi,
Can you process the data directly with PIG without using Hive?. Could you provide an example?
Thanks,
Fernando
Hi Fernando, yes one ought to be able to do the transformations with PIG as well. I’m sorry, but I don’t have any a PIG version for this example.
What I like about Hive is that it’s quite readable for people with SQL experience.
Hi Please help me finally am getting this error, how can i resolve this.
lifecycle.LifecycleSupervisor: Unable to start EventDrivenSourceRunner: { source:com.cloudera.flume.source.TwitterSource{name:Twitter,state:IDLE} } – Exception follows.
java.lang.NoSuchMethodError: twitter4j.TwitterStream.addListener(Ltwitter4j/StreamListener;)V
Hi, when does this happen? During flume-ng startup?