|
|
# -*- coding: utf-8 -*-
|
|
|
import calendar
|
|
|
import datetime
|
|
|
import os
|
|
|
from collections import defaultdict, OrderedDict
|
|
|
import numpy as np
|
|
|
import pandas as pd
|
|
|
from openpyxl import Workbook
|
|
|
from openpyxl.styles import NamedStyle, PatternFill, Font, Alignment, Border, Side, colors
|
|
|
import collections
|
|
|
import sys
|
|
|
|
|
|
import boto3
|
|
|
import time
|
|
|
#############################################下载附件部分
|
|
|
_dict = {
|
|
|
# 6910账号aksk
|
|
|
'AWS_ACCESS_KEY_ID': 'AKIA4FU4JFDXO5J2ISNY',
|
|
|
'AWS_SECRET_ACCESS_KEY': 'AVfPZ/XryDsoWmYB293eNPOyiO82x9xR62KFKezS',
|
|
|
# 7422账号aksk
|
|
|
'GLOBAL_AWS_ACCESS_KEY_ID': "AKIAQWIJ3GB7CTO5GAWY",
|
|
|
'GLOBAL_AWS_SECRET_ACCESS_KEY': "o1iQFQFSW+W/GGzjIRcNu1RDO1s72Uv2pJK/zf3z"
|
|
|
}
|
|
|
aws_aksk = {
|
|
|
"441271182313": {
|
|
|
"account_type": "amazon-china",
|
|
|
"aws_access_key_id": "AKIAWNPOP4PURVNFVZBL",
|
|
|
"aws_secret_access_key": "vhic5R4U586pe01Ib3e0zzVkZgi8Qj3Cqa3h7abe",
|
|
|
"bucket": "fc-billing",
|
|
|
"credential_type": "Amazon Access Key",
|
|
|
"role_arn": "",
|
|
|
"external_id": ""
|
|
|
},
|
|
|
}
|
|
|
def auto_download_file_form_s3(file: tuple):
|
|
|
for i in file:
|
|
|
payer_id = i.split('-')[0]
|
|
|
credential = get_credentials(payer_id)
|
|
|
bucket = credential.pop('bucket')
|
|
|
s3_client = boto3.client('s3', **credential)
|
|
|
print('{}开始下载'.format(i))
|
|
|
s3_client.download_file(
|
|
|
bucket, i, i)
|
|
|
print('{}下载完成'.format(i))
|
|
|
def get_credentials(payer_id):
|
|
|
credential = aws_aksk.get(payer_id, None)
|
|
|
if not credential:
|
|
|
return
|
|
|
if credential.get('account_type', None) == 'amazon-china':
|
|
|
region_name = 'cn-northwest-1'
|
|
|
aws_access_key_id = _dict['AWS_ACCESS_KEY_ID']
|
|
|
aws_secret_access_key = _dict['AWS_SECRET_ACCESS_KEY']
|
|
|
else:
|
|
|
region_name = 'us-east-2'
|
|
|
aws_access_key_id = _dict['GLOBAL_AWS_ACCESS_KEY_ID']
|
|
|
aws_secret_access_key = _dict['GLOBAL_AWS_SECRET_ACCESS_KEY']
|
|
|
if credential.get('credential_type', None) == 'Amazon Access Key':
|
|
|
return {
|
|
|
'aws_access_key_id': credential['aws_access_key_id'],
|
|
|
'aws_secret_access_key': credential['aws_secret_access_key'],
|
|
|
'region_name': region_name,
|
|
|
'bucket': credential['bucket']
|
|
|
}
|
|
|
else:
|
|
|
sts = boto3.client('sts',
|
|
|
region_name=region_name,
|
|
|
aws_access_key_id=aws_access_key_id,
|
|
|
aws_secret_access_key=aws_secret_access_key
|
|
|
)
|
|
|
sts_credential = sts.assume_role(
|
|
|
RoleArn=credential['role_arn'],
|
|
|
RoleSessionName='crawl-billing' + str(time.time()),
|
|
|
DurationSeconds=900,
|
|
|
ExternalId=credential['external_id']
|
|
|
)
|
|
|
return {
|
|
|
'aws_access_key_id': sts_credential['Credentials']['AccessKeyId'],
|
|
|
'aws_secret_access_key': sts_credential['Credentials']['SecretAccessKey'],
|
|
|
'region_name': region_name,
|
|
|
'aws_session_token': sts_credential['Credentials']['SessionToken'],
|
|
|
'bucket': credential['bucket']
|
|
|
}
|
|
|
#############################################
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def new_round(_float, _len=2):
|
|
|
"""
|
|
|
Parameters
|
|
|
----------
|
|
|
_float:
|
|
|
_len: int, 指定四舍五入需要保留的小数点后几位数为_len
|
|
|
|
|
|
Returns
|
|
|
-------
|
|
|
type ==> float, 返回四舍五入后的值
|
|
|
"""
|
|
|
try:
|
|
|
if isinstance(_float, np.float64):
|
|
|
return new_round(float(_float), _len)
|
|
|
elif isinstance(_float, float):
|
|
|
if 'e' in str(_float).lower():
|
|
|
return round(_float, _len)
|
|
|
elif str(_float)[::-1].find('.') <= _len:
|
|
|
return _float
|
|
|
elif str(_float)[-1] == '5':
|
|
|
return round(float(str(_float)[:-1] + '6'), _len)
|
|
|
else:
|
|
|
return round(_float, _len)
|
|
|
elif isinstance(_float, str):
|
|
|
_float = _float.replace(',', '')
|
|
|
return new_round(float(_float), _len)
|
|
|
|
|
|
else:
|
|
|
return round(_float, _len)
|
|
|
except Exception as e:
|
|
|
pass
|
|
|
|
|
|
|
|
|
class Book:
|
|
|
def __init__(self, *args, **kwargs):
|
|
|
self.wb = Workbook()
|
|
|
self.wb.remove(self.wb.active)
|
|
|
self.highlight = None
|
|
|
self.project = kwargs.get('project', '')
|
|
|
self.tag = kwargs.get('tag', '')
|
|
|
self.tagname = kwargs.get('tagname', '')
|
|
|
self.currency = kwargs.get('currency', '¥')
|
|
|
self.time = kwargs.get('time', '')
|
|
|
self.subtotal_str = '小计:' if kwargs.get('language', None) else 'Subtotal:'
|
|
|
self.total_str = '总计:' if kwargs.get('language', None) else 'Total:'
|
|
|
|
|
|
def save(self, file_name='freeze.xlsx'):
|
|
|
self.wb.save(file_name)
|
|
|
|
|
|
def workbook(self, name=None, index=False):
|
|
|
name = name.replace(' ', '_')
|
|
|
if name is None or index is True:
|
|
|
wb = self.wb.create_sheet('ResourcesUsage', )
|
|
|
wb.title = 'ResourcesUsage'
|
|
|
else:
|
|
|
wb = self.wb.create_sheet(name, ) # 在最后添加
|
|
|
wb.title = name
|
|
|
return wb
|
|
|
|
|
|
def format_font(self, background_color="FFFFFF", font_color='2B2B2B', bold=True):
|
|
|
if not self.highlight:
|
|
|
self.highlight = NamedStyle(name = "highlight")
|
|
|
self.highlight.fill = PatternFill("solid", fgColor = background_color) # 背景填充
|
|
|
self.highlight.font = Font(name = '微软雅黑', size = 11, bold = bold, italic = False, strike = False,
|
|
|
color = font_color)
|
|
|
return self.highlight
|
|
|
|
|
|
class Sheet:
|
|
|
|
|
|
def __init__(self, _wb: Book, tag: str, table_first_data=None, table_second_data=None, table_data=None,
|
|
|
index=False, **kwargs):
|
|
|
self.border = None
|
|
|
self.raw_wb = _wb
|
|
|
self.wb = _wb.workbook(tag.replace(':', "_"), index=index)
|
|
|
self.highlight = _wb.format_font
|
|
|
self.tag = tag
|
|
|
self.tagname = kwargs.get('tagname', None)
|
|
|
self.title_first = []
|
|
|
self.table_first_data = table_first_data
|
|
|
self.table_data = table_data
|
|
|
self.title_second = []
|
|
|
self.table_second_data = table_second_data
|
|
|
self.margin_left = 1
|
|
|
self.margin_top = 5
|
|
|
self.row_number = None
|
|
|
|
|
|
def merge(self, left, right, top, down, color=None, value=None):
|
|
|
"""
|
|
|
:param left:
|
|
|
:param right:
|
|
|
:param top:
|
|
|
:param down:
|
|
|
:param color:
|
|
|
:param value:
|
|
|
:return:
|
|
|
"""
|
|
|
self.set_border(left, right, top, down)
|
|
|
self.wb.merge_cells('%s%d:%s%d' % (chr(64 + left), top, chr(64 + right), down))
|
|
|
subtotal = self.wb.cell(row=self.row_number, column=left)
|
|
|
subtotal.style = self.highlight(color)
|
|
|
if value:
|
|
|
subtotal.value = value
|
|
|
if not isinstance(value, str):
|
|
|
subtotal.number_format = f'{self.raw_wb.currency}#,##0.00;{self.raw_wb.currency}-#,##0.00'
|
|
|
subtotal.alignment = Alignment(horizontal='center', vertical='center')
|
|
|
self.wb.row_dimensions[self.row_number].height = 20
|
|
|
|
|
|
def fill_table(self, data: list, index_title: list, link=False, index=False):
|
|
|
"""
|
|
|
填充数据
|
|
|
:param data:
|
|
|
:param index_title:
|
|
|
:return:
|
|
|
"""
|
|
|
for row in data:
|
|
|
for k, value in enumerate(row):
|
|
|
cell = self.wb.cell(row=self.row_number, column= k + self.margin_left + 1)
|
|
|
|
|
|
cell.style = self.highlight(bold=False)
|
|
|
cell.alignment = Alignment(
|
|
|
horizontal = index_title[k].get('sub_horizontal', 'center'),
|
|
|
vertical = 'center'
|
|
|
)
|
|
|
cell.border = self.my_border('thin', 'thin', 'thin', 'thin')
|
|
|
self.wb.row_dimensions[self.row_number].height = 20
|
|
|
if index_title[k].get('format'):
|
|
|
cell.number_format = index_title[k].get('format')
|
|
|
if k == 0 and link is True:
|
|
|
cell.value = '=HYPERLINK("%s", "%s")' % (f'#{value.replace(" ", "_")}!B2', value)
|
|
|
cell.font = Font(underline = 'single', color = '0563C1')
|
|
|
else:
|
|
|
cell.value = value
|
|
|
# if index and k == 1:
|
|
|
# cell.hyperlink = f'#{value.replace(" ", "_")}!B2'
|
|
|
# cell.font = Font(underline = 'single', color = '0563C1')
|
|
|
|
|
|
self.row_number += 1
|
|
|
|
|
|
def fill_footer(self, footer_data: list, color='A9D08E'):
|
|
|
"""
|
|
|
填充角标
|
|
|
:return:
|
|
|
"""
|
|
|
total = self.margin_left + 1
|
|
|
for k, v in footer_data:
|
|
|
# 合并每一个区域块
|
|
|
self.merge(total, total + k - 1, self.row_number, self.row_number, value = v, color = color)
|
|
|
# 标注网格线
|
|
|
self.set_border(total, total + k - 1, self.row_number, self.row_number)
|
|
|
total += k
|
|
|
# 完成下移
|
|
|
# 空行
|
|
|
self.row_number += 1
|
|
|
for i in range(2):
|
|
|
self.wb.append([])
|
|
|
self.row_number += 1
|
|
|
|
|
|
def format_title(self, index_data: list):
|
|
|
for i, title in enumerate(index_data):
|
|
|
self.wb.column_dimensions[chr(65 + i + 1)].width = title.get('width', 20)
|
|
|
cell = self.wb.cell(row = self.row_number, column = i + self.margin_left + 1)
|
|
|
title_info = index_data[i]
|
|
|
cell.value = title_info['title']
|
|
|
cell.style = self.highlight(
|
|
|
title_info['background_color'],
|
|
|
font_color = title_info.get('font_color', '2B2B2B')
|
|
|
)
|
|
|
cell.alignment = Alignment(horizontal = 'center', vertical = 'center')
|
|
|
self.set_border(self.margin_left + 1, len(index_data) + self.margin_left, self.row_number, self.row_number)
|
|
|
self.row_number += 1
|
|
|
|
|
|
def my_border(self, t_border, b_border, l_border, r_border):
|
|
|
if self.border:
|
|
|
return self.border
|
|
|
else:
|
|
|
self.border = Border(top=Side(border_style=t_border, color=colors.BLACK),
|
|
|
bottom=Side(border_style=b_border, color=colors.BLACK),
|
|
|
left=Side(border_style=l_border, color=colors.BLACK),
|
|
|
right=Side(border_style=r_border, color=colors.BLACK))
|
|
|
return self.border
|
|
|
|
|
|
def redirect_index(self):
|
|
|
cell = self.wb.cell(row = 1, column = 1)
|
|
|
cell.style = self.highlight(bold = False)
|
|
|
cell.border = self.my_border('thin', 'thin', 'thin', 'thin')
|
|
|
self.wb.row_dimensions[self.row_number].height = 20
|
|
|
cell.value = '=HYPERLINK("%s", "%s")' % (f'#ResourcesUsage!B2', "跳回首页")
|
|
|
cell.font = Font(underline='single', color='0563C1')
|
|
|
|
|
|
def set_border(self, left, right, top, down):
|
|
|
for row in range(top, down + 1):
|
|
|
for line in range(left, right + 1):
|
|
|
cell = self.wb.cell(row = row, column = line)
|
|
|
cell.border = self.my_border('thin', 'thin', 'thin', 'thin')
|
|
|
cell.alignment = Alignment(horizontal = 'center', vertical = 'center')
|
|
|
|
|
|
def fill_index(self, location='B2', title_index: list = None, title=None, bkg=None):
|
|
|
if title:
|
|
|
self.wb[location] = title
|
|
|
else:
|
|
|
self.wb[location] = self.raw_wb.project + ':' + self.raw_wb.time
|
|
|
if bkg:
|
|
|
self.wb[location].style = self.highlight(background_color = bkg)
|
|
|
else:
|
|
|
self.wb[location].style = self.highlight(background_color = 'D9D9D9')
|
|
|
self.set_border(self.margin_left + 1, len(title_index) + self.margin_left, self.row_number,
|
|
|
self.row_number)
|
|
|
self.wb[location].border = self.my_border('thin', 'thin', 'thin', 'thin')
|
|
|
self.wb[location].alignment = Alignment(horizontal = 'center', vertical = 'center')
|
|
|
self.row_number += 1
|
|
|
|
|
|
def set_solid_border(self, area):
|
|
|
s_column = area[0]
|
|
|
s_index = area[1]
|
|
|
e_column = area[2]
|
|
|
e_index = area[3]
|
|
|
# 设置左框线
|
|
|
for cell in self.wb[s_column][s_index - 1:e_index]:
|
|
|
cell.border = Border(left=Side(border_style='thin', color=colors.BLACK))
|
|
|
# 设置右框线
|
|
|
for cell in self.wb[e_column][s_index - 1:e_index]:
|
|
|
cell.border = Border(right=Side(border_style='thin', color=colors.BLACK))
|
|
|
|
|
|
class IndexSheet(Sheet):
|
|
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
|
kwargs['index'] = True
|
|
|
super().__init__(*args, **kwargs)
|
|
|
|
|
|
self.title_index = [
|
|
|
{'title': 'NO.', 'background_color': "808080", "font_color": "FFFFFF", 'sub_horizontal': 'center',
|
|
|
'width': 26},
|
|
|
{'title': self.raw_wb.tagname, 'background_color': "808080", "font_color": "FFFFFF", 'width': 35},
|
|
|
{'title': 'Total Cost', 'background_color': "808080", "font_color": "FFFFFF", 'sub_horizontal': 'center',
|
|
|
'format': f'{self.raw_wb.currency}#,##0;{self.raw_wb.currency}-#,##0', 'width': 35},
|
|
|
{'title': 'Percentage', 'background_color': "808080", "font_color": "FFFFFF", 'sub_horizontal': 'center',
|
|
|
'width': 30},
|
|
|
]
|
|
|
self.margin_top = 7
|
|
|
self.wb.sheet_view.showGridLines = False # 隐藏网络线
|
|
|
self.wb.merge_cells('B8:E8')
|
|
|
self.row_number = self.margin_top + 1
|
|
|
|
|
|
self.wb.merge_cells('A1:G1')
|
|
|
self.wb.merge_cells('B2:E4')
|
|
|
b2 = self.wb.cell(row=2, column=2)
|
|
|
b2.value = 'The Monthly Bill for {}'.format(self.raw_wb.project)
|
|
|
fontObj2 = Font(bold=False, italic=False, size=20)
|
|
|
b2.font = fontObj2
|
|
|
b2.alignment = Alignment(horizontal='center', vertical='center')
|
|
|
b5 = self.wb.cell(row=5, column=2)
|
|
|
b5.value = 'Billing duration:'
|
|
|
b5.style = self.highlight(background_color='FFFF00', bold=False)
|
|
|
c5 = self.wb.cell(row=5, column=3)
|
|
|
c5.style = self.highlight(bold=False)
|
|
|
b6 = self.wb.cell(row=6, column=2)
|
|
|
b6.value = 'Tag=%s' % self.tagname
|
|
|
d5 = self.wb.cell(row=5, column=4)
|
|
|
d5.value = self.raw_wb.time
|
|
|
self.wb.sheet_properties.tabColor = "FF0000"
|
|
|
|
|
|
def run(self):
|
|
|
# 第二行
|
|
|
self.fill_index(location="B8", title_index=self.title_index)
|
|
|
# title行
|
|
|
self.format_title(self.title_index)
|
|
|
total_cost = sum([v for _, v in self.table_data.items()])
|
|
|
self.fill_data = []
|
|
|
num = 1
|
|
|
for k, v in self.table_data.items():
|
|
|
self.fill_data.append([num, k, v, "%.2f%%" % (v / total_cost * 100)])
|
|
|
num += 1
|
|
|
# 填充数据行
|
|
|
self.fill_table(data=self.fill_data, index_title=self.title_index, index=True)
|
|
|
self.set_border(2, len(self.title_index) + self.margin_left, self.row_number, self.row_number)
|
|
|
# 总计行
|
|
|
self.fill_footer([(2, self.raw_wb.total_str), (1, total_cost)])
|
|
|
# self.set_solid_border(['K', 2, 'M', 5])
|
|
|
|
|
|
class AppSheet(Sheet):
|
|
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
|
super().__init__(*args, **kwargs)
|
|
|
|
|
|
self.title_first = [
|
|
|
{'title': 'NO.', 'background_color': "808080", "font_color": "FFFFFF", 'sub_horizontal': 'center',
|
|
|
'width': 5},
|
|
|
{'title': 'Name', 'background_color': "808080", "font_color": "FFFFFF", 'sub_horizontal': 'left',
|
|
|
'width': 30},
|
|
|
{'title': 'Product', 'background_color': "1F4E78", "font_color": "FFFFFF", 'sub_horizontal': 'left',
|
|
|
'width': 30},
|
|
|
{'title': 'Resource Cost', 'format': f'{self.raw_wb.currency}#,##0.00;{self.raw_wb.currency}-#,##0.00',
|
|
|
'background_color': "D9D9D9",
|
|
|
'sub_horizontal': 'right', 'width': 20},
|
|
|
{'title': 'Other Resource', 'background_color': "1F4E78", "font_color": "FFFFFF", 'width': 30},
|
|
|
{'title': 'Resource Cost', 'format': f'{self.raw_wb.currency}#,##0.00;{self.raw_wb.currency}-#,##0.00',
|
|
|
'background_color': "D9D9D9", 'width': 15},
|
|
|
{'title': 'Data Transfer', 'format': f'{self.raw_wb.currency}#,##0.00;{self.raw_wb.currency}-#,##0.00',
|
|
|
'background_color': "D9D9D9",
|
|
|
'sub_horizontal': 'right', 'width': 15},
|
|
|
{'title': 'Total Cost', 'format': f'{self.raw_wb.currency}#,##0.00;{self.raw_wb.currency}-#,##0.00',
|
|
|
'background_color': "D9D9D9",
|
|
|
'sub_horizontal': 'right', 'width': 15},
|
|
|
{'title': 'Application', 'background_color': "808080", "font_color": "FFFFFF", 'width': 20},
|
|
|
{'title': 'Owner', 'background_color': "808080", "font_color": "FFFFFF", 'width': 10},
|
|
|
{'title': 'Remarks', 'background_color': "808080", "font_color": "FFFFFF", 'width': 10},
|
|
|
]
|
|
|
self.title_second = [
|
|
|
{"title": 'NO.', 'background_color': "808080", "font_color": "FFFFFF", 'width': 5},
|
|
|
{"title": 'Name', 'background_color': "808080", "font_color": "FFFFFF", 'width': 30},
|
|
|
{"title": 'Product', 'background_color': "1F4E78", "font_color": "FFFFFF", 'width': 30},
|
|
|
{"title": 'Resource Cost', 'background_color': "D9D9D9", 'width': 20,
|
|
|
'format': f'{self.raw_wb.currency}#,##0.00;{self.raw_wb.currency}-#,##0.00', },
|
|
|
{"title": 'Total Cost', 'background_color': "D9D9D9", 'width': 30,
|
|
|
'format': f'{self.raw_wb.currency}#,##0.00;{self.raw_wb.currency}-#,##0.00', },
|
|
|
{"title": self.tagname, 'background_color': "808080", "font_color": "FFFFFF", 'width': 20},
|
|
|
]
|
|
|
self.wb.sheet_view.showGridLines = False # 隐藏网络线
|
|
|
# self.wb.freeze_panes = 'C4' # 冻结窗口
|
|
|
self.wb.merge_cells('B3:L5')
|
|
|
b3 = self.wb.cell(row=3, column=2)
|
|
|
b3.value = '{} program fee details'.format(self.tag)
|
|
|
fontObj2 = Font(bold=False, italic=False, size=20)
|
|
|
b3.font = fontObj2
|
|
|
b3.alignment = Alignment(horizontal='center', vertical='center')
|
|
|
self.wb.merge_cells('B6:L6')
|
|
|
self.row_number = self.margin_top + 1
|
|
|
|
|
|
def run(self):
|
|
|
# 第二行
|
|
|
self.fill_index(location='B6', title_index=self.title_first)
|
|
|
# title行
|
|
|
self.format_title(self.title_first)
|
|
|
# 填充数据行
|
|
|
total_1 = new_round(sum([item[-4] for item in self.table_first_data]), 6)
|
|
|
# 格式化 符号
|
|
|
for i in range(len(self.table_first_data)):
|
|
|
self.table_first_data[i][3] = self.table_first_data[i][3]
|
|
|
self.table_first_data[i][5] = self.table_first_data[i][5]
|
|
|
self.table_first_data[i][6] = self.table_first_data[i][6]
|
|
|
self.table_first_data[i][7] = self.table_first_data[i][7]
|
|
|
self.fill_table(data = self.table_first_data, index_title = self.title_first)
|
|
|
self.set_border(2, len(self.title_first) + self.margin_left, self.row_number, self.row_number)
|
|
|
# 小计行
|
|
|
self.fill_footer([(len(self.title_first) - 4, self.raw_wb.subtotal_str), (4, total_1)])
|
|
|
# 第二个表格
|
|
|
self.format_title(self.title_second)
|
|
|
start = self.row_number
|
|
|
total_2 = new_round(sum([item[3] for item in self.table_second_data]), 6)
|
|
|
total_3 = new_round(sum([item[4] for item in self.table_second_data]), 6)
|
|
|
|
|
|
# 格式化 符号
|
|
|
for i in range(len(self.table_second_data)):
|
|
|
self.table_second_data[i][3] = self.table_second_data[i][3]
|
|
|
self.table_second_data[i][4] = self.table_second_data[i][4]
|
|
|
|
|
|
self.fill_table(self.table_second_data, index_title = self.title_second)
|
|
|
end = self.row_number - 1
|
|
|
if end > start:
|
|
|
self.wb.merge_cells(
|
|
|
'%s%d:%s%d' % (chr(64 + self.margin_left + 2), start, chr(64 + self.margin_left + 2), end))
|
|
|
# 小计行
|
|
|
self.fill_footer([(3, self.raw_wb.subtotal_str), (1, total_2), (1, total_3)])
|
|
|
# 总计行
|
|
|
self.fill_footer([
|
|
|
(len(self.title_first) - 2, self.raw_wb.total_str),
|
|
|
(2, new_round(float(total_1) + float(total_3), 2))
|
|
|
], color = 'FFC000')
|
|
|
# self.redirect_index()
|
|
|
# self.set_solid_border(['A', 2, 'M', 95])
|
|
|
|
|
|
|
|
|
class OtherSheet(Sheet):
|
|
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
|
super().__init__(*args, **kwargs)
|
|
|
self.title_index = [
|
|
|
{'title': 'LinkedAccountId', 'background_color': "808080", "font_color": "FFFFFF", 'width': 20,
|
|
|
'sub_horizontal': 'left'},
|
|
|
{'title': 'Product', 'background_color': "808080", "font_color": "FFFFFF", 'sub_horizontal': 'left',
|
|
|
'width': 40, },
|
|
|
{'title': 'ItemDescription', 'background_color': "808080", "font_color": "FFFFFF", 'width': 60,
|
|
|
'sub_horizontal': 'left'},
|
|
|
{'title': 'UnBlendedCost', 'background_color': "808080", "font_color": "FFFFFF", 'sub_horizontal': 'right',
|
|
|
'width': 20, 'format': f'{self.raw_wb.currency}#,##0.00;{self.raw_wb.currency}-#,##0.00', },
|
|
|
]
|
|
|
self.margin_top = 5
|
|
|
self.wb.sheet_view.showGridLines = False # 隐藏网络线
|
|
|
self.row_number = self.margin_top + 1
|
|
|
# self.wb.freeze_panes = 'B3' # 冻结窗口
|
|
|
self.wb.merge_cells('B3:E5')
|
|
|
b3 = self.wb.cell(row=3, column=2)
|
|
|
b3.value = 'Others resource cost program fee details'
|
|
|
fontObj2 = Font(bold=False, italic=False, size=20)
|
|
|
b3.font = fontObj2
|
|
|
b3.alignment = Alignment(horizontal='center', vertical='center')
|
|
|
|
|
|
def run(self, ):
|
|
|
# title行
|
|
|
self.format_title(self.title_index)
|
|
|
# 填充数据行
|
|
|
self.fill_table(data=self.table_data, index_title=self.title_index)
|
|
|
self.set_border(2, len(self.title_index) + self.margin_left, self.row_number, self.row_number)
|
|
|
# 总计行
|
|
|
self.fill_footer([(3, self.raw_wb.total_str), (1, sum([item[3] for item in self.table_data]))])
|
|
|
|
|
|
class Custom_Excel(object):
|
|
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
|
self.project = kwargs.get('project', None)
|
|
|
self.tag = kwargs.get('tag', None)
|
|
|
self.tagname = self.tag
|
|
|
self.linkedAccountIds = kwargs.get('linkedAccountIds', None)
|
|
|
self.csv_list = kwargs.get('csv_list', [])
|
|
|
|
|
|
def get_data(self):
|
|
|
df = pd.DataFrame()
|
|
|
for _csv in self.csv_list:
|
|
|
if _csv:
|
|
|
csv_data = pd.read_csv(
|
|
|
_csv,
|
|
|
sep=',',
|
|
|
encoding='utf-8',
|
|
|
skiprows=[0, ], # 默认要删除第一行
|
|
|
dtype={'LinkedAccountId': object}
|
|
|
)
|
|
|
df = pd.concat([df, csv_data], axis=0)
|
|
|
df = df[(df['RecordType'] == 'LinkedLineItem') & (df['ProductName'].notna()) &
|
|
|
#分两次跑
|
|
|
# (df['LinkedAccountId'].isin(['183995181010', '609863357525']))]
|
|
|
# (df['LinkedAccountId'].isin(['515743265704']))]
|
|
|
#20230823 已优化
|
|
|
(df['LinkedAccountId'].isin(self.linkedAccountIds))]
|
|
|
print('和'.join(self.linkedAccountIds)+"----处理中")
|
|
|
df = df.apply(self.alter_tag_name, axis=1)
|
|
|
df[self.tag] = df[self.tag].fillna('Other')
|
|
|
df['user:Name'] = df['user:Name'].fillna('other')
|
|
|
tag_df = df[df[self.tag] != 'Other']
|
|
|
other_df = df[df[self.tag] == 'Other']
|
|
|
tag_result = tag_df.groupby(by=[self.tag, 'user:Name', 'ProductName']).agg({'TotalCost': 'sum'})
|
|
|
other_result = other_df.groupby(by=['LinkedAccountId', 'ProductName', 'ItemDescription']).agg({'TotalCost': 'sum'})
|
|
|
tag_result = tag_result.sort_values(by='TotalCost', ascending=False)
|
|
|
tags_dict = tag_result.to_dict(orient='dict')
|
|
|
other_dict = other_result.to_dict(orient='dict')
|
|
|
return tags_dict, other_dict
|
|
|
|
|
|
@staticmethod
|
|
|
def alter_tag_name(arrLike):
|
|
|
productli = ['AmazonCloudWatch', 'Amazon Simple Storage Service', 'Amazon CloudTrail', 'Amazon Config',
|
|
|
'Amazon Lambda', 'Amazon Directory Service']
|
|
|
username = arrLike['user:Name']
|
|
|
productname = arrLike['ProductName']
|
|
|
application = arrLike['user:application']
|
|
|
if application == 'shared':
|
|
|
arrLike['user:application'] = 'sharing resources'
|
|
|
if pd.isna(username):
|
|
|
arrLike['user:Name'] = 'Others'
|
|
|
if productname in productli:
|
|
|
arrLike['user:application'] = 'sharing resources'
|
|
|
arrLike['user:Name'] = np.nan
|
|
|
return arrLike
|
|
|
|
|
|
def create_excel(self):
|
|
|
start_dt, end_dt = self.get_period()
|
|
|
currency = '¥'
|
|
|
sheet = Book(
|
|
|
project=self.project,
|
|
|
tag=self.tag,
|
|
|
tagname=self.tagname,
|
|
|
time='{}_{}'.format(start_dt, end_dt),
|
|
|
currency=currency
|
|
|
# language='zh'
|
|
|
)
|
|
|
tag_data, other_data = self.get_data()
|
|
|
data = tag_data['TotalCost']
|
|
|
other = other_data['TotalCost']
|
|
|
order_other = collections.OrderedDict(sorted(other.items(), key=lambda t:t[1], reverse=True))
|
|
|
other_list = []
|
|
|
for k, v in order_other.items():
|
|
|
other_list.append([k[0], k[1], k[2], v])
|
|
|
name_list = defaultdict(set)
|
|
|
table_data = defaultdict(float)
|
|
|
tag_dict = defaultdict(list)
|
|
|
other_dict = defaultdict(list)
|
|
|
for k, v in data.items():
|
|
|
table_data[k[0]] += v
|
|
|
if k[2] != 'Amazon Data Transfer' and k[1] != 'other':
|
|
|
if k[1] in name_list[k[0]]:
|
|
|
for i in tag_dict[k[0]]:
|
|
|
if i[0] == k[1]:
|
|
|
i[3] = k[2]
|
|
|
i[4] += float(v)
|
|
|
i[6] += float(v)
|
|
|
else:
|
|
|
data_transfer = data.get((k[0], k[1], 'Amazon Data Transfer'), 0.00)
|
|
|
total = float(v) + float(data_transfer)
|
|
|
tag_dict[k[0]].append([k[1], k[2], v, '', 0.00, data_transfer, total, self.tagname, '', ''])
|
|
|
name_list[k[0]].add(k[1])
|
|
|
elif k[1] == 'other':
|
|
|
other_dict[k[0]].append([k[1], k[2], v, v, self.tagname])
|
|
|
for k, v in other.items():
|
|
|
table_data['Others'] += v
|
|
|
result = collections.OrderedDict(sorted(table_data.items(), key=lambda t: t[1], reverse=True))
|
|
|
IndexSheet(sheet, self.tag, table_data=result, tagname=self.tagname).run()
|
|
|
for _, value_list in tag_dict.items():
|
|
|
for i in range(len(value_list)):
|
|
|
value_list[i].insert(0, i+1)
|
|
|
for _, value_list in other_dict.items():
|
|
|
for i in range(len(value_list)):
|
|
|
value_list[i].insert(0, i+1)
|
|
|
for i in result:
|
|
|
if i == 'Others':
|
|
|
continue
|
|
|
app = AppSheet(sheet, i, tagname=self.tagname)
|
|
|
app.table_first_data = tag_dict[i]
|
|
|
app.table_second_data = other_dict[i]
|
|
|
app.run()
|
|
|
other_display = 'Other resource cost·'
|
|
|
OtherSheet(sheet, other_display, table_data=other_list, tagname=self.tagname).run()
|
|
|
#调整目录
|
|
|
check_path_creat('./' + '和'.join(self.linkedAccountIds))
|
|
|
sheet.save(os.path.join('./' +('和'.join(self.linkedAccountIds)), '{}-{}账单.xlsx'.format(self.project, self.data_month)))
|
|
|
print('数据处理完成')
|
|
|
|
|
|
def get_period(self):
|
|
|
data = self.csv_list[0]
|
|
|
self.data_month = data[-11:-4]
|
|
|
date = datetime.datetime.strptime(self.data_month, "%Y-%m")
|
|
|
later = date.strftime("%Y-%m-%d")
|
|
|
days = calendar.monthrange(date.year, date.month)
|
|
|
later1dt = date + datetime.timedelta(days[1] - 1)
|
|
|
later1 = later1dt.strftime("%Y-%m-%d")
|
|
|
return later, later1
|
|
|
|
|
|
############方法 -判断目录是否存在,不存在则创建
|
|
|
def check_path_creat(path):
|
|
|
"""
|
|
|
检查路径是否存在,不存在就创建
|
|
|
:param path: 文件路径
|
|
|
:return:
|
|
|
"""
|
|
|
if not os.path.exists(path):
|
|
|
os.mkdir(path)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
|
|
#获取输入执行时输入参数
|
|
|
month='2024-06'
|
|
|
if len(sys.argv) > 1:
|
|
|
month = sys.argv[1]
|
|
|
print('执行月份:'+month)
|
|
|
#1. 附件
|
|
|
csv_list = [
|
|
|
'441271182313-aws-cost-allocation-ACTS-'+month+'.csv',
|
|
|
'441271182313-aws-cost-allocation-ACTS-Ningxia-'+month+'.csv',
|
|
|
]
|
|
|
#1.1 循环附件名
|
|
|
for csv in csv_list:
|
|
|
csv.format(month)
|
|
|
#print(os.path.isfile('./'+csv))
|
|
|
#1.1.1附件不存在,则拉取
|
|
|
if not os.path.isfile('./' + csv):
|
|
|
print('附件不存在:',csv)
|
|
|
auto_download_file_form_s3([csv])
|
|
|
#2. 直接跑
|
|
|
project = '菲仕兰'
|
|
|
tag = 'user:application'
|
|
|
accountIds= [['183995181010', '609863357525'],['515743265704']]
|
|
|
for i in accountIds:
|
|
|
Custom_Excel(csv_list=csv_list, project=project, tag=tag,linkedAccountIds=i).create_excel()
|