You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
288 lines
12 KiB
288 lines
12 KiB
import json
|
|
from collections import defaultdict
|
|
from decimal import Decimal
|
|
import numpy as np
|
|
import pandas as pd
|
|
from openpyxl import Workbook
|
|
from openpyxl.styles import Alignment, Font, Border, Side, colors, NamedStyle, PatternFill
|
|
|
|
|
|
class Book:
|
|
def __init__(self, title=None, project=None, currency=None, language=True):
|
|
self.wb = Workbook()
|
|
self.wb.remove(self.wb.active)
|
|
self.highlight = None
|
|
self.title = title
|
|
self.project = project
|
|
self.currency = currency
|
|
|
|
self.total_str = '总计:' if language else 'total:'
|
|
|
|
def save(self, file_name='freeze.xlsx'):
|
|
self.wb.save(file_name)
|
|
|
|
def workbook(self, name=None, index=False):
|
|
name = name.replace(' ', '_')
|
|
if name is None or index is True:
|
|
wb = self.wb.create_sheet('ResourcesUsage', )
|
|
wb.title = 'ResourcesUsage'
|
|
else:
|
|
wb = self.wb.create_sheet(name, ) # 在最后添加
|
|
wb.title = name
|
|
return wb
|
|
|
|
def format_font(self, background_color="FFFFFF", font_color='2B2B2B', bold=True):
|
|
if not self.highlight:
|
|
self.highlight = NamedStyle(name = "highlight")
|
|
self.highlight.fill = PatternFill("solid", fgColor = background_color) # 背景填充
|
|
self.highlight.font = Font(name = '微软雅黑', size = 11, bold = bold, italic = False, strike = False,
|
|
color = font_color)
|
|
return self.highlight
|
|
|
|
|
|
class Sheet:
|
|
|
|
def __init__(self, _wb: Book, tag: str, table_first_data=None, table_second_data=None, table_data=None,
|
|
index=False):
|
|
self.border = None
|
|
self.raw_wb = _wb
|
|
self.wb = _wb.workbook(tag.replace(':', "_"), index = index)
|
|
self.highlight = _wb.format_font
|
|
self.tag = tag
|
|
self.title_first = []
|
|
self.table_first_data = table_first_data
|
|
self.table_data = table_data
|
|
self.title_second = []
|
|
self.table_second_data = table_second_data
|
|
self.margin_left = 0
|
|
self.margin_top = 1
|
|
self.row_number = None
|
|
|
|
def merge(self, left, right, top, down, color=None, value=None):
|
|
"""
|
|
:param left:
|
|
:param right:
|
|
:param top:
|
|
:param down:
|
|
:param color:
|
|
:param value:
|
|
:return:
|
|
"""
|
|
self.set_border(left, right, top, down)
|
|
self.wb.merge_cells('%s%d:%s%d' % (chr(64 + left), top, chr(64 + right), down))
|
|
subtotal = self.wb.cell(row = self.row_number, column = left)
|
|
subtotal.style = self.highlight(color)
|
|
if value:
|
|
subtotal.value = value
|
|
if not isinstance(value, str):
|
|
subtotal.number_format = f'{self.raw_wb.currency}#,##0.00;{self.raw_wb.currency}-#,##0.00'
|
|
subtotal.alignment = Alignment(horizontal = 'center', vertical = 'center')
|
|
self.wb.row_dimensions[self.row_number].height = 20
|
|
|
|
def fill_table(self, data: list, index_title: list, link=False, index=False):
|
|
"""
|
|
填充数据
|
|
:param data:
|
|
:param index_title:
|
|
:return:
|
|
"""
|
|
for i, row in enumerate(data):
|
|
for k, value in enumerate(row):
|
|
cell = self.wb.cell(row = self.row_number, column = k + 1)
|
|
|
|
cell.style = self.highlight(bold = False, background_color = 'F4D9CE' if i % 2 == 0 else 'F9ECE8')
|
|
cell.alignment = Alignment(
|
|
horizontal = index_title[k].get('sub_horizontal', 'center'),
|
|
vertical = 'center'
|
|
)
|
|
cell.border = self.my_border('thin', 'thin', 'thin', 'thin')
|
|
self.wb.row_dimensions[self.row_number].height = 20
|
|
if index_title[k].get('format'):
|
|
cell.number_format = index_title[k].get('format')
|
|
cell.value = value
|
|
|
|
self.row_number += 1
|
|
|
|
def format_title(self, index_data: list):
|
|
for i, title in enumerate(index_data):
|
|
self.wb.column_dimensions[chr(65 + i)].width = title.get('width', 20)
|
|
cell = self.wb.cell(row = self.row_number, column = 1 + i + self.margin_left)
|
|
title_info = index_data[i]
|
|
cell.value = title_info['title']
|
|
cell.style = self.highlight(
|
|
title_info['background_color'],
|
|
font_color = title_info.get('font_color', '2B2B2B')
|
|
)
|
|
cell.alignment = Alignment(horizontal = 'center', vertical = 'center')
|
|
self.set_border(self.margin_left, len(index_data) + self.margin_left, self.row_number, self.row_number)
|
|
self.row_number += 1
|
|
|
|
def my_border(self, t_border, b_border, l_border, r_border):
|
|
if self.border:
|
|
return self.border
|
|
else:
|
|
self.border = Border(top = Side(border_style = t_border, color = colors.BLACK),
|
|
bottom = Side(border_style = b_border, color = colors.BLACK),
|
|
left = Side(border_style = l_border, color = colors.BLACK),
|
|
right = Side(border_style = r_border, color = colors.BLACK))
|
|
return self.border
|
|
|
|
def redirect_index(self):
|
|
cell = self.wb.cell(row = 1, column = 1)
|
|
cell.style = self.highlight(bold = False)
|
|
cell.border = self.my_border('thin', 'thin', 'thin', 'thin')
|
|
self.wb.row_dimensions[self.row_number].height = 20
|
|
cell.value = '=HYPERLINK("%s", "%s")' % (f'#ResourcesUsage!B2', "跳回首页")
|
|
cell.font = Font(underline = 'single', color = '0563C1')
|
|
|
|
def set_border(self, left, right, top, down):
|
|
for row in range(top, down + 1):
|
|
for line in range(left, right):
|
|
cell = self.wb.cell(row = row, column = line + 1)
|
|
cell.border = self.my_border('thin', 'thin', 'thin', 'thin')
|
|
cell.alignment = Alignment(horizontal = 'center', vertical = 'center')
|
|
|
|
def fill_index(self, location='B2', title_index: list = None, title=None, bkg=None):
|
|
if title:
|
|
self.wb[location] = title
|
|
else:
|
|
pass
|
|
if bkg:
|
|
self.wb[location].style = self.highlight(background_color = bkg)
|
|
else:
|
|
self.wb[location].style = self.highlight(background_color = 'D9D9D9')
|
|
self.set_border(self.margin_left + 1, len(title_index) + self.margin_left, self.row_number,
|
|
self.row_number)
|
|
self.wb[location].border = self.my_border('thin', 'thin', 'thin', 'thin')
|
|
self.wb[location].alignment = Alignment(horizontal = 'center', vertical = 'center')
|
|
self.row_number += 1
|
|
|
|
|
|
class IndexSheet(Sheet):
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
kwargs['index'] = True
|
|
super().__init__(*args, **kwargs)
|
|
self.title_index = [
|
|
{'title': 'region', 'background_color': "E28433", "font_color": "FFFFFF", 'sub_horizontal': 'center',
|
|
'width': 50},
|
|
{'title': 'bucket', 'background_color': "E28433", "font_color": "FFFFFF", 'width': 40},
|
|
{'title': 'Cost', 'background_color': "E28433", "font_color": "FFFFFF", 'sub_horizontal': 'center',
|
|
'format': f'{self.raw_wb.currency}#,##0;{self.raw_wb.currency}-#,##0', 'width': 40},
|
|
{'title': 'Account', 'background_color': "E28433", "font_color": "FFFFFF", 'sub_horizontal': 'center',
|
|
'width': 40},
|
|
]
|
|
self.margin_top = 0
|
|
self.row_number = self.margin_top + 1
|
|
|
|
def run(self):
|
|
# title行
|
|
self.format_title(self.title_index)
|
|
for k in range(len(self.table_data)):
|
|
# 变成整数
|
|
self.table_data[k][2] = self.table_data[k][2]
|
|
for k in range(len(self.table_data)):
|
|
temp = self.table_data[k]
|
|
self.table_data[k] = temp
|
|
|
|
for i in range(len(self.table_data)):
|
|
self.table_data[i][2] = self.table_data[i][2]
|
|
# 填充数据行
|
|
self.fill_table(data = self.table_data, index_title = self.title_index, index = True)
|
|
# 总计行
|
|
|
|
|
|
class Excel:
|
|
@classmethod
|
|
def handle(cls, file_path, res):
|
|
if 'Ningxia' in file_path:
|
|
region = 'cn-northwest-1'
|
|
else:
|
|
region = 'cn-north-1'
|
|
_type = {
|
|
'BlendedCost': np.float,
|
|
'UnBlendedCost': np.object,
|
|
'ProductName': np.object,
|
|
'UsageStartDate': np.object,
|
|
'ResourceId': np.object,
|
|
'RateId': np.object,
|
|
}
|
|
reader = pd.read_csv(file_path, iterator=True, sep=',', dtype=_type, chunksize=100000,
|
|
low_memory = False)
|
|
|
|
index = reader.get_chunk(1)
|
|
if len(index.columns) == 1:
|
|
reader = pd.read_csv(
|
|
file_path, iterator=True, sep=',', skiprows=[0, ], dtype=_type, chunksize=100000,
|
|
low_memory = False
|
|
)
|
|
a = 0
|
|
for data in reader:
|
|
data = data.loc[data['RateId'].notna()]
|
|
data.loc[:, 'UsageStartDate'] = pd.to_datetime(data.loc[:, 'UsageStartDate'].copy()).dt.strftime('%Y-%m-%d')
|
|
data.loc[:, 'ProductName'] = data['ProductName'].copy().fillna("fill_null_value")
|
|
data = data[data['ProductName'] == 'Amazon Simple Storage Service']
|
|
data = data[data['LinkedAccountId'].isin([917004370941, 401263333417])]
|
|
data = data.reset_index(drop = True)
|
|
index = list(data.columns)
|
|
for i in range(data.shape[0]):
|
|
# if 'content.leialoft.com' in data.loc[i][index.index("ResourceId")]:
|
|
# print(123)
|
|
try:
|
|
key = '|'.join(
|
|
[
|
|
region, 'Others' if data.loc[i][index.index("ResourceId")] is np.nan else data.loc[i][
|
|
index.index("ResourceId")],
|
|
'{}'.format(str(data.loc[i][index.index("LinkedAccountId")])[:12])
|
|
]
|
|
)
|
|
old = res.get(key, Decimal(0))
|
|
res[key] = Decimal(data.loc[i][index.index("UnBlendedCost")], ) + old
|
|
except Exception as e:
|
|
print(123, e)
|
|
a += 1
|
|
print(a)
|
|
|
|
def gen(self, data):
|
|
|
|
sheet = Book(currency = "¥", )
|
|
table_data_dict = defaultdict(list)
|
|
data_trans = []
|
|
|
|
for item, v in data.items():
|
|
if item.split("|")[1] == 'data-trans':
|
|
data_trans.append([item.split("|")[0], item.split("|")[1], round(float(v), 6), item.split('|')[2]])
|
|
else:
|
|
table_data_dict[item.split("|")[0]].append((item.split("|")[1], round(float(v), 6), item.split('|')[2]))
|
|
|
|
table_data = [[k, item[0], item[1] * 1.06, item[2]] for k, v in table_data_dict.items() for item in
|
|
sorted(v, key = lambda x: x[1], reverse = True)]
|
|
table_data.extend(data_trans)
|
|
# 第一页
|
|
IndexSheet(sheet, "default", table_data=table_data).run()
|
|
sheet.save(os.path.join('./', "中意2022-3月.xlsx"))
|
|
|
|
|
|
if __name__ == '__main__':
|
|
file_paths = [
|
|
'./190508708273-aws-billing-detailed-line-items-with-resources-and-tags-ACTS-2022-04.csv',
|
|
'./190508708273-aws-billing-detailed-line-items-with-resources-and-tags-ACTS-Ningxia-2022-04.csv',
|
|
]
|
|
res_cost = dict()
|
|
import os
|
|
|
|
excel = Excel()
|
|
if os.path.exists('./res.txt'):
|
|
with open('./res.txt', 'r', encoding='utf8') as f:
|
|
res_cost = json.loads(f.read())
|
|
else:
|
|
for file in file_paths:
|
|
excel.handle(file, res_cost)
|
|
|
|
for k, v in res_cost.items():
|
|
res_cost[k] = str(v)
|
|
if res_cost:
|
|
with open('./res.txt', 'w', encoding='utf8') as f:
|
|
f.write(json.dumps(dict(res_cost)))
|
|
excel.gen(res_cost)
|