Implementing Pipes and Filters Pattern using Azure Integrations Services

In my first blog post, I like to discuss implementing the pipes and filter pattern using Azure Integration Services (AIS).

A Pipe and Filter pattern uses multiple event queues and topics to streamline events’ flow across numerous cloud services. You can implement this by using queues or topics in a service bus namespace and Logic Apps or Azure Functions. Implementing this pattern with the mentioned services allows you to process messages in multiple stages like receiving, validating, processing, and delivering. Moreover, you can also opt for an event-driven approach using Logic Apps, Event Grid, and Functions.

The Pipe and Filter pattern is described in the Microsoft docs as a way to decompose complex processing into a series of separate elements that can be reused, providing improved performance, scalability, and reusability. Each element in the context of Azure Integration Services (AIS) can be either a Logic App or Azure Function and connected through a topic – which can be either a Service Bus Topic or an Event Grid Topic. The latter allows an event-driven approach to the pattern.

To process messages more timely, choosing the Event Grid is more efficient than using a service bus queue. Although you can select the queue or topic as a pipe with the pipe and filter pattern, having each filter subscribe and publish to them. However, it is less efficient as you will need to poll the queue or topic quite frequently.

Assume you receive multiple order files as a batch during the day, and these need to be processed as soon as possible in the shortest amount of time. You would need a way to receive or pick the files, validate them, process them, and subsequently deliver them to one or more sub-systems or locations. In the diagram below, we outlined the scenario.

Pipes and filter pattern implementation diagram

In the scenario, the Logic App function as on- and off-ramp receiving and sending data. The benefit of leveraging Logic Apps for data transport is its versatility in connectors. By leveraging the out-of-the-box connectors, you can consistently use a standard connector, in this case, Secure File Transport (SFTP).

Next, the functions will act as single processing units, i.e., one will store the data, including adding instance properties (context). Finally, another is triggered because the files are stored as a blob in a storage container (blobCreated Event). The chaining of the functions is done through the event mechanism – each storing of a blob results in an event (blobCreated) that another function can subscribe to. And lastly, a Logic App can be triggered to deliver the processed file. Moreover, multiple Logic Apps could be activated to deliver the file to various locations using various connectors if necessary.

The benefit of using functions is that you can have them scaled automatically (and thus also have elasticity) using Serverless mode (consumption) as a hosting option. Or, if you need more compute you can choose premium or dedicated plans. And by adding more compute, the performance of processing the files can be increased.

With the implementation of the pipes and filter pattern described above, you can see that you could easily reuse components (functions), add or remove components or shift around if necessary – hence you also have flexibility (agility). As described in the enterprise integration pipe and filter pattern with the context of the implementation:

Each filter (Function) exposes a straightforward interface (Binding): it receives messages on the inbound pipe (Event Grid Topic), processes the message, and publishes the results to the outbound pipe (Storage Container). The pipe (Event Grid Topic) connects one filter to the next, sending output messages from one filter to the next. Because all components use the same external interface, they can be composed into different solutions by connecting the components to other pipes. We can add new filters, omit existing ones or rearrange them into a new sequence — all without changing the filters themselves. The connection between filter and pipe is sometimes called port. Each filter component has one input port (Event Grid Topic) and one output port (Storage Account) in the basic form.

To summarize, the pipe and filter pattern brings the following benefits:

  • loose and flexible coupling of components (filters)
  • flexibility as it allows filters to be changed without modifications to other filters
  • parallel processing
  • reusability as each component (filter) can be called and used repeatedly

Note that it is less suitable when there are too many components (filters) concerning performance. Individually, the components – functions in our example can be tuned using the available hosting options – however, the sum of all the components determines the overall performance. Furthermore, the pattern is not suitable for interactive systems or long-running processes.

And lastly, with the pipe and filter pattern, there are also some challenges or considerations when implementing it according to the Microsoft documentation. For example, it mentions complexity when filters are distributed across servers. However, in our example, every component is presumably in the same Azure region, and messages are persisted in storage, benefiting from the built-in redundancy (reliability is available as well).

Finally, again in our example, when a message fails to reprocess, it is an option to reprocess it (retry); however, it depends on whether the message has to be reprocessed or dropped in a different container in the storage account. And when that happens, it is up to how monitoring and notifications are set up to troubleshoot and correct the error.