Volga: Distributed event streaming
Introduction
Volga is a multi tenant high performance lightweight event streaming implementation tailor made for Avassa Supd. Terminology and functionality in Volga is similar to other event streaming implementations such as Pulsar and Kafka.
In an Avassa environment, Volga runs on all hosts, on edge sites as well as on the Control Tower hosts. Each site automatically forms a Volga cluster where all hosts at the site participate in the Volga cluster. Volga utilizes the well known publish/subscribe pattern whereby producers produce messages to a topic, and consumers subscribe to a topic, and consume messages from the topic. Volga runs inside Avassa Supd, and requires no configuration from the end user to be used. It is used internally by Supd, and can be easily used by tenant code.
Volga can be used for:
-
Event sourcing. An application wants to propagate a configuration change to all sites. This is best done by
- Post change into a volga topic at the Control Tower.
- Consume change at all edge sites.
This is a good and proven way to organize loosely coupled distributed applications.
-
Aggregation. An application with a large number of remote sites where large amounts of data is produced at the remote sites. Due to either cost or bandwidth, data must be aggregated and refined before it is pushed further up in the network. Such an application would publish data on one or several Volga topics locally at each site. Data would typically then be consumed and refined locally and either re-produced to an aggregation topic locally or produced remotely to an aggregation topic at the Control Tower.
-
Distributed metrics. Metrics generated at edge sites is best produced to local site Volga topics. To query and search such metrics a request to search can be posted to Volga at the Control Tower. This is a means to have all metrics data distributed without ever accumulating data centrally. If the number of edge sites is very large and amount of metrics data is large this is a good way to avoid central bottlenecks.
Topics
A topic is a a named stream of a data where data items are ordered by the time they were published to the topic. The data may be replicated over multiple hosts in the Volga cluster. A topic is automatically created the first time it is opened, regardless of whether it is a consumer or a producer that opens the topic. Topic data may be persistent on disk or RAM only.
A Volga topic has certain characteristics. One is
replication-factor
, which is given the first time the topic is created.
If you have replication factor 3, all data produced to the topic will be
replicated at 3 nodes. The nodes are chosen by Volga when the topic is
created. As long as there is a majority working, the topic will continue
to operate.
A topic has an arbitrary number of named producers, and an arbitrary number of named consumers. A producer publishes data to the topic, and a consumer consumes data from the topic. Data in a topic is persistent, and a consumer can go down and up again, and continue to consume from where it left off.
Topic location
Regardless of the actual host you make the connection to, there are three different ways of connecting to a topic, specified by the location parameter:
-
local
- used when you have connected directly to one of the hosts at the site where the topic resides. -
child-site
- used to connect to a topic at an edge site via the Control Tower. -
parent
- used to connect from an edge site to a topic at the Control Tower.
Topic properties
When a topic is created, i.e the first time it is opened, a number of properties are attached to the topic. Most options are permanent and cannot be changed. You have the following topic properties.
-
persistence
Either ofdisk
orram
. Default isdisk
. Persistent topics may be replicated, whereas RAM based topics are not replicated. However, if the host with a RAM based topic fails, the topic will be immediately moved to another host, and all clients will automatically reconnect to the new host, thus on RAM topics, messages can be lost. Volga ensures that disk topics are evenly distributed between the hosts in the site. If manual control over topic placement is needed, host labels can be used to restrict specific topics to certain subsets of hosts. -
match-host-labels
A label expression that determines which hosts are eligible to replicate this topic. The label expression is evaluated when the topic is created. To re-evaluate it (and potentially move the topic), run theredistribute
action on the topic. -
replication-factor
A topic can be replicated on multiple hosts in a Volga cluster. Choose an odd number, since a majority is required for the topic to be operational. -
local-placement
If set to true, the host executing the request to create the topic is guaranteed to be included in the replication set. -
num-chunks
Data for a topic is kept on local disk for the host(s) running the topic. Data is divided in chunks on disk, the size of each chunk is 1 Meg and the default value fornum-chunks
is 1000, giving 1 Gigabyte by default per topic and per replica. Data aging is automatic, and not based on time, but on size. Thus when chunk 1001 is eventually needed, chunk 1 is discarded. -
max-size
Rather than setting a fixed number of chunks for a topic, you can specify its maximum allowed size, eg10MB
or1GB
. Internally, Volga will create as many chunks as needed, and remove old chunks too keep the topic size belowmax-size
. -
max-days
Sets the maximum number of days a message is allowed to exist on the topic. Older messages will be deleted. In practice, what happens is that every midnight, a new chunk is started and any chunk that ismax-days
old is deleted. So ifmax-days
is 3, any message that is more than 48 hours old will be deleted at midnight. Note that even ifmax-days
is set, the topic still has an upper size limit set by eithermax-size
ornum-chunks
, meaning messages can also be deleted because the topic is full. -
format
Set to eitherstring
(the default) orjson
. If the messages produced on a topic are json objects, setting format to json will make for nicer looking json messages on the receiver end. An example of this is provided in the Messages chapter. -
ephemeral
If set to true, the topic will be automatically deleted when it no longer has any consumers or producers. Only available in the websocket API.
Getting started
The easiest way to get started with Volga topics is to use the supctl
tool to create a few topics an play around with them.
Rest API
When scripting, it is easier to use a regular REST API to access Volga than to use the Websocket API. The REST API is not as versatile as the Websocket API, but can still be useful. There are 3 different paths that can be POSTED to.
/api/v1/state/volga/topics/{topic-name}/consume
/api/v1/state/volga/topics/{topic-name}/produce
/api/v1/state/volga/topics/{topic-name}/create-topic
In the REST API to Volga, topics are not automatically created as they are in the Websocket API. Each method takes input parameters that are described in the Avassa reference documentation for the REST API.
The REST API to Volga is nicely accessible from supctl
, for example:
supctl do volga create-topic foo string
supctl do volga topics foo produce "Hello Avassa"
Will create a new topic foo
with payload format string
and publish one message to the topic.
The REST API to produce is fairly inefficient, since the code will open the
topic, publish one message and then close the topic. In the Websocket API
data is streamed from the client to the topic.
To check that the message is there in the topic, we can do
supctl -d .acme do volga topics foo consume
{
"time": "2022-03-29T15:05:06.801Z",
"seqno": 1,
"remain": 24,
"payload": "Hello Avassa",
"name": "REST-api",
"mtime": 1648566306801,
"format": "string"
}
There are several useful flags that can be passed on the REST api by supctl
For example the above command is equal to
supctl do volga topics foo consume --position end
If we TAB supctl
we get:
supctl do volga topics foo consume --end-marker TAB
-h --position
--follow --position-sequence-number
--help --position-since
--ignore-all-filters --position-timestamp
--input --re-match
--payload-only --topic-tag
We'll not describe everything in detail here, all parameters have detailed information in the reference documentation.
Let's just do a session and comment it.
supctl -d .acme do volga topics foo consume --position since --position-since 0s
Nothing was printed, and command returned.
supctl -d .acme do volga topics foo consume --position since --position-since 0s --follow
Command hangs, waiting for data to be produced to the topic
supctl -d .acme do volga topics foo consume --payload-only
Hello Avassa
When manually viewing a topic, it's often more pleasing to the eye, to only look at the payload, without the Volga meta data. For example:
supctl do volga topics system:audit-trail-log consume \
--payload-only --follow
Will follow the audit log at a site and only show the actual audit log entries without the Volga extra data.
supctl do volga create-topic bar json
Creates a new topic which should contain entries in json format. One of the
reasons for forcing the user to specify the format of the topic, i.e string
or json, is to allow for prettier data when consuming. For example the
audit log, is a "format json" topic. Try to view the the audit log without
the payload-only
flag, and see that the payload is now embedded in the
surrounding JSON object as opposed to a quoted JSON object.
supctl do volga topics bar produce '{"abc": "aa"}'
supctl do volga topics bar consume
{
"time": "2022-03-30T09:49:56.707Z",
"seqno": 1,
"remain": 24,
"payload": {
"abc": "aa"
},
"name": "REST-api",
"mtime": 1648633796707,
"format": "json"
}
As a final note on the supctl
API towards Volga, all of the above commands
were run towards the Control Tower, creating the topics at the Control Tower.
Even though supctl
talks to the Control Tower it can connect to topics
on an edge site using the --site
parameter.
supctl do --site udc volga create-topic bar json
supctl do --site udc volga topics bar produce '{"abc": "aa"}'
supctl do --site udc volga topics bar consume
{
"time": "2022-03-30T09:56:14.266Z",
"seqno": 1,
"remain": 24,
"payload": {
"abc": "aa"
},
"name": "REST-api",
"mtime": 1648634174266,
"format": "json"
}
We can view the stats for the newly create topic by:
supctl show --site udc volga topics bar
name: bar
tenant: acme
labels: {}
format: json
seqno: 1
chunkno: 1
number-of-chunks: 1000
creation-time: 2022-03-30T09:55:54.526Z
assigned-hosts:
- udc-002
leader-host: udc-002
worker-hosts: []
requested-replication-factor: 1
current-replication-factor: 1
persistence: disk
size: 976.56 KiB
oldest-entry: 2022-03-30T09:55:54.525Z
dropped-chunks: 0
consumers: []
producers: []
Websocket
To access Volga over a websocket, an authenticated websocket to one of the hosts at a site is required. Once authenticated, you receive a Strongbox token that must be used in the websocket Upgrade procedure.
You login by TLS connecting to Avassa Supd at the api
address
for the site and post the login data to /v1/login
{
"username": "joe@popcorn-systems.com",
"tenant": "popcorn-systems",
"password": "verysecret"
}
Which upon success could return:
{
"token": "0d908c6b-f900-4187-bb00-394cd7275fed",
"expires-in": 1209600,
"accessor": "41182e6e-7027-4bdd-8b35-e44b4e0d8204",
"creation-time": "2021-06-07T15:23:54.629168Z"
}
It is the token in the above JSON structure you need to extract and provide when you upgrade the socket to a websocket. The upgrade procedure entails:
- GET /v1/ws/volga
- Provide the
Upgrade
header - Provide the
Authorization
header with valueBearer "0d908c6b-f900-4187-bb00-394cd7275fed
If successful, you can use the websocket to access the Volga API.
Messages
The Volga message is what is consumed by a subscribing consumer. It is a JSON structure with the following components.
-
mtime
The timestamp when the message was created. The timestamp is set by the Volga application. It has millisecond granularity, and the time is current time retrieved from the underlying OS. Note that the timestamp is generated immediately by the server where the message is produced, but due to things like network latency or connectivity issues, the message might arrive at its topic at a somewhat later time. This means that a consumer is not guaranteed to receive messages in timestamp order. -
time
The RFC 3339 representation as a string of mtime. -
seqno
The sequence number of the message in the topic. This is an ever increasing number. -
remain
An integer denoting how many messages remain until you have to invokemore()
to ask for more data. -
name
The name of producer that produced this message. -
end-marker
An optional field which is set to true if the consumer asked forend-marker
and also there is no more data to be consumed, that is the consumer would hang, waiting for more data. -
payload
The data, either as a string or a json object, depending on the format of the topic.
A message is received over the Websocket interface as a JSON structure.
String topic example
The produced message:
{
"op": "produce",
"sync": "sync",
"payload": "Some Data"
}
The received message:
{
"time": "2021-06-07T16:21:11.204+02:00",
"seqno": 3,
"remain": 1,
"producer-name": "myprod",
"payload": "Some Data"
"mtime": 1623075671204,
"host": "myhost"
}
JSON topic example
Topic topic format is json. The produced message:
{
"op": "produce",
"sync": "sync",
"payload": {
"a-list": [1, 2, 3]
}
}
The received message:
{
"time": "2021-06-07T16:21:11.204+02:00",
"seqno": 4,
"remain": 1,
"producer-name": "myprod",
"payload": {
"a-list": [
1,
2,
3
]
},
"mtime": 1623075671204,
"host": "myhost"
}
Connection procedure
It is important to understand the relationship between the client websocket connection and the Volga topic. Let's explain by a few examples. Assume an Avassa environment consisting of a Control Tower with three hosts, and a single edge site, also consisting of three hosts. Thus you have the following 6 hosts:
e1.edgesite0.movie-theater-owner.com
e2.edgesite0.movie-theater-owner.com
e3.edgesite0.movie-theater-owner.com
ct1.control-tower.movie-theater-owner.com
ct2.control-tower.movie-theater-owner.com
ct3.control-tower.movie-theater-owner.com
With the above set of hosts and sites, there will be two separate Volga clusters, one at each site. The topics in these clusters can be accessed in the following ways:
-
An application running on
e1
can connect toe1
and request a topic withlocation=local
andtopic=mytopic
. This is the simplest case of them all, since no additional connections are made. -
An application running somewhere on the internet can connect
e1.edgesite0.movie-theater-owner.com
and request a topic withlocation=local
andtopic=mytopic
. This also does not result in any additional connections and will connect to the same topic as in the example above, but connecting to an edge site from the outside might not always be possible. -
If there are NAT constraints or firewall rules preventing direct access to the hosts at
edgesite0
, an application running somewhere on the internet can instead connect toct1.control-tower.movie-theater-owner.com
and requesttopic=mytopic
withlocation=child-site
andchild-site=edgesite0
to connect to the topicmytopic
atedgesite0
. -
An application that runs on
e1
can connect to Volga ate1
and request to open a topic withtopic=foo
andlocation=parent
. This will instruct Volga one1
to connect to thefoo
topic on one of the hosts atcontrol-tower
.
Thus the client connection to a host in a Volga cluster is one thing, and the location of the topic another.
An example producer/consumer session
In this section we will show the JSON messages required to open a topic and produce your first Volga message and then consume it. All concepts will not be explained in this section.
-
First login to one of the hosts at a site according to the login/Websocket-Upgrade procedure described above.
-
Open a producer by sending on the websocket:
{
"op": "open-producer",
"location": "local",
"topic": "mytopic",
"name": "myprod",
"create-options": {
"replication-factor": 1,
"persistence": "disk"
}
}This will return JSON
{
"result": "ok"
}At this point a Volga producer is attached to the websocket. There can be only one producer per websocket.
-
Produce data on the websocket by sending:
{
"op": "produce",
"mode": "sync",
"payload": "Hello Avassa"
}If mode is
sync
a response is received, where if mode isasync
no response is received. If successful here, you'll receive{
"result": "ok"
} -
Now open an additional socket to a host at the same site , and login on that socket as well.
-
Open a consumer by sending:
{
"op": "open-consumer",
"location": "local",
"topic": "mytopic",
"position": "unread",
"name": "mycons",
"mode": "exclusive",
"create-options": {
"replication-factor": 1,
"persistence": "disk"
}
}Now you have an active consumer on the websocket.
-
A basic Volga concept is that you have to request data by invoking the
more
operation. Send:{
"op": "more",
"n": 10
}This means that you will receive at most 10 messages, after that you'll have to issue another call to
more
in order to get more data. A common user error with Volga is to forget to issue themore
command. If you are not receiving any messages from a topic. you are either at the end, or have not issuedmore
. This shows nicely asmore-n: 0
in thesupctl show volga topics ...
for the Volga topic. -
At this point you'll start receiving messages.
{
"time": "2021-06-07T16:21:11.204+02:00",
"mtime": 1623075671204,
"seqno": 1,
"remain": 9,
"name": "myprod",
"payload": "Hello Avassa"
}You can use
supctl
to inspect the state of your topic. Assuming the topic resides at an edge site, called e0, you can, towards the Control Tower, issue:supctl show --site e0 volga topics mytopic
topic: mytopic
tenant: popcorn-systems
format: string
seqno: 1
chunkno: 1
number-of-chunks: 1000
created: 2021-06-08T13:13:24.675Z
assigned-hosts:
- e0-002
leader-host: e0-002
worker-hosts: []
requested-replication-factor: 1
current-replication-factor: 1
persistence: disk
size: 976.56 KiB
oldest-entry: 2021-06-08T13:13:24.674Z
dropped-chunks: 0
consumers:
- consumer-name: mycons
more-n: 9
last-ack: 0
buffered: 0
mode: exclusive
consuming-host: e0-001
producers:
- producer-name: myprod
producing-host: e0-001
Additional opts
As you saw in the previous section, a create-opts
struct is passed as a
field in the JSON when you open a producer or a consumer.
These options are the topic properties, used when the topic is
initially created. These are, persistence
, replication-factor
,
local-placement
, num-chunks
/max-size
and format
.
You have a set of additional fields:
-
mode
which is either ofexclusive
,standby
orshared
. This will be described in the Consumer mode section below. This option is only valid for consumers. -
control-message
. If set to true, the client gets notified on the connection status of the topic. The client will receive JSON messages on the form of{
"connected" : "true"
}
{
"connected" : "false"
}This is useful if the client truly needs to know the actual connection status. Usually it is easier programming on the client side if current connection status is unknown.
When a client opens a topic, there are a number of transient errors the client is not by default exposed to. For example if
replication-factor
is 3, and there is no current majority for the topic, Volga will return an opened topic to the client and continue in the background to establish a majority. If thecontrol-message
is set to true, the client will be notified once a majority for the topic has arrived. -
on-noexists
When opening topics at remote sites, or opening topics where you are not the "owner" of the topic, i.e some other application's topic, it is highly recommended that you don't accidentally create the topics with options that are wrong. Ifon-noexists
iswait
, and the topic doesn't exist, this is considered a transient error, and the topic open operation will succeed, and once the topic is actually created, the topic will start to function. The caller will receive a handle which appears to be working topic. -
topic-tag
Sometimes a topic is shared by multiple applications, this is often the case for "Volga infra" which is described in its own chapter below. -
end-marker
If this option is used by a consumer, a message with no payload, and the fieldend-marker: true
will be delivered to the client when there is no more data to be read from the stream, i.e when the client would hang. Note that the end marker is only sent once per session. If the client keeps the connection open after receiving the end marker, it will receive new messages produced on the topic but no new end marker. -
local-placement
If this option is used, it is guaranteed that the host creating the topic will be part of the replication set.
Producing
A producer is attached to an authenticated websocket. Each produced message is either asynchronous or synchronous. In general throughput is considerably higher for asynchronous messages. In asynchronous mode, messages are simply streamed and if the producer is faster than the consuming Volga cluster, eventually the producer will block on TCP write.
If you produce synchronous, not until data gets written to disk, the call
to produce
returns. Data gets written by write()
but without any
accompanying fsync()
. To ensure that use the Volga sync
operation.
It is possible to produce multiple messages asynchronously in a sequence, and then do one synchronous message, this will ensure that all data produced is written to disk.
Even with an asynchronous producer, it is possible to issue the sync
command
over the websocket. This guarantees that all produced data is written to disk.
This is a fairly costly and slow command.
{
"op": "sync"
}
If the websocket client produces asynchronous data faster than Volga is able to deliver the data, eventually the client will block on TCP write.
If the websocket is closed, the producer is implicitly closed.
Consuming
Similar to a producer, a consumer needs an authenticated websocket.
To attach a consumer to the websocket, you need to send a JSON structure
with op open-consumer
, a location and a topic name. The typical consumption
pattern is - using pseudo code:
topic = "mytopic";
sock = websockets_connect(host, user, password);
mode = "exclusive";
create_opts = {replication_factor = 3;
on_no_exist = create
}
position = "unread";
name = "myconsumer";
c = volga.open_consumer(sock, "local", topic, position,
name, mode, create_opts);
volga.more(c, 100);
while(true) {
msg = read_volga_msg(c);
if (msg.remain == 0) {
volga.more(c, 100);
}
do_process_msg(msg.payload);
volga.ack(c, msg.seqno);
}
Position is required and you have the following positions to choose from.
-
unread
This is the mode to choose when you subscribe to a topic and want to consume messages once, and once only. Theunread
mode is typically combined withack
operation. Acknowledging a message, using the sequence number from the Volga message indicates that you are done with that message. So, if the system restarts, and you re-issue theopen-consumer
call with positionunread
, you'll get messages from where you last acked. Each consumer has a unique name, thus you can have arbitrarily many consumers to a topic and the acknowledge is kept on disk for each named consumer. -
end
This starts consumption at the end of the topic stream. -
beginning
This starts consumption at the beginning of the topic stream. -
seqno
It is possible to start consuming from a specific sequence number. -
since
Start reading the events from a timestamp, the format of the parameter is [Num]s, [Num]m or [Num]h. For example '10s' will start reading event logs 10 seconds back in time from now. -
timestamp
Volga messages contain a timestamp. You can start consuming from an arbitrary millisecond timestamp. This can sometimes be useful fortail
type of applications if you - say - wish to start consuming from a stream from minutes ago and forward.
Consumer modes
The most commonly used consumer mode is probably exclusive
. The semantics
of the mode is that in a single Volga cluster, for a specific topic,
there can only be one consumer with a specific name.
If a second consumer tries to
open the same topic, using the same name, the first consumer is closed and
over-taken by the second one.
On the other hand, if your application runs on multiple hosts at the Avassa site
and you wish to have one consumer only, but you don't really care on which
host the consumer runs, the standby
mode is useful.
Here you can have multiple consumers, running on multiple hosts in the cluster
or on the internet, all consuming from the same topic, using the same name
.
Only one of them will receive any Volga messages, the remaining consumers are
standing by. If the active consumer closes, one of the standby consumers
is chosen and it'll start to receive messages.
The new consumer, the one that was standing by, will receive messages from where the original consumer last acknowledged a message. Thus it is possible for the following scenario to occur:
- Consumer A receives message with seqno 13.
- Consumer A process that message.
- Consumer A dies before being able to acknowledge message 13.
- Consumer B gets activated.
- Consumer B get message 13 one more time.
If processing of messages is a CPU intensive activity, you may choose the
shared
mode for the consumer. Here you can also have multiple consumers to
the same topic using the same name.
In this mode consumers get Volga messages in a random fashion. A use case for
this could be a case where a producer is producing image data and you wish to
attach a consumer to that stream for image processing. However, if the producer
is producing data faster that the consumer can process the data, you need
multiple hosts to consume the data fast enough.
A consumer will stay open forever. If it loses majority, the consumer
will try to reestablish majority forever. Naturally you'll not be
receiving any messages from it if the majority disappears. If it is important
for the application to act if majority is lost, the consumer must
use the control-message
flag when opening the topic.
Open errors
There are quite a few things that can go wrong when opening a topic,
many of them
are masked internally by Volga though, they are considered transient errors.
These include e.g not being able to gather a majority for the topic. By default
the client gets back what appears to be a functioning consumer or producer,
whereas in reality, Volga tries to remedy the situation by retrying to open.
If this behavior is not desired, use the control-message
option.
Encryption
The traffic from the websocket client to the REST server in Supd
is encrypted using regular HTTPS. It's possible to extract the certificates
from strongbox.
When you connect to the REST server and request to open a topic on another
site, ie setting location
to child-site
or parent
,
Volga at one site will connect, using mutual TLS from one site to another.
The certificates to do this are generated, maintained, and distributed
internally by Strongbox. Thus, site-to-site Volga traffic is always
encrypted.
Volga uses an internal protocol between sites that runs over port 443
.
The child-site parameter
It is not unusual for edge sites to reside behind NAT firewalls, making them inaccessible from the internet. However, since all edge sites connect to the Control Tower, you can still access them via a host at the Control Tower.
To connect to a topic at an edge site, connect to a host at the Control Tower
and open a producer or consumer with location=child-site
and
child-site=sitename
, where sitename is the name of the edge site.
Apart from the location
and child-site
parameters, the API functions exactly
the same as when making direct connections.
Volga Status
In order to troubleshoot Volga issues, looking at the Volga status at the
different sites is recommended.
The detailed meaning of all the fields in the status output is properly
documented in the Volga Reference Documentation.
You can look at clients and topics at a site. These are not quite the same.
Say that you connect with websocket to a host in the Control Tower, and request
a topic at an edge site using location
=child-site
, the client resides at
the Control Tower, whereas the topic resides at the edge site.
supctl show --site edge volga clients foobar
id: 1
name: mycons2
location: local
topic: mytopic2
tenant: popcorn-systems
host: edge-001
kind: consumer
num-messages: 1
status: up
status-info: consuming-locally
reconnects: 0
time-connected: 42m13s
If you look at a topic at a site, you get
supctl show --site edge volga topics mytopic2
topic: mytopic2
tenant: popcorn-systems
format: string
seqno: 1
chunkno: 1
number-of-chunks: 1000
created: 2021-06-18T08:11:49.916Z
assigned-hosts:
- edge-002
leader-host: edge-002
worker-hosts: []
replication-factor: 1
persistence: disk
size-megabyte: 1
oldest-entry: 2021-06-18T08:11:49.916Z
dropped-chunks: 0
consumers:
- consumer-name: mycons2
more-n: 9
last-ack: 0
buffered: 0
mode: exclusive
consuming-host: edge-001
producers:
- producer-name: myprod2
producing-host: edge-001
Volga Infrastructure
On top of the Volga topics, we have a local Avassa infrastructure that can be used to pass data between Avassa sites. Infra is nothing more than an application built on top of Volga topics. All Avassa sites are part of the same infrastructure. Code can publish a message to the infra, and the message will be automatically propagated to all Avassa sites. The Volga topics needed by the infra are automatically setup by Volga at startup. Infra by default covers all sites.
A tenant doesn't have Volga Infra enabled by default. To use a Volga Infra, it must first be created:
supctl create volga infras << EOF
name: myinfra
format: json
EOF
supctl show volga infras myinfra
name: myinfra
chunks: 1000
persistence: disk
format: json
You can then produce messages:
supctl do volga infra myinfra produce down "Hello Edge"
And consume messages:
supctl do volga infra myinfra consume
A tenant can have arbitrarily many Volga Infras.
Once you create an Infra, a number of normal Volga topics are setup behind the scenes by Volga. These topics are used internally by the Volga infra. There will be:
-
An
up
which is used by Volga infra to pass data upstream. -
A
down
topic which is used by Volga infra to pass data downstream. -
A
result
topic which is used by Volga infra to write whatever comes from upstream or downstream. This is the topic that is consumed by the infra consume call.
You can view those topics using supctl
:
supctl show volga topics infra:down:myinfra
name: infra:down:myinfra
tenant: popcorn-systems
format: json
seqno: 0
chunkno: 1
number-of-chunks: 1000
created: 2021-06-18T11:30:32.101Z
assigned-hosts:
- control-tower-001
leader-host: control-tower-001
worker-hosts: []
replication-factor: 1
persistence: disk
size-megabyte: 1
oldest-entry: 2021-06-18T11:30:32.100Z
dropped-chunks: 0
consumers:
- consumer-name: edge-001-popcorn-systems-myinfra-result-disk
more-n: 100
last-ack: 0
buffered: 0
mode: exclusive
consuming-host: edge-001
producers:
- producer-name: control-tower-001-popcorn-systems-myinfra-down-disk
producing-host: control-tower-001
One interesting aspect of these statistics, is the consumer. It is a remote consumer, initiated at the edge site. Thus data that gets produced at the Control Tower goes into the topic, and that data is consumed remotely by code running at the edge site. This is important, because it allows Volga to apply filters to the Infra, ensuring that data that gets published on the Infra doesn't necessarily have to be sent to all edge sites, only the ones that need it.
When debugging your infra code, it can sometimes be useful to look at the individual topics that make up the infra, and even consume from them using the REST consumer as in:
supctl do volga topics infra:down:myinfra consume --ignore-all-filters
in order to see what has been posted on the Infra.
Producing data on a Volga Infra
In Volga Infra all production is local, and consumption is remote. Let's walk through what happens if a client publishes a message at the Control Tower which the tenant code has connected to. We do this by showing what JSON is passed over the websockets.
The first thing required, is to create the infra itself. We assume this is done
with the format string
on the Infra.
Next, user code connecting to the Control Tower acquires an Infra handle to
"myinfra" for publishing. This is done by sending the following JSON structure
on an authenticated websocket:
{
"op": "open-infra-producer",
"infra": "myinfra"
}
This returns JSON
{
"result": "ok"
}
or
{
"result": "error",
"info" : "Some error description ..."
}
At this point you can start producing data on the Volga infra, a JSON example could look like:
{
"op": "infra-produce",
"payload": "zzzzzzzzzzz",
"sync" : "async",
"direction": "down",
"destination-sites": [{"site": "edge2"}, {"site": "edge13"}]
}
This would send the payload downwards to the sites edge2 and edge13. Implementation wise what happens here is that:
- User code produces into the Infra per above.
- Supd produces the data into a normal stream topic at the Control Tower
called
infra:down:myinfra
. This topic was created by Supd when the Infra was created. - An internal Supd consumer running at all edge sites, will consume the
message and - at the edge site - produce the data into a topic called
infra:result:mytopic
. There will be as many consumers at the Controrl Tower as there are edge sites. - Your private consumer, can consume from
infra:result:mytopic
at the edge site. This is done by using a volga infra consumer and opening themyinfra
infra.
Volga infra is thus just an application built on top of regular Volga topics. It can be used for a variety of use cases. One common use case for Volga Infra is when you have a coordination application that wishes to send control instructions to other edge applications. This could be configuration changes or arbitrary requests. If such "commands" are posted downwards on Volga Infra, the coordination application can be assured that its sub-applications are guaranteed to eventually receive the command/request regardless of whether the edge application is currently up or down.
The producer has a number of interesting options that can be provided
to the infra-produce
operation.
-
direction
- must beup|down|stitch
.down
- used to send messages from the Control Tower to edge sitesup
- used to send messages to the Control Tower from an edge sitestitch
- used to send messages from an edge site to other edge sites by passing them through the Control Tower
-
local-site-deliver
If set to true, the produced data will occur not justdown
orup
but also at the local site. -
topic-tag
Sometimes a Volga Infra is created and shared by multiple applications. In order to not even see the other applications messages that are sent on the Infra, usetopic-tag
in the Infra producer, and then use that sametopic-tag
in the consumer that consumes from the finalresult
topic. -
destination-sites
A list of sites that are to consume the message. If omitted, all sites will consume the message.
Topic queries
Volga query-topics
is a tool whereby you can issue queries that search
your Volga topics. The entire API is fully documented in the
reference documentation.
A tenant has several searchable topics, for example
all data that is produced on standard out and standard error
by deployed containers is stored in a per container topic. These
topics are created by the system when the container is deployed, but the
topic is owned by the tenant that deployed the container.
The topic can of course be read as usual using either supctl
or
the Websocket API.
A topic query is issued at the Control Tower. The query is then distributed over the internal Volga Infra to the various edge sites where the calling tenant resides. The query is then evaluated at all sites, and matching messages are passed back up to the Control Tower. At the Control Tower, the message streams are merged and then finally delivered back to the caller. Thus it is a tool to do parallel distributed logs search at a large number of sites. This search tool is especially useful when you have multiple containers, possibly at multiple sites, interacting with each other or some external entity.
The default behavior is that all query results are returned as quickly as
possible to the control tower. This means that even though the control tower
does a rudimentary sorting of messages based on timestamps, the end result may
not appear entirely in timestamp order. If the sort
option is set to true,
query results will be returned in a more controlled fashion which allows the
control tower to sort them properly. This is off by default since it reduces
the performance of queries. Note however the caveat about out of order
timestamps in the Messages
section. When sort
is true, query-topics
will
usually manage to get the order correct even when this occurs, but a message
with an unusually large timestamp discrepancy can still end up out of order in
the merged message stream.
In this example we have deployed a container called cowboy
.
Once the container is deployed, we can
inspect the two topics used for log collection.
The edge site:
supctl show --site udc1 volga topics \
system:container-logs:cowboy-app.cowboy-srv-1.cowboy
name: system:container-logs:cowboy-app.cowboy-srv-1.cowboy
tenant: acme
labels:
service-name: cowboy-srv
service-instance: cowboy-srv-1
container-name: cowboy
application: cowboy-app
format: string
seqno: 4
chunkno: 1
number-of-chunks: 100
creation-time: 2022-09-01T08:14:22.984Z
assigned-hosts:
- udc1-001
leader-host: udc1-001
worker-hosts: []
requested-replication-factor: 1
current-replication-factor: 1
persistence: disk
size: 976.56 KiB
oldest-entry: 2022-09-01T08:14:22.984Z
dropped-chunks: 0
consumers: []
producers:
- producer-name: system:container-logs:cowboy-app.cowboy-srv-1.cowboy-udc1-001
producing-host: udc1-001
The log topic is created with a number-of-chunks
set to 100 and will thus
at most contain 100 Megabyte data per container. The search tool is accessible
through supctl
and is for example invoked as:
supctl do volga query-topics --topics \
match-topic-labels=application=cowboy-app output-payload-only=true
or
supctl do volga query-topics --all-sites --topics \
topic-names=system:container-logs:cowboy-app.cowboy-srv-1.cowboy \
output-payload-only=true output-format=%t:%h:%p
Using the TAB completion in supctl
is very useful here. The query-topics
API is available over both the REST API (which is used by supctl
and also
the websocket API.
If you provide the -v
flag to supctl
, you see the JSON payload that is
sent by supctl
. The above example sends:
{"follow": true,
"all-sites": true,
"topics":
[{"topic-names": ["system:container-logs:cowboy-app.cowboy-srv-1.cowboy"],
"output": {"payload-only": true, "format": "%t:%h:%p"}}]}
The commands above will collect the specified topics from all sites we specify in the query. The output will be merged based on timestamp. Output from all topics will be merged. Data is by default raw, but JSON messages can also be requested.
A wide variety of searches are possible, the following arguments can be combined in various ways. In principle we:
- Specify which sites we wish the query to cover, by default all are chosen. Sites can be chosen based on names, regular expression match, site labels, etc.
- Specify a list of topic/filter/output selectors.
- Topics can be chosen in a wide variety of ways, e.g regular expressions, label matching, explicit names, etc.
- Specify filters. Several variants of regular expression matches exist. One especially interesting filter is the JSON match, where a sparse JSON pattern is supplied.
- Specify an optional time span to apply to the query.
- Decide weather to follow the logs, or terminate when all specified topics are exhausted.
This functionality is also available over the websocket API. Connect a websocket to any host at the Control Tower and send a json structure, for example:
{
"op": "query-topics",
"follow": true,
"all-sites": true,
"topics":
[{"topic-names":
["system:container-logs:cowboy-app.cowboy-srv-1.cowboy"],
"output": {"payload-only": false,
"filter": {"drop-until-last-re-match": "ERROR"}}]}
Will produce a stream of json structures from the output of the cowboy container.
The above command using supctl
and the REST api.
supctl -v do volga query-topics --all-sites \
--topics topic-names=system:container-logs:cowboy-app.cowboy-srv-1.cowboy \
output-payload-only=false \
filter-drop-until-last-re-match=ERROR --follow