Skip to main content

Volga: Distributed event streaming

Introduction

Volga is a multi tenant high performance lightweight event streaming implementation tailor made for Avassa Supd. Terminology and functionality in Volga is similar to other event streaming implementations such as Pulsar and Kafka.

In an Avassa environment, Volga runs on all hosts, on edge sites as well as on the Control Tower hosts. Each site automatically forms a Volga cluster where all hosts at the site participate in the Volga cluster. Volga utilizes the well known publish/subscribe pattern whereby producers produce messages to a topic, and consumers subscribe to a topic, and consume messages from the topic. Volga runs inside Avassa Supd, and requires no configuration from the end user to be used. It is used internally by Supd, and can be easily used by tenant code.

Volga can be used for:

  • Event sourcing. An application wants to propagate a configuration change to all sites. This is best done by

    • Post change into a volga topic at the Control Tower.
    • Consume change at all edge sites.

    This is a good and proven way to organize loosely coupled distributed applications.

  • Aggregation. An application with a large number of remote sites where large amounts of data is produced at the remote sites. Due to either cost or bandwidth, data must be aggregated and refined before it is pushed further up in the network. Such an application would publish data on one or several Volga topics locally at each site. Data would typically then be consumed and refined locally and either re-produced to an aggregation topic locally or produced remotely to an aggregation topic at the Control Tower.

  • Distributed metrics. Metrics generated at edge sites is best produced to local site Volga topics. To query and search such metrics a request to search can be posted to Volga at the Control Tower. This is a means to have all metrics data distributed without ever accumulating data centrally. If the number of edge sites is very large and amount of metrics data is large this is a good way to avoid central bottlenecks.

Topics

A topic is a a named stream of a data where data items are ordered by the time they were published to the topic. The data may be replicated over multiple hosts in the Volga cluster. A topic is automatically created the first time it is opened, regardless of whether it is a consumer or a producer that opens the topic. Topic data may be persistent on disk or RAM only.

A Volga topic has certain characteristics. One is replication-factor, which is given the first time the topic is created. If you have replication factor 3, all data produced to the topic will be replicated at 3 nodes. The nodes are chosen by Volga when the topic is created. As long as there is a majority working, the topic will continue to operate.

A topic has an arbitrary number of named producers, and an arbitrary number of named consumers. A producer publishes data to the topic, and a consumer consumes data from the topic. Data in a topic is persistent, and a consumer can go down and up again, and continue to consume from where it left off.

Topic location

Regardless of the actual host you make the connection to, there are three different ways of connecting to a topic, specified by the location parameter:

  1. local - used when you have connected directly to one of the hosts at the site where the topic resides.

  2. child-site - used to connect to a topic at an edge site via the Control Tower.

  3. parent - used to connect from an edge site to a topic at the Control Tower.

Topic properties

When a topic is created, i.e the first time it is opened, a number of properties are attached to the topic. Most options are permanent and cannot be changed. You have the following topic properties.

  • persistence Either of disk or ram. Default is disk. Persistent topics may be replicated, whereas RAM based topics are not replicated. However, if the host with a RAM based topic fails, the topic will be immediately moved to another host, and all clients will automatically reconnect to the new host, thus on RAM topics, messages can be lost. Volga ensures that disk topics are evenly distributed between the hosts in the site. If manual control over topic placement is needed, host labels can be used to restrict specific topics to certain subsets of hosts.

  • match-host-labels A label expression that determines which hosts are eligible to replicate this topic. The label expression is evaluated when the topic is created. To re-evaluate it (and potentially move the topic), run the redistribute action on the topic.

  • replication-factor A topic can be replicated on multiple hosts in a Volga cluster. Choose an odd number, since a majority is required for the topic to be operational.

  • local-placement If set to true, the host executing the request to create the topic is guaranteed to be included in the replication set.

  • num-chunks Data for a topic is kept on local disk for the host(s) running the topic. Data is divided in chunks on disk, the size of each chunk is 1 Meg and the default value for num-chunks is 1000, giving 1 Gigabyte by default per topic and per replica. Data aging is automatic, and not based on time, but on size. Thus when chunk 1001 is eventually needed, chunk 1 is discarded.

  • max-size Rather than setting a fixed number of chunks for a topic, you can specify its maximum allowed size, eg 10MB or 1GB. Internally, Volga will create as many chunks as needed, and remove old chunks too keep the topic size below max-size.

  • max-days Sets the maximum number of days a message is allowed to exist on the topic. Older messages will be deleted. In practice, what happens is that every midnight, a new chunk is started and any chunk that is max-days old is deleted. So if max-days is 3, any message that is more than 48 hours old will be deleted at midnight. Note that even if max-days is set, the topic still has an upper size limit set by either max-size or num-chunks, meaning messages can also be deleted because the topic is full.

  • format Set to either string (the default) or json. If the messages produced on a topic are json objects, setting format to json will make for nicer looking json messages on the receiver end. An example of this is provided in the Messages chapter.

  • ephemeral If set to true, the topic will be automatically deleted when it no longer has any consumers or producers. Only available in the websocket API.

Getting started

The easiest way to get started with Volga topics is to use the supctl tool to create a few topics an play around with them.

Rest API

When scripting, it is easier to use a regular REST API to access Volga than to use the Websocket API. The REST API is not as versatile as the Websocket API, but can still be useful. There are 3 different paths that can be POSTED to.

  • /api/v1/state/volga/topics/{topic-name}/consume
  • /api/v1/state/volga/topics/{topic-name}/produce
  • /api/v1/state/volga/topics/{topic-name}/create-topic

In the REST API to Volga, topics are not automatically created as they are in the Websocket API. Each method takes input parameters that are described in the Avassa reference documentation for the REST API.

The REST API to Volga is nicely accessible from supctl, for example:

supctl do volga create-topic foo string
supctl do volga topics foo produce "Hello Avassa"

Will create a new topic foo with payload format string and publish one message to the topic. The REST API to produce is fairly inefficient, since the code will open the topic, publish one message and then close the topic. In the Websocket API data is streamed from the client to the topic.

To check that the message is there in the topic, we can do

supctl -d .acme do volga topics foo consume
{
"time": "2022-03-29T15:05:06.801Z",
"seqno": 1,
"remain": 24,
"payload": "Hello Avassa",
"name": "REST-api",
"mtime": 1648566306801,
"format": "string"
}

There are several useful flags that can be passed on the REST api by supctl For example the above command is equal to

supctl do volga topics foo consume --position end

If we TAB supctl we get:

supctl do volga topics foo consume --end-marker TAB
-h --position
--follow --position-sequence-number
--help --position-since
--ignore-all-filters --position-timestamp
--input --re-match
--payload-only --topic-tag

We'll not describe everything in detail here, all parameters have detailed information in the reference documentation.

Let's just do a session and comment it.

supctl -d .acme do volga topics foo consume --position since  --position-since 0s

Nothing was printed, and command returned.

supctl -d .acme do volga topics foo consume --position since  --position-since 0s --follow

Command hangs, waiting for data to be produced to the topic

supctl -d .acme do volga topics foo consume --payload-only
Hello Avassa

When manually viewing a topic, it's often more pleasing to the eye, to only look at the payload, without the Volga meta data. For example:

supctl do volga topics system:audit-trail-log consume \
--payload-only --follow

Will follow the audit log at a site and only show the actual audit log entries without the Volga extra data.

supctl do volga create-topic bar json

Creates a new topic which should contain entries in json format. One of the reasons for forcing the user to specify the format of the topic, i.e string or json, is to allow for prettier data when consuming. For example the audit log, is a "format json" topic. Try to view the the audit log without the payload-only flag, and see that the payload is now embedded in the surrounding JSON object as opposed to a quoted JSON object.

supctl do volga topics bar produce '{"abc": "aa"}'
supctl do volga topics bar consume
{
"time": "2022-03-30T09:49:56.707Z",
"seqno": 1,
"remain": 24,
"payload": {
"abc": "aa"
},
"name": "REST-api",
"mtime": 1648633796707,
"format": "json"
}

As a final note on the supctl API towards Volga, all of the above commands were run towards the Control Tower, creating the topics at the Control Tower. Even though supctl talks to the Control Tower it can connect to topics on an edge site using the --site parameter.

supctl do --site udc volga create-topic bar json
supctl do --site udc volga topics bar produce '{"abc": "aa"}'
supctl do --site udc volga topics bar consume
{
"time": "2022-03-30T09:56:14.266Z",
"seqno": 1,
"remain": 24,
"payload": {
"abc": "aa"
},
"name": "REST-api",
"mtime": 1648634174266,
"format": "json"
}

We can view the stats for the newly create topic by:

supctl show --site udc volga topics bar
name: bar
tenant: acme
labels: {}
format: json
seqno: 1
chunkno: 1
number-of-chunks: 1000
creation-time: 2022-03-30T09:55:54.526Z
assigned-hosts:
- udc-002
leader-host: udc-002
worker-hosts: []
requested-replication-factor: 1
current-replication-factor: 1
persistence: disk
size: 976.56 KiB
oldest-entry: 2022-03-30T09:55:54.525Z
dropped-chunks: 0
consumers: []
producers: []

Websocket

To access Volga over a websocket, an authenticated websocket to one of the hosts at a site is required. Once authenticated, you receive a Strongbox token that must be used in the websocket Upgrade procedure.

You login by TLS connecting to Avassa Supd at the api address for the site and post the login data to /v1/login

{
"username": "joe@popcorn-systems.com",
"tenant": "popcorn-systems",
"password": "verysecret"
}

Which upon success could return:

{
"token": "0d908c6b-f900-4187-bb00-394cd7275fed",
"expires-in": 1209600,
"accessor": "41182e6e-7027-4bdd-8b35-e44b4e0d8204",
"creation-time": "2021-06-07T15:23:54.629168Z"
}

It is the token in the above JSON structure you need to extract and provide when you upgrade the socket to a websocket. The upgrade procedure entails:

  • GET /v1/ws/volga
  • Provide the Upgrade header
  • Provide the Authorization header with value Bearer "0d908c6b-f900-4187-bb00-394cd7275fed

If successful, you can use the websocket to access the Volga API.

Messages

The Volga message is what is consumed by a subscribing consumer. It is a JSON structure with the following components.

  • mtime The timestamp when the message was created. The timestamp is set by the Volga application. It has millisecond granularity, and the time is current time retrieved from the underlying OS. Note that the timestamp is generated immediately by the server where the message is produced, but due to things like network latency or connectivity issues, the message might arrive at its topic at a somewhat later time. This means that a consumer is not guaranteed to receive messages in timestamp order.

  • time The RFC 3339 representation as a string of mtime.

  • seqno The sequence number of the message in the topic. This is an ever increasing number.

  • remain An integer denoting how many messages remain until you have to invoke more() to ask for more data.

  • name The name of producer that produced this message.

  • end-marker An optional field which is set to true if the consumer asked for end-marker and also there is no more data to be consumed, that is the consumer would hang, waiting for more data.

  • payload The data, either as a string or a json object, depending on the format of the topic.

A message is received over the Websocket interface as a JSON structure.

String topic example

The produced message:

{
"op": "produce",
"sync": "sync",
"payload": "Some Data"
}

The received message:

{
"time": "2021-06-07T16:21:11.204+02:00",
"seqno": 3,
"remain": 1,
"producer-name": "myprod",
"payload": "Some Data"
"mtime": 1623075671204,
"host": "myhost"
}

JSON topic example

Topic topic format is json. The produced message:

{
"op": "produce",
"sync": "sync",
"payload": {
"a-list": [1, 2, 3]
}
}

The received message:

{
"time": "2021-06-07T16:21:11.204+02:00",
"seqno": 4,
"remain": 1,
"producer-name": "myprod",
"payload": {
"a-list": [
1,
2,
3
]
},
"mtime": 1623075671204,
"host": "myhost"
}

Connection procedure

It is important to understand the relationship between the client websocket connection and the Volga topic. Let's explain by a few examples. Assume an Avassa environment consisting of a Control Tower with three hosts, and a single edge site, also consisting of three hosts. Thus you have the following 6 hosts:

  • e1.edgesite0.movie-theater-owner.com
  • e2.edgesite0.movie-theater-owner.com
  • e3.edgesite0.movie-theater-owner.com
  • ct1.control-tower.movie-theater-owner.com
  • ct2.control-tower.movie-theater-owner.com
  • ct3.control-tower.movie-theater-owner.com

With the above set of hosts and sites, there will be two separate Volga clusters, one at each site. The topics in these clusters can be accessed in the following ways:

  • An application running on e1 can connect to e1 and request a topic with location=local and topic=mytopic. This is the simplest case of them all, since no additional connections are made.

  • An application running somewhere on the internet can connect e1.edgesite0.movie-theater-owner.com and request a topic with location=local and topic=mytopic. This also does not result in any additional connections and will connect to the same topic as in the example above, but connecting to an edge site from the outside might not always be possible.

  • If there are NAT constraints or firewall rules preventing direct access to the hosts at edgesite0, an application running somewhere on the internet can instead connect to ct1.control-tower.movie-theater-owner.com and request topic=mytopic with location=child-site and child-site=edgesite0 to connect to the topic mytopic at edgesite0.

  • An application that runs on e1 can connect to Volga at e1 and request to open a topic with topic=fooand location=parent. This will instruct Volga on e1 to connect to the foo topic on one of the hosts at control-tower.

Thus the client connection to a host in a Volga cluster is one thing, and the location of the topic another.

An example producer/consumer session

In this section we will show the JSON messages required to open a topic and produce your first Volga message and then consume it. All concepts will not be explained in this section.

  1. First login to one of the hosts at a site according to the login/Websocket-Upgrade procedure described above.

  2. Open a producer by sending on the websocket:

    {
    "op": "open-producer",
    "location": "local",
    "topic": "mytopic",
    "name": "myprod",
    "create-options": {
    "replication-factor": 1,
    "persistence": "disk"
    }
    }

    This will return JSON

    {
    "result": "ok"
    }

    At this point a Volga producer is attached to the websocket. There can be only one producer per websocket.

  3. Produce data on the websocket by sending:

    {
    "op": "produce",
    "mode": "sync",
    "payload": "Hello Avassa"
    }

    If mode is sync a response is received, where if mode is async no response is received. If successful here, you'll receive

    {
    "result": "ok"
    }
  4. Now open an additional socket to a host at the same site , and login on that socket as well.

  5. Open a consumer by sending:

    {
    "op": "open-consumer",
    "location": "local",
    "topic": "mytopic",
    "position": "unread",
    "name": "mycons",
    "mode": "exclusive",
    "create-options": {
    "replication-factor": 1,
    "persistence": "disk"
    }
    }

    Now you have an active consumer on the websocket.

  6. A basic Volga concept is that you have to request data by invoking the more operation. Send:

    {
    "op": "more",
    "n": 10
    }

    This means that you will receive at most 10 messages, after that you'll have to issue another call to more in order to get more data. A common user error with Volga is to forget to issue the more command. If you are not receiving any messages from a topic. you are either at the end, or have not issued more. This shows nicely as more-n: 0 in the supctl show volga topics ... for the Volga topic.

  7. At this point you'll start receiving messages.

    {
    "time": "2021-06-07T16:21:11.204+02:00",
    "mtime": 1623075671204,
    "seqno": 1,
    "remain": 9,
    "name": "myprod",
    "payload": "Hello Avassa"
    }

    You can use supctl to inspect the state of your topic. Assuming the topic resides at an edge site, called e0, you can, towards the Control Tower, issue:

    supctl show --site e0 volga topics mytopic
    topic: mytopic
    tenant: popcorn-systems
    format: string
    seqno: 1
    chunkno: 1
    number-of-chunks: 1000
    created: 2021-06-08T13:13:24.675Z
    assigned-hosts:
    - e0-002
    leader-host: e0-002
    worker-hosts: []
    requested-replication-factor: 1
    current-replication-factor: 1
    persistence: disk
    size: 976.56 KiB
    oldest-entry: 2021-06-08T13:13:24.674Z
    dropped-chunks: 0
    consumers:
    - consumer-name: mycons
    more-n: 9
    last-ack: 0
    buffered: 0
    mode: exclusive
    consuming-host: e0-001
    producers:
    - producer-name: myprod
    producing-host: e0-001

Additional opts

As you saw in the previous section, a create-opts struct is passed as a field in the JSON when you open a producer or a consumer.

These options are the topic properties, used when the topic is initially created. These are, persistence, replication-factor, local-placement, num-chunks/max-size and format.

You have a set of additional fields:

  • mode which is either of exclusive, standby or shared. This will be described in the Consumer mode section below. This option is only valid for consumers.

  • control-message. If set to true, the client gets notified on the connection status of the topic. The client will receive JSON messages on the form of

    {
    "connected" : "true"
    }
    {
    "connected" : "false"
    }

    This is useful if the client truly needs to know the actual connection status. Usually it is easier programming on the client side if current connection status is unknown.

    When a client opens a topic, there are a number of transient errors the client is not by default exposed to. For example if replication-factor is 3, and there is no current majority for the topic, Volga will return an opened topic to the client and continue in the background to establish a majority. If the control-message is set to true, the client will be notified once a majority for the topic has arrived.

  • on-noexists When opening topics at remote sites, or opening topics where you are not the "owner" of the topic, i.e some other application's topic, it is highly recommended that you don't accidentally create the topics with options that are wrong. If on-noexists is wait, and the topic doesn't exist, this is considered a transient error, and the topic open operation will succeed, and once the topic is actually created, the topic will start to function. The caller will receive a handle which appears to be working topic.

  • topic-tag Sometimes a topic is shared by multiple applications, this is often the case for "Volga infra" which is described in its own chapter below.

  • end-marker If this option is used by a consumer, a message with no payload, and the field end-marker: true will be delivered to the client when there is no more data to be read from the stream, i.e when the client would hang. Note that the end marker is only sent once per session. If the client keeps the connection open after receiving the end marker, it will receive new messages produced on the topic but no new end marker.

  • local-placement If this option is used, it is guaranteed that the host creating the topic will be part of the replication set.

Producing

A producer is attached to an authenticated websocket. Each produced message is either asynchronous or synchronous. In general throughput is considerably higher for asynchronous messages. In asynchronous mode, messages are simply streamed and if the producer is faster than the consuming Volga cluster, eventually the producer will block on TCP write.

If you produce synchronous, not until data gets written to disk, the call to produce returns. Data gets written by write() but without any accompanying fsync(). To ensure that use the Volga sync operation.

It is possible to produce multiple messages asynchronously in a sequence, and then do one synchronous message, this will ensure that all data produced is written to disk.

Even with an asynchronous producer, it is possible to issue the sync command over the websocket. This guarantees that all produced data is written to disk. This is a fairly costly and slow command.

{
"op": "sync"
}

If the websocket client produces asynchronous data faster than Volga is able to deliver the data, eventually the client will block on TCP write.

If the websocket is closed, the producer is implicitly closed.

Consuming

Similar to a producer, a consumer needs an authenticated websocket. To attach a consumer to the websocket, you need to send a JSON structure with op open-consumer, a location and a topic name. The typical consumption pattern is - using pseudo code:

topic = "mytopic";
sock = websockets_connect(host, user, password);
mode = "exclusive";
create_opts = {replication_factor = 3;
on_no_exist = create
}
position = "unread";
name = "myconsumer";
c = volga.open_consumer(sock, "local", topic, position,
name, mode, create_opts);
volga.more(c, 100);
while(true) {
msg = read_volga_msg(c);
if (msg.remain == 0) {
volga.more(c, 100);
}
do_process_msg(msg.payload);
volga.ack(c, msg.seqno);
}

Position is required and you have the following positions to choose from.

  • unread This is the mode to choose when you subscribe to a topic and want to consume messages once, and once only. The unread mode is typically combined with ack operation. Acknowledging a message, using the sequence number from the Volga message indicates that you are done with that message. So, if the system restarts, and you re-issue the open-consumer call with position unread, you'll get messages from where you last acked. Each consumer has a unique name, thus you can have arbitrarily many consumers to a topic and the acknowledge is kept on disk for each named consumer.

  • end This starts consumption at the end of the topic stream.

  • beginning This starts consumption at the beginning of the topic stream.

  • seqno It is possible to start consuming from a specific sequence number.

  • since Start reading the events from a timestamp, the format of the parameter is [Num]s, [Num]m or [Num]h. For example '10s' will start reading event logs 10 seconds back in time from now.

  • timestamp Volga messages contain a timestamp. You can start consuming from an arbitrary millisecond timestamp. This can sometimes be useful for tail type of applications if you - say - wish to start consuming from a stream from minutes ago and forward.

Consumer modes

The most commonly used consumer mode is probably exclusive. The semantics of the mode is that in a single Volga cluster, for a specific topic, there can only be one consumer with a specific name. If a second consumer tries to open the same topic, using the same name, the first consumer is closed and over-taken by the second one.

On the other hand, if your application runs on multiple hosts at the Avassa site and you wish to have one consumer only, but you don't really care on which host the consumer runs, the standby mode is useful. Here you can have multiple consumers, running on multiple hosts in the cluster or on the internet, all consuming from the same topic, using the same name. Only one of them will receive any Volga messages, the remaining consumers are standing by. If the active consumer closes, one of the standby consumers is chosen and it'll start to receive messages.

The new consumer, the one that was standing by, will receive messages from where the original consumer last acknowledged a message. Thus it is possible for the following scenario to occur:

  1. Consumer A receives message with seqno 13.
  2. Consumer A process that message.
  3. Consumer A dies before being able to acknowledge message 13.
  4. Consumer B gets activated.
  5. Consumer B get message 13 one more time.

If processing of messages is a CPU intensive activity, you may choose the shared mode for the consumer. Here you can also have multiple consumers to the same topic using the same name. In this mode consumers get Volga messages in a random fashion. A use case for this could be a case where a producer is producing image data and you wish to attach a consumer to that stream for image processing. However, if the producer is producing data faster that the consumer can process the data, you need multiple hosts to consume the data fast enough.

A consumer will stay open forever. If it loses majority, the consumer will try to reestablish majority forever. Naturally you'll not be receiving any messages from it if the majority disappears. If it is important for the application to act if majority is lost, the consumer must use the control-message flag when opening the topic.

Open errors

There are quite a few things that can go wrong when opening a topic, many of them are masked internally by Volga though, they are considered transient errors. These include e.g not being able to gather a majority for the topic. By default the client gets back what appears to be a functioning consumer or producer, whereas in reality, Volga tries to remedy the situation by retrying to open. If this behavior is not desired, use the control-message option.

Encryption

The traffic from the websocket client to the REST server in Supd is encrypted using regular HTTPS. It's possible to extract the certificates from strongbox. When you connect to the REST server and request to open a topic on another site, ie setting location to child-site or parent, Volga at one site will connect, using mutual TLS from one site to another. The certificates to do this are generated, maintained, and distributed internally by Strongbox. Thus, site-to-site Volga traffic is always encrypted. Volga uses an internal protocol between sites that runs over port 443.

The child-site parameter

It is not unusual for edge sites to reside behind NAT firewalls, making them inaccessible from the internet. However, since all edge sites connect to the Control Tower, you can still access them via a host at the Control Tower.

To connect to a topic at an edge site, connect to a host at the Control Tower and open a producer or consumer with location=child-site and child-site=sitename, where sitename is the name of the edge site.

Apart from the location and child-site parameters, the API functions exactly the same as when making direct connections.

Volga Status

In order to troubleshoot Volga issues, looking at the Volga status at the different sites is recommended. The detailed meaning of all the fields in the status output is properly documented in the Volga Reference Documentation. You can look at clients and topics at a site. These are not quite the same. Say that you connect with websocket to a host in the Control Tower, and request a topic at an edge site using location=child-site, the client resides at the Control Tower, whereas the topic resides at the edge site.

supctl show --site edge volga clients foobar
id: 1
name: mycons2
location: local
topic: mytopic2
tenant: popcorn-systems
host: edge-001
kind: consumer
num-messages: 1
status: up
status-info: consuming-locally
reconnects: 0
time-connected: 42m13s

If you look at a topic at a site, you get

supctl show --site edge volga topics mytopic2
topic: mytopic2
tenant: popcorn-systems
format: string
seqno: 1
chunkno: 1
number-of-chunks: 1000
created: 2021-06-18T08:11:49.916Z
assigned-hosts:
- edge-002
leader-host: edge-002
worker-hosts: []
replication-factor: 1
persistence: disk
size-megabyte: 1
oldest-entry: 2021-06-18T08:11:49.916Z
dropped-chunks: 0
consumers:
- consumer-name: mycons2
more-n: 9
last-ack: 0
buffered: 0
mode: exclusive
consuming-host: edge-001
producers:
- producer-name: myprod2
producing-host: edge-001

Volga Infrastructure

On top of the Volga topics, we have a local Avassa infrastructure that can be used to pass data between Avassa sites. Infra is nothing more than an application built on top of Volga topics. All Avassa sites are part of the same infrastructure. Code can publish a message to the infra, and the message will be automatically propagated to all Avassa sites. The Volga topics needed by the infra are automatically setup by Volga at startup. Infra by default covers all sites.

A tenant doesn't have Volga Infra enabled by default. To use a Volga Infra, it must first be created:

supctl create volga infras << EOF
name: myinfra
format: json
EOF
supctl show volga infras myinfra
name: myinfra
chunks: 1000
persistence: disk
format: json

You can then produce messages:

supctl do volga infra myinfra produce down "Hello Edge"

And consume messages:

supctl do volga infra myinfra consume

A tenant can have arbitrarily many Volga Infras.

Once you create an Infra, a number of normal Volga topics are setup behind the scenes by Volga. These topics are used internally by the Volga infra. There will be:

  • An up which is used by Volga infra to pass data upstream.

  • A down topic which is used by Volga infra to pass data downstream.

  • A result topic which is used by Volga infra to write whatever comes from upstream or downstream. This is the topic that is consumed by the infra consume call.

You can view those topics using supctl:

supctl show volga topics infra:down:myinfra
name: infra:down:myinfra
tenant: popcorn-systems
format: json
seqno: 0
chunkno: 1
number-of-chunks: 1000
created: 2021-06-18T11:30:32.101Z
assigned-hosts:
- control-tower-001
leader-host: control-tower-001
worker-hosts: []
replication-factor: 1
persistence: disk
size-megabyte: 1
oldest-entry: 2021-06-18T11:30:32.100Z
dropped-chunks: 0
consumers:
- consumer-name: edge-001-popcorn-systems-myinfra-result-disk
more-n: 100
last-ack: 0
buffered: 0
mode: exclusive
consuming-host: edge-001
producers:
- producer-name: control-tower-001-popcorn-systems-myinfra-down-disk
producing-host: control-tower-001

One interesting aspect of these statistics, is the consumer. It is a remote consumer, initiated at the edge site. Thus data that gets produced at the Control Tower goes into the topic, and that data is consumed remotely by code running at the edge site. This is important, because it allows Volga to apply filters to the Infra, ensuring that data that gets published on the Infra doesn't necessarily have to be sent to all edge sites, only the ones that need it.

When debugging your infra code, it can sometimes be useful to look at the individual topics that make up the infra, and even consume from them using the REST consumer as in:

supctl do volga topics infra:down:myinfra consume --ignore-all-filters

in order to see what has been posted on the Infra.

Producing data on a Volga Infra

In Volga Infra all production is local, and consumption is remote. Let's walk through what happens if a client publishes a message at the Control Tower which the tenant code has connected to. We do this by showing what JSON is passed over the websockets.

The first thing required, is to create the infra itself. We assume this is done with the format string on the Infra. Next, user code connecting to the Control Tower acquires an Infra handle to "myinfra" for publishing. This is done by sending the following JSON structure on an authenticated websocket:

{
"op": "open-infra-producer",
"infra": "myinfra"
}

This returns JSON

{
"result": "ok"
}

or

{
"result": "error",
"info" : "Some error description ..."
}

At this point you can start producing data on the Volga infra, a JSON example could look like:

{
"op": "infra-produce",
"payload": "zzzzzzzzzzz",
"sync" : "async",
"direction": "down",
"destination-sites": [{"site": "edge2"}, {"site": "edge13"}]
}

This would send the payload downwards to the sites edge2 and edge13. Implementation wise what happens here is that:

  • User code produces into the Infra per above.
  • Supd produces the data into a normal stream topic at the Control Tower called infra:down:myinfra. This topic was created by Supd when the Infra was created.
  • An internal Supd consumer running at all edge sites, will consume the message and - at the edge site - produce the data into a topic called infra:result:mytopic. There will be as many consumers at the Controrl Tower as there are edge sites.
  • Your private consumer, can consume from infra:result:mytopic at the edge site. This is done by using a volga infra consumer and opening the myinfra infra.

Volga infra is thus just an application built on top of regular Volga topics. It can be used for a variety of use cases. One common use case for Volga Infra is when you have a coordination application that wishes to send control instructions to other edge applications. This could be configuration changes or arbitrary requests. If such "commands" are posted downwards on Volga Infra, the coordination application can be assured that its sub-applications are guaranteed to eventually receive the command/request regardless of whether the edge application is currently up or down.

The producer has a number of interesting options that can be provided to the infra-produce operation.

  • direction - must be up|down|stitch.

    • down - used to send messages from the Control Tower to edge sites
    • up - used to send messages to the Control Tower from an edge site
    • stitch - used to send messages from an edge site to other edge sites by passing them through the Control Tower
  • local-site-deliver If set to true, the produced data will occur not just down or up but also at the local site.

  • topic-tag Sometimes a Volga Infra is created and shared by multiple applications. In order to not even see the other applications messages that are sent on the Infra, use topic-tag in the Infra producer, and then use that same topic-tag in the consumer that consumes from the final result topic.

  • destination-sites A list of sites that are to consume the message. If omitted, all sites will consume the message.

Topic queries

Volga query-topics is a tool whereby you can issue queries that search your Volga topics. The entire API is fully documented in the reference documentation.

A tenant has several searchable topics, for example all data that is produced on standard out and standard error by deployed containers is stored in a per container topic. These topics are created by the system when the container is deployed, but the topic is owned by the tenant that deployed the container. The topic can of course be read as usual using either supctl or the Websocket API.

A topic query is issued at the Control Tower. The query is then distributed over the internal Volga Infra to the various edge sites where the calling tenant resides. The query is then evaluated at all sites, and matching messages are passed back up to the Control Tower. At the Control Tower, the message streams are merged and then finally delivered back to the caller. Thus it is a tool to do parallel distributed logs search at a large number of sites. This search tool is especially useful when you have multiple containers, possibly at multiple sites, interacting with each other or some external entity.

The default behavior is that all query results are returned as quickly as possible to the control tower. This means that even though the control tower does a rudimentary sorting of messages based on timestamps, the end result may not appear entirely in timestamp order. If the sort option is set to true, query results will be returned in a more controlled fashion which allows the control tower to sort them properly. This is off by default since it reduces the performance of queries. Note however the caveat about out of order timestamps in the Messages section. When sort is true, query-topics will usually manage to get the order correct even when this occurs, but a message with an unusually large timestamp discrepancy can still end up out of order in the merged message stream.

In this example we have deployed a container called cowboy. Once the container is deployed, we can inspect the two topics used for log collection.

The edge site:

supctl show --site udc1 volga topics \
system:container-logs:cowboy-app.cowboy-srv-1.cowboy
name: system:container-logs:cowboy-app.cowboy-srv-1.cowboy
tenant: acme
labels:
service-name: cowboy-srv
service-instance: cowboy-srv-1
container-name: cowboy
application: cowboy-app
format: string
seqno: 4
chunkno: 1
number-of-chunks: 100
creation-time: 2022-09-01T08:14:22.984Z
assigned-hosts:
- udc1-001
leader-host: udc1-001
worker-hosts: []
requested-replication-factor: 1
current-replication-factor: 1
persistence: disk
size: 976.56 KiB
oldest-entry: 2022-09-01T08:14:22.984Z
dropped-chunks: 0
consumers: []
producers:
- producer-name: system:container-logs:cowboy-app.cowboy-srv-1.cowboy-udc1-001
producing-host: udc1-001

The log topic is created with a number-of-chunks set to 100 and will thus at most contain 100 Megabyte data per container. The search tool is accessible through supctl and is for example invoked as:

supctl  do volga query-topics --topics \
match-topic-labels=application=cowboy-app output-payload-only=true

or

supctl do volga query-topics --all-sites --topics \
topic-names=system:container-logs:cowboy-app.cowboy-srv-1.cowboy \
output-payload-only=true output-format=%t:%h:%p

Using the TAB completion in supctl is very useful here. The query-topics API is available over both the REST API (which is used by supctl and also the websocket API. If you provide the -v flag to supctl, you see the JSON payload that is sent by supctl. The above example sends:

{"follow": true,
"all-sites": true,
"topics":
[{"topic-names": ["system:container-logs:cowboy-app.cowboy-srv-1.cowboy"],
"output": {"payload-only": true, "format": "%t:%h:%p"}}]}

The commands above will collect the specified topics from all sites we specify in the query. The output will be merged based on timestamp. Output from all topics will be merged. Data is by default raw, but JSON messages can also be requested.

A wide variety of searches are possible, the following arguments can be combined in various ways. In principle we:

  • Specify which sites we wish the query to cover, by default all are chosen. Sites can be chosen based on names, regular expression match, site labels, etc.
  • Specify a list of topic/filter/output selectors.
  • Topics can be chosen in a wide variety of ways, e.g regular expressions, label matching, explicit names, etc.
  • Specify filters. Several variants of regular expression matches exist. One especially interesting filter is the JSON match, where a sparse JSON pattern is supplied.
  • Specify an optional time span to apply to the query.
  • Decide weather to follow the logs, or terminate when all specified topics are exhausted.

This functionality is also available over the websocket API. Connect a websocket to any host at the Control Tower and send a json structure, for example:

{
"op": "query-topics",
"follow": true,
"all-sites": true,
"topics":
[{"topic-names":
["system:container-logs:cowboy-app.cowboy-srv-1.cowboy"],
"output": {"payload-only": false,
"filter": {"drop-until-last-re-match": "ERROR"}}]}

Will produce a stream of json structures from the output of the cowboy container.

The above command using supctl and the REST api.

supctl -v  do volga query-topics --all-sites \
--topics topic-names=system:container-logs:cowboy-app.cowboy-srv-1.cowboy \
output-payload-only=false \
filter-drop-until-last-re-match=ERROR --follow

Language bindings

Javascript

npm

Rust

crates.io.

Python

PyPi