NIFI — Monitoring Data Flows

Before moving an Data pipeline in production, the key thing is to designing/deciding an monitoring tool. Fortunately NIFI bloaters
with lot of Inbuilt monitoring utilities which helps one to monitor disk usage, memory usage, back pressure etc.. Throughout this
article let’s walk through monitoring data flows via NIFI bulletins which a processor emits during warnings or error.

NIFI converts every bulletin to flow file by using S2S protocol, which in turn can be used for data flow monitoring.

From official documentation,

When sending data from one instance of NiFi to another, there are many different protocols that can be used.
The preferred protocol, though, is the NiFi Site-to-Site Protocol. Site-to-Site makes it easy to securely and efficiently transfer data to/from nodes in
one NiFi instance or
data producing application to nodes in another NiFi instance or other consuming application.

To collect the all the reported bulletin continually, we can run the NIFI s2s reporting tasks. With the help of an input port (placed on
the canvas), the reported data can be received and used in NIFI workflow.

Make sure to use this NIFI monitoring template when following this article for better understanding.

For demonstration, lets run “UpdateAttribute”processor in debug level. Then via s2s reporting tasks, fetch the Bulletin’s as Flow File, process them and send an alert via mail to the appropriate teams.

Use UpdateAttribute processor with Bulletin level of Debug to generate dummy bulletins along with the GenerateFlowFile processor as upstream connection. Make sure the Name of the processors follow an unique convention, which helps to monitor the bulletins efficiently. For example in the following UpdateAttribute processor configuration the naming convention is “<team>.<source>.<useCase>.<Purpose>.<processTask>.<Severity>”

Then, create a new site2site reporting site.

Drag and Drop an Input port from the canvas and give it a name, which should be same as the Input Port Name in the reporting tasks. Once these mundane stuffs are done, NIFI routes the bulletins to this input port.

NIFI emits the bulletins as an JSON array, So the first process is to extract the individual elements in the JSON array using SplitJson Processor.

Once the individual JSON elements are splitted, extract the source name i.e. from where the bulletin is emitted, using EvaluateJsonPath processor.

As, the bulletin source name is the actual name of the processor which we give in the format of “<team>.<source>.<useCase>.<Purpose>.<processTask>.<Severity>”.

Let’s extract the individual attribute from the source name using UpdateAttribute processor.

Use the team attribute to route for the specific processor group.

Remove duplicate bulletins to avoid bombarding with alert messages using DetectDuplicate processor.

Use the LookUpService processor to load email based on the severity levels.

If the severity level is Audit it makes sense to store it in HDFS rather than alerting via e-mail. Hence, use RouteOnAttribute to route the flow files to respective flows like PutMail processor for alerting or merge the contents and dump it in HDFS.

Conclusion

Hope, this walk-through helps you find how easy it is to monitor a NIFI data flow with the help of bulletins and built-in processors. Additionally the generated bulletins also helps in monitoring disk-usage, memory utilization and back-pressure which itself deserve separate article to cover.

Comments

Leave a Reply

XHTML: You can use these tags: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>