Post

InfluxDB pour les développeurs SQL : petit guide de transition vers le langage Flux

InfluxDB pour les développeurs SQL : petit guide de transition vers le langage Flux

Présentation d’InfluxDB

InfluxDB est un système de gestion de base de données spécialisé dans les séries temporelles — c’est-à-dire des données horodatées, comme les relevés de capteurs (température, humidité…), les métriques système, etc. Contrairement aux SGBD relationnels (comme SQL Server, MySQL ou PostgreSQL), InfluxDB est conçu pour ingérer, stocker et interroger des flux de données chronologiques à haute fréquence. Il est très pratique avec son langage de requêtes pour manipuler des flux de données dans le temps : interroger les dernières 24h, faire des moyennes sur des tranches horaires, etc.

Le problème est qu’InfluxDB a sorti trois versions 1.x, 2.x et maintenant 3.x, avec à chaque fois des langages différents.

  • InfluxDB v1 est toujours disponible sur le site d’InfluxData, mais la maintenance est minimale et il ne présente plus de nouvelles fonctionnalités. Le langage de manipulation des données est InfluxQL, un langage simple qui se veut proche de SQL, mais qui reste limité.

  • InfluxDB v2 est toujours maintenu, même s’il ne recevra plus d’innovations majeures. La version 2.7.12 est la dernière à ce jour et est sortie en mai 2025. Le langage de manipulation de données se nomme Flux, un langage de script fonctionnel puissant, mais qui ressemble à SQL de très loin.

  • InfluxDB v3 a opéré un revirement stratégique avec un retour au langage SQL, finalement bien plus familier des développeurs.

Encore et toujours avec un wagon de retard, je travaille actuellement avec la version 2.7.11, bien suffisante pour mes petits besoins, et toujours d’actualité, même si InfluxData recommande évidemment aujourd’hui le passage à InfluxDB v3.

Le langage fonctionnel Flux d’InfluxDB v2

Prenons comme base la configuration utilisée dans mon projet LoRaWAN, qui stocke les mesures de température et d’humidité relevées par une sonde SHT31. En Python (grâce à la bibliothèque influxdb-client), un enregistrement est effectué à intervalles réguliers :

1
2
3
4
5
6
7
    ...
    # Création du point
    point = Point("lora_data") \
        .tag("device", "heltec-v3-perso") \
        .field("temperature", round(float(temperature), 1)) \
        .field("humidity", round(float(humidity), 1))
    ...

Lors de l’insertion, un champ spécial time d’horodatage est automatiquement ajouté à la création de l’enregistrement.

  • Un tag “device” (une métadonnée de l’enregistrement) est un élément indexé qui permet de filtrer ou regrouper les séries (par capteur, par localisation, etc.). Chaque combinaison différente de tags crée une série distincte : le capteur de température de la chambre, le capteur de température du salon, le capteur de pression dans le jardin, etc.

  • “temperature” et “humidity” sont des fields pour stocker les valeurs mesurées.

Ce point qui fait partie d’une série va être enregistré dans un measurement. Disons une table si on fait le parallèle avec SQL.

Les measurements sont regroupés dans un bucket, le conteneur global.

Schématiquement, cela donne :

1
2
3
4
Bucket
  └── Measurement ("lora_data")
        └── Series ("device" = "heltec-v3-perso")
              └── Point (fields + time)

Mon bucket ne contient qu’un seul measurement, nommé “lora_data”, et celui-ci ne contient qu’une seule serie (car je n’ai qu’un seul capteur).

Ce modèle permet d’interroger facilement les données par capteur, par période, ou par type de mesure, tout en assurant une grande efficacité en écriture et en lecture.

Pour lancer quelques requêtes en langage Flux avec l’interface d’InfluxDB en ligne de commande, je passe par un petit script exécutable :

run_query_influxdb.sh

1
2
3
4
5
6
7
8
#!/bin/bash

source .env
influx query \
  --host $INFLUXDB_URL \
  --org $INFLUXDB_ORG \
  --token $INFLUXDB_TOKEN \
  --file querytest.flux

Les identifiants et token de connexion sont dans des variables d’environnement dans un fichier .env.

La requête est écrite dans un fichier .flux, par exemple :

querytest.flux

1
2
3
4
from(bucket: "lora-ttn")
  |> range(start: -2h)
  |> filter(fn: (r) => r._measurement == "lora_data")
  |> keep(columns: ["_time", "_field", "_value", "device"])

Et on lance l’exécution du script avec la commande ./run_query_influxdb.sh, ce qui me renvoie les données suivantes :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
Result: _result
Table: keys: [_field, device]
         _field:string           device:string                      _time:time        _value:float
----------------------  ----------------------  ------------------------------  ------------------
              humidity         heltec-v3-perso  2025-08-29T14:42:21.774570306Z                65.7
              humidity         heltec-v3-perso  2025-08-29T14:54:19.728220928Z                65.3
              humidity         heltec-v3-perso  2025-08-29T15:06:18.632352441Z                61.8
              humidity         heltec-v3-perso  2025-08-29T15:18:17.004567109Z                63.7
              humidity         heltec-v3-perso  2025-08-29T15:30:15.959644094Z                59.8
              humidity         heltec-v3-perso  2025-08-29T15:42:23.182885912Z                  61
              humidity         heltec-v3-perso  2025-08-29T15:54:12.147956696Z                60.6
              humidity         heltec-v3-perso  2025-08-29T16:06:10.564498597Z                60.4
              humidity         heltec-v3-perso  2025-08-29T16:18:08.341589471Z                58.2
              humidity         heltec-v3-perso  2025-08-29T16:30:07.798405170Z                57.4
Table: keys: [_field, device]
         _field:string           device:string                      _time:time        _value:float
----------------------  ----------------------  ------------------------------  ------------------
           temperature         heltec-v3-perso  2025-08-29T14:42:21.774570306Z                21.8
           temperature         heltec-v3-perso  2025-08-29T14:54:19.728220928Z                22.2
           temperature         heltec-v3-perso  2025-08-29T15:06:18.632352441Z                22.7
           temperature         heltec-v3-perso  2025-08-29T15:18:17.004567109Z                  22
           temperature         heltec-v3-perso  2025-08-29T15:30:15.959644094Z                23.7
           temperature         heltec-v3-perso  2025-08-29T15:42:23.182885912Z                22.3
           temperature         heltec-v3-perso  2025-08-29T15:54:12.147956696Z                21.9
           temperature         heltec-v3-perso  2025-08-29T16:06:10.564498597Z                22.4
           temperature         heltec-v3-perso  2025-08-29T16:18:08.341589471Z                23.2
           temperature         heltec-v3-perso  2025-08-29T16:30:07.798405170Z                23.2

Comment lire une requête Flux

Reprenons la dernière requête Flux :

1
2
3
4
from(bucket: "lora-ttn")
  |> range(start: -2h)
  |> filter(fn: (r) => r._measurement == "lora_data")
  |> keep(columns: ["_time", "_field", "_value", "device"])

Et décortiquons cette requête ligne par ligne :

  • from(bucket: "lora-ttn") : la source de données est le bucket “lora-ttn”.

  • |> range(start: -2h) : la taille de la fenêtre temporelle est limitée, en demandant seulement les données des deux dernières heures.

  • |> filter(fn: (r) => r._measurement == "lora_data") : filtre pour des mesures spécifiques. Ici, on ne restitue que les données dont la mesure (measurement) porte le nom “lora_data”.

  • |> keep(columns: ["_time", "_field", "_value", "device"]) : réduire le nombre de colonnes à l’essentiel - _time pour l’horodatage de la mesure, _field pour le nom du champ mesuré (ex. température, humidité…), _value pour la valeur mesurée, device pour l’identifiant du capteur ou module LoRa.

Le symbole |> dans Flux est l’opérateur de pipeline, et il fonctionne un peu comme le | en bash. Il permet de chaîner les opérations les unes après les autres, en passant les résultats d’une étape à la suivante.

La partie fn: (r) => ... évoque une fonction (anonyme ou lambda) avec le paramère r, le plus souvent une ligne (row) du tableau de données.

En SQL, on aurait un équivalent qui ressemblerait à :

1
2
3
4
SELECT time AS _time, field AS _field, value AS _value, device
FROM lora_data
WHERE time >= NOW() - INTERVAL '2 hours'
ORDER BY time DESC;

Agrégation temporelle de données avec Flux

Soit la requête :

1
2
3
4
5
6
    from(bucket: "lora-ttn")
      |> range(start: -6h)
      |> filter(fn: (r) => r._measurement == "lora_data")
      |> filter(fn: (r) => r._field == "temperature" or r._field == "humidity")
      |> aggregateWindow(every: 1h, fn: mean, createEmpty: false)
      |> keep(columns: ["_time", "_field", "_value"])

Au résultat :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Result: _result
Table: keys: [_field]
         _field:string                      _time:time                  _value:float
----------------------  ------------------------------  ----------------------------
              humidity  2025-08-29T12:00:00.000000000Z             62.56666666666666
              humidity  2025-08-29T13:00:00.000000000Z            56.120000000000005
              humidity  2025-08-29T14:00:00.000000000Z                         70.12
              humidity  2025-08-29T15:00:00.000000000Z                          66.6
              humidity  2025-08-29T16:00:00.000000000Z             61.38000000000001
              humidity  2025-08-29T17:00:00.000000000Z             58.32000000000001
              humidity  2025-08-29T17:23:49.397923842Z                         58.45
Table: keys: [_field]
         _field:string                      _time:time                  _value:float
----------------------  ------------------------------  ----------------------------
           temperature  2025-08-29T12:00:00.000000000Z            22.366666666666664
           temperature  2025-08-29T13:00:00.000000000Z                          24.6
           temperature  2025-08-29T14:00:00.000000000Z                         19.72
           temperature  2025-08-29T15:00:00.000000000Z            21.259999999999998
           temperature  2025-08-29T16:00:00.000000000Z                         22.52
           temperature  2025-08-29T17:00:00.000000000Z            22.919999999999998
           temperature  2025-08-29T17:23:49.397923842Z                         22.35

La ligne |> aggregateWindow(every: 1h, fn: mean, createEmpty: false) est la fonction d’agrégation temporelle. Elle regroupe les données par fenêtres de temps (ici, des tranches d’1 heure) et applique une fonction d’agrégation (ici, la moyenne). Avec createEmpty: false, on ignore les tranches sans données.

En résumé, cette requête va regrouper les points par tranches horaires (ex. 14h–15h, 15h–16h…), calculer la moyenne de chaque champ (ex. température, humidité) dans chaque tranche, mais en ignorant les tranches sans données (grâce à createEmpty: false).

Un peu plus longue, la requête suivante va compter le nombre de jours à forte chaleur (≥ 30°C) par semaine entre deux dates :

1
2
3
4
5
6
7
8
from(bucket: "lora-ttn")
  |> range(start: 2025-08-06T00:00:00Z, stop: 2025-08-30T00:00:00Z)
  |> filter(fn: (r) => r._measurement == "lora_data" and r._field == "temperature" and r.device == "heltec-v3-perso")
  |> aggregateWindow(every: 1d, fn: max, createEmpty: false)
  |> filter(fn: (r) => r._value >= 30.0)
  |> aggregateWindow(every: 1w, fn: count, createEmpty: true)
  |> fill(column: "_value", value: 0)
  |> keep(columns: ["_time", "_field", "_value"])

Avec le résultat :

1
2
3
4
5
6
7
8
9
Result: _result
Table: keys: [_field]
         _field:string                      _time:time        _value:int
----------------------  ------------------------------  ----------------
           temperature  2025-08-07T00:00:00.000000000Z                 0
           temperature  2025-08-14T00:00:00.000000000Z                 5
           temperature  2025-08-21T00:00:00.000000000Z                 5
           temperature  2025-08-28T00:00:00.000000000Z                 1
           temperature  2025-08-30T00:00:00.000000000Z                 0

Avec l’option createEmpty: true et l’instruction fill(column: "_value", value: 0), on prend en compte les semaines sans fortes chaleurs. 11 journées chaudes quand même, soif

Les développeurs SQL sentent les opérations COUNT et AVG avec GROUP BY, mais des fonctions anonymes, des filter, cela ressemble davantage à de la programmation fonctionnelle, non ? Pourquoi pas des map ou des reduce tant qu’on y est…

Du SQL à la programmation fonctionnelle

… Puisque vous le demandez, prenez la requête Flux suivante :

1
2
3
4
5
6
7
8
9
10
from(bucket: "lora-ttn")
  |> range(start: -14d)
  |> filter(fn: (r) => r._measurement == "lora_data" and r._field == "temperature" 
                                         and r.device == "heltec-v3-perso")
  |> aggregateWindow(every: 1d, fn: max, createEmpty: false)
  |> map(fn: (r) => ({r with tempLevel: if r._value < 15.0 then "froid"
                                        else if r._value < 30.0 then "modéré"
                                        else "chaud"
                     }))
  |> keep(columns: ["_time", "_field", "_value", "tempLevel"])

Requête qui renvoie le résultat suivant :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Result: _result
Table: keys: [_field]
         _field:string                      _time:time        _value:float        tempLevel:string
----------------------  ------------------------------  ------------------  ----------------------
           temperature  2025-08-17T00:00:00.000000000Z                29.1                modéré
           temperature  2025-08-18T00:00:00.000000000Z                29.8                modéré
           temperature  2025-08-19T00:00:00.000000000Z                31.1                   chaud
           temperature  2025-08-20T00:00:00.000000000Z                26.6                modéré
           temperature  2025-08-21T00:00:00.000000000Z                21.8                modéré
           temperature  2025-08-22T00:00:00.000000000Z                25.5                modéré
           temperature  2025-08-23T00:00:00.000000000Z                24.6                modéré
           temperature  2025-08-24T00:00:00.000000000Z                25.6                modéré
           temperature  2025-08-25T00:00:00.000000000Z                27.1                modéré
           temperature  2025-08-26T00:00:00.000000000Z                29.9                modéré
           temperature  2025-08-27T00:00:00.000000000Z                30.1                   chaud
           temperature  2025-08-28T00:00:00.000000000Z                29.5                modéré
           temperature  2025-08-30T00:00:00.000000000Z                25.9                modéré
           temperature  2025-08-30T17:23:24.295947920Z                25.4                modéré

Avec map on applique une fonction pour chaque ligne r du tableau. Et avec { r with ... }, on renvoie une nouvelle version de r, enrichie d’une nouvelle colonne calculée tempLevel.

Plutôt que de chercher un équivalent en SQL, on ferait mieux de se mettre à la programmation fonctionnelle, par exemple en Python :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
data = [
    {"_value": 31.5},
    {"_value": 26.2},
    {"_value": 14.5}
]

def classify_temperature(r):
    if r["_value"] < 15.0:
        r["tempLevel"] = "froid"
    elif r["_value"] < 30.0:
        r["tempLevel"] = "modéré"
    else:
        r["tempLevel"] = "chaud"
    return r

result = list(map(classify_temperature, data))

Conclusion

La syntaxe de Flux est inspirée de la programmation fonctionnelle avec des opérateurs comme map(), filter(), reduce() ou aggregateWindow(). Flux permet de construire des pipelines évolués de requêtes finalement assez lisibles avec l’habitude.

Pour autant, j’aurais beaucoup de mal à traduire certaines requêtes SQL en Flux : requêtes avec sous-requêtes, agégations conditionnelles, etc.

Si la logique orientée flux vous semble trop déroutante, il vous reste à passer à la version 3 d’InfluxDB avec son langage SQL orienté bases de données temporelles ;-)

Cet article est sous licence CC BY 4.0 par l'auteur.