In this post, I try to provide a simple example to demonstrate how wait/notify works in nifi. We're going to look at a scenario, where there is a list of files to be processed. But, we want Nifi to process the files one at a time (or say max n at a time) due to downstream performance restrictions. We're going to use 'GenerateFlowFile' to generate the scenario where there are lot of files to be processed.
We then connect this directly to a 'Wait' processor. The wait processor waits for the signal from a corresponding processor. When it recieves the signal, it proceeds via the 'success' relationship. So, we first 'route' the expired/error relationship to a simple 'LogMessage' processor which just logs a message. The 'wait' relationship is terminated.
The 'success' relationship moves to the next step of actually processing the file. For our illustration purposes we just use a 'LogAttribute' processor to simulate the processing of the file. In our case, we log the flowfile name. This completes our first part.
The wait processor needs the 'DistributedCache' services (we can set this up easily). I use a signal identifier 'testSignal'. This signal name will have to be used in the 'Notify' processor too.
We can actually run the flow now. The 'GEnerateFlowFile' will crank out files and we can see all of them actually waiting. The 'queue' from Gen to 'Wait' will be filled up.
Now, let's come to the notify part. The scenario that I am trying to simulate is like: we get a hit/response from an external source and for every hit we can process one file. To demonstrate this, I am going to use a 'HandleHttpRequest' processor. This starts a HTTP server listening for incoming requests. On getting a request, we route the 'success' to 'Notify' processor which uses the same signal text. The success of 'Notify' leads to 'HandleHttpResponse' which then completes the http request with a response code of 200.
We can now run the entire flow. Using curl we can hit the endpoint like, http://localhost:5050 as this is a simple get request (alternatively you can paste the URL in the browser and hit refresh). If we tail the nifi-app.log file, we can see that, for every hit we do via curl/browser, the wait releases one file and the file name gets printed in the log.
The template for this example is available in my github repo.
We then connect this directly to a 'Wait' processor. The wait processor waits for the signal from a corresponding processor. When it recieves the signal, it proceeds via the 'success' relationship. So, we first 'route' the expired/error relationship to a simple 'LogMessage' processor which just logs a message. The 'wait' relationship is terminated.
The 'success' relationship moves to the next step of actually processing the file. For our illustration purposes we just use a 'LogAttribute' processor to simulate the processing of the file. In our case, we log the flowfile name. This completes our first part.
The wait processor needs the 'DistributedCache' services (we can set this up easily). I use a signal identifier 'testSignal'. This signal name will have to be used in the 'Notify' processor too.
We can actually run the flow now. The 'GEnerateFlowFile' will crank out files and we can see all of them actually waiting. The 'queue' from Gen to 'Wait' will be filled up.
Now, let's come to the notify part. The scenario that I am trying to simulate is like: we get a hit/response from an external source and for every hit we can process one file. To demonstrate this, I am going to use a 'HandleHttpRequest' processor. This starts a HTTP server listening for incoming requests. On getting a request, we route the 'success' to 'Notify' processor which uses the same signal text. The success of 'Notify' leads to 'HandleHttpResponse' which then completes the http request with a response code of 200.
We can now run the entire flow. Using curl we can hit the endpoint like, http://localhost:5050 as this is a simple get request (alternatively you can paste the URL in the browser and hit refresh). If we tail the nifi-app.log file, we can see that, for every hit we do via curl/browser, the wait releases one file and the file name gets printed in the log.
The template for this example is available in my github repo.