You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1153 lines
50 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import calendar
import os
import string
from collections import OrderedDict, defaultdict
from datetime import datetime, date
from decimal import Decimal
import numpy as np
import pandas as pd
from numpy import object
from openpyxl import Workbook
from openpyxl import load_workbook
from openpyxl.chart import Reference, PieChart3D
from openpyxl.chart.marker import DataPoint
from openpyxl.styles import NamedStyle, Font, Border, Side, PatternFill, Alignment, colors
from pytz import timezone
fill_null_value = 'Others'
other_display = 'OtherNullTagResource'
application_dict = {
'sapdr': ['sapdr', '基础架构管理处'],
'dfsbackup': ['文件服务器云异地备份', '基础架构管理处'],
'ka': ['KA现渠自动化管理平台', '资讯应用规划处'],
'tpm': ['TPM系统', '资讯应用规划处'],
'wtx': ['旺天下测试', '资讯技术研发处'],
'newsrm': ['新供应商管理系统', '资讯技术研发处'],
'wantmediaback': ['CC备份上传云端', '基础架构管理处'],
'wantclub': ['微信广告', '资讯技术研发处'],
'watergod': ['水神系统', '/神旺控股/资讯处'],
'srm': ['企企通供应商管理系统', '资讯应用规划处'],
'vpn server': ['VPN服务', '基础架构管理处'],
'ecrm': ['ECRM系统', '资讯应用规划处'],
'ytwms': ['优豚WMS系统', '企业资源应用处'],
'sfa': ['营销通SFA系统', '资讯应用规划处'],
'dljs': ['多啦金税系统', '企业资源应用处'],
'firewall': ['CP测试', '基础架构管理处'],
'awsbackup': ['GIT数据备份', '-'],
'hrrpa': ['人资自动化机器人', '资讯应用规划处'],
'tczs': ['华南统仓追溯系统', '资讯应用规划处'],
'cas': ['统一认证系统', '资讯应用规划处'],
'nyerp': ['农业ERP系统', '农业发展事业部'],
'other': ['其他', '-'],
'iwantwant': ['爱旺旺广告', '资讯技术研发处'],
'wwgw': ['集团官网', '传媒服务处'],
'awslog': ['AWS服务日志', '-'],
'hollywant': ['售卖机', '-'],
}
venv_name_dict = {
'PRD': '生产环境',
'UAT': '测试环境',
'prd': '生产环境',
'uat': '测试环境',
}
def new_round(_float, _len=2):
"""
Parameters
----------
_float:
_len: int, 指定四舍五入需要保留的小数点后几位数为_len
Returns
-------
type ==> float, 返回四舍五入后的值
"""
try:
if isinstance(_float, np.float64):
return new_round(float(_float), _len)
elif isinstance(_float, float):
if 'e' in str(_float).lower():
return round(_float, _len)
elif str(_float)[::-1].find('.') <= _len:
return _float
elif str(_float)[-1] == '5':
return round(float(str(_float)[:-1] + '6'), _len)
else:
return round(_float, _len)
elif isinstance(_float, str):
_float = _float.replace(',', '')
return new_round(float(_float), _len)
else:
return round(_float, _len)
except Exception as e:
pass
class Book:
def __init__(self, _time: list, title=None, project=None, currency=None, language=True):
self.wb = Workbook()
self.wb.remove(self.wb.active)
self.highlight = None
self.title = title
self.project = project
self.currency = currency
_time.sort()
if len(_time) != 2:
raise Exception('时间数组不对')
self.subtotal_str = '小计:' if language else 'subtotal:'
self.total_str = '总计:' if language else 'total:'
self.time = "%s_%s-%d" % (
_time[0] + '-01', _time[1],
calendar.monthrange(int(_time[1].split('-')[0]), int(_time[1].split('-')[1]))[1])
def save(self, file_name='freeze.xlsx'):
self.wb.save(file_name)
def workbook(self, name=None, index=False):
name = name.replace(' ', '_')
if name is None or index is True:
wb = self.wb.create_sheet('ResourcesUsage', )
wb.title = 'ResourcesUsage'
else:
wb = self.wb.create_sheet(name, ) # 在最后添加
wb.title = name
return wb
def format_font(self, background_color="FFFFFF", font_color='2B2B2B', bold=True):
if not self.highlight:
self.highlight = NamedStyle(name="highlight")
self.highlight.fill = PatternFill("solid", fgColor=background_color) # 背景填充
self.highlight.font = Font(name='微软雅黑', size=11, bold=bold, italic=False, strike=False,
color=font_color)
return self.highlight
class Sheet:
def __init__(self, _wb: Book, tag: str, table_first_data=None, table_second_data=None, table_data=None,
index=False):
self.border = None
self.raw_wb = _wb
self.wb = _wb.workbook(tag.replace(':', "_"), index=index)
self.highlight = _wb.format_font
self.tag = tag
self.title_first = []
self.table_first_data = table_first_data
self.table_data = table_data
self.title_second = []
self.table_second_data = table_second_data
self.margin_left = 1
self.margin_top = 1
self.row_number = None
def merge(self, left, right, top, down, color=None, value=None):
"""
:param left:
:param right:
:param top:
:param down:
:param color:
:param value:
:return:
"""
self.set_border(left, right, top, down)
self.wb.merge_cells('%s%d:%s%d' % (chr(64 + left), top, chr(64 + right), down))
subtotal = self.wb.cell(row=self.row_number, column=left)
subtotal.style = self.highlight(color)
if value:
subtotal.value = value
if not isinstance(value, str):
subtotal.number_format = f'{self.raw_wb.currency}#,##0.00;{self.raw_wb.currency}-#,##0.00'
subtotal.alignment = Alignment(horizontal='center', vertical='center')
self.wb.row_dimensions[self.row_number].height = 20
def fill_table(self, data: list, index_title: list, link=False, index=False):
"""
填充数据
:param data:
:param index_title:
:return:
"""
for row in data:
for k, value in enumerate(row):
cell = self.wb.cell(row=self.row_number, column= k + self.margin_left + 1)
cell.style = self.highlight(bold=False)
cell.alignment = Alignment(
horizontal = index_title[k].get('sub_horizontal', 'center'),
vertical = 'center'
)
cell.border = self.my_border('thin', 'thin', 'thin', 'thin')
self.wb.row_dimensions[self.row_number].height = 20
if index_title[k].get('format'):
cell.number_format = index_title[k].get('format')
if k == 0 and link is True and not index:
cell.value = '=HYPERLINK("%s", "%s")' % (f'#{value.replace(" ", "_")}!B2', value)
cell.font = Font(underline = 'single', color = '0563C1')
else:
cell.value = value
if index and k == 1:
# cell.value = '=HYPERLINK("%s", "%s")' % (f'#{value.replace(" ", "_")}!B2', value)
# cell.font = Font(underline = 'single', color = '0563C1')
cell.value = value
self.row_number += 1
def fill_footer(self, footer_data: list, color='A9D08E'):
"""
填充角标
:return:
"""
total = self.margin_left + 1
for k, v in footer_data:
# 合并每一个区域块
self.merge(total, total + k - 1, self.row_number, self.row_number, value = v, color = color)
# 标注网格线
self.set_border(total, total + k - 1, self.row_number, self.row_number)
total += k
# 完成下移
# 空行
self.row_number += 1
for i in range(2):
self.wb.append([])
self.row_number += 1
def format_title(self, index_data: list):
for i, title in enumerate(index_data):
self.wb.column_dimensions[chr(65 + i + 1)].width = title.get('width', 20)
cell = self.wb.cell(row = self.row_number, column = i + self.margin_left + 1)
title_info = index_data[i]
cell.value = title_info['title']
cell.style = self.highlight(
title_info['background_color'],
font_color = title_info.get('font_color', '2B2B2B')
)
cell.alignment = Alignment(horizontal = 'center', vertical = 'center')
self.set_border(self.margin_left + 1, len(index_data) + self.margin_left, self.row_number, self.row_number)
self.row_number += 1
def my_border(self, t_border, b_border, l_border, r_border):
if self.border:
return self.border
else:
self.border = Border(top = Side(border_style = t_border, color = colors.BLACK),
bottom = Side(border_style = b_border, color = colors.BLACK),
left = Side(border_style = l_border, color = colors.BLACK),
right = Side(border_style = r_border, color = colors.BLACK))
return self.border
def redirect_index(self):
cell = self.wb.cell(row = 1, column = 1)
cell.style = self.highlight(bold = False)
cell.border = self.my_border('thin', 'thin', 'thin', 'thin')
self.wb.row_dimensions[self.row_number].height = 20
cell.value = '=HYPERLINK("%s", "%s")' % (f'#ResourcesUsage!B2', "跳回首页")
cell.font = Font(underline = 'single', color = '0563C1')
def set_border(self, left, right, top, down):
for row in range(top, down + 1):
for line in range(left, right + 1):
cell = self.wb.cell(row = row, column = line)
cell.border = self.my_border('thin', 'thin', 'thin', 'thin')
cell.alignment = Alignment(horizontal = 'center', vertical = 'center')
def fill_index(self, location='B2', title_index: list = None, title=None, bkg=None):
if title:
self.wb[location] = title
else:
self.wb[location] = self.raw_wb.project + ':' + self.raw_wb.time
if bkg:
self.wb[location].style = self.highlight(background_color = bkg)
else:
self.wb[location].style = self.highlight(background_color = 'D9D9D9')
self.set_border(self.margin_left + 1, len(title_index) + self.margin_left, self.row_number,
self.row_number)
self.wb[location].border = self.my_border('thin', 'thin', 'thin', 'thin')
self.wb[location].alignment = Alignment(horizontal = 'center', vertical = 'center')
self.row_number += 1
class AppSheet(Sheet):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.title_first = [
{'title': 'NO.', 'background_color': "808080", "font_color": "FFFFFF", 'sub_horizontal': 'center',
'width': 5},
{'title': 'Name', 'background_color': "808080", "font_color": "FFFFFF", 'sub_horizontal': 'left',
'width': 30},
{'title': '所属环境', 'background_color': "808080", "font_color": "FFFFFF", 'sub_horizontal': 'left',
'width': 30},
{'title': 'Product', 'background_color': "1F4E78", "font_color": "FFFFFF", 'sub_horizontal': 'left',
'width': 30},
{'title': 'Resource Cost', 'format': f'{self.raw_wb.currency}#,##0.00;{self.raw_wb.currency}-#,##0.00',
'background_color': "D9D9D9",
'sub_horizontal': 'right', 'width': 20},
{'title': 'Other Resource', 'background_color': "1F4E78", "font_color": "FFFFFF", 'width': 30},
{'title': 'Resource Cost', 'format': f'{self.raw_wb.currency}#,##0.00;{self.raw_wb.currency}-#,##0.00',
'background_color': "D9D9D9", 'width': 15},
{'title': 'Data Transfer', 'format': f'{self.raw_wb.currency}#,##0.00;{self.raw_wb.currency}-#,##0.00',
'background_color': "D9D9D9",
'sub_horizontal': 'right', 'width': 15},
{'title': 'Total Cost', 'format': f'{self.raw_wb.currency}#,##0.00;{self.raw_wb.currency}-#,##0.00',
'background_color': "D9D9D9",
'sub_horizontal': 'right', 'width': 15},
{'title': 'Application', 'background_color': "808080", "font_color": "FFFFFF", 'width': 20},
{'title': 'Owner', 'background_color': "808080", "font_color": "FFFFFF", 'width': 10},
{'title': 'Remarks', 'background_color': "808080", "font_color": "FFFFFF", 'width': 10},
]
self.title_second = [
{"title": 'NO.', 'background_color': "808080", "font_color": "FFFFFF", 'width': 5},
{"title": 'Name', 'background_color': "808080", "font_color": "FFFFFF", 'width': 30},
{"title": 'Product', 'background_color': "1F4E78", "font_color": "FFFFFF", 'width': 30},
{"title": 'Resource Cost', 'background_color': "D9D9D9", 'width': 20,
'format': f'{self.raw_wb.currency}#,##0.00;{self.raw_wb.currency}-#,##0.00', },
{"title": 'Total Cost', 'background_color': "D9D9D9", 'width': 30,
'format': f'{self.raw_wb.currency}#,##0.00;{self.raw_wb.currency}-#,##0.00', },
{"title": self.tag, 'background_color': "808080", "font_color": "FFFFFF", 'width': 20},
]
self.wb.sheet_view.showGridLines = False # 隐藏网络线
self.wb.freeze_panes = 'C4' # 冻结窗口
self.wb.merge_cells('B2:M2')
self.row_number = self.margin_top + 1
def run(self):
# 第二行
self.fill_index(title_index = self.title_first)
# title行
self.format_title(self.title_first)
# 填充数据行
total_1 = new_round(sum([item[-4] for item in self.table_first_data]), 6)
# 格式化 符号
for i in range(len(self.table_first_data)):
self.table_first_data[i][3] = self.table_first_data[i][3]
self.table_first_data[i][5] = self.table_first_data[i][5]
self.table_first_data[i][6] = self.table_first_data[i][6]
self.table_first_data[i][7] = self.table_first_data[i][7]
self.fill_table(data = self.table_first_data, index_title = self.title_first)
self.set_border(2, len(self.title_first) + self.margin_left, self.row_number, self.row_number)
# 小计行
self.fill_footer([(len(self.title_first) - 4, self.raw_wb.subtotal_str), (4, total_1)])
# 第二个表格
self.format_title(self.title_second)
start = self.row_number
total_2 = new_round(sum([item[3] for item in self.table_second_data]), 6)
total_3 = new_round(sum([item[4] for item in self.table_second_data]), 6)
# 格式化 符号
for i in range(len(self.table_second_data)):
self.table_second_data[i][3] = self.table_second_data[i][3]
self.table_second_data[i][4] = self.table_second_data[i][4]
self.fill_table(self.table_second_data, index_title = self.title_second)
end = self.row_number - 1
if end > start:
self.wb.merge_cells(
'%s%d:%s%d' % (chr(64 + self.margin_left + 2), start, chr(64 + self.margin_left + 2), end))
# 小计行
self.fill_footer([(3, self.raw_wb.subtotal_str), (1, total_2), (1, total_3)])
# 总计行
self.fill_footer([
(len(self.title_first) - 2, self.raw_wb.total_str),
(2, new_round(float(total_1) + float(total_3), 2))
], color = 'FFC000')
self.redirect_index()
class ServiceSheet(Sheet):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.wb.sheet_view.showGridLines = False # 隐藏网络线
self.wb.freeze_panes = 'B2' # 冻结窗口
self.row_number = self.margin_top + 1
def run(self):
# 第二行
for _date, date_data in self.table_data.items():
self.write(_date, self.get_tag_index(date_data))
self.redirect_index()
def get_tag_index(self, date_data: dict):
temp = []
for tag, data in date_data.items():
cost = new_round(float(sum(data.values())) * 1.06, 6)
if cost > 0:
temp.append((tag, cost))
temp = sorted(temp, key = lambda x: x[1], reverse = True)
return temp
def get_pie(self, min_row=None, max_row=None, min_col=None, max_col=None):
pie = PieChart3D() # 创建饼图
labels = Reference(self.wb, min_col = min_col, max_col = min_col, min_row = min_row,
max_row = max_row) # 饼图label取值单元格
data = Reference(self.wb, max_col = max_col, min_col = max_col, min_row = min_row - 1,
max_row = max_row) # 饼图数据取值单元格
pie.add_data(data, titles_from_data = True)
pie.set_categories(labels)
pie.title = "项目费用分布图"
slice = DataPoint(idx = 0, explosion = 20)
pie.series[0].data_points = [slice]
return pie
def get_date_tag_value(self, inner_date, tag, service):
if service not in display_set:
total = sum([v for k, v in self.table_data[inner_date][tag].items() if k not in display_set])
if total == 0:
return '-'
else:
total = self.table_data[inner_date][tag].get(service, 0)
if total == 0:
return 0
return new_round(float(total) * 1.06, 6)
@staticmethod
def len_byte(value):
# 获取字符串长度一个中文的长度为2
if value is None or value == "":
return 10
if type(value) != int:
length = len(value)
utf8_length = len(value.encode('utf-8'))
length = (utf8_length - length) / 2 + length
else:
length = len(str(value))
return int(length)
def write(self, inner_date, _date_tag_order):
services_set = set()
for k_1, item_dict in self.table_data[inner_date].items():
for k_2 in item_dict.keys():
if k_2 in display_set:
services_set.add(k_2)
services_list = [*list(services_set), "Others Services"]
# 确定栏位宽度
adapt_width = [self.get_width(item) for item in services_list]
serve_dict = {
'Amazon Elastic Compute Cloud': 'EC2服务器',
'Amazon Simple Storage Service': 'S3存储桶',
'Amazon Relational Database Service': 'RDS数据库',
'Amazon ElastiCache': 'Redis数据库',
'Others Services': '其他云服务',
}
temp = []
for k, service in enumerate(services_list):
name = serve_dict.get(service, '')
temp.append({
'title': name, 'background_color': "808080", "font_color": "FFFFFF", 'sub_horizontal': 'center',
'width': adapt_width[k],
'format': f'{self.raw_wb.currency}#,##0.00;{self.raw_wb.currency}-#,##0.00',
})
self.title_first = [
{
'title': self.tag, 'background_color': "808080", "font_color": "FFFFFF", 'sub_horizontal': 'center',
'width': 37
},
{
'title': '所属部门', 'background_color': "808080", "font_color": "FFFFFF", 'sub_horizontal': 'center',
'width': 37
},
*temp,
{
'title': 'Total Cost', 'background_color': "808080", "font_color": "FFFFFF", 'sub_horizontal': 'center',
'width': self.get_width('Total Cost'),
'format': f'{self.raw_wb.currency}#,##0.00;{self.raw_wb.currency}-#,##0.00',
},
{
'title': 'Percentage', 'background_color': "808080", "font_color": "FFFFFF", 'sub_horizontal': 'center',
'width': self.get_width('Total Cost')
},
]
# 第二行
self.wb.merge_cells(f'B{self.row_number}:{self.digit2alphabet(len(self.title_first))}{self.row_number}')
self.fill_index(title_index = self.title_first, title = f"{inner_date[:7]} Resource Distribution Details",
bkg = 'E26B0A')
# title行
self.format_title(self.title_first)
table_data = []
total_cost = new_round(sum([item[1] for item in _date_tag_order]), 6)
for tag, total in _date_tag_order:
application = application_dict.get(tag, [tag, '-'])
temp = [application[0], application[1]]
for service in services_list:
temp.append(self.get_date_tag_value(inner_date, tag, service))
temp.append(total)
if total_cost:
temp.append("%.2f%%" % new_round(total * 100 / total_cost, 4))
else:
temp.append("%.2f%%" % new_round(0, 6))
table_data.append(temp)
table_data = self.reset_order(table_data)
# others_total_list = self.get_other_total(table_data, services_list)
resource_li = ['资源费用', '-', *[0] * len(services_list), 0, '100%']
for item_data in table_data:
for k, v in enumerate(item_data):
if k == 0 or k > len(item_data) - 2 or k == 1:
continue
else:
if v == '-' or v == '':
pass
else:
resource_li[k] += v
resource_num = resource_li[-2]
table_data.append(resource_li)
table_data.insert(0, ['服务费用', '-', *'-' * len(services_list), resource_num * 0.106, '-'])
total_list = self.get_total(table_data, resource_num, services_list)
# table_data.append(others_total_list)
# table_data.append(total_list)
min_row = self.row_number
self.fill_table(table_data, self.title_first, link=True)
max_row = self.row_number - 1
min_col = 2
max_col = 3 + len(services_list) + 1
if max_row > min_row:
pie = self.get_pie(max_row = max_row, min_row = min_row, max_col = max_col, min_col = min_col)
self.wb.add_chart(pie, f"{self.digit2alphabet(7 + len(services_list))}3")
self.fill_total([total_list], self.title_first)
def get_width(self, service):
total = 0
for item in list(service):
if item in string.digits:
total += 1
elif item in string.ascii_lowercase:
total += 1
elif item in string.ascii_uppercase:
total += 2
elif item in string.punctuation:
total += 1
else:
total += 2
return total
def fill_total(self, total_list, index_title: list):
for row in total_list:
for k, value in enumerate(row):
cell = self.wb.cell(row = self.row_number, column = k + self.margin_left + 1)
cell.value = value
cell.style = self.highlight(background_color = 'A6A6A6', bold = False)
# cell.style = self.highlight()
cell.alignment = Alignment(horizontal = index_title[k].get('sub_horizontal', 'center'),
vertical = 'center')
cell.border = self.my_border('thin', 'thin', 'thin', 'thin')
self.wb.row_dimensions[self.row_number].height = 20
if index_title[k].get('format'):
cell.number_format = index_title[k].get('format')
self.row_number += 1
def get_other_total(self, table_data, services_list):
other_percent = 0
others_total_list = [f'Total(exclude Other resource cost)', *[0] * len(services_list), 0, 0]
for item_data in table_data:
if item_data[0] == other_display:
other_percent = eval(item_data[-1][:-1])
continue
for k, v in enumerate(item_data):
if k == 0 or k > len(item_data) - 2:
continue
else:
if v == '-' or v == '':
pass
else:
others_total_list[k] += v
others_total_list[-1] += eval(item_data[-1][:-1])
others_total_list[-1] = f"{new_round(100 - other_percent, 2)}%"
return others_total_list
def get_total(self, table_data, resource_num, services_list):
total_list = [f'Total', '-', *'-' * len(services_list), resource_num * 1.106, '']
return total_list
def reset_order(self, data):
res = []
temp = []
for item in data:
if item[0] == other_display:
temp = item
else:
res.append(item)
if temp:
res.append(temp)
return res
@staticmethod
def digit2alphabet(digit):
"""10进制 -> 26进制"""
ALPHABET = list(map(lambda x: chr(x), range(ord('A'), ord('Z') + 1)))
mod, remainder = divmod(digit, 26)
alphabet = ALPHABET[remainder]
while mod:
mod, remainder = divmod(mod, 26)
alphabet = ALPHABET[remainder] + alphabet
return alphabet
class SharedSheet(Sheet):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.title_first = [
{'title': 'NO.', 'background_color': "808080", "font_color": "FFFFFF", 'sub_horizontal': 'center',
'width': 5},
{'title': 'Name', 'background_color': "808080", "font_color": "FFFFFF", 'sub_horizontal': 'left',
'width': 30},
{'title': 'Product', 'background_color': "1F4E78", "font_color": "FFFFFF", 'sub_horizontal': 'left',
'width': 30},
{'title': 'Resource Cost', 'format': f'{self.raw_wb.currency}#,##0.00;{self.raw_wb.currency}-#,##0.00',
'background_color': "D9D9D9",
'sub_horizontal': 'right', 'width': 20},
{'title': 'Other Resource', 'background_color': "1F4E78", "font_color": "FFFFFF", 'width': 30},
{'title': 'Resource Cost', 'format': f'{self.raw_wb.currency}#,##0.00;{self.raw_wb.currency}-#,##0.00',
'background_color': "D9D9D9", 'width': 15},
{'title': 'Data Transfer', 'background_color': "D9D9D9", 'sub_horizontal': 'right', 'width': 15},
{'title': 'Total Cost', 'format': f'{self.raw_wb.currency}#,##0.00;{self.raw_wb.currency}-#,##0.00',
'background_color': "D9D9D9",
'sub_horizontal': 'right',
'width': 15},
{'title': self.tag, 'background_color': "808080", "font_color": "FFFFFF", 'width': 20},
{'title': 'Owner', 'background_color': "808080", "font_color": "FFFFFF", 'width': 10},
{'title': 'Remarks', 'background_color': "808080", "font_color": "FFFFFF", 'width': 10},
]
self.title_second = [
{"title": 'NO.', 'background_color': "808080", "font_color": "FFFFFF", 'width': 5},
{"title": 'Name', 'background_color': "808080", "font_color": "FFFFFF", 'width': 30},
{"title": 'Product', 'background_color': "1F4E78", "font_color": "FFFFFF", 'width': 30},
{"title": 'Resource Cost', 'format': f'{self.raw_wb.currency}#,##0.00;{self.raw_wb.currency}-#,##0.00',
'background_color': "D9D9D9", 'width': 20},
{"title": 'Total Cost', 'format': f'{self.raw_wb.currency}#,##0.00;{self.raw_wb.currency}-#,##0.00',
'background_color': "D9D9D9", 'width': 30},
{"title": 'Application', 'background_color': "808080", "font_color": "FFFFFF", 'width': 20},
]
self.wb.sheet_view.showGridLines = False # 隐藏网络线
self.wb.freeze_panes = 'C4' # 冻结窗口
self.wb.merge_cells('B2:L2')
self.row_number = self.margin_top + 1
def run(self):
# 第二行
self.fill_index(title_index = self.title_first)
# title行
self.format_title(self.title_first)
# 填充数据行
total_1 = new_round(sum([item[-4] for item in self.table_first_data]), 6)
# 格式化 符号
for i in range(len(self.table_first_data)):
self.table_first_data[i][3] = self.table_first_data[i][3]
self.table_first_data[i][5] = self.table_first_data[i][5]
self.table_first_data[i][6] = self.table_first_data[i][6]
self.table_first_data[i][7] = self.table_first_data[i][7]
self.fill_table(data = self.table_first_data, index_title = self.title_first)
self.set_border(2, len(self.title_first) + self.margin_left, self.row_number, self.row_number)
# 小计行
self.fill_footer([(len(self.title_first) - 4, self.raw_wb.subtotal_str), (4, total_1)])
# 第二个表格
self.format_title(self.title_second)
start = self.row_number
total_2 = new_round(sum([item[3] for item in self.table_second_data]), 6)
total_3 = new_round(sum([item[4] for item in self.table_second_data]), 6)
# 格式化 符号
for i in range(len(self.table_second_data)):
self.table_second_data[i][3] = self.table_second_data[i][3]
self.table_second_data[i][4] = self.table_second_data[i][4]
self.fill_table(self.table_second_data, index_title = self.title_second)
end = self.row_number - 1
if end > start:
self.wb.merge_cells(
'%s%d:%s%d' % (chr(64 + self.margin_left + 2), start, chr(64 + self.margin_left + 2), end))
# 小计行
self.fill_footer([(3, self.raw_wb.subtotal_str), (1, total_2), (1, total_3)])
# 总计行
self.fill_footer([
(len(self.title_first) - 2, self.raw_wb.total_str),
(2, new_round(total_1 + total_3, 2))
], color = 'FFC000')
class IndexSheet(Sheet):
def __init__(self, *args, **kwargs):
kwargs['index'] = True
super().__init__(*args, **kwargs)
self.title_index = [
{'title': 'NO.', 'background_color': "808080", "font_color": "FFFFFF", 'sub_horizontal': 'center',
'width': 20},
{'title': self.tag, 'background_color': "808080", "font_color": "FFFFFF", 'width': 30},
{'title': 'Total Cost', 'background_color': "808080", "font_color": "FFFFFF", 'sub_horizontal': 'center',
'format': f'{self.raw_wb.currency}#,##0;{self.raw_wb.currency}-#,##0', 'width': 20},
{'title': 'Percentage', 'background_color': "808080", "font_color": "FFFFFF", 'sub_horizontal': 'center',
'width': 20},
]
self.margin_top = 4
self.wb.sheet_view.showGridLines = False # 隐藏网络线
self.wb.merge_cells('B5:E5')
self.row_number = self.margin_top + 1
a2 = self.wb.cell(row = 2, column = 1)
a2.value = 'Billing duration'
a2.style = self.highlight(background_color = 'FFFF00', bold = False)
b2 = self.wb.cell(row = 2, column = 2)
b2.style = self.highlight(background_color = 'FFFF00', bold = False)
a3 = self.wb.cell(row = 3, column = 1)
a3.value = 'tag%s' % self.tag
c2 = self.wb.cell(row = 2, column = 3)
c2.value = self.raw_wb.time
self.wb.sheet_properties.tabColor = "FF0000"
def run(self):
# 第二行
self.fill_index(location = "B5", title_index = self.title_index)
# title行
self.format_title(self.title_index)
for k in range(len(self.table_data)):
# 变成整数
self.table_data[k][2] = float(self.table_data[k][2]) * 1.06
total_cost = new_round(sum([item[2] for item in self.table_data]), 6)
for k in range(len(self.table_data)):
temp = self.table_data[k]
if total_cost:
temp.append("%.2f%%" % new_round(self.table_data[k][2] * 100 / total_cost, 4))
else:
temp.append("%.2f%%" % new_round(0, 6))
self.table_data[k] = temp
for i in range(len(self.table_data)):
self.table_data[i][2] = self.table_data[i][2]
# 填充数据行
self.fill_table(data=self.table_data, index_title=self.title_index, index=True)
self.set_border(2, len(self.title_index) + self.margin_left, self.row_number, self.row_number)
# 总计行
self.fill_footer([(2, self.raw_wb.total_str), (1, total_cost)])
def insert(self):
# ws.insert_rows(7)
pass
class OtherSheet(Sheet):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.title_index = [
{'title': 'LinkedAccountId', 'background_color': "808080", "font_color": "FFFFFF", 'width': 20,
'sub_horizontal': 'left'},
{'title': 'Product', 'background_color': "808080", "font_color": "FFFFFF", 'sub_horizontal': 'left',
'width': 40, },
{'title': 'ItemDescription', 'background_color': "808080", "font_color": "FFFFFF", 'width': 60,
'sub_horizontal': 'left'},
{'title': 'UnBlendedCost', 'background_color': "808080", "font_color": "FFFFFF", 'sub_horizontal': 'right',
'width': 20, 'format': f'{self.raw_wb.currency}#,##0.00;{self.raw_wb.currency}-#,##0.00', },
]
self.margin_top = 1
self.wb.sheet_view.showGridLines = False # 隐藏网络线
self.row_number = self.margin_top + 1
self.wb.freeze_panes = 'B3' # 冻结窗口
def run(self, ):
# title行
self.format_title(self.title_index)
# 填充数据行
self.fill_table(data=self.table_data, index_title=self.title_index)
self.set_border(2, len(self.title_index) + self.margin_left, self.row_number, self.row_number)
# 总计行
self.fill_footer([(3, self.raw_wb.total_str), (1, sum([item[3] for item in self.table_data]))])
class Excel:
"""
生成EXCEL
"""
ROUND_NUM = 6
def __init__(self, **kwargs):
self.raw_kwargs = kwargs
self.regions = kwargs['regions'] # type: list
self.project = kwargs['project']
self.project_id = kwargs['project_id']
self.title = kwargs['title']
self.tag = kwargs['tag']
self.linked_accounts = kwargs['linked_accounts']
self.shared = set(kwargs.get('shared', []))
self.start_date = []
self.currency = kwargs['currency']
self.create_time = kwargs['create_time']
self.file_paths = kwargs['file_path']
def get_file(self):
self.start_date = [self.file_paths[0][-11:-4], self.file_paths[0][-11:-4]]
v_tag = dict()
wb = load_workbook('旺旺EBStag信息.xlsx', read_only = False, data_only = True)
wb1 = wb['Sheet1']
# 因为按行所以返回A1, B1, C1这样的顺序
for row in list(wb1.rows)[1:]:
tags = list(row)
v_tag[tags[0].value] = {
'user:Name': tags[1].value,
'user:Application': tags[2].value,
# 'user:appenv': tags[3].value,
'user:Vendor': tags[4].value,
}
res_data = []
for file in self.file_paths:
a = self.handle(file, v_tag)
res_data += a
return res_data
def handle(self, file_path, res):
if 'Ningxia' in file_path:
region = 'cn-northwest-1'
else:
region = 'cn-north-1'
_type = {
'UnBlendedCost': object,
'ProductName': object,
'UsageStartDate': object,
'ResourceId': object,
'RateId': object,
'InvoiceID': object,
}
reader = pd.read_csv(
file_path, iterator=True, sep=',', dtype=_type, chunksize=10000, low_memory = False)
total = 0
index = reader.get_chunk(1)
if len(index.columns) == 1:
reader = pd.read_csv(
file_path, iterator=True, sep=',', skiprows=[0, ], dtype=_type, chunksize=10000, low_memory = False
)
iter_num = 1
result = []
for data in reader:
print(iter_num)
iter_num += 1
data.loc[:, 'UsageStartDate'] = pd.to_datetime(data.loc[:, 'UsageStartDate'].copy()).dt.strftime('%Y-%m-%d')
data.insert(1, 'usage_start_date', pd.to_datetime(data['UsageStartDate']).dt.strftime('%Y-%m-%d'))
data.insert(1, 'usage_end_date', pd.to_datetime(data['UsageEndDate']).dt.strftime('%Y-%m-%d'))
data = data[(data['RecordType'] == 'LineItem') & (data['ProductName'].notna())]
data = data[data['LinkedAccountId'].isin(self.linked_accounts)]
data = data.reset_index(drop = True)
index = list(data.columns)
for i in range(data.shape[0]):
pack = dict()
for k, v in enumerate(data.loc[i]):
if str(v).strip() == '*':
print(i)
v = 'Others'
pack[index[k]] = v
pack["region"] = region
_temp = self.pop_empty(pack)
for k, v in _temp.items():
if k == 'UnBlendedCost':
_temp[k] = Decimal(v)
else:
_temp[k] = str(v)
_temp['usage_start_date'] = _temp['usage_start_date'][:7] + "-01"
total += Decimal(_temp['UnBlendedCost'])
# print(res.get(_temp.get('ResourceId', ''), {}))
_temp.update(res.get(_temp.get('ResourceId', ''), {}))
result.append(_temp)
return result
def run(self):
currency = '$' if self.currency == 'USD' else '¥'
response = self.get_file()
start_date = self.get_range_date(self.start_date)
temp = []
for item in response:
if item.get('region', '') in self.regions:
temp.append(item)
response = temp
service_data = self.get_service_data(start_date, response)
shared_data = []
response2 = []
for item in response:
if item.get(self.tag) == 'shared':
shared_data.append(item)
elif item.get('ProductName', '') in self.shared:
shared_data.append(item)
else:
response2.append(item)
response = response2
temp_dict = defaultdict(list)
temp_dict['sharing resources'] = shared_data
other = []
for item in response:
if item.get(self.tag) and item.get(self.tag) != fill_null_value:
temp_dict[item.get(self.tag).strip()].append(item)
else:
other.append(item)
temp_dict[other_display] = other
sheet = Book(
title = self.title,
_time = self.start_date,
project = self.project,
currency = currency,
language = self.raw_kwargs.get("language") == 'zh'
)
table_data = []
for k, items in temp_dict.items():
table_data.append(
[0, k, new_round((sum([item.get('UnBlendedCost', Decimal("0.0")) for item in items])), self.ROUND_NUM)]
)
# 对 table_data进行排序
table_data = sorted(table_data, key=lambda _item: _item[2], reverse=True)
for i in range(len(table_data)):
table_data[i][0] = i + 1
# 第一页
IndexSheet(sheet, self.tag, table_data=table_data).run()
# 应用服务占比图
ServiceSheet(sheet, self.tag, table_data=service_data).run()
order_dict = OrderedDict()
for item in [item[1] for item in table_data]:
application = application_dict.get(item, [item, '-'])[0]
order_dict[application] = temp_dict[item]
# 其余页
for key, items in order_dict.items():
if key == fill_null_value:
table_data = []
for item in items:
table_data.append([
item.get('LinkedAccountId'),
item.get('ProductName'),
item.get('ItemDescription'),
new_round(float(item.get('UnBlendedCost', '0.0')), self.ROUND_NUM),
])
merge_table_data = []
table_data = sorted(table_data, key = lambda xx: "%s%s%s" % (xx[0], xx[1], xx[2]), reverse = True)
# 合并数据
if len(table_data) == 1:
merge_table_data = table_data
elif table_data:
merge_table_data.append(table_data[0])
for item in table_data[1:]:
if merge_table_data[-1][0] == item[0] and \
merge_table_data[-1][1] == item[1] and \
merge_table_data[-1][2] == item[2]:
merge_table_data[-1][3] += item[3]
else:
merge_table_data.append(item)
merge_table_data = sorted(merge_table_data, key=lambda xx: xx[3], reverse=True)
OtherSheet(sheet, other_display, table_data=merge_table_data).run()
else:
is_key = key == 'sharing resources'
app = AppSheet(sheet, key)
table_first_data = []
table_second_data = []
blow_data = []
venv_dict = defaultdict(dict)
# 上边的表
front_table = defaultdict(dict)
if not is_key:
for item in items:
if item.get('user:Name') and item.get('user:Name') != fill_null_value:
front_table[item.get('user:Name')][item.get('ProductName', '')] = front_table[item.get('user:Name')].get(item.get('ProductName', Decimal("0")),Decimal("0")) + item.get('UnBlendedCost', Decimal("0"))
venv_dict[item.get('user:Name')] = item.get('user:appenv', '')
else:
blow_data.append(item)
else:
for item in items:
if item.get('user:Name') and item.get(self.tag) == 'shared' and \
item.get('ProductName') not in self.shared:
front_table[item.get('user:Name')][item.get('ProductName', '')] = \
front_table[item.get('user:Name')].get(item.get('ProductName', Decimal("0")),
Decimal("0")) + \
item.get('UnBlendedCost', Decimal("0"))
venv_dict[item.get('user:Name')] = item.get('user:appenv', '')
else:
blow_data.append(item)
for name, values in front_table.items():
venv = venv_dict.get(name, '')
venv_name = venv_name_dict.get(venv.strip(), '')
temp = [Decimal("0"), name, venv_name, '', Decimal("0"), '', Decimal("0"), Decimal("0"), Decimal("0"),
self.tag, '', '', ]
i = 1
for k, v in values.items():
if 'transfer' in k.lower():
temp[7] = v
elif i == 1:
temp[3] = k
temp[4] += v
i += 1
else:
temp[5] = k
temp[6] += v
temp[8] = temp[4] + temp[6] + temp[7]
temp[4] = new_round(temp[4], self.ROUND_NUM)
temp[6] = new_round(temp[6], self.ROUND_NUM)
temp[7] = new_round(temp[7], self.ROUND_NUM)
temp[8] = new_round(temp[8], self.ROUND_NUM)
table_first_data.append(temp)
# 下边的表
x = defaultdict(lambda: defaultdict(list))
for item in blow_data:
x[item.get('ProductName', '')]['CostBeforeTax'].append(item.get('CostBeforeTax', Decimal("0")))
x[item.get('ProductName', '')]['UnBlendedCost'].append(item.get('UnBlendedCost', Decimal("0")))
for k, item in x.items():
temp = [
0,
fill_null_value,
k,
new_round(sum(item['CostBeforeTax']), self.ROUND_NUM),
new_round(sum(item['UnBlendedCost']), self.ROUND_NUM),
'shared' if is_key else self.tag
]
table_second_data.append(temp)
table_first_data = sorted(table_first_data, key = lambda _item: _item[1] + _item[2])
for i in range(len(table_first_data)):
table_first_data[i][0] = i + 1
table_first_data[i][4] = float(table_first_data[i][4]) * 1.06
table_first_data[i][8] = float(table_first_data[i][8]) * 1.06
app.table_first_data = table_first_data
table_second_data = sorted(table_second_data, key = lambda _item: _item[2])
for i in range(len(table_second_data)):
table_second_data[i][0] = i + 1
table_second_data[i][4] = float(table_second_data[i][4]) * 1.06
app.table_second_data = table_second_data
app.run()
sheet.save(os.path.join('./', filename))
@staticmethod
def pop_empty(data: dict):
raw_data = {}
for k, v in data.items():
if v is np.nan or str(v) == 'nan' or str(v) == 'None':
continue
if isinstance(v, np.float64):
raw_data[k] = str(int(v))
continue
if v != "":
raw_data[k] = v
return raw_data
def get_service_data(self, date_list, response):
date_dict = OrderedDict()
def handler(inner_date, inner_res):
temp_dict = defaultdict(list)
other = []
for item in inner_res:
if item['usage_start_date'] != inner_date:
continue
if item.get(self.tag) and item.get(self.tag) != fill_null_value:
if item['ProductName'] in display_set:
temp_dict[item.get(self.tag).strip()].append(item)
continue
other.append(item)
temp_dict[other_display] = other
res = defaultdict(lambda: defaultdict(lambda: Decimal('0')))
for app, data_list in temp_dict.items():
for item in data_list:
res[app][item.get('ProductName', 'others')] += item['UnBlendedCost']
date_dict[inner_date] = res
for _date in date_list:
handler(_date, response)
return date_dict
@staticmethod
def get_range_date(date_list: list):
if len(date_list) != 2:
return date_list
start_date = date(year=int(date_list[0][:4]), month=int(date_list[0][5:7]), day=1)
end_date = date(year=int(date_list[1][:4]), month=int(date_list[1][5:7]), day=1)
if start_date.year > end_date.year:
return []
elif start_date.year == end_date.year:
if start_date.month > end_date.month:
return []
else:
return [
"%02d-%02d-01" % (start_date.year, i)
for i in range(start_date.month, end_date.month + 1)
]
else:
res = []
# 1、
for i in range(start_date.month, 13):
# res.append(date(year=start_date.year, month=i, day=start_date.day).strftime("%Y年%m月"))
res.append("%02d-%02d-01" % (start_date.year, i))
# 2、
for year in range(start_date.year + 1, end_date.year):
for i in range(1, 13):
res.append("%02d-%02d-01" % (year, i))
# 3、
for i in range(1, end_date.month + 1):
res.append("%02d-%02d-01" % (end_date.year, i))
return res
if __name__ == '__main__':
filename = '旺旺账单定制开发2022-4月2.xlsx'
# display_set = {'AmazonEC2', 'AmazonElastiCache', 'AmazonRDS', 'AmazonS3', 'AWSDataTransfer'}
display_set = {
'Amazon Elastic Compute Cloud', 'Amazon ElastiCache',
'Amazon Relational Database Service',
'Amazon Simple Storage Service', 'AWSDataTransfer'
}
file_path = [
'190508708273-aws-billing-detailed-line-items-with-resources-and-tags-ACTS-2022-04.csv',
'190508708273-aws-billing-detailed-line-items-with-resources-and-tags-ACTS-Ningxia-2022-04.csv',
# '190841870523-aws-billing-detailed-line-items-with-resources-and-tags-ACTS-2022-04.csv',
# '190841870523-aws-billing-detailed-line-items-with-resources-and-tags-ACTS-Ningxia-2022-04.csv',
]
Excel(**{
"project": '旺旺',
"project_id": '3419daba-0ff1-4463-8069-34b8df839c78',
"title": '------',
"tag": 'user:Application',
"regions": ['cn-north-1', 'cn-northwest-1'],
"linked_accounts": {
# 295524321018,
# 538779945615,
# 559133870160,
# 559312183200,
# 123192574065,
# 284149824167,
# 557490265248,
694341593692, # 0.0001272639 单独出
# 123502220024 # 0523账号 单独出
},
# "start_date": ['2021-03', '2021-03'],
'create_time': str(datetime.now(tz=timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S")),
'language': 'zh',
'currency': '¥',
'file_path': file_path
}).run()