Sink group:
A sink group allows multiple sinks to be treated as one used for failover or load-balancing purposes. If a tier2 agent is not available, then the events will be sent to another tier2 agent and then to HDFS destination without any problem.
Sink group
We need to configure a sink group using “sinkgroups” property which defines the sinkgroup’s name, then the sink group lists the sinks in the group, and also the type of the sink processor, which sets the policy for choosing a sink.
Below configuration shows the load balancing between two Avro endpoints.
agent1.sources = source1 agent1.sinks = sink1a sink1b agent1.sinkgroups = sinkgroup1 agent1.channels = channel1 agent1.sources.source1.channels = channel1 agent1.sinks.sink1a.channel = channel1 agent1.sinks.sink1b.channel = channel1 agent1.sinkgroups.sinkgroup1.sinks = sink1a sink1b agent1.sinkgroups.sinkgroup1.processor.type = load_balance agent1.sinkgroups.sinkgroup1.processor.backoff = true agent1.sources.source1.type = spooldir agent1.sources.source1.spoolDir = /tmp/spooldir agent1.sinks.sink1a.type = avro agent1.sinks.sink1a.hostname = localhost agent1.sinks.sink1a.port = 10000 agent1.sinks.sink1b.type = avro agent1.sinks.sink1b.hostname = localhost agent1.sinks.sink1b.port = 10001 agent1.channels.channel1.type = file
Here we have defined 2 Avro sinks they are sink1a and sink1b, which differ only in the Avro endpoint they are connected to (As we are running on localhost the port is different but in distributed environment the hosts will be different and ports would be same).
We have also defined sinkgroup1 and set its sinks to sink1a and sink1b. The processor type is “load_balance” which attempts to distribute the event flow on both the sinks in the sink group by using a round-robin selection mechanism (we can change this by using “processor.selector” property).
Suppose if a sink is not there then the next sink will be tried, if they are all not available then the event is not removed from the channel as we have seen in the single sink case. Here by default, sink processor does not remember the sink unavailability, the failing sinks will be retried for every batch of events which are sent.
This can be inefficient, so we have set the processor.backoff property to change the behavior so that failing sinks are blacklisted for an exponentially increasing timeout period (up to a maximum period of 30 seconds, controlled by processor.selector.maxTimeOut).
NOTE:
There is another type of processor, failover, that instead of load balancing events across sinks uses a preferred sink if it is available, and fails over to another sink in the case that the preferred sink is down. The failover sink processor maintains a priority order for sinks in the group, and attempts delivery in order of priority.
If the sink with the highest priority is unavailable the one with the next highest priority is tried, and so on. Failed sinks are blacklisted for an increasing timeout period (up to a maximum period of 30 seconds, controlled by processor.maxpenalty).
Flume configuration for second-tier agent in a load balancing scenario can be seen as,
agent2a.sources = source2a agent2a.sinks = sink2a agent2a.channels = channel2a agent2a.sources.source2a.channels = channel2a agent2a.sinks.sink2a.channel = channel2a agent2a.sources.source2a.type = avro agent2a.sources.source2a.bind = localhost agent2a.sources.source2a.port = 10000 agent2a.sinks.sink2a.type = hdfs agent2a.sinks.sink2a.hdfs.path = /tmp/flume agent2a.sinks.sink2a.hdfs.filePrefix = events-a agent2a.sinks.sink2a.hdfs.fileSuffix = .log agent2a.sinks.sink2a.hdfs.fileType = DataStream agent2a.channels.channel2a.type = file
The configuration for agent2b is the same, except for the Avro source port (since we are running the examples on localhost) and the file prefix for the files created by the HDFS sink. The file prefix is used to ensure that HDFS files created by second-tier agents at the same time don’t collide.
In the more usual case of agents running on different machines, the hostname can be used to make the filename unique by configuring a host interceptor and
including the %{host} escape sequence in the file path, or prefix:
agent2.sinks.sink2.hdfs.filePrefix = events-%{host}.
We can see the whole configuration diagram at below.