Afenioux's Blog page

3am; darkness; Maintenance window closing. Safety net: rollback.

pmacct / sfacct + influxdb + grafana

Written by Arnaud no comments

I spent a lot of time messing with all of this, that's why I deceided to share my notes.
My aim was to collect sFlow samples and to agregate them (sfacct), store them in a TSDB (influxDB) and graph network traffic (grafana).

sFlow

sFlow is used to make packet sampling (whereas netflow is flow -sampled or not- oriented). sFlow packets contains several Flow samples, you can easily read sFlow packets with wireshark or sflowtool.
(s)Flow samples can have several informations :

  • standard information : input interface, output interface, sampling_rate...
  • Raw packet header : from layer2 (mac...) to layer 4 (proto, and port)
  • Extended switch data : incoming/outgoing VLAN tag
  • Extended router data : direct router next-hop
  • Extended gateway data : BGP next-hop, AS Peer, AS source, AS sequence, localpref, communities...

 

PMACCT

PMACCT is a project with several binaries :

  • pmacct : the cli, used to query data
  • pmacctd : libpcap collector
  • nfacctd : netflow/IP-FIX collector
  • sfacctd : sFlow collector

Please note that the up-to-date documentation can be fond on https://github.com/pmacct/pmacct (wiki section) at the time of writing.

Installation

You can use your packet manager to install it, or compile it (example for debian) :

git clone https://github.com/pmacct/pmacct.git
apt-get install pkg-config libtool autoconf automake
apt-get install gcc make libstdc++-6-dev libssl-dev libpcap-dev libmariadbclient18 libmariadbd-dev librabbitmq-dev libjansson-dev librdkafka-dev libtool

cd pmacct
./autogen.sh
# make sure to enable all the plugins you need
./configure --prefix=/usr --sysconfdir=/etc --localstatedir=/var --enable-rabbitmq --enable-l2 --enable-debug --enable-jansson

make
mkdir /etc/pmacct
make install

I will talk about sfacct, but it's almost the same for nfacct. sfacct will collect sFlow samples and agregarte them by primitives during a time interval. Primitives are the keys you are interrested for (can be IP_SRC, IP_DST, AS_SRC, AS_DST...) the whole list of primitives can be found in CONFIG-KEYS (look at the "KEY: aggregate" section). sfacct will agregate packets with common primitives and gives PACKETS count and BYTES.
You should also know that sfacct has many plugins (the way to export data), it can be in_memory, print in a file, sql, rabbitmq...

There are plenty of information and examples in QUICKSTART.

Here is a basic configuration example (/etc/pmacct/sfacctd.conf) :

debug: false
! for this example, I wan to run sfacctd by hand and look at the output
daemonize: false
pidfile: /var/run/sfacctd.pid
! remember to configure logrotate if you use logfile
!logfile : /var/log/sfacct.log

! returns warning messages in case of data loss
! look at CONFIG-KEYS for details
! bufferization of data transfers between core process and active plugins (default 4MB)
plugin_pipe_size: 10240000

! The value has to be <= the size defined by 'plugin_pipe_size' and keeping a ratio < 1:1000 between the two
! Once a buffer is filled, it is delivered to the plugin
plugin_buffer_size: 10240

! automatically renormalizes byte/packet counters value based on information acquired
! and take count of the sampling_rate in the (s)Flow sample
sfacctd_renormalize: true

! I dont use it, but you can use several plugins if you like : "plugins: amqp[foo], memory[bar]"
! and set options to plugins with aggregate[foo]:
plugins: memory
! check primitives list in CONFIG-KEYS
aggregate: peer_src_as, peer_dst_as, src_as, dst_as, etype

 

We can now run sfacct with sfacctd -f /etc/pmacct/sfacctd.conf command, if you enable debug it should print lot of things.
As we are using the memory plugin, we can easily check what sfacct is collecting :

  • pmacct -s ::  Show full statistics
  • pmacct -s -T bytes :: Output statistics ordered by decreasing value of bytes
  • pmacct -e :: Clear all statistics
  • pmacct -e -s -T bytes -O json :: combo! Show full statistics ordered by bytes in json format and clear statistics
#pmacct -e -s -T bytes | head
ETYPE  SRC_AS      DST_AS      PEER_SRC_AS PEER_DST_AS  PACKETS               BYTES
800    64496       64500       64496       64500        304324608             406280773632
800    64497       64500       64496       64500        175841280             241821032448
800    64500       64496       64500       64496        142401536             195753091072

Not bad ;) but the memory plugin is not "production grade"... even with plugin_pipe_size tuned, it still loses records :/
Instead we will use the print plugin (extract from sfacctd.conf):

plugins: print[print]
aggregate[print]: peer_src_as, peer_dst_as, src_as, dst_as, etype

! by default file is overwritten
print_output_file[print]: /tmp/5m_avg.json
print_output[print]: json
print_history[print]: 5m
print_history_roundoff[print]: m
print_refresh_time[print]: 300
! we want to run this script after purging cache (but that's another story) :
print_trigger_exec[print]: /opt/pma2influx.sh

 

InfluxDB

InfluxDB has a really well done documentation you should follow to get stated (links are out-dated, use up-to-date versions) :

#installation
https://docs.influxdata.com/influxdb/v1.4/introduction/installation

#first steps
https://docs.influxdata.com/influxdb/v1.4/introduction/getting_started/
https://docs.influxdata.com/influxdb/v1.4/query_language/database_manage...
https://docs.influxdata.com/influxdb/v1.4/guides/downsampling_and_retent...

Here is how to create a new database and check basics :

influx
> CREATE DATABASE sflow
> SHOW DATABASES
>  SHOW RETENTION POLICIES on sflow
> use sflow
> SHOW MEASUREMENTS

As you could see, the default retention policy is named autogen and duration is set to 0s (no record deleted after a duration time).
That's ok if you want to keep infinitely all records, but because records are imported every 5 minutes, you could hit performance issue after some time (weeks or months...). You can do really awesome things with RETENTION POLICY and CONTINUOUS QUERY... read the doc! Lets make a 3 months retention policy that we will use by default :

influx
> CREATE RETENTION POLICY "three_months_only" on sflow duration 13w REPLICATION 1 DEFAULT
>  SHOW RETENTION POLICIES on sflow

You may need to edit your InfluxDB’s config to set max-values-per-tag = 0 in the [data] section, otherwise you won’t be able to store a lot of flows (thanks FHR).
As you could see with SHOW MEASUREMENTS, there is currently no measurement, but that's were pma2influx.sh comes in!

 

pma2influx.sh

We declared in the sfacctd.conf to run a script via print_trigger_exec after writing the cache into /tmp/5m_avg.json but we can not pass arguments to our script.

As you guessed, this script will parse the JSON and import the result into InfluxDB. Here is the script I wrote (in bash!)
This was inspired from https://github.com/pierky/pmacct-to-elasticsearch/blob/master/CONFIGURATION.md
You may do much more fancy things with queuing/messaging, but because I'm on the same host, it does the job for me.

#!/usr/bin/env bash

DATABASE='sflow'
SRC_FILE='/tmp/5m_avg-sflow.json'
DST_FILE='/tmp/pma2influx-sflow-5m.txt'

# Header for influx import
echo -e "# DML \n# CONTEXT-DATABASE: $DATABASE" > $DST_FILE

# We will import all the primitives of sfacctd as tags into influx (with the same name)
# only bytes are saved as field value
# these records are stored in a MEASUREMENT name "traffic"
# sfacctd BYTE size is w/o L2 informations, We need to add them to be more accurate with SNMP counters
# (26 bytes w/o VLAN tag, 30 bytes with) * PACKETS count

cat $SRC_FILE | sed 's/{//; s/}//; s/"//g; s/:/=/g; s/\ //g;' | sed -r 's/event_type=purge,//; s/stamp_inserted=\S+,packets/packets/;' | sed 's/,packets=/ /;s/,bytes=/ /'| awk '{print "traffic,"$1,"bytes="$2*26+$3}' >> $DST_FILE

# This is the JSON for one aggregation
# {"etype": "800", "as_src": 64496, "as_dst": 64500, "peer_as_src": 64496, "peer_as_dst": 64500, "packets": 112910336, "bytes": 150642491392}

# This is a record to add to influx, no timestamp specified, Influx will add it at this time of import
#traffic,etype=800,as_src=64496,as_dst=64500,peer_as_src=64496,peer_as_dst=64500 bytes=153578160128

# This is how we import data into influx
influx -import -path=$DST_FILE

After a few minutes we can check the result in influx :

influx -database 'sflow' -execute 'SELECT * FROM traffic' | wc -l
9313

Bingo!

 

Grafana

The installation is straight forward : http://docs.grafana.org/installation/

The configuration do not need much edits, by defaut grafana web-server is listening on port 3000.
Default user is admin/admin make sure to edit the config file or change the password on the http://host:3000/profile/password page
details are available at : http://docs.grafana.org/installation/configuration/

To configure Grafana with InfluxDB, I followed theses instructions : http://docs.grafana.org/features/datasources/influxdb/
adding a Data Source is really easy, set the URL to http://localhost:8086 and Access in proxy mode, make sure to set the Min time interval (5m for me).

Once the Data Source is configured, the last thing to do is to configure a dashboard, make sure to save it before closing the page, otherwise you will lost all your creation.

And tadaaa! You will find some pictures in this other blog post I wrote : https://afenioux.fr/blog/article/see-traffic-according-to-rpki-roa-validation-state-with-pmacct

 

Going further

I now want to get 95%tils trafic on the last month from AS64500 to AS64496 , which is quite easy to do with InfluxDB (remember we store data at 5min average, values are now calculated in Mbps -10^6- and not Mibps -2^20-) :

SELECT PERCENTILE("bytes",95)*8/300/1000/1000,peer_as_src,as_dst,peer_as_dst FROM traffic WHERE peer_as_src = '64500' AND time > now() - 30d GROUP BY etype,as_dst'

name: traffic
tags: as_dst=64496, etype=800
time                percentile        peer_as_src as_dst peer_as_dst
----                ----------        ----------- ------ -----------
1516046701202122812 78744.20395833334 64500       64496   64496

name: traffic
tags: as_dst=64496, etype=86dd
time                percentile        peer_as_src as_dst peer_as_dst
----                ----------        ----------- ------ -----------
1516046701202122812 1484.853541666667 64500       64496   64496

As you could see, I had 2 series in result for traffic from 64500 to 64496. It is extremely important here to use "GROUP BY etype", otherwise you will mix values from both protocols and the result won't be what you expect it to be.

With the same idea, we can not get so easily the global 95%tils from AS64500 to any (I mean the 95%tils input on his port), trying to "GROUP BY peer_as_src" only, would result in a very low value.
To overcome this issue, we will aggregate data per peer_as every 5 minutes in a CONTINUOUS QUERY :

> SHOW CONTINUOUS QUERIES
name: sflow
name                      query
----                      -----
cq_sum_per_peer_as_dst_5m CREATE CONTINUOUS QUERY cq_sum_per_peer_as_dst_5m ON sflow BEGIN SELECT sum(bytes) AS bytes INTO sflow.three_months_only.sum_per_peer_as_dst FROM sflow.three_months_only.traffic GROUP BY peer_as_dst, time(5m) END
cq_sum_per_peer_as_src_5m CREATE CONTINUOUS QUERY cq_sum_per_peer_as_src_5m ON sflow BEGIN SELECT sum(bytes) AS bytes INTO sflow.three_months_only.sum_per_peer_as_src FROM sflow.three_months_only.traffic GROUP BY peer_as_src, time(5m) END

Let's wait a bit before seeing the result with a SHOW MEASUREMENTS and voila!

> SELECT * FROM sum_per_peer_as_src where peer_as_src = '64500';
name: sum_per_peer_as_src
time                bytes         peer_as_src
----                -----         -----------
1516228500000000000 7771814551552 64500
1516228800000000000 7627846352896 64500
1516229100000000000 7526702882816 64500

> SELECT PERCENTILE("bytes",95)*8/300/1000/1000,peer_as_src FROM sum_per_peer_as_src WHERE peer_as_src = '64500'  AND time > now() - 30d;
name: sum_per_peer_as_src
time                percentile         peer_as_src
----                ----------         -----------
1516228500000000000 197647.46479166666 64500

We wouldn't need all this if we were using SUM(), remember to use extra-care with PERCENTILE().

 

The PEER_AS_SRC :

One point I should have mentioned before, is how is determined the PEER_AS_SRC, It's actually determined on the router that this sending the Flow via a FIB lookup.
In other words it's where we would have send the traffic for the source address we are looking in the packet, but if the traffic is asymmetrical it is a wrong assumption.

Here is how to fix it (example of sfacctd.conf) :

! Even if you don't setup a BGP session, we MUST HAVE these lines to get bgp_peer_src_as_map working
bgp_daemon: true
bgp_daemon_ip: 127.0.0.1
bgp_daemon_max_peers: 1
bgp_peer_src_as_map: /etc/pmacct/peers.map
bgp_peer_src_as_type: map

And the example of peers.map :

! it is the ASN value we want to set in PEER_AS_SRC or PEER_SRC_AS... it depends...
! I've choose to use src_mac as a discriminant, please note that I use a rather large prefix for ip
! (ip is a mandatory field...), it the source IP of the router sending the sFflow packets.
id=64496            src_mac=11:22:33:44:55:66    ip=0.0.0.0/0
id=64500            src_mac=aa:bb:cc:dd:ee:11    ip=0.0.0.0/0
! catch all
id=999999          ip=0.0.0.0/0
! you could also do a BGP looking as last chance catch all with :
! but of course you need to have setup an ibgp session from you router to pmacct listening IP.
id=bgp                      ip=0.0.0.0/0

All fields usable are detailed in this peers.map.example, you can write you own automation script or look at https://github.com/pierky/mactopeer which is a napalm wrapper working wonderfully well with Arista (well... until I upgraded to EOS 4.24... Napalm may have fixed the issue now but I still give you my fix below).

 

The Arista CLI is now using sh ipv6 bgp peers (VS neighbors), mac-to-peer is using python2.7 by default.

/usr/local/lib/python2.7/dist-packages/napalm/eos/eos.py
#       #2020-06-26 Afenioux : change to comply with new Arista cli
#        NEIGHBOR_FILTER = 'bgp neighbors vrf all | include remote AS | remote router ID |IPv[46] Unicast:.*[0-9]+|^Local AS|Desc|BGP state'  # noqa
#        PEERS_FILTER = 'bgp peers vrf all | include remote AS | remote router ID |IPv[46] Unicast:.*[0-9]+|^Local AS|Desc|BGP state'  # noqa
#        output_summary_cmds = self.device.run_commands(
#            ['show ipv6 bgp summary vrf all', 'show ip bgp summary vrf all'],
#            encoding='json')
#        output_neighbor_cmds = self.device.run_commands(
#            ['show ip ' + PEERS_FILTER, 'show ipv6 ' + PEERS_FILTER],
#            #['show ip ' + NEIGHBOR_FILTER, 'show ipv6 ' + NEIGHBOR_FILTER],
#            encoding='text')

        #2020-06-26 Afenioux : change to comply with new Arista cli - and remove IPv6
        NEIGHBOR_FILTER = 'bgp neighbors vrf all | include remote AS | remote router ID |IPv[46] Unicast:.*[0-9]+|^Local AS|Desc|BGP state'  # noqa
        output_summary_cmds = self.device.run_commands(
            ['show ip bgp summary vrf all'],
            encoding='json')
        output_neighbor_cmds = self.device.run_commands(
            ['show ip ' + NEIGHBOR_FILTER],
            encoding='text')

 

 

Even further, better bgp lookup with pmacct :

Sometimes you will need to use pmacct to get bgp information (as_path, bgp_communities...), so you will need to setup a bgp session between each routers and pmacct, here is an example for Arista (which is IOS-XE like) :

sflow sample xxxx
sflow destination 10.11.12.13
sflow source-interface Loopback0
sflow run
sflow extension bgp

router bgp xxxx
   neighbor 10.11.12.13 description PMACCT
   neighbor 10.11.12.13 remote-as 65535
   neighbor 10.11.12.13 update-source Loopback0
   neighbor 10.11.12.13 route-reflector-client
  # avoid add-path with pmacct, it wasn't working that well during my tests
   no neighbor 10.11.12.13 additional-paths send any

sfacctd.conf :

bgp_daemon: true
bgp_daemon_ip: 10.11.12.13
bgp_daemon_as: 65535
bgp_daemon_max_peers: 10
bgp_peer_src_as_map: /etc/pmacct/peers.map
bgp_peer_src_as_type: map

! sfacctd populate 'src_as', 'dst_as', 'peer_src_as' and 'peer_dst_as' primitives from information in bgp
! 'longest' behaves : networks_file < sFlow/NetFlow < <= BGP
sfacctd_as: longest
sfacctd_net: longest

 

Still further, reduce number of records in InfluxDB:
Here is a script I wrote to reduce the number of records in the DB, because I had lot of ASN doing only a very small amount of traffic.
I chose to delete the unmeanigful records, but you may want to consolidate them under AS0. This proposition is let as an exercise for the reader.

#!/usr/bin/env python
"""
Deleting records older than duration with ASN traffic under min_traff
MUST be run on the VM that run InfluxDB (currently)
"""
#https://influxdb-python.readthedocs.io/en/latest/api-documentation.html
from influxdb import InfluxDBClient
from operator import itemgetter, attrgetter, methodcaller
from pprint import pprint
import sys

__author__ = "Arnaud Fenioux"
__version__ = "0.1"

debug = False

db = sys.argv[1]
if db == "sflow" or db == "sflow_1h" :
  db = InfluxDBClient(database=db)
else :
  exit("db in argv[1] must be sflow or sflow_1h")

# calculation of the Sum of Bytes on the duration
# records older than duration get deleted
duration = "1w"

# Min traff on duration in Bytes
min_traff = 10*1024*1024

#
# Get total traff by asn
# asn_type can by 'as_dst' or 'as_src'
#
def get_traffic_for(asn_type) :
  query = "SELECT sum(bytes) FROM traffic WHERE time > now() - {} GROUP BY {}".format(duration, asn_type)
  resultSet = db.query(query)
  result = {}
  for keyset, gp in resultSet.items():
    asn = keyset[1][asn_type]
    traff = list(gp)[0]['sum']
    if debug : print "AS: {} \t traff: {}GB".format(asn, traff/(1024*1024*1024))
    result[asn] = traff
  return result

#
# Deleting records older than duration with ASN traffic under min_traff
def clean_records(asn_type, top_list, min_traff):
  deleted_asn = 0
  for (asn, traff) in top_list:
    if traff < min_traff :
      query = "DELETE FROM traffic WHERE {} = '{}' AND time < now() - {}".format(asn_type, asn, duration)
      if debug : print query
      print "AS: {} \t traff: {}MB - records older than {} deleted".format(asn, traff/(1024*1024), duration)
      db.query(query)
      deleted_asn+=1
  print "number of asn: {} - deleted asn: {}\n".format(len(top_list),deleted_asn)

for asn_type in ('as_src','as_dst'):
  # these dict are key : asn, value : traffic
  traff = get_traffic_for(asn_type)

  # We want to sort destinations by traffic, biggest first
  top_as = sorted(traff.items(), key=itemgetter(1), reverse=True)

  #if debug: pprint(top_as)
  print "cleaning {}...".format(asn_type)
  clean_records(asn_type, top_as, min_traff)

---
I would like to thanks @CorsoAlexandre for his help and advices.

Classified in : MISC, UNIX Tags : none

Comments are closed.

Rss feed of the article's comments