Skip to content

Commit

Permalink
更新bundle数据支持将错误统一输出到日志文件中
Browse files Browse the repository at this point in the history
  • Loading branch information
Lin-Dongzhao committed Aug 8, 2024
1 parent 762bc15 commit 9100819
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 68 deletions.
13 changes: 11 additions & 2 deletions rqalpha/cmds/bundle.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ def create_bundle(data_bundle_path, rqdatac_uri, compression, concurrency):
help='rqdatac uri, eg user:password or tcp://user:password@ip:port')
@click.option('--compression', default=False, type=click.BOOL, help='enable compression to reduce file size')
@click.option('-c', '--concurrency', type=click.INT, default=1)
def update_bundle(data_bundle_path, rqdatac_uri, compression, concurrency):
@click.option("--error-log-file", "-ef", default=None, help="数据更新过程中错误信息的存储文件,默认为指定 bundle 目录下的 update_error.log")
def update_bundle(data_bundle_path, rqdatac_uri, compression, concurrency, error_log_file):
try:
import rqdatac
except ImportError:
Expand All @@ -89,7 +90,15 @@ def update_bundle(data_bundle_path, rqdatac_uri, compression, concurrency):
return 1

from rqalpha.data.bundle import update_bundle as update_bundle_
update_bundle_(os.path.join(data_bundle_path, 'bundle'), False, compression, concurrency)
from rqalpha.data.bundle import push_errors_log as push_errors_log_
path = os.path.join(data_bundle_path, "bundle")
if not error_log_file:
error_log_file = os.path.join(path, "update_failed.log")
elif os.path.isdir(error_log_file):
error_log_file = os.path.join(error_log_file, "update_failed.log")
failed_msg = update_bundle_(path, False, compression, concurrency)
if failed_msg:
push_errors_log_(error_log_file, failed_msg)


@cli.command(help=_("Download bundle (monthly updated)"))
Expand Down
156 changes: 98 additions & 58 deletions rqalpha/data/bundle.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
from itertools import chain
from typing import Callable, Optional, Union, List
from filelock import FileLock, Timeout
from collections import defaultdict
import logbook
from logbook import Logger
import click

import h5py
import numpy as np
Expand All @@ -35,48 +39,48 @@
END_DATE = 29991231


def gen_instruments(d):
def gen_instruments(d: str, file: str):
stocks = sorted(list(rqdatac.all_instruments().order_book_id))
instruments = [i.__dict__ for i in rqdatac.instruments(stocks)]
with open(os.path.join(d, 'instruments.pk'), 'wb') as out:
with open(os.path.join(d, file), 'wb') as out:
pickle.dump(instruments, out, protocol=2)


def gen_yield_curve(d):
def gen_yield_curve(d: str, file: str):
yield_curve = rqdatac.get_yield_curve(start_date=START_DATE, end_date=datetime.date.today())
yield_curve.index = [convert_date_to_date_int(d) for d in yield_curve.index]
yield_curve.index.name = 'date'
with h5py.File(os.path.join(d, 'yield_curve.h5'), 'w') as f:
with h5py.File(os.path.join(d, file), 'w') as f:
f.create_dataset('data', data=yield_curve.to_records())


def gen_trading_dates(d):
def gen_trading_dates(d: str, file: str):
dates = rqdatac.get_trading_dates(start_date=START_DATE, end_date='2999-01-01')
dates = np.array([convert_date_to_date_int(d) for d in dates])
np.save(os.path.join(d, 'trading_dates.npy'), dates, allow_pickle=False)
np.save(os.path.join(d, file), dates, allow_pickle=False)


def gen_st_days(d):
def gen_st_days(d: str, file: str):
from rqdatac.client import get_client
stocks = rqdatac.all_instruments('CS').order_book_id.tolist()
st_days = get_client().execute('get_st_days', stocks, START_DATE,
convert_date_to_date_int(datetime.date.today()))
with h5py.File(os.path.join(d, 'st_stock_days.h5'), 'w') as h5:
with h5py.File(os.path.join(d, file), 'w') as h5:
for order_book_id, days in st_days.items():
h5[order_book_id] = days


def gen_suspended_days(d):
def gen_suspended_days(d: str, file: str):
from rqdatac.client import get_client
stocks = rqdatac.all_instruments('CS').order_book_id.tolist()
suspended_days = get_client().execute('get_suspended_days', stocks, START_DATE,
convert_date_to_date_int(datetime.date.today()))
with h5py.File(os.path.join(d, 'suspended_days.h5'), 'w') as h5:
with h5py.File(os.path.join(d, file), 'w') as h5:
for order_book_id, days in suspended_days.items():
h5[order_book_id] = days


def gen_dividends(d):
def gen_dividends(d: str, file: str):
stocks = rqdatac.all_instruments().order_book_id.tolist()
dividend = rqdatac.get_dividend(stocks)
need_cols = ["dividend_cash_before_tax", "book_closure_date", "ex_dividend_date", "payable_date", "round_lot"]
Expand All @@ -86,12 +90,12 @@ def gen_dividends(d):
for f in ('book_closure_date', 'ex_dividend_date', 'payable_date', 'announcement_date'):
dividend[f] = [convert_date_to_date_int(d) for d in dividend[f]]
dividend.set_index(['order_book_id', 'book_closure_date'], inplace=True)
with h5py.File(os.path.join(d, 'dividends.h5'), 'w') as h5:
with h5py.File(os.path.join(d, file), 'w') as h5:
for order_book_id in dividend.index.levels[0]:
h5[order_book_id] = dividend.loc[order_book_id].to_records()


def gen_splits(d):
def gen_splits(d: str, file: str):
stocks = rqdatac.all_instruments().order_book_id.tolist()
split = rqdatac.get_split(stocks)
split['split_factor'] = split['split_coefficient_to'] / split['split_coefficient_from']
Expand All @@ -101,12 +105,12 @@ def gen_splits(d):
split['ex_date'] = [convert_date_to_int(d) for d in split['ex_date']]
split.set_index(['order_book_id', 'ex_date'], inplace=True)

with h5py.File(os.path.join(d, 'split_factor.h5'), 'w') as h5:
with h5py.File(os.path.join(d, file), 'w') as h5:
for order_book_id in split.index.levels[0]:
h5[order_book_id] = split.loc[order_book_id].to_records()


def gen_ex_factor(d):
def gen_ex_factor(d: str, file: str):
stocks = rqdatac.all_instruments().order_book_id.tolist()
ex_factor = rqdatac.get_ex_factor(stocks)
ex_factor.reset_index(inplace=True)
Expand All @@ -120,25 +124,25 @@ def gen_ex_factor(d):
initial['start_date'] = 0
initial['ex_cum_factor'] = 1.0

with h5py.File(os.path.join(d, 'ex_cum_factor.h5'), 'w') as h5:
with h5py.File(os.path.join(d, file), 'w') as h5:
for order_book_id in ex_factor.index.levels[0]:
h5[order_book_id] = np.concatenate([initial, ex_factor.loc[order_book_id].to_records()])


def gen_share_transformation(d):
def gen_share_transformation(d: str, file: str):
df = rqdatac.get_share_transformation()
df.drop_duplicates("predecessor", inplace=True)
df.set_index('predecessor', inplace=True)
df.effective_date = df.effective_date.astype(str)
df.predecessor_delisted_date = df.predecessor_delisted_date.astype(str)

json_file = os.path.join(d, 'share_transformation.json')
json_file = os.path.join(d, file)
with open(json_file, 'w') as f:
f.write(df.to_json(orient='index'))


def gen_future_info(d):
future_info_file = os.path.join(d, 'future_info.json')
def gen_future_info(d: str, file: str):
future_info_file = os.path.join(d, file)

def _need_to_recreate():
if not os.path.exists(future_info_file):
Expand Down Expand Up @@ -270,7 +274,7 @@ def update_margin_rate(file):
future_dict['tick_size'] = instruemnts_data.tick_size()
all_futures_info.append(future_dict)

with open(os.path.join(d, 'future_info.json'), 'w') as f:
with open(os.path.join(d, file), 'w') as f:
json.dump(all_futures_info, f, separators=(',', ':'), indent=2)


Expand All @@ -280,13 +284,16 @@ def __init__(self, func):
self._step = 100

@property
def total_steps(self):
# type: () -> int
def total_steps(self) -> int:
return self._step

def __call__(self, *args, **kwargs):
self._func(*args, **kwargs)
yield self._step
try:
self._func(*args, **kwargs)
except Exception as e:
yield (args, e)
else:
yield self._step


STOCK_FIELDS = ['open', 'close', 'high', 'low', 'prev_close', 'limit_up', 'limit_down', 'volume', 'total_turnover']
Expand All @@ -300,8 +307,7 @@ def __init__(self, order_book_ids):
self._order_book_ids = order_book_ids

@property
def total_steps(self):
# type: () -> int
def total_steps(self) -> int:
return len(self._order_book_ids)

def __call__(self, path, fields, **kwargs):
Expand All @@ -310,24 +316,27 @@ def __call__(self, path, fields, **kwargs):

class GenerateDayBarTask(DayBarTask):
def __call__(self, path, fields, **kwargs):
with h5py.File(path, 'w') as h5:
i, step = 0, 300
while True:
order_book_ids = self._order_book_ids[i:i + step]
df = rqdatac.get_price(order_book_ids, START_DATE, datetime.date.today(), '1d',
adjust_type='none', fields=fields, expect_df=True)
if not (df is None or df.empty):
df.reset_index(inplace=True)
df['datetime'] = [convert_date_to_int(d) for d in df['date']]
del df['date']
df.set_index(['order_book_id', 'datetime'], inplace=True)
df.sort_index(inplace=True)
for order_book_id in df.index.levels[0]:
h5.create_dataset(order_book_id, data=df.loc[order_book_id].to_records(), **kwargs)
i += step
yield len(order_book_ids)
if i >= len(self._order_book_ids):
break
try:
with h5py.File(path, 'w') as h5:
i, step = 0, 300
while True:
order_book_ids = self._order_book_ids[i:i + step]
df = rqdatac.get_price(order_book_ids, START_DATE, datetime.date.today(), '1d',
adjust_type='none', fields=fields, expect_df=True)
if not (df is None or df.empty):
df.reset_index(inplace=True)
df['datetime'] = [convert_date_to_int(d) for d in df['date']]
del df['date']
df.set_index(['order_book_id', 'datetime'], inplace=True)
df.sort_index(inplace=True)
for order_book_id in df.index.levels[0]:
h5.create_dataset(order_book_id, data=df.loc[order_book_id].to_records(), **kwargs)
i += step
yield len(order_book_ids)
if i >= len(self._order_book_ids):
break
except Exception as e:
yield (path, e)


class UpdateDayBarTask(DayBarTask):
Expand Down Expand Up @@ -355,20 +364,13 @@ def __call__(self, path, fields, **kwargs):
else:
try:
h5 = h5py.File(path, 'a')
except OSError:
raise OSError("File {} update failed, if it is using, please update later, "
"or you can delete then update again".format(path))
try:
is_futures = "futures" == os.path.basename(path).split(".")[0]
for order_book_id in self._order_book_ids:
# 特殊处理前复权合约,需要全量更新
is_pre = is_futures and "888" in order_book_id
if order_book_id in h5 and not is_pre:
try:
last_date = int(h5[order_book_id]['datetime'][-1] // 1000000)
except OSError:
raise OSError("File {} update failed, if it is using, please update later, "
"or you can delete then update again".format(path))
except ValueError:
h5.pop(order_book_id)
start_date = START_DATE
Expand All @@ -395,6 +397,8 @@ def __call__(self, path, fields, **kwargs):
else:
h5.create_dataset(order_book_id, data=df.to_records(), **kwargs)
yield 1
except Exception as e:
yield (path, e)
finally:
h5.close()

Expand Down Expand Up @@ -426,18 +430,54 @@ def update_bundle(path, create, enable_compression=False, concurrency=1):
rqdatac.reset()

gen_file_funcs = (
gen_instruments, gen_trading_dates, gen_dividends, gen_splits, gen_ex_factor, gen_st_days,
gen_suspended_days, gen_yield_curve, gen_share_transformation, gen_future_info
(gen_instruments, "instruments.pk"),
(gen_trading_dates, "trading_dates.npy"),
(gen_dividends, "dividends.h5"),
(gen_splits, "split_factor.h5"),
(gen_ex_factor, "ex_cum_factor.h5"),
(gen_st_days, "st_stock_days.h5"),
(gen_suspended_days, "suspended_days.h5"),
(gen_yield_curve, "yield_curve.h5"),
(gen_share_transformation, "share_transformation.json"),
(gen_future_info, "future_info.json"),
)

with ProgressedProcessPoolExecutor(
max_workers=concurrency, initializer=init_rqdatac_with_warnings_catch
) as executor:
# windows上子进程需要执行rqdatac.init, 其他os则需要执行rqdatac.reset; rqdatac.init包含了rqdatac.reset的功能
for func in gen_file_funcs:
executor.submit(GenerateFileTask(func), path)
for file, order_book_id, field in day_bar_args:
executor.submit(_DayBarTask(order_book_id), os.path.join(path, file), field, **kwargs)
gen_func_results = [
executor.submit(GenerateFileTask(func), path, file) for func, file in gen_file_funcs
]
day_bar_update_results = [
executor.submit(_DayBarTask(order_book_id), os.path.join(path, file), field, **kwargs) for file, order_book_id, field in day_bar_args
]
errors = []
for result in gen_func_results:
if result.result():
(p, file), message = result.result()
errors.append((os.path.join(p, file), message))
for result in day_bar_update_results:
if result.result():
errors.append(result.result())
return errors


def push_errors_log(log_file, results):
logger = Logger("UpdateBundle")
error_dict = defaultdict(list)
logbook.FileHandler(filename=log_file, mode="a").push_application()
for result in results:
if result is not None:
error_dict[result[1].__class__.__name__].append(result[0])
if isinstance(result[1], OSError):
logger.error(f"更新 {result[0]} 失败,请检查文件的权限以及是否被其他进程占用。\n{result[1]}")
else:
logger.error(f"更新 {result[0]} 失败,该文件可能已经损坏,您可以尝试删除该文件并重新执行更新命令。\n{result[1]}")
if error_dict:
click.echo(f"WARNING: 存在更新失败的文件,失败原因和文件列表如下所示,错误详情已存储至 {log_file} :")
for reason, file_list in error_dict.items():
click.echo(f"{reason}: {file_list}")


class AutomaticUpdateBundle(object):
Expand Down
21 changes: 13 additions & 8 deletions rqalpha/utils/concurrent.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@


def _process_worker(call_queue, result_queue, progress_queue, initializer, initargs):
error = None
if initializer is not None:
try:
initializer(*initargs)
Expand All @@ -26,20 +27,26 @@ def _process_worker(call_queue, result_queue, progress_queue, initializer, inita
r = call_item.fn(*call_item.args, **call_item.kwargs)
if isinstance(call_item.fn, ProgressedTask):
for step in r:
progress_queue.put(step)
if isinstance(step, int):
progress_queue.put(step)
elif isinstance(step, tuple): # 部分 task 报错时会返回一个 tuple
error = step
r = None
else:
progress_queue.put(1)
except BaseException as e:
exc = _ExceptionWithTraceback(e, e.__traceback__)
result_queue.put(_ResultItem(call_item.work_id, exception=exc))
else:
result_queue.put(_ResultItem(call_item.work_id, result=r))
if error:
result_queue.put(_ResultItem(call_item.work_id, result=error))
error = None
else:
result_queue.put(_ResultItem(call_item.work_id, result=r))


class ProgressedProcessPoolExecutor(ProcessPoolExecutor):
def __init__(self, max_workers=None, initializer=None, initargs=()):
# type: (Optional[int], Optional[Callable], Optional[Tuple]) -> None
def __init__(self, max_workers: Optional[int] = None, initializer: Optional[Callable] = None, initargs: Optional[tuple] = ()):
super(ProgressedProcessPoolExecutor, self).__init__(max_workers)
self._initializer = initializer
self._initargs = initargs
Expand Down Expand Up @@ -102,10 +109,8 @@ def shutdown(self, wait=True):

class ProgressedTask:
@property
def total_steps(self):
# type: () -> int
def total_steps(self) -> int:
raise NotImplementedError

def __call__(self, *args, **kwargs):
# type: (*Any, **Any) -> Generator
def __call__(self, *args: Any, **kwargs: Any) -> Generator:
raise NotImplementedError
Binary file modified tests/outs/test_f_mean_reverting.pkl
Binary file not shown.

0 comments on commit 9100819

Please sign in to comment.