/    /  Flume – Sink group

Sink group:

A sink group allows multiple sinks to be treated as one used for failover or load-balancing purposes. If a tier2 agent is not available, then the events will be sent to another tier2 agent and then to HDFS destination without any problem.

Sink group

We need to configure a sink group using “sinkgroups” property which defines the sinkgroup’s name, then the sink group lists the sinks in the group, and also the type of the sink processor, which sets the policy for choosing a sink.

Below configuration shows the load balancing between two Avro endpoints.

agent1.sources = source1

agent1.sinks = sink1a sink1b

agent1.sinkgroups = sinkgroup1

agent1.channels = channel1



agent1.sources.source1.channels = channel1

agent1.sinks.sink1a.channel = channel1

agent1.sinks.sink1b.channel = channel1



agent1.sinkgroups.sinkgroup1.sinks = sink1a sink1b

agent1.sinkgroups.sinkgroup1.processor.type = load_balance

agent1.sinkgroups.sinkgroup1.processor.backoff = true



agent1.sources.source1.type = spooldir

agent1.sources.source1.spoolDir = /tmp/spooldir



agent1.sinks.sink1a.type = avro

agent1.sinks.sink1a.hostname = localhost

agent1.sinks.sink1a.port = 10000



agent1.sinks.sink1b.type = avro

agent1.sinks.sink1b.hostname = localhost

agent1.sinks.sink1b.port = 10001



agent1.channels.channel1.type = file

Here we have defined 2 Avro sinks they are sink1a and sink1b, which differ only in the Avro endpoint they are connected to (As we are running on localhost the port is different but in distributed environment the hosts will be different and ports would be same).

We have also defined sinkgroup1 and set its sinks to sink1a and sink1b. The processor type is “load_balance” which attempts to distribute the event flow on both the sinks in the sink group by using a round-robin selection mechanism (we can change this by using “processor.selector” property).

Suppose if a sink is not there then the next sink will be tried, if they are all not available then the event is not removed from the channel as we have seen in the single sink case. Here by default, sink processor does not remember the sink unavailability, the failing sinks will be retried for every batch of events which are sent.

This can be inefficient, so we have set the processor.backoff property to change the behavior so that failing sinks are blacklisted for an exponentially increasing timeout period (up to a maximum period of 30 seconds, controlled by processor.selector.maxTimeOut).

NOTE:

There is another type of processor, failover, that instead of load balancing events across sinks uses a preferred sink if it is available, and fails over to another sink in the case that the preferred sink is down. The failover sink processor maintains a priority order for sinks in the group, and attempts delivery in order of priority.

If the sink with the highest priority is unavailable the one with the next highest priority is tried, and so on. Failed sinks are blacklisted for an increasing timeout period (up to a maximum period of 30 seconds, controlled by processor.maxpenalty).

Flume configuration for second-tier agent in a load balancing scenario can be seen as,

agent2a.sources = source2a

agent2a.sinks = sink2a

agent2a.channels = channel2a



agent2a.sources.source2a.channels = channel2a

agent2a.sinks.sink2a.channel = channel2a



agent2a.sources.source2a.type = avro

agent2a.sources.source2a.bind = localhost

agent2a.sources.source2a.port = 10000



agent2a.sinks.sink2a.type = hdfs

agent2a.sinks.sink2a.hdfs.path = /tmp/flume

agent2a.sinks.sink2a.hdfs.filePrefix = events-a

agent2a.sinks.sink2a.hdfs.fileSuffix = .log

agent2a.sinks.sink2a.hdfs.fileType = DataStream



agent2a.channels.channel2a.type = file

The configuration for agent2b is the same, except for the Avro source port (since we are running the examples on localhost) and the file prefix for the files created by the HDFS sink. The file prefix is used to ensure that HDFS files created by second-tier agents at the same time don’t collide.

In the more usual case of agents running on different machines, the hostname can be used to make the filename unique by configuring a host interceptor and

including the %{host} escape sequence in the file path, or prefix:

agent2.sinks.sink2.hdfs.filePrefix = events-%{host}.

We can see the whole configuration diagram at below.