Multi-agent flow (linking 2 agents):
Let us see how to connect between 2 different agents or 2 tiers of agents. So first let us see why do we need to connect 2 different agents? Suppose we have an agent running on node1 which produces less data, this data is written to HDFS. This HDFS data will only consist of the events produced from node1.
2 – Tier agent
If we had so many nodes which contains less data then it would be better if we could aggregate the events from a group of nodes and store it into a single file, since this could result in fewer, larger files Also, if required, files can also be rolled more often since they are being fed by a larger number of nodes, which will lead to a reduction between the time when an event is created and when it’s available for analysis.
From the below diagram we can see that the first tier collects events from the sources like web servers, weblogs, etc. and sends that events to set of agents in the second tier, which aggregates all the events from the first tier and then writes them to the HDFS. These events are sent over the network.
Process flow 2 – tire agent
We generally use Avro sink to send events over Avro RPC to an Avro source running in another Flume agent and we can also use Thrift sink to send events over Thrift RPC to Thrift source. These Avro, Thrift sinks and sources are used to distribute events between agent tiers using RPC. They are not used to write or read Avro files. If we need to write events to Avro files, then we can use HDFS sink.
Let us see an example of this,
We have a data called tuple1 which can be seen as,
hdadmin@ubuntu:~$ cat tuple1 (1,2,3) (4,5,6) (2,3,4) (6,3,7) (5,3,7) (6,2,8) (4,2,5) (2,4,6)
Configuration file for first agent.
hdadmin@ubuntu:~/apache-flume-1.5.0-cdh5.3.2-bin/conf$ cat flume_tier1.conf agent1.sources = source1 agent1.sinks = sink1 agent1.channels = channel1 agent1.sources.source1.channels = channel1 agent1.sinks.sink1.channel = channel1 agent1.sources.source1.type = spooldir agent1.sources.source1.spoolDir = /home/hdadmin/flume1/spooldir agent1.sinks.sink1.type = avro agent1.sinks.sink1.hostname = localhost agent1.sinks.sink1.port = 10005 agent1.channels.channel1.type = file agent1.channels.channel1.checkpointDir=/home/hdadmin/flume1/agent1/checkpoint agent1.channels.channel1.dataDirs=/home/hdadmin/flume1/agent1/data
Here checkpoint and data directory will be automatically created.
Configuration file for second agent is,
hdadmin@ubuntu:~/apache-flume-1.5.0-cdh5.3.2-bin/conf$ cat flume_tier2.conf agent2.sources = source2 agent2.sinks = sink2 agent2.channels = channel2 agent2.sources.source2.channels = channel2 agent2.sinks.sink2.channel = channel2 agent2.sources.source2.type = avro agent2.sources.source2.bind = localhost agent2.sources.source2.port = 10005 agent2.sinks.sink2.type = hdfs agent2.sinks.sink2.hdfs.path = hdfs://localhost:9000/flume-tier agent2.sinks.sink2.hdfs.filePrefix = events agent2.sinks.sink2.hdfs.fileSuffix = .log agent2.sinks.sink2.hdfs.fileType = DataStream agent2.channels.channel2.type = file agent2.channels.channel2.checkpointDir=/home/hdadmin/agent2/checkpoint agent2.channels.channel2.dataDirs=/home/hdadmin/agent2/data
Here we have 2 agents running agent1 and agent2. agent1 is running in the first tier which have spooldir source and an Avro sink connected by a file channel. The agent2 runs in the second tier, and has an Avro source that listens on the port that agent1’s Avro sink sends events to.
We have configured 2 different data and checkpoint directories as we are using 2 file channels running on the same machine. This way, they don’t try to write their files on top of one another.
Now start the 2 agents as,
hdadmin@ubuntu:~/apache-flume-1.5.0-cdh5.3.2-bin$ bin/flume-ng agent --conf ./conf/ -f conf/flume_tier1.conf -n agent1 hdadmin@ubuntu:~/apache-flume-1.5.0-cdh5.3.2-bin$ bin/flume-ng agent --conf ./conf/ -f conf/flume_tier2.conf -n agent2
So now we have linked 2 agents using “avro” and we can see the output in HDFS as,
hdadmin@ubuntu:~/agent2$ hdfs dfs -ls /flume-tier Found 1 items -rw-r--r-- 1 hdadmin supergroup 64 2017-12-27 04:02 /flume-tier/events.1514376095923.log hdadmin@ubuntu:~/agent2$ hdfs dfs -cat /flume-tier/events.1514376095923.log (1,2,3) (4,5,6) (2,3,4) (6,3,7) (5,3,7) (6,2,8) (4,2,5) (2,4,6)
Tuple1 data into HDFS
In Flume we use transactions to ensure that each batch of events is delivered correctly from source to channel. So from the above example, in agent1 we are reading a batch of events through file channel by Avro sink can be called as one transaction. This transaction will be committed only when the Avro sink receives the confirmation that the write to Avro source’s RPC was successful.
This confirmation will be sent once agent2’s transaction of writing batch events to its file channel has been successfully committed. These confirmations need to be synchronous confirmation. After the confirmation we can tell that the event has been successfully delivered from one Flume agent channel to other flume agent channel.
Now suppose, agent1 stops running then all the files will accumulate in the spooling directory. They will be processed once agent1 starts again. If any events are available in its file channel, then they will be available once the agent is restarted due to channel durability guarantee. Suppose agent2 stops running then all the events will be stored in agent1 file channel until agent2 starts again.
Remember that channel also have a limited capacity of storing events. If agent1 channel gets filled and still agent2 is not running, then by default a file channel will not recover more than 1 million events (we can override this by using capacity property of channel) and it will stop accepting events if the free disk space for its
checkpoint directory falls below 500 MB (controlled by the minimumRequiredSpace property).
Suppose if our hardware fails then we cannot recover our agent. If agent1 does not recover, then all the events present in the channel which are not delivered to agent2 before agent1 shut down will be lost. If the nodes are running load-balanced web servers, then other nodes will absorb the failed web server’s traffic, and they will generate new Flume events that are delivered to agent2. Thus, no new events are lost.
We can also have a solution for unrecoverable agent2 failure that is having a Sink group. We can have multiple redundant Avro sinks arranged in a sink group so that if agent2 is unavailable then we can try another sink from the group. We will see about sink group in the next upcoming sections.