/    /  Flume – Basic examples

Flume – Basic examples:

Let us just take an example and see the working of Flume:

  1. First take a local directory which watches for new text files.
  2. As files are added send each line of each file to the console.

Let just imagine that new files are continuously ingested into flume, but here we will be adding files by ourselves.

Here we are using single source-channel-sink. We configure the flume agent using java properties file. The configuration controls the types of sources, sinks, and channels that are used, as well as how they are connected together.

First we need to list the sources, sinks and channels for the given agent which we are using, and then point the source and sink to a channel.

Note– A source instance can specify multiple channels, but a sink instance can only specify one channel.

<Agent>.sources = <Source>

<Agent>.sinks = <Sink>

<Agent>.channels = <Channel1> <Channel2>



# To point the source and sink to the channel

<Agent>.sources.<Source>.channels = <Channel1> <Channel2> ...

<Agent>.sinks.<Sink>.channel = <Channel1>

Then we need to set the properties of each source, sink and channel.

# properties for sources

<Agent>.sources.<Source>.<someProperty> = <someValue>



# properties for channels

<Agent>.channel.<Channel>.<someProperty> = <someValue>



# properties for sinks

<Agent>.sources.<Sink>.<someProperty> = <someValue>

Each component i.e. Source, Channel, Sink has its own set of properties. We need to set the property “type” for every component in Flume.

Now let us take an example of this,

First we need to write the java properties file as,

hdadmin@ubuntu:~/apache-flume-1.5.0-cdh5.3.2-bin$ cd conf/

hdadmin@ubuntu:~/apache-flume-1.5.0-cdh5.3.2-bin/conf$ cat flume.conf

agent1.sources = source1

agent1.channels = Channel1

agent1.sinks = Sink1



agent1.sources.source1.type = exec

agent1.sources.source1.command = cat /home/hdadmin/tuple1

agent1.sources.source1.channels = Channel1



agent1.sinks.Sink1.type = hdfs

agent1.sinks.Sink1.channel = Channel1

agent1.sinks.Sink1.hdfs.path = hdfs://localhost:9000/flume-00001

agent1.sinks.Sink1.filetype = DataStream



agent1.channels.Channel1.type = memory

Here “agent1” is the name of the agent and we are using ‘exec’ source. The sink is HDFS sink which means we are writing the data into HDFS.

DataStream means it will not write any metadata, only actual data will be collected.

Now execute the flume configuration file as,

hdadmin@ubuntu:~/apache-flume-1.5.0-cdh5.3.2-bin$ bin/flume-ng agent --conf ./conf/ -f conf/flume.conf -n agent1 -Dflume.root.logger=DEBUG,console



SOURCES: {source1={ parameters:{channels=Channel1, type=exec, command=cat /home/hdadmin/tuple1} }}

CHANNELS: {Channel1={ parameters:{type=memory} }}

SINKS: {Sink1={ parameters:{filetype=DataStream, hdfs.path=hdfs://localhost:9000/flume-00001, channel=Channel1, type=hdfs} }}



…...

…...

2017-12-13 05:06:17,265 (conf-file-poller-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:126)] Checking file:conf/flume.conf for changes

2017-12-13 05:06:47,268 (conf-file-poller-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:126)] Checking file:conf/flume.conf for changes

Here,

Flume-ng—flume executable file

-conf—location of configuration directory

-f—location of configuration file

-n—name of the agent

We are using -Dflume.root.logger=DEBUG,console so that if any problem occurs it will be written on console.

Now open other terminal and check for hdfs://localhost:9000/flume-00001.

hdadmin@ubuntu:~$ hdfs dfs -ls /flume-00001

Found 1 items

-rw-r--r-- 1 hdadmin supergroup 200 2017-12-13 05:05 /flume-00001/FlumeData.1513170317306

No we have got the data into HDFS which was mentioned by source using “cat /home/hdadmin/tuple1”.

Let us see one more example for flume using “spooling directory” source.

First create flume configuration file,

hdadmin@ubuntu:~/apache-flume-1.5.0-cdh5.3.2-bin/conf$ cat Flume1.conf

agent1.sinks =  hdfs-sink1

agent1.sources = source1

agent1.channels = fileChannel1



agent1.channels.fileChannel1.type = file

agent1.channels.fileChannel.capacity = 2000

agent1.channels.fileChannel.transactionCapacity = 100



agent1.sources.source1.type = spooldir

#Spooldir in my case is /home/hadoop/Desktop/flume_sink



agent1.sources.source1.spoolDir = /home/hdadmin/Desktop/flume_sink

agent1.sources.source1.fileHeader = false

agent1.sources.source1.fileSuffix = .COMPLETED

agent1.sinks.hdfs-sink1.type = hdfs



agent1.sinks.hdfs-sink1.hdfs.path = hdfs://localhost.localdomain:9000/flume_sink

agent1.sinks.hdfs-sink1_1.hdfs.batchSize = 1000

agent1.sinks.hdfs-sink1.hdfs.rollSize = 2684

agent1.sinks.hdfs-sink1.hdfs.rollInterval = 0

agent1.sinks.hdfs-sink1.hdfs.rollCount = 5000

agent1.sinks.hdfs-sink1.hdfs.writeFormat=Text



agent1.sinks.hdfs-sink1.hdfs.fileType = DataStream

agent1.sources.source1.channels = fileChannel1

agent1.sinks.hdfs-sink1.channel = fileChannel1

Now start the agent as,

hdadmin@ubuntu:~/apache-flume-1.5.0-cdh5.3.2-bin$ flume-ng agent -n agent1 -f /home/hdadmin/apache-flume-1.5.0-cdh5.3.2-bin/conf/Flume1.conf

Now copy some files in spoolDir, they will be automatically being stored in HDFS.

hdadmin@ubuntu:~$ cat data2.txt

(1,2)

(5,3)

hdadmin@ubuntu:~$ cp data2.txt Desktop/flume_sink

hdadmin@ubuntu:~$ cd Desktop/flume_sink/

hdadmin@ubuntu:~/Desktop/flume_sink$ ls

data2.txt.COMPLETED

We have copied ‘data2.txt’ into our spool directory and after that its status have been changed into ‘completed’ state. Now check it in HDFS.

hdadmin@ubuntu:~/Desktop/flume_sink$ hdfs dfs -ls /flume_sink

Found 1 items

-rw-r--r--   1 hdadmin supergroup        17 2017-12-26 04:39 /flume_sink/FlumeData.1514291975722.tmp



hdadmin@ubuntu:~/Desktop/flume_sink$ hdfs dfs -cat /flume_sink/FlumeData.1514291975722.tmp

 (1,2)

(5,3)