134 lines
5.1 KiB
Python
134 lines
5.1 KiB
Python
from dataclasses import dataclass
|
|
import json
|
|
import re
|
|
|
|
|
|
@dataclass
|
|
class Field:
|
|
name: str
|
|
|
|
|
|
@dataclass
|
|
class ListField(Field):
|
|
min_size: int
|
|
max_size: int
|
|
associated_function: str | None
|
|
|
|
|
|
valid_functions = ["sum", "media", "min", "max"]
|
|
|
|
|
|
def main():
|
|
headers = read_header("alunos.csv")
|
|
read_to_json("alunos.csv", headers)
|
|
|
|
# Create a function that reads the first line of the csv file and returns a list of fields
|
|
def read_header(file: str) -> list[Field]:
|
|
with open(file, "r",encoding="utf-8") as f:
|
|
header = f.readline()
|
|
# Split the header into fields by comma but ignore commas inside curly brackets (list fields)
|
|
fields = re.split(r",(?![^{]*})", header)
|
|
# Regex to parse the fields
|
|
regex = r"(\w+)(?:{(\d+)(?:,(\d+))?}(?:::(\w+))?)?"
|
|
# List to store the parsed fields
|
|
parsed_headers: list[Field] = []
|
|
for field in fields:
|
|
# Remove whitespace
|
|
field = field.strip()
|
|
# Check if field is empty
|
|
if field == "":
|
|
continue
|
|
# apply regex to field
|
|
match = re.match(regex, field)
|
|
if match is None:
|
|
raise ValueError(f"Invalid field: {field}")
|
|
name = match.group(1)
|
|
min_size = match.group(2)
|
|
max_size = match.group(3)
|
|
function = match.group(4)
|
|
# First check if the field is a list
|
|
if min_size is None:
|
|
parsed_headers.append(Field(name))
|
|
else:
|
|
# Check if the list has a variable size
|
|
if max_size is None:
|
|
max_size = min_size
|
|
# Check if the list has a valid function
|
|
if function not in valid_functions:
|
|
raise ValueError(f"Invalid function: {function}")
|
|
parsed_headers.append(
|
|
ListField(name, int(min_size), int(max_size), function)
|
|
)
|
|
return parsed_headers
|
|
|
|
|
|
def read_to_json(file: str, headers: list[Field]):
|
|
with open(file, "r",encoding="utf-8") as f:
|
|
# Read the lines of the file
|
|
lines = f.readlines()
|
|
# Remove the header
|
|
lines.pop(0)
|
|
# Create the output Json List
|
|
output = []
|
|
# Iterate over the lines
|
|
for line in lines:
|
|
# Iterate over the header fields
|
|
values = line.split(",")
|
|
output_line = {}
|
|
for header in headers:
|
|
# If the field represents a list , we will have N values separated by comma, if the list is a variable size we will have at least the minimum size with the rest being empty
|
|
# If the field represents a list with a function we should apply the function to the list and add it to the json
|
|
# If the field does not represent a list we should just add it to the json
|
|
# Check header type and read the next x values depending on the header type
|
|
# Dont forget to remove the values from the line after using them
|
|
if isinstance(header, ListField):
|
|
# Read the next max_size values
|
|
list_values = values[: header.max_size]
|
|
# Remove the empty or new line strings from list
|
|
list_values = [x.strip() for x in list_values if x.strip() != ""]
|
|
# Remove the values from the line
|
|
values = values[header.max_size :]
|
|
# Check if the list has the minimum size
|
|
if len(list_values) < header.min_size:
|
|
raise ValueError(f"Invalid list size: {header.name}")
|
|
# Check if the list has a function
|
|
if header.associated_function is not None:
|
|
# Apply the function to the list
|
|
function_result = apply_function(
|
|
header.associated_function, list_values
|
|
)
|
|
# Add the function result to the json
|
|
output_line[
|
|
header.name + "_" + header.associated_function
|
|
] = function_result
|
|
else:
|
|
# Add the list to the json
|
|
output_line[header.name] = list_values
|
|
else:
|
|
# Read the next value
|
|
value = values.pop(0)
|
|
# Add the value to the json
|
|
output_line[header.name] = value
|
|
# Add the line to the json
|
|
output.append(output_line)
|
|
#Write pretty json to file
|
|
with open("alunos.json", "w",encoding="utf-8") as f:
|
|
json.dump(output, f, indent=2,ensure_ascii=False)
|
|
|
|
def apply_function(fn: str, values: list[str]):
|
|
# convert values to int
|
|
vals = [int(value) for value in values]
|
|
# match case to apply the correct function
|
|
match fn:
|
|
case "sum":
|
|
return sum(vals)
|
|
case "media":
|
|
return sum(vals) / len(vals)
|
|
case "min":
|
|
return min(vals)
|
|
case "max":
|
|
return max(vals)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|