Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update #898

Merged
merged 1 commit into from
Aug 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions rqalpha/cmds/bundle.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,9 @@ def update_bundle(data_bundle_path, rqdatac_uri, compression, concurrency):
return 1

from rqalpha.data.bundle import update_bundle as update_bundle_
status = update_bundle_(os.path.join(data_bundle_path, 'bundle'), False, compression, concurrency)
if status != 0:
sys.exit(status)
succeed = update_bundle_(os.path.join(data_bundle_path, 'bundle'), False, compression, concurrency)
if not succeed:
sys.exit(1)


@cli.command(help=_("Download bundle (monthly updated)"))
Expand Down
60 changes: 35 additions & 25 deletions rqalpha/data/bundle.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
from itertools import chain
from typing import Callable, Optional, Union, List
from filelock import FileLock, Timeout
import multiprocessing
from multiprocessing.sharedctypes import Synchronized
from ctypes import c_bool

import h5py
import numpy as np
Expand All @@ -31,8 +34,7 @@
from rqalpha.utils.logger import init_logger, system_log
from rqalpha.environment import Environment
from rqalpha.model.instrument import Instrument
import multiprocessing
from multiprocessing.sharedctypes import Synchronized


START_DATE = 20050104
END_DATE = 29991231
Expand Down Expand Up @@ -313,24 +315,32 @@ def __call__(self, path, fields, **kwargs):

class GenerateDayBarTask(DayBarTask):
def __call__(self, path, fields, **kwargs):
with h5py.File(path, 'w') as h5:
i, step = 0, 300
while True:
order_book_ids = self._order_book_ids[i:i + step]
df = rqdatac.get_price(order_book_ids, START_DATE, datetime.date.today(), '1d',
adjust_type='none', fields=fields, expect_df=True)
if not (df is None or df.empty):
df.reset_index(inplace=True)
df['datetime'] = [convert_date_to_int(d) for d in df['date']]
del df['date']
df.set_index(['order_book_id', 'datetime'], inplace=True)
df.sort_index(inplace=True)
for order_book_id in df.index.levels[0]:
h5.create_dataset(order_book_id, data=df.loc[order_book_id].to_records(), **kwargs)
i += step
yield len(order_book_ids)
if i >= len(self._order_book_ids):
break
try:
h5 = h5py.File(path, "w")
except OSError:
system_log.error("File {} update failed, if it is using, please update later, "
"or you can delete then update again".format(path))
sval.value = False
yield 1
else:
with h5:
i, step = 0, 300
while True:
order_book_ids = self._order_book_ids[i:i + step]
df = rqdatac.get_price(order_book_ids, START_DATE, datetime.date.today(), '1d',
adjust_type='none', fields=fields, expect_df=True)
if not (df is None or df.empty):
df.reset_index(inplace=True)
df['datetime'] = [convert_date_to_int(d) for d in df['date']]
del df['date']
df.set_index(['order_book_id', 'datetime'], inplace=True)
df.sort_index(inplace=True)
for order_book_id in df.index.levels[0]:
h5.create_dataset(order_book_id, data=df.loc[order_book_id].to_records(), **kwargs)
i += step
yield len(order_book_ids)
if i >= len(self._order_book_ids):
break


class UpdateDayBarTask(DayBarTask):
Expand Down Expand Up @@ -362,7 +372,7 @@ def __call__(self, path, fields, **kwargs):
except OSError:
system_log.error("File {} update failed, if it is using, please update later, "
"or you can delete then update again".format(path))
sval.value = 1
sval.value = False
yield 1
else:
is_futures = "futures" == os.path.basename(path).split(".")[0]
Expand All @@ -375,7 +385,7 @@ def __call__(self, path, fields, **kwargs):
except OSError:
system_log.error("File {} update failed, if it is using, please update later, "
"or you can delete then update again".format(path))
sval.value = 1
sval.value = False
yield 1
break
except ValueError:
Expand Down Expand Up @@ -446,16 +456,16 @@ def update_bundle(path, create, enable_compression=False, concurrency=1):
gen_suspended_days, gen_yield_curve, gen_share_transformation, gen_future_info
)

status_code = multiprocessing.Value("i", 0)
succeed = multiprocessing.Value(c_bool, True)
with ProgressedProcessPoolExecutor(
max_workers=concurrency, initializer=process_init, initargs=(status_code, )
max_workers=concurrency, initializer=process_init, initargs=(succeed, )
) as executor:
# windows上子进程需要执行rqdatac.init, 其他os则需要执行rqdatac.reset; rqdatac.init包含了rqdatac.reset的功能
for func in gen_file_funcs:
executor.submit(GenerateFileTask(func), path)
for file, order_book_id, field in day_bar_args:
executor.submit(_DayBarTask(order_book_id), os.path.join(path, file), field, **kwargs)
return status_code.value
return succeed.value


class AutomaticUpdateBundle(object):
Expand Down
Loading