I didn’t want more complexity than I needed, so I’m using the same Raspberry Pi 5 for the flow meter.
First, setup your flow meter in the web interface. Go to Admin→Database Admin→Flow meters→Add. I named mine ‘flowmeter’ and used the port name ‘1’.
I started with GitHub - stevenpinewebair/kegbot-pi-flowmeter: Pushes flowmeter results from pi to kegbot server . Here’s what I used:
cd ~
git clone https://github.com/stevenpinewebair/kegbot-pi-flowmeter.git
cd kegbot-pi-flowmeter
nano config.py
Here is my config.py:
#!/usr/bin/env python
KEY = '<put your key here>'
BASE_URL = 'http://127.0.0.1:8000/'
Get your API key from the web interface ‘Account’ page and put it in the above spot for KEY.
ChatGPT recommended I replace RPi.GPIO with gpiozero. Here is my beerapi-gpio.py:
import requests
import json
#import RPi.GPIO as GPIO
from gpiozero import Button
import os, sys, math, logging
import time
import datetime
from threading import Timer
#from datetime import datetime
from decimal import *
from flowmeter import *
from config import *
#config file is defining KEY and BASE_URL
logging.basicConfig(
filename='gpioDaemon.log',
level=logging.DEBUG,
format='%(asctime)s:%(levelname)s:%(message)s')
DEBUG=True
ML_PER_CLICK = 1.989
logging.info('Running gpio-daemon')
logging.info('DEBUG %s', DEBUG)
# Tell GPIO library to use GPIO references
#GPIO.setmode(GPIO.BCM)
# Set the correct pin
#GPIO.setup(17 , GPIO.IN)
logging.info('Setup GPIO17 as flowmeter input')
GPIO = Button(17)
logging.info('Setup Flowmeter')
#pulling from flowmeter.py
fm = FlowMeter('', ['beer'])
#Class GpioDaemon:
def sensorCallback1(channel):
# Called if sensor output goes LOW
timestamp = time.time()
stamp = datetime.datetime.fromtimestamp(timestamp).strftime('%H:%M:%S')
currentTime = int(time.time() * FlowMeter.MS_IN_A_SECOND)
#print "Sensor LOW " + stamp
if fm.enabled == True:
fm.update(currentTime)
logging.info(fm.lastClick)
def pourDrinkEvent(clicks):
#drink is being poured
#need to wait for it to finish
logging.info('in drink loop')
logging.info('initial clicks %s', clicks)
logging.info('initial fmclicks %s', fm.clicks)
time.sleep(3)
logging.info('after sleep clicks %s', clicks)
logging.info('after sleep fmclicks %s', fm.clicks)
if(fm.clicks <= 3):
#ghost pour
logging.info('ghost pour, %s', fm.clicks)
fm.clear()
elif(fm.clicks == clicks and fm.clicks > 3):
#not pouring?
logging.info(fm.clicks)
reportDrinkEvent(fm.clicks)
else:
#return to loop
logging.info('in pourDrinkEvent else condition')
pourDrinkEvent(fm.clicks)
def reportDrinkEvent(clicks):
#the server should calculate oz from ticks but I was getting bad readings from it.
mlVolume = clicks*ML_PER_CLICK
myDrink = {'api_key': KEY, 'ticks': clicks, 'volume_ml': mlVolume}
p = requests.post(BASE_URL+'api/taps/flowmeter.1', data=myDrink)
#drink reported, resetting indictator
fm.clear()
logging.info("Status: %s", p.status_code)
logging.info("Response text: %s", p.text)
if p.status_code != 200:
logging.info("Request failed")
return
if not p.text.strip():
print("Empty response from server")
return
data = p.json()
logging.info(data)
return data
def pourGet():
return fm.getFormattedTotalPour()
def pourReset():
fm.clear()
def stillAlive():
Timer(600, stillAlive).start()
logging.info('still alive')
def main():
#initial build out
# GPIO.add_event_detect(17, GPIO.FALLING, callback=sensorCallback1)
# Taken from https://gpiozero.readthedocs.io/en/stable/migrating_from_rpigpio.html
GPIO.when_released = sensorCallback1
stillAlive()
try:
while True:
time.sleep(1)
#logging.info('in sleep loop')
#push this off to definition
if(fm.clicks > 0):
pourDrinkEvent(fm.clicks)
except KeyboardInterrupt:
#GPIO.cleanup() # GPIO.cleanup not needed with gpiozero, see link above under section 11.5
logging.info('exiting...')
if __name__=="__main__":
main()