Jirka's Public Notepad

Data Engineering | Python | SQL Server | Teradata

December 27, 2014 By Jiří Hubáček 8 Comments

Streaming Tweets into Hadoop (Part II)

In the previous part of this series, I discussed a concept of streaming Twitter data into Hadoop. In this part, I will focus on Apache Flume and its settings.

Flume is an Apache project that is capable of collecting data from many different sources into HDFS. Using connectors, Flume works with social media data, log files, email messages and pretty much anything one may think of.

Flume architecture is built on top of a producer-consumer model. The Source is the event producer and the Sink is the consumer. In this post, the Source will be Twitter data feed. Flume is also capable of processing of web server logs or network data are also possible to be captured by Flume.

Flume Architecture
Flume Architecture (Source: http://flume.apache.org)

Data are sent from a Source, received by an asynchronous Channel and drained into a Sink. A Channel drains into a Sink in an asynchronous fashion so the source doesn’t have to wait until data are stored by the Sink.

Getting your keys from Twitter

Go to Twitter Apps and click Create New App. Fill in required information to create a new app. Go to the Keys and Access Tokens tab and save securely following keys:

  • Consumer Key (API Key)
  • Consumer Secret (API Secret)
  • Access Token
  • Access Token Secret

Flume configuration

Flume comes preinstalled in HDP sandbox. To get Twitter source installed execute

wget -P /usr/hdp/2.2.0.0-1084/flume/lib http://files.cloudera.com/samples/flume-sources-1.0-SNAPSHOT.jar

Download a preconfigured Twitter conf file

rm /etc/flume/conf/flume.conf
wget -P /etc/flume/conf http://github.com/cloudera/cdh-twitter-example/raw/master/flume-sources/flume.conf

Now it’s time to configure Flume. Open the downloaded flume.conf file in your favourite editor. Every entry is in the format [agent_name].[object_type].[object_name].[parameter_name] where agent_name attribute is specified at the beginning of the file and object_type is either the Sink or the Channel. My configured flume.conf file is included bellow.

TwitterAgent.sources = Twitter
TwitterAgent.channels = MemChannel
TwitterAgent.sinks = HDFS

TwitterAgent.sources.Twitter.type = com.cloudera.flume.source.TwitterSource
TwitterAgent.sources.Twitter.channels = MemChannel
TwitterAgent.sources.Twitter.consumerKey = KEY
TwitterAgent.sources.Twitter.consumerSecret = KEY
TwitterAgent.sources.Twitter.accessToken = KEY
TwitterAgent.sources.Twitter.accessTokenSecret = KEY
TwitterAgent.sources.Twitter.keywords = theinterview, 17YearsOfNash, Warnock, RioCompetition, cpfc, Palace, London, Christmas, New Years

################## SINK #################################
TwitterAgent.sinks.HDFS.channel = MemChannel
TwitterAgent.sinks.HDFS.type = hdfs
TwitterAgent.sinks.HDFS.hdfs.path = hdfs://sandbox.hortonworks.com:8020/user/root/flume/test
TwitterAgent.sinks.HDFS.hdfs.fileType = DataStream
TwitterAgent.sinks.HDFS.hdfs.writeFormat = Text

TwitterAgent.sinks.HDFS.hdfs.batchSize = 1000
TwitterAgent.sinks.HDFS.hdfs.rollSize = 67108864
TwitterAgent.sinks.HDFS.hdfs.rollInterval = 3600
TwitterAgent.sinks.HDFS.hdfs.rollCount = 0

#################### CHANNEL #########################
TwitterAgent.channels.MemChannel.type = memory
TwitterAgent.channels.MemChannel.capacity = 1000
#default - TwitterAgent.channels.MemChannel.capacity = 100
TwitterAgent.channels.MemChannel.transactionCapacity = 1000

I’ve made couple of changes

sources:

  • set my own hashtags I want to capture
  • changed path to point to my NamedNode (as long as we are using HDP sandbox, it is the local DNS name) hdfs://sandbox.hortonworks.com:8020/user/root/flume/test

sinks:

  • increase batchSize to 1000 (number of events written to the HDFS in one batch)
  • set rollSize to 67108864 (start a new file when the current one reaches 64MB)
  • set rollInterval to 3600 (start a new file after 1 hour)
  • set rollCount to 0 (0 is for never roll over based on the count of events)

channels configuration

  • increase capacity to 1000 (count of events sent to the sink in a single batch)

If I didn’t increase MemChannel.capacity, I would get the error bellow because memory buffer couldn’t keep pace with the incoming data volume

org.apache.flume.ChannelException: Take list for MemoryTransaction, capacity 100 full, consider committing more frequently, increasing capacity, or increasing thread count

Note to my configuration

I set up lots of hashtags to stresstest my sandbox. I reached 64MB rollsize in about 7 minutes.

Imagine a situation when you don’t capture trending topics. In such a situation, it would be better to lower rollSize or rollInterval to write files into HDFS more frequently. For memory resident sink, any disruption within the roll period would result in data loss.

Create destination folder for Twitter data

hdfs dfs -mkdir flume
hdfs dfs -mkdir flume/tweets

Launching flume agent

Take a deep breath and execute a flume agent. Nohup launches Flume in the background so we may keep using the same console for viewing the generated log file.

nohup flume-ng agent --conf-file /etc/flume/conf/flume.conf --name TwitterAgent >flume_twitteragent.log &

Execute Tail command to view the generated log file.

tail -f flume_twitteragent.log

If everything was set correctly, the log file will contain following lines and will repeat BucketWriter entries for each file the Sink created.

14/12/19 22:01:13 INFO twitter4j.TwitterStreamImpl: Establishing connection.
14/12/19 22:01:16 INFO twitter4j.TwitterStreamImpl: Connection established.
14/12/19 22:01:16 INFO twitter4j.TwitterStreamImpl: Receiving status stream.
14/12/19 22:01:16 INFO hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false
14/12/19 22:01:16 INFO hdfs.BucketWriter: Creating hdfs://sandbox.hortonworks.com:8020/user/root/flume/test/FlumeData.1419026476113.tmp
14/12/19 22:11:19 INFO hdfs.BucketWriter: Closing hdfs://sandbox.hortonworks.com:8020/user/root/flume/test/FlumeData.1419026476113.tmp
14/12/19 22:11:19 INFO hdfs.BucketWriter: Close tries incremented
14/12/19 22:11:19 INFO hdfs.BucketWriter: Renaming hdfs://sandbox.hortonworks.com:8020/user/root/flume/test/FlumeData.1419026476113.tmp to hdfs://sandbox.hortonworks.com:8020/user/root/flume/tweets/FlumeData.1419026476113
14/12/19 22:11:19 INFO hdfs.HDFSEventSink: Writer callback called.

Checking on created files

To list HDFS’ filesystem execute the following Shell command.

hdfs dfs -ls flume/test

You might also check on files from Hue web console at http://YOURSANDBOXIP:8000/filebrowser/#/user/root/flume/test

Hue, Flume, File explorer

 

To terminate Flume agent, simply issue kill command with the process number.

Next time, I’m going to process all the raw files using Hive.

Related

Filed Under: Big Data, Hadoop

Comments

  1. Lija Mohan says

    February 23, 2015 at 10:41 am

    Your posts were really helpful…eagerly waiting for your 3rd part of the post…pls upload it soon…

    Reply
    • Jiří Hubáček says

      February 23, 2015 at 6:47 pm

      Hey Lija, thanks. Unfortunately, I ran into a problem with Hortonworks’ ODBC driver that defied my attempts to connect to 2.2 sandbox. It worked with 2.1 and 2.0, though. I will revisit this topic and let you know. Cheers!

      Reply
      • Ismael says

        June 26, 2015 at 10:41 pm

        Hi there.
        Firstly, I will be ever grateful to you for such an easy and to the point tutorial. Secondly, by any chance did you performed the last step i.e processing all the raw tweets using hive.
        If you didn’t, can you do it using HDP 2.0? It is an humble request.
        Thanks.

        Reply
        • Jiří Hubáček says

          February 1, 2016 at 8:57 pm

          I know it took some time but finally the next part is available. 🙂

          Reply
  2. Fernando says

    October 5, 2016 at 4:18 pm

    Hi,

    Can you process the data directly with PIG without using Hive?. Could you provide an example?

    Thanks,
    Fernando

    Reply
    • Jiří Hubáček says

      October 14, 2016 at 11:10 am

      Hi Fernando, yes one ought to be able to do the transformations with PIG as well. I’m sorry, but I don’t have any a PIG version for this example.

      What I like about Hive is that it’s quite readable for people with SQL experience.

      Reply
  3. sapthagiri says

    October 18, 2016 at 1:35 pm

    Hi Please help me finally am getting this error, how can i resolve this.

    lifecycle.LifecycleSupervisor: Unable to start EventDrivenSourceRunner: { source:com.cloudera.flume.source.TwitterSource{name:Twitter,state:IDLE} } – Exception follows.
    java.lang.NoSuchMethodError: twitter4j.TwitterStream.addListener(Ltwitter4j/StreamListener;)V

    Reply
    • Jiří Hubáček says

      November 23, 2016 at 11:04 am

      Hi, when does this happen? During flume-ng startup?

      Reply

Leave a Reply Cancel reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.

  • GitHub
  • LinkedIn
  • RSS
  • Twitter
© 2022 · Jiří Hubáček, PGP