Flow Queue Bus - Use this app for synchronizing messages via queues

The application is live! :star_struck:

Intro

This app makes it possible to synchronize messages via named queues. Ideally for triggers that interfere with one another.

The queuing system always works in a very straightforward way:
QUEUE → PROCESS → MARK AS HANDLED/FAILED
A queue is based on the principle ā€œFirst In, First Outā€ (FIFO).

As soon as you queue a new message, processing starts right away, unless there are messages before.
CAUTION: you are responsible for marking messages as handled or failed manually via an action card. Otherwise the queue will become blocked and no new triggers will be fired, because the message that is being processed will remain in the first position of the queue. You can retrieve the current message that is being processed at any time via an action card, if you missed the processing trigger.

As long as a message is not being processed, it can be dequeued. As soon as a message pops from a queue (the processing trigger will fire), it’s considered as processing and can only be marked as handled or as failed to remove it from its queue (with exception of the Clear Queue action card). Messages are NOT persisted. If the app would become restarted, all queues will be empty.

Cards

Triggers (When)

Message dequeued from queue Queue Name

A message has been dequeued via the action card Dequeue message Message ID from queue Queue Name

This card takes following arguments:

  • Queue Name (Text): The name of the queue to monitor

This card will provide information about that message via tokens:

  • Queue Name (Text): The name of the queue where the message was in
  • Message ID (Text): The unique identifier of the message
  • Message (Text): The contents of the message
  • Attempts (Number): The number of times this message was attempted.
    Always 0 because the message is never attempted

Message failed from queue Queue Name

A message has been marked as failed via the action card Mark message Message ID as failed in queue Queue Name and process next (if any)

This card takes following arguments:

  • Queue Name (Text): The name of the queue to monitor

This card will provide information about that message via tokens:

  • Queue Name (Text): The name of the queue where the message was in
  • Message ID (Text): The unique identifier of the message
  • Message (Text): The contents of the message
  • Attempts (Number): The number of times this message was attempted

Message handled from queue Queue Name

A message has been marked as handled via the action card Mark message Message ID as handled in queue Queue Name and process next (if any)

This card takes following arguments:

  • Queue Name (Text): The name of the queue to monitor

This card will provide information about that message via tokens:

  • Queue Name (Text): The name of the queue where the message was in
  • Message ID (Text): The unique identifier of the message
  • Message (Text): The contents of the message
  • Attempts (Number): The number of times this message was attempted

Message processing in queue Queue Name

A message has popped from the queue and is being handled. This is either a new message that has been added to the queue via action cards Place Message in queue Queue Name or Place message in queue Queue Name as only message, or a message that is being retried via action cards Retry message Message ID maximum Maximum Attempts times in queue Queue Name or Retry Message ID maximum Maximum Attempts times, each time with a delay of Delay milliseconds in Queue Name.

CAUTION: you are responsible for marking messages as handled or failed manually via an action cards Mark message Message ID as handled in queue Queue Name and process next (if any) or Mark message Message ID as failed in queue Queue Name and process next (if any).

Retrying a message will keep it in the queue and in the first position.

This card takes following arguments:

  • Queue Name (Text): The name of the queue to monitor

This card will provide information about that message via tokens:

  • Queue Name (Text): The name of the queue where the message was in
  • Message ID (Text): The unique identifier of the message
  • Message (Text): The contents of the message
  • Attempts (Number): The number of times this message was attempted
    Always 1 for new message and a higher number for messages that are being retried

Conditions (And)

Conditions will be added in future versions.

Actions (Then)

Clear queue Queue Name

Clears the entire named queue (including the message that is being processed). Separate triggers will fire for each message that was in the queue:

  • Message dequeued from queue Queue Name (Trigger): for all messages that were dequeued
  • Message failed from queue Queue Name (Trigger): for the message that was being processed

This card takes following arguments:

  • Queue Name (Text): The name of the queue to clear

Dequeue message Message ID from queue Queue Name

Dequeues a message that was not being processed from a named queue. If the message is already being processed, it will not be found by this card. This card will trigger:

  • Message dequeued from queue Queue Name (Trigger): for the message that was dequeued (if found)

This card takes following arguments:

  • Queue Name (Text): The name of the queue
  • Message ID (Text): The unique identifier of the message

Dequeue all non-processed messages from queue Queue Name

Dequeues all messages that have not yet been processed. Separate triggers will fire for each message that was in the queue:

  • Message dequeued from queue Queue Name (Trigger): for all messages that were queued

This card takes following arguments:

  • Queue Name (Text): The name of the queue to clear

Mark message Message ID as failed in queue Queue Name and process next (if any)

Marks the message that was being processed as failed, removes it from the named queue and starts processing the next message (if any). This card will trigger:

  • Message failed from queue Queue Name (Trigger): for the message that was marked as failed

This card takes following arguments:

  • Queue Name (Text): The name of the queue
  • Message ID (Text): The unique identifier of the message that failed

Get the processing message in queue Queue Name

Gets the message that is currently being processed (if any).

This card takes following arguments:

  • Queue Name (Text): The name of the queue to inspect

This card will provide information about that message via tokens:

  • Queue Name (Text): The name of the queue
  • Message ID (Text): The unique identifier of the message
    This will "-" if no processing message was found, otherwise the actual identifier of the message
  • Message (Text): The contents of the message
    This will "-" if no processing message was found, otherwise the actual contents of the message
  • Attempts (Number): The number of times this message was attempted.
    This will 0 if no processing message was found, otherwise a number higher or equal to 1

Get the processing message in queue Queue Name and with default value Default Value when no message is being processed

Gets the message that is currently being processed (if any) and returns a default value (for token message) when no message is being processed.

This card takes following arguments:

  • Queue Name (Text): The name of the queue to inspect

This card will provide information about that message via tokens:

  • Queue Name (Text): The name of the queue
  • Message ID (Text): The unique identifier of the message
    This will "-" if no processing message was found, otherwise the actual identifier of the message
  • Message (Text): The contents of the message
    This will Default Value if no processing message was found, otherwise the actual contents of the message
  • Attempts (Number): The number of times this message was attempted.
    This will 0 if no processing message was found, otherwise a number higher or equal to 1

Mark message Message ID as handled in queue Queue Name and process next (if any)

Marks the message that was being processed as handled, removes it from the named queue and starts processing the next message (if any). This card will trigger:

  • Message handled from queue Queue Name (Trigger): for the message that was marked as failed

This card takes following arguments:

  • Queue Name (Text): The name of the queue
  • Message ID (Text): The unique identifier of the message that finished

Place Message in queue Queue Name

Queues a message onto a named queue. This card will trigger:

  • Message processing in queue Queue Name (Trigger): when that message reaches the first position of the queue and pops

This card takes following arguments:

  • Message (Text): The contents of the message
  • Queue Name (Text): The name of the queue where to place the message

Place Message in queue Queue Name as only message

Queues a message onto a named queue and dequeues all messages that have not yet been processed. Separate triggers will fire for each message that was in the queue:

  • Message dequeued from queue Queue Name (Trigger): for all messages that were dequeued
  • Message processing in queue Queue Name (Trigger): when that message reaches the first position of the queue and pops

This card takes following arguments:

  • Message (Text): The contents of the message
  • Queue Name (Text): The name of the queue where to place the message

Retry message Message ID maximum Maximum Attempts times in queue Queue Name

Retries the message that was being processed immediately with a specified maximum number of attempts. The message remains in the queue. When the maximum number of attempts is reached, the message will be marked as failed and a separate trigger will be fired:

  • Message failed from queue Queue Name (Trigger): for the message that reached the maximum number of attempts

This card takes following arguments:

  • Message ID (Text): The unique identifier of the message to retry
  • Maximum Attempts (Number): The number of times this message can be attempted
  • Queue Name (Text): The name of the queue

Retry Message ID maximum Maximum Attempts times, each time with a delay of Delay millisonds in queue Queue Name

Retries the message that was being processed with a specified maximum number of attempts after the specified delay (in milliseconds). The message remains in the queue. When the maximum number of attempts is reached, the message will be marked as failed and a separate trigger will be fired:

  • Message failed from queue Queue Name (Trigger): for the message that reached the maximum number of attempts

This card takes following arguments:

  • Message ID (Text): The unique identifier of the message to retry
  • Maximum Attempts (Number): The number of times this message can be attempted
  • Maximum Attempts (Number): The delay (in milliseconds) between retries
  • Queue Name (Text): The name of the queue

CAUTION: Using delayed retries will block the queue because the message remains in the queue.

Example

This flow a possible application of the Flow Message Queue.

In this flow a number of triggers are being synchronized into the queue to prevent racing conditions.

From Wikipedia: Race condition
A race condition or race hazard is the condition of an electronics, software, or other system where the system’s substantive behavior is dependent on the sequence or timing of other uncontrollable events, leading to unexpected or inconsistent results. It becomes a bug when one or more of the possible behaviors is undesirable.

This flow handles a number of devices (lights and switches) to act like a group:

  • When one or more lights go on (via the app, via a mood or via another automation), the switch (which has an LED indicator) goes on as well.
  • When the switch turns on it switches on all lights (or activates the specified mood).
  • When all lights turn off (via the app, via a mood or via another automation), the switch goes off too.

All of this would be impossible without a queue that synchronizes the triggers (events), because by setting the group into another state, other (new) triggers will fire.

2 Likes

Got some samples how you use MQ with flows. Not everyone might be familiar with MQ systems. Hopefully you can implement some reporting functions to list the existing queues and the containing messages with their details. Of course persistence would be great as homey restarts and thus app restarts do happen.

Got some samples how you use MQ with flows. Not everyone might be familiar with MQ systems.

I’ve updated the initial post, with better documentation and an example how I currently use in my home(y).

Of course persistence would be great as homey restarts and thus app restarts do happen.

This was not my intention for this app. I wanted the app to be as fast as I can make it (so only in memory). But I can also see an application for persisted queues, so that would be a feature for a future version, where the Homey user can decide. In other words, I would create separate cards for persisted queues with the clear disclaimer: ā€œSlower but saferā€.
Thanks for making the suggestion.

Hopefully you can implement some reporting functions to list the existing queues and the containing messages with their details.

That would also be a nice feature, yes. Thanks again for suggesting it.
The way it is implemented in Chronograph, is absolute top notch. But I’m not very anxious to do frontend development… I will inspect the source code from Chronograph as see if I get my head around it. Definitely something for a future version.

1 Like

Nice that I found this app. Have been lacking the possibility to have messages queues. Unfortunately I have a lot of flows written as standard flows and not advanced once’s. Currently that makes it impossible to queue new messages. Is there a reason for not having the ā€˜Place message in queue Queue Name as’ flow card available for standard flows?

Thanks in advance!

Hi @Persan,

This is something that Homey does automatically. My guess is because the cards return tokens to the flow (as you can see in the documentation above). I’ve done a quick comparison between the list of ā€œadvancedā€ cards in the app store and the token documentation above, and they match.

Maybe someone in the community here could help me with this: how do I mark cards forcibly as ā€œsimpleā€?

I don’t understand why Homey would shield these cards from use in the simple flows….?
If you don’t want to use tokens (advanced flows) or can’t use tokens (simple flows), you can always ignore them…

Any thoughts anyone?

Thanks for your answer. Yes, what you say makes sense. But would it then be possible to have a card that places a message in a queue without giving return values?