Wolfgang Loder
Wolfgang Loder About Me
Back to Article List

Elixir Broadway Review

May 23, 2019

Elixir Broadway is based on GenStage and gives a template for processing data from different sources in a unified way.

Broadway is in the tradition of the Elixir eco-system to go beyond existing behaviors in Erlang/OTP and to go from specific requirements to a more generic form. As pointed out in the announcement, Broadway’s goal is to significantly cut down the development time to assemble data processing pipelines, while providing many features and avoiding common pitfalls.

Let’s dive into an example. The version released at the time of writing this post (May 23, 2019) is 0.3.0. Broadway is based on GenStage and is related to Flow. GenStage is a behavior that knows the roles of data producers and data consumers.

Between consumers and producers, a communication channel is established. The producer is listening to a data source and emits events to a consumer.

The examples that come with this Broadway release are queues. At the moment, Broadway provides two producers for Amazon SQS and RabbitMQ. There is also a guide to implement a custom producer for other data sources.

Architecture of the example

The example in this post uses a local RabbitMQ server. The following diagram shows the pipeline we want to set up:

In RabbitMQ we have a queue images that is monitored by a BroadwayRabbitMQ.Producer. We will describe the data that is received below in detail. For now, the data describes image metadata.

We set up a module BroadwayTest that uses Broadway and handles incoming events. In the handle_message function that consumer passes the data to a Genstage pipeline where the data is processed.:

  • Resize the image to a default size - in our case we resize it proportionally to 50% size of the original.
  • Add a watermark to the image.
  • Safe the image.

I have taken the GenStage pipeline from a project I am working on.

Note

You can download the source code for this post from this repository. To setup the requirements you need to have a local RabbitMQ and create a queue named “images”. We use ImageMagick for processing the images, so the ImageMagick tools should be accessible on your path.

There is a script for MacOS named resize.sh called from the GenServer. Make sure it has executable rights.

The data in the queue message describes the image metadata:

  • customer_id: Unique id to separate different image processing requests
  • file_name: Name of the file to be processed
  • image_type: Type of the image file to be processed
  • path_from: Path where to find the image file
  • path_to: Path were to write the intermediate processed files
  • file_name_destination: File name of the processed file
  • destination_type: Type of the processed image file

The format of the data in the queue is a simple comma separated file with all data provided, for example.

S3R569,warthog.jpg,jpg,assets/,assets/,processed-01,png

In this example, the image warthog.jpg will be read from the folder assets and the intermediate files will be written to the same folder. The file name of the processed image will be processed-01 and we want to a png format.

The processed image file will get a prefix "S3R556_* automatically, and all files for the customer id S3R556 will be written to a folder named S3R556 in the path defined by path_to.

Running the application

In the folder you downloaded the code of repository mentioned above run mix test —include dummy. This statement will run a simple test to see if everything is working and leaves out all the real tests first.

If successful, you will get an output similar to this:

$ mix test --only dummy

Including tags: [:dummy]
.
Finished in 0.03 seconds
9 tests, 0 failures, 8 excluded
Randomized with seed 496576

To get Broadway you need to add a dependency in mix.ex first and run mix deps.get. when you compile the first time, it will compile several dependency sources, including Erlang files.

defp deps do
  [
    {:broadway_rabbitmq, "~> 0.1.0"}
  ]
end

If you see the following line, then the RabbitMQ server was not started or is not accessible:

14:01:06.505 [error] Cannot connect to RabbitMQ broker

Let’s run the application with iex -S mix. In the code are lots of IO.inspect statements and Broadway outputs several log messages as well. You will get an output like this:

$ iex -S mix
Erlang/OTP 21 [erts-10.3.4] [source] [64-bit] [smp:8:8] [ds:8:8:10] [async-threads:1] [hipe] [dtrace]

15:01:42.335 [info] Application lager started on node nonode@nohost
15:01:42.341 [info] Application xmerl started on node nonode@nohost
15:01:42.343 [info] Application jsx started on node nonode@nohost
15:01:42.347 [info] Application ranch started on node nonode@nohost
15:01:42.348 [info] Application ranch_proxy_protocol started on node nonode@nohost
15:01:42.349 [info] Application recon started on node nonode@nohost
15:01:42.349 [info] Application rabbit_common started on node nonode@nohost
15:01:42.355 [info] Application amqp_client started on node nonode@nohost
15:01:42.355 [info] Application amqp started on node nonode@nohost
15:01:42.358 [info] Application gen_stage started on node nonode@nohost
15:01:42.360 [info] Application broadway started on node nonode@nohost
15:01:42.360 [info] Application broadway_rabbitmq started on node nonode@nohost

The server is waiting for messages to come in. Create on the RabbitMQ web interface on localhost:15672 (if you have not changed the port) a message like this and publish it:

S3R569,warthog.jpg,jpg,assets/,assets/,processed-01,png

The Broadway server will process the message and outputs after all other log messages the following:

ack - successful: [
  %Broadway.Message{
    acknowledger: {BroadwayTest, :ack_imageprocessor, :ack_data},
    batch_key: :default,
    batcher: :special,
    data: %ImageProcessingModel{
      customer_id: "S3R569",
      destination_type: "png",
      file_name: "warthog.jpg",
      file_name_destination: "processed-01",
      image_type: "jpg",
      path_from: "assets/",
      path_to: "assets/",
      status: :special
    },
    status: :ok
  }
]
ack - failed: []

If everything worked, we see that in the list successful is the message we have sent. A folder assets/S3R556 was created and in there we find the processed image with the name S3R569_processed-01.png.

The size of the original image is 1040 * 493 pixels, it was resized to 520 * 247 pixels and a label as watermark added.

How does it work? We have to implement in our Broadway module the callbackhandle_message:

def handle_message(processor, message, context) do
  # create a Broadway message from the csv input
  updatedmessage = Transformer.transform_queue_message_to_model(message.data, [])
                   |> Message.update_data(&process_data/1)

  # check the returned message status if we should push it to a certain batcher
  case updatedmessage.data.status do
    :ok -> updatedmessage # this will use the batcher with the name default

    # this will use the batcher with the name special
    :special -> Message.put_batcher(updatedmessage, :special)

    # the status says the message processing failed
    _ -> Message.failed(updatedmessage, "image processing failed")
  end

  defp process_data(data) do
    Imageresizer.process(data) # pass the message to the GenStage server
  end
end

This function is called by Broadway when the producer has a message received. First, we have to transform our message from the CSV format into a Broadway message. In the ack log output, we see the structure of this message. It has several keys, one of them data which shows the output of the Transform module. ImageProcessingModel is a representation of the original CSV and also defines an acknowledger which we will discuss later.

The function transform_queue_message_to_model:

def transform_queue_message_to_model(message, _opts) do
  model = create_model(message)
  %Message{
    data: validate_model(model),
    acknowledger: {BroadwayTest, :ack_imageprocessor, :ack_data}
  }
end

defp validate_model(model) do
  model
end

defp create_model(message) do
  list = String.split(message, ",")
  model = %ImageProcessingModel{
    customer_id: Enum.at(list, 0),
    file_name: Enum.at(list, 1),
    image_type: Enum.at(list, 2),
    path_from: Enum.at(list, 3),
    path_to: Enum.at(list, 4),
    file_name_destination: Enum.at(list, 5),
    destination_type: Enum.at(list, 6),
    status: :ok
  }
  model = if String.starts_with?(model.customer_id, "S") do
    %{model | status: :special}
  else
    model
  end
  model
end

The private function create_model is doing all the work. It takes the string from the queue message and creates the internal ImageProcessingModel which is then passed as data to the Boadway.Message. This function also sets the status to indicate which batcher should be used. In the example, we check if the first letter of the customer_id is an “S”.

I have not implemented validation in this example.

In transform_queue_message_to_model we also have to pass the key acknowledger to Broadway.Message. This is an enforced key, so it must be defined.

acknowledger: {BroadwayTest, :ack_imageprocessor, :ack_data}

We define that our Broadway module BroadwayTest is the acknowledger and give it the name ack_imageprocessor. The third argument could pass user-defined data, but we use the atom ack_data. Since we define the acknowledger, we also have to implement the callback ack in *BroadwayTest.

def ack(ack_ref, successful, failed) do
  IO.inspect(ack_ref, label: "ack - ack_ref")
  IO.inspect(successful, label: "ack - successful")
  IO.inspect(failed, label: "ack - failed")
end

The callback in the example does not do anything except logging the arguments, but it could be used to do an ack, or nack or anything special depending on the used data source.

Regarding acknowledgment of the queue message: I saw that the RabbitMQ message is consumed, but its status in RabbitMQ is still unacked. Am not sure if this is due to my implementation or it is not fully implemented in the RabbitMQ producer.

Also, there is a key transformer in the producer configuration in start_link (not used in this example) that should do the transformation automatically. I could not get it working as well. If later versions behave differently or I find out how to implement it correctly, I will update this post.

Implementation and Features

The code for implementing the Broadway server is without IO.inspect statements about 60 loc. Of course, the actual processing of the data is only a one-liner, because the actual work is outsourced to a GenStage server in this example.

As mentioned before, Broadway itself is based on GenStage. All the configuration of the pipeline is done in the function start_link. The following code shows this function with comments for some configuration options.

def start_link(_opts) do
  Broadway.start_link(__MODULE__,
    name: RabbitBroadway,
    context: %{},
    producers: [
      default: [
        module: {BroadwayRabbitMQ.Producer,
          queue: "images",
          qos: [
            prefetch_count: 10,
          ]
        },
        stages: 2
      ]
    ],
    processors: [
      default: [
        stages: 50
      ]
    ],
    batchers: [
      default: [
        batch_size: 10,
        batch_timeout: 1500,
        stages: 5
      ],
      special: [
        stages: 1,
        batch_size: 5,
        batch_timeout: 1500,
      ]
    ]
  )
end

When we look at the example GenServer that does the image processing work, we see that there is only one producer, one producer_consumer and one consumer. The example implementation is very basic and “primitive”. In a production environment, we would need to implement a supervisor for the pipeline, implement dynamically starting pipeline stages, and more.

Broadway gives us exactly this for free. Stages are the steps in the Broadway pipeline and are configured in start_link with the following lines:

# producers
stages: 2

# processors
stages: 50

# batchers
stages: 5

We define the concurrency level for each step in the Broadway pipeline and thus define the concurrency level of the solution. If many messages come in the Broadway server can create more producers and/or processors and batchers. The configuration with 50 processors is probably overkill for our example, but we can easily adjust it during testing.

Back Pressure

Back-Pressure is a built-in feature of Broadway and is essential not to overflood the pipeline with messages. In the producer RabbitMQ that is part of the Broadway repository the prefetch_count is configuring the back pressure.

prefetch_count: 10

We set it to 10; the default value is 50. That means that no more than 10 messages will be sent from RabbitMQ at one time. Other producers will have to implement other measures according to the data source to implement back pressure.

Partitioning with Batches

In the example, we group messages by the first letter of customer_id. Again, this is a simple implementation to show the feature. In a production system, any criteria could be used. Partitioning is useful to distribute the processing of incoming messages. The processors are still the same, but could internally branch into different processing algorithms.

Each batch can define its concurrency level with the stages key.

Error Handling

Errors in the example can happen on two levels:

  • The message is not well-formed, for example, the metadata does not contain a UID, or the image name is not formed according to the specification.
  • The processing returns an error. This could be for various reasons; for example, the image can" t be found in the specified path or the image data is corrupt.

I have implemented only the second reason. The GenStage processing is spawning tasks twice to resize the image or add a watermark. The return value of Task.await indicates success or failure, and this value is then propagated to the other stages in the GenServer and eventually to the Broadway server.

In handle_message we can then set the message as failed:

Message.failed(updatedmessage, "image processing failed")

Broadway.Message provides the function failed . In the ack callback we can see then that the failed message is in the list failed. If we want to push the message into another queue on RabbitMQ, we have to implement this ourselves in the current release of Broadway in this callback. I plan a future post about this, which involves a different low-level module * BroadwayRabbitMQ.AmqpClient*.

Testing

Sometimes it may be necessary to send messages directly to the Broadway pipeline. One example is testing, but other situations like using the pipeline without a queue may be possible.

We can send direct messages with the function test_messages from the Broadway module.

@tag :broadway
test "broadway pipeline integration test" do
  ref = Broadway.test_messages(RabbitBroadway, ["S3R556,warthog.jpg,jpg,assets/,assets/,processed-01,png"])
  # assert_receive {:ack, ^ref, successful, failed}
  Process.sleep(1000)
  IO.inspect(ref, label: "Reference")
end

We pass the message to the function together with a reference to our Broadway server implementation and get a ref returned. The commented line after the call should assert the received message from the ack callback, but I could not get it to work. Most probably, there is something I did not yet understand, and I will update the post and code as soon as I figure it out with the help of the community.

The sleep statement is only necessary to let the log messages display before the test process closes. It is not needed for production code, because the server process will run until aborted manually.

This post does not describe the tests for the GenServer code and other modules implemented, but in the Github repository, you can find tests for it.

Conclusion

I like Broadway a lot. It is a step into the right direction to simplify data processing pipelines and to minimize code to write. Future releases will add more features as already proposed.

I hope this more complex and real-world example gives you an overview of Broadway and its abilities.