Flume – Basic examples:
Let us just take an example and see the working of Flume:
- First take a local directory which watches for new text files.
- As files are added send each line of each file to the console.
Let just imagine that new files are continuously ingested into flume, but here we will be adding files by ourselves.
Here we are using single source-channel-sink. We configure the flume agent using java properties file. The configuration controls the types of sources, sinks, and channels that are used, as well as how they are connected together.
First we need to list the sources, sinks and channels for the given agent which we are using, and then point the source and sink to a channel.
Note– A source instance can specify multiple channels, but a sink instance can only specify one channel.
<Agent>.sources = <Source> <Agent>.sinks = <Sink> <Agent>.channels = <Channel1> <Channel2> # To point the source and sink to the channel <Agent>.sources.<Source>.channels = <Channel1> <Channel2> ... <Agent>.sinks.<Sink>.channel = <Channel1>
Then we need to set the properties of each source, sink and channel.
# properties for sources <Agent>.sources.<Source>.<someProperty> = <someValue> # properties for channels <Agent>.channel.<Channel>.<someProperty> = <someValue> # properties for sinks <Agent>.sources.<Sink>.<someProperty> = <someValue>
Each component i.e. Source, Channel, Sink has its own set of properties. We need to set the property “type” for every component in Flume.
Now let us take an example of this,
First we need to write the java properties file as,
hdadmin@ubuntu:~/apache-flume-1.5.0-cdh5.3.2-bin$ cd conf/ hdadmin@ubuntu:~/apache-flume-1.5.0-cdh5.3.2-bin/conf$ cat flume.conf agent1.sources = source1 agent1.channels = Channel1 agent1.sinks = Sink1 agent1.sources.source1.type = exec agent1.sources.source1.command = cat /home/hdadmin/tuple1 agent1.sources.source1.channels = Channel1 agent1.sinks.Sink1.type = hdfs agent1.sinks.Sink1.channel = Channel1 agent1.sinks.Sink1.hdfs.path = hdfs://localhost:9000/flume-00001 agent1.sinks.Sink1.filetype = DataStream agent1.channels.Channel1.type = memory
Here “agent1” is the name of the agent and we are using ‘exec’ source. The sink is HDFS sink which means we are writing the data into HDFS.
DataStream means it will not write any metadata, only actual data will be collected.
Now execute the flume configuration file as,
hdadmin@ubuntu:~/apache-flume-1.5.0-cdh5.3.2-bin$ bin/flume-ng agent --conf ./conf/ -f conf/flume.conf -n agent1 -Dflume.root.logger=DEBUG,console SOURCES: {source1={ parameters:{channels=Channel1, type=exec, command=cat /home/hdadmin/tuple1} }} CHANNELS: {Channel1={ parameters:{type=memory} }} SINKS: {Sink1={ parameters:{filetype=DataStream, hdfs.path=hdfs://localhost:9000/flume-00001, channel=Channel1, type=hdfs} }} …... …... 2017-12-13 05:06:17,265 (conf-file-poller-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:126)] Checking file:conf/flume.conf for changes 2017-12-13 05:06:47,268 (conf-file-poller-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:126)] Checking file:conf/flume.conf for changes
Here,
Flume-ng—flume executable file
-conf—location of configuration directory
-f—location of configuration file
-n—name of the agent
We are using -Dflume.root.logger=DEBUG,console so that if any problem occurs it will be written on console.
Now open other terminal and check for hdfs://localhost:9000/flume-00001.
hdadmin@ubuntu:~$ hdfs dfs -ls /flume-00001 Found 1 items -rw-r--r-- 1 hdadmin supergroup 200 2017-12-13 05:05 /flume-00001/FlumeData.1513170317306
No we have got the data into HDFS which was mentioned by source using “cat /home/hdadmin/tuple1”.
Let us see one more example for flume using “spooling directory” source.
First create flume configuration file,
hdadmin@ubuntu:~/apache-flume-1.5.0-cdh5.3.2-bin/conf$ cat Flume1.conf agent1.sinks = hdfs-sink1 agent1.sources = source1 agent1.channels = fileChannel1 agent1.channels.fileChannel1.type = file agent1.channels.fileChannel.capacity = 2000 agent1.channels.fileChannel.transactionCapacity = 100 agent1.sources.source1.type = spooldir #Spooldir in my case is /home/hadoop/Desktop/flume_sink agent1.sources.source1.spoolDir = /home/hdadmin/Desktop/flume_sink agent1.sources.source1.fileHeader = false agent1.sources.source1.fileSuffix = .COMPLETED agent1.sinks.hdfs-sink1.type = hdfs agent1.sinks.hdfs-sink1.hdfs.path = hdfs://localhost.localdomain:9000/flume_sink agent1.sinks.hdfs-sink1_1.hdfs.batchSize = 1000 agent1.sinks.hdfs-sink1.hdfs.rollSize = 2684 agent1.sinks.hdfs-sink1.hdfs.rollInterval = 0 agent1.sinks.hdfs-sink1.hdfs.rollCount = 5000 agent1.sinks.hdfs-sink1.hdfs.writeFormat=Text agent1.sinks.hdfs-sink1.hdfs.fileType = DataStream agent1.sources.source1.channels = fileChannel1 agent1.sinks.hdfs-sink1.channel = fileChannel1
Now start the agent as,
hdadmin@ubuntu:~/apache-flume-1.5.0-cdh5.3.2-bin$ flume-ng agent -n agent1 -f /home/hdadmin/apache-flume-1.5.0-cdh5.3.2-bin/conf/Flume1.conf
Now copy some files in spoolDir, they will be automatically being stored in HDFS.
hdadmin@ubuntu:~$ cat data2.txt (1,2) (5,3) hdadmin@ubuntu:~$ cp data2.txt Desktop/flume_sink hdadmin@ubuntu:~$ cd Desktop/flume_sink/ hdadmin@ubuntu:~/Desktop/flume_sink$ ls data2.txt.COMPLETED
We have copied ‘data2.txt’ into our spool directory and after that its status have been changed into ‘completed’ state. Now check it in HDFS.
hdadmin@ubuntu:~/Desktop/flume_sink$ hdfs dfs -ls /flume_sink Found 1 items -rw-r--r-- 1 hdadmin supergroup 17 2017-12-26 04:39 /flume_sink/FlumeData.1514291975722.tmp hdadmin@ubuntu:~/Desktop/flume_sink$ hdfs dfs -cat /flume_sink/FlumeData.1514291975722.tmp (1,2) (5,3)