-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy patheinkauf.py
More file actions
executable file
·139 lines (119 loc) · 4.27 KB
/
einkauf.py
File metadata and controls
executable file
·139 lines (119 loc) · 4.27 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
#!/sbin/env python
import csv
import sys
from collections import defaultdict
import locale
import itertools
from rapidfuzz import fuzz
import inquirer
import json
locale.setlocale(locale.LC_ALL, 'de_DE.utf-8')
VALID_UNITS = {"g", "kg", "ml", "L", "Stück", "VE", "Pott"}
shopping_items = defaultdict(lambda: {unit: 0 for unit in VALID_UNITS})
try:
with open("einkauf.db", "r") as f:
data = json.load(f)
except FileNotFoundError:
data = {}
items_per_section = defaultdict(list)
if "items_per_section" in data:
for sec in data["items_per_section"]:
items_per_section[sec] = data["items_per_section"][sec]
item_equivalence = {}
if "equivalence" in data:
item_equivalence = data["equivalence"]
def write_einkaufdb(items_per_section, equivalence):
db_data = {
"items_per_section": items_per_section,
"equivalence": equivalence
}
with open("einkauf.db", "w") as f:
json.dump(db_data, f)
def pretty_print_amounts(amounts):
to_be_printed = list(VALID_UNITS)
output = ""
for unit in to_be_printed:
if amounts[unit] > 0:
if output:
output += " + "
output += f"{amounts[unit]}{unit}"
return output
def load_sections(sec_path):
sections = {}
with open(sec_path, "r") as f:
reader = csv.reader(f)
for line in reader:
if line[1] in sections:
print(f"ERROR: Key {line[1]} used for {line[0]} was already assigned to {sections[line[1]]}")
sys.exit(-1)
sections[line[1]] = line[0]
return sections
def check_match(products, new_product):
potential_matches = []
for product in products:
if fuzz.ratio(product, new_product) > 85:
potential_matches.append(product)
return potential_matches
def check_equivalence(new_product, products, equivalences):
if new_product in equivalences:
return equivalences[new_product]
potential_matches = check_match(products, new_product)
if potential_matches:
potential_matches.append("Do not merge.")
questions = [
inquirer.List(
"match",
message=f"Do you want to merge *{new_product}* with one of the following?",
choices=potential_matches
)
]
answers = inquirer.prompt(questions)
if answers["match"] != "Do not merge.":
equivalences[new_product] = answers["match"]
return answers["match"]
else:
return new_product
else:
return new_product
def select_section(product, sections):
while True:
res = input(f"In which category is {product}? [? for list]").strip()
if res == "?":
section_info = [f"{name} [{short}]" for short, name in sections.items()]
print("\n".join(section_info))
if res in sections:
return sections[res]
with open(sys.argv[1], "r") as f:
reader = csv.reader(f)
for line in reader:
ignore = line[0]
item = line[1].strip()
unit = line[3]
if ignore == "1" or not item or unit not in VALID_UNITS:
continue
normalized_amount = line[2].replace(",", ".")
try:
amount = float(normalized_amount)
except ValueError:
print(f"WARN: {item} with amount {line[2]} is invalid and ignored")
continue
if unit == "kg":
amount *= 1000
unit = "g"
if unit == "L":
amount *= 1000
unit = "ml"
if item not in shopping_items:
item = check_equivalence(item, shopping_items.keys(), item_equivalence)
shopping_items[item][unit] += amount
sections = load_sections("sections.csv")
for item in shopping_items.keys():
if item not in itertools.chain.from_iterable(items_per_section.values()):
sec = select_section(item, sections)
items_per_section[sec].append(item)
write_einkaufdb(items_per_section, item_equivalence)
for section, section_items in items_per_section.items():
section_items = {k: v for k,v in shopping_items.items() if k in section_items}
print(f"\n* {section}:")
for item, amounts in sorted(section_items.items(), key=lambda x: locale.strxfrm(x[0])):
print(f"{item}: {pretty_print_amounts(amounts)}")