Skip to content
Snippets Groups Projects
memcached2Influx.py 7.44 KiB
Newer Older
import argparse
from influxdb import InfluxDBClient
Riccardo Sellari's avatar
Riccardo Sellari committed
from datetime import datetime
Riccardo Sellari's avatar
Riccardo Sellari committed
import sys  
from pymemcache.client import base
import os
import re
import json
import time
def jsonKey2Influx(key, clientMemcached, name):
    data = clientMemcached.get(key)
    data = json.loads(data)
    payload = []
    payload.append({
        "measurement": name,
        "tags": {
Riccardo Sellari's avatar
Riccardo Sellari committed
            "key": key
        },
        "fields": data
    })
    return payload

Riccardo Sellari's avatar
Riccardo Sellari committed
def jsonData2Influx(fileData,clientMemcached):
    payload = []
Riccardo Sellari's avatar
Riccardo Sellari committed
    jsonData = clientMemcached.get(fileData['keybind'])
    memcached = json.loads(jsonData)
    fileData["rate"] = 5
    fileData["currentTime"] = 0.0
    payload.append({
            "measurement": fileData["name"],
            "tags": {
                "key": fileData["keybind"]
            },
            "fields": memcached,
            "parameter":fileData
Riccardo Sellari's avatar
Riccardo Sellari committed
    return payload
def byteData2Influx(fileData,clientMemcached):
    payload = []
    byteArray = clientMemcached.get(fileData['keybind'])
    memcached =byteArray[int(fileData['offset'])]
    fileData["currentTime"] = 0.0
Riccardo Sellari's avatar
Riccardo Sellari committed
    try:
        fileData["rate"] = int(fileData["rate"])
    except:
        fileData["rate"] = 5
    
    payload.append({
            "measurement": fileData["name"],
            "tags": {
                "key": fileData["keybind"]
            },
            "fields": {
                fileData["name"] : memcached
                },
            "parameter" : fileData
    })
    return payload
def findTheKey(configFile):
    data = {}
    #find all the line with usefull data, except for keybind
    pattern = r'^"(.+)":(.+,(( ←)|[^a-zA-Z ]))'
    for line in configFile:
        match = re.search(pattern,line)
        #knowing that the keybind is always the last value, the return is set right after it
        if(match):
            #match = re.search(pattern,line)
            parameter = re.sub(r'(,|"|( ←)|\n)(?![^\[\]]*\])',"",match.group(2))
            data[match.group(1)] = parameter
Riccardo Sellari's avatar
Riccardo Sellari committed
        elif(line.find('keybind') != -1):
            line = re.sub(r',|( ←)',"",line)
            pattern = r'"(.+)":"(.+)"'
            match = re.search(pattern,line)
            data[match.group(1)] = match.group(2)
            return data
            

parser = argparse.ArgumentParser()

parser.add_argument("-u", "--username", required = False ,default="", help = "the username needed to log in the db")
parser.add_argument("-p", "--password", required = False ,default="", help = "the password needed to log in the db")
Riccardo Sellari's avatar
Riccardo Sellari committed
parser.add_argument("-is", "--influxServer",required = True, help = "the address of the influx server")
parser.add_argument("-id", "--influxDatabase", required = True, help = "the name of the database you want to log in")
parser.add_argument("-ip", "--influxPort",required = False, default = 8086, help = "the port associated with the server address")
parser.add_argument("-ms", "--memcachedServer",required = True, help = "the address of the memcached server")
parser.add_argument("-mp", "--memcachedPort",required = False, default = 11211, help = "the port of the memcached server")
parser.add_argument("-k", "--key",required = False, help = "the key needed to find the data in the memcached DB")
Riccardo Sellari's avatar
Riccardo Sellari committed
parser.add_argument("-kn", "--keyName",required = False, help = "if you're not using a configuration file, use this parameter to chose the name of the measurement")
Riccardo Sellari's avatar
Riccardo Sellari committed
parser.add_argument("-kr", "--keyRate", required = False, default = 5, help = "when the program is not working with a configuration file, specify the seconds between 2 different push in the influx db")
Riccardo Sellari's avatar
Riccardo Sellari committed
parser.add_argument("-f", "--file",required = False, help = "the configuration file path")
Riccardo Sellari's avatar
Riccardo Sellari committed
parser.add_argument("-fr", "--fileRate", required = False, default = 0.1, help = "when using a configuration file, specify the sleep time of the loop that check when and wich one of the key must be pushed")

args = parser.parse_args()

#log in the influx DB
Riccardo Sellari's avatar
Riccardo Sellari committed
#dcsMemDb
#python memcached2Influx.py -s vldantemon003.lnf.infn.it -po 8086 -f '/home/riccardo/Random bs go/Test/cofnigurationFile.txt' 
Riccardo Sellari's avatar
Riccardo Sellari committed
try:
Riccardo Sellari's avatar
Riccardo Sellari committed
    clientInflux = InfluxDBClient(host=args.influxServer, port=args.influxPort, username=args.username, password=args.password)
    clientInflux.switch_database(args.influxDatabase)
Riccardo Sellari's avatar
Riccardo Sellari committed
    print("succesfully logged in\n")
except:
Riccardo Sellari's avatar
Riccardo Sellari committed
    print("Error: impossible to connect to influx with the following parameter")
    print("username = "+args.username)
    print("password = "+args.password)
    print("database = "+args.influxServer)
    print("port = "+args.influxPort)
    sys.exit

#log in the memcached DB
#memcached_server = "192.168.198.20"
#mc_port = 11211
try:
Riccardo Sellari's avatar
Riccardo Sellari committed
    clientMemcached = base.Client( (args.memcachedServer , args.memcachedPort) , connect_timeout=20.0)
except:
    sys.exit('Cannot reach the memcached server, try again later')

if args.key:
Riccardo Sellari's avatar
Riccardo Sellari committed
    try:
        args.keyRate = int(args.keyRate)
    except ValueError as e :
        print("the inserted rate value is not valid\nThe default value of 5 second will be set")
        args.keyRate = 5
Riccardo Sellari's avatar
Riccardo Sellari committed
    name = ""
    if(args.keyName):
        name = args.keyName
    else:
        name = args.key
    while True:
        payload = []
Riccardo Sellari's avatar
Riccardo Sellari committed
        payload = jsonKey2Influx(args.key, clientMemcached, name)
        print('Publishing data')
        print(payload)
Riccardo Sellari's avatar
Riccardo Sellari committed
        clientInflux.write_points(payload)
Riccardo Sellari's avatar
Riccardo Sellari committed
        time.sleep(args.keyRate)

elif args.file:
Riccardo Sellari's avatar
Riccardo Sellari committed
    try:
Riccardo Sellari's avatar
Riccardo Sellari committed
        args.fileRate = float(args.fileRate)
Riccardo Sellari's avatar
Riccardo Sellari committed
    except ValueError as e :
        print("the inserted rate value is not valid\nThe default value of 0.1 second will be set")
        args.fileRate = 0.1
Riccardo Sellari's avatar
Riccardo Sellari committed
        open(args.file,'r')
    except FileNotFoundError as e:
        sys.exit("Error: " + args.iptable + " is not valid or does not point to a DBFile.")
Riccardo Sellari's avatar
Riccardo Sellari committed
    if os.path.isfile(args.file):
        with open(args.file,'r') as configFile:
            slowest = 0
            payloadList = []
Riccardo Sellari's avatar
Riccardo Sellari committed
            while (True):
                fileData = findTheKey(configFile)
                if fileData == None:
Riccardo Sellari's avatar
Riccardo Sellari committed
                    print('list done') 
Riccardo Sellari's avatar
Riccardo Sellari committed
                    break
                elif fileData["type"] == "json":  
                    payloadList.append(jsonData2Influx(fileData,clientMemcached))
                elif fileData["type"] == "double":
                    payloadList.append(byteData2Influx(fileData,clientMemcached))
                    
            while (True):
Riccardo Sellari's avatar
Riccardo Sellari committed
                time.sleep(args.fileRate)
                for payload in payloadList:
                    if (payload[0]["parameter"]["currentTime"] >= payload[0]["parameter"]["rate"]):
                        payload[0]["parameter"]["currentTime"] = 0
Riccardo Sellari's avatar
Riccardo Sellari committed
                        name = payload[0]["parameter"]["name"]
                        data = ({
Riccardo Sellari's avatar
Riccardo Sellari committed
                                "measurement": name,
                                "tags": {
                                    "key": payload[0]["parameter"]["keybind"]
                                },
                                "fields": {
Riccardo Sellari's avatar
Riccardo Sellari committed
                                    name : payload[0]["fields"][name]
                                    },
                        })
                        print("\nPublishing to influx:")
                        print(data)
Riccardo Sellari's avatar
Riccardo Sellari committed
                        print()
                        wrapData = [data]
                        clientInflux.write_points(wrapData)
Riccardo Sellari's avatar
Riccardo Sellari committed
                        payload[0]["parameter"]["currentTime"] = payload[0]["parameter"]["currentTime"] +args.fileRate