/    /  Flume – Multi-agent flow

Multi-agent flow (linking 2 agents):

Let us see how to connect between 2 different agents or 2 tiers of agents. So first let us see why do we need to connect 2 different agents? Suppose we have an agent running on node1 which produces less data, this data is written to HDFS. This HDFS data will only consist of the events produced from node1.

2 – Tier agent

If we had so many nodes which contains less data then it would be better if we could aggregate the events from a group of nodes and store it into a single file, since this could result in fewer, larger files Also, if required, files can also be rolled more often since they are being fed by a larger number of nodes, which will lead to a reduction between the time when an event is created and when it’s available for analysis.

From the below diagram we can see that the first tier collects events from the sources like web servers, weblogs, etc. and sends that events to set of agents in the second tier, which aggregates all the events from the first tier and then writes them to the HDFS. These events are sent over the network.

Process flow 2 – tire agent

We generally use Avro sink to send events over Avro RPC to an Avro source running in another Flume agent and we can also use Thrift sink to send events over Thrift RPC to Thrift source. These Avro, Thrift sinks and sources are used to distribute events between agent tiers using RPC. They are not used to write or read Avro files. If we need to write events to Avro files, then we can use HDFS sink.

Let us see an example of this,

We have a data called tuple1 which can be seen as,

hdadmin@ubuntu:~$ cat tuple1

(1,2,3) (4,5,6)

(2,3,4) (6,3,7)

(5,3,7) (6,2,8)

(4,2,5) (2,4,6)

Configuration file for first agent.

hdadmin@ubuntu:~/apache-flume-1.5.0-cdh5.3.2-bin/conf$ cat flume_tier1.conf

agent1.sources = source1

agent1.sinks = sink1

agent1.channels = channel1

agent1.sources.source1.channels = channel1

agent1.sinks.sink1.channel = channel1

agent1.sources.source1.type = spooldir

agent1.sources.source1.spoolDir = /home/hdadmin/flume1/spooldir

agent1.sinks.sink1.type = avro

agent1.sinks.sink1.hostname = localhost

agent1.sinks.sink1.port = 10005

agent1.channels.channel1.type = file

agent1.channels.channel1.checkpointDir=/home/hdadmin/flume1/agent1/checkpoint

agent1.channels.channel1.dataDirs=/home/hdadmin/flume1/agent1/data

Here checkpoint and data directory will be automatically created.

Configuration file for second agent is,

hdadmin@ubuntu:~/apache-flume-1.5.0-cdh5.3.2-bin/conf$ cat flume_tier2.conf

agent2.sources = source2

agent2.sinks = sink2

agent2.channels = channel2

agent2.sources.source2.channels = channel2

agent2.sinks.sink2.channel = channel2

agent2.sources.source2.type = avro

agent2.sources.source2.bind = localhost

agent2.sources.source2.port = 10005

agent2.sinks.sink2.type = hdfs

agent2.sinks.sink2.hdfs.path = hdfs://localhost:9000/flume-tier

agent2.sinks.sink2.hdfs.filePrefix = events

agent2.sinks.sink2.hdfs.fileSuffix = .log

agent2.sinks.sink2.hdfs.fileType = DataStream

agent2.channels.channel2.type = file

agent2.channels.channel2.checkpointDir=/home/hdadmin/agent2/checkpoint

agent2.channels.channel2.dataDirs=/home/hdadmin/agent2/data

Here we have 2 agents running agent1 and agent2. agent1 is running in the first tier which have spooldir source and an Avro sink connected by a file channel. The agent2 runs in the second tier, and has an Avro source that listens on the port that agent1’s Avro sink sends events to.

We have configured 2 different data and checkpoint directories as we are using 2 file channels running on the same machine. This way, they don’t try to write their files on top of one another.

Now start the 2 agents as,

hdadmin@ubuntu:~/apache-flume-1.5.0-cdh5.3.2-bin$ bin/flume-ng agent --conf ./conf/ -f conf/flume_tier1.conf -n agent1

hdadmin@ubuntu:~/apache-flume-1.5.0-cdh5.3.2-bin$ bin/flume-ng agent --conf ./conf/ -f conf/flume_tier2.conf -n agent2

So now we have linked 2 agents using “avro” and we can see the output in HDFS as,

hdadmin@ubuntu:~/agent2$ hdfs dfs -ls /flume-tier

Found 1 items

-rw-r--r--   1 hdadmin supergroup        64 2017-12-27 04:02 /flume-tier/events.1514376095923.log

hdadmin@ubuntu:~/agent2$ hdfs dfs -cat /flume-tier/events.1514376095923.log

 (1,2,3) (4,5,6)

(2,3,4) (6,3,7)

(5,3,7) (6,2,8)

(4,2,5) (2,4,6)

Tuple1 data into HDFS

In Flume we use transactions to ensure that each batch of events is delivered correctly from source to channel. So from the above example, in agent1 we are reading a batch of events through file channel by Avro sink can be called as one transaction. This transaction will be committed only when the Avro sink receives the confirmation that the write to Avro source’s RPC was successful.

This confirmation will be sent once agent2’s transaction of writing batch events to its file channel has been successfully committed. These confirmations need to be synchronous confirmation. After the confirmation we can tell that the event has been successfully delivered from one Flume agent channel to other flume agent channel.

Now suppose, agent1 stops running then all the files will accumulate in the spooling directory. They will be processed once agent1 starts again. If any events are available in its file channel, then they will be available once the agent is restarted due to channel durability guarantee. Suppose agent2 stops running then all the events will be stored in agent1 file channel until agent2 starts again.

Remember that channel also have a limited capacity of storing events. If agent1 channel gets filled and still agent2 is not running, then by default a file channel will not recover more than 1 million events (we can override this by using capacity property of channel) and it will stop accepting events if the free disk space for its

checkpoint directory falls below 500 MB (controlled by the minimumRequiredSpace property).

Suppose if our hardware fails then we cannot recover our agent. If agent1 does not recover, then all the events present in the channel which are not delivered to agent2 before agent1 shut down will be lost. If the nodes are running load-balanced web servers, then other nodes will absorb the failed web server’s traffic, and they will generate new Flume events that are delivered to agent2. Thus, no new events are lost.

We can also have a solution for unrecoverable agent2 failure that is having a Sink group. We can have multiple redundant Avro sinks arranged in a sink group so that if agent2 is unavailable then we can try another sink from the group. We will see about sink group in the next upcoming sections.