Skip to content

Commit

Permalink
Merge pull request #900 from ricequant/develop
Browse files Browse the repository at this point in the history
Develop
  • Loading branch information
Cuizi7 authored Sep 12, 2024
2 parents d703cff + b09703b commit e608e57
Show file tree
Hide file tree
Showing 10 changed files with 128 additions and 54 deletions.
5 changes: 4 additions & 1 deletion rqalpha/cmds/bundle.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import time
import datetime
import dateutil
import sys

import click
import requests
Expand Down Expand Up @@ -89,7 +90,9 @@ def update_bundle(data_bundle_path, rqdatac_uri, compression, concurrency):
return 1

from rqalpha.data.bundle import update_bundle as update_bundle_
update_bundle_(os.path.join(data_bundle_path, 'bundle'), False, compression, concurrency)
succeed = update_bundle_(os.path.join(data_bundle_path, 'bundle'), False, compression, concurrency)
if not succeed:
sys.exit(1)


@cli.command(help=_("Download bundle (monthly updated)"))
Expand Down
80 changes: 54 additions & 26 deletions rqalpha/data/bundle.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
from itertools import chain
from typing import Callable, Optional, Union, List
from filelock import FileLock, Timeout
import multiprocessing
from multiprocessing.sharedctypes import Synchronized
from ctypes import c_bool

import h5py
import numpy as np
Expand All @@ -28,9 +31,11 @@
from rqalpha.utils.datetime_func import convert_date_to_date_int, convert_date_to_int
from rqalpha.utils.i18n import gettext as _
from rqalpha.utils.functools import lru_cache
from rqalpha.utils.logger import init_logger, system_log
from rqalpha.environment import Environment
from rqalpha.model.instrument import Instrument


START_DATE = 20050104
END_DATE = 29991231

Expand Down Expand Up @@ -310,24 +315,32 @@ def __call__(self, path, fields, **kwargs):

class GenerateDayBarTask(DayBarTask):
def __call__(self, path, fields, **kwargs):
with h5py.File(path, 'w') as h5:
i, step = 0, 300
while True:
order_book_ids = self._order_book_ids[i:i + step]
df = rqdatac.get_price(order_book_ids, START_DATE, datetime.date.today(), '1d',
adjust_type='none', fields=fields, expect_df=True)
if not (df is None or df.empty):
df.reset_index(inplace=True)
df['datetime'] = [convert_date_to_int(d) for d in df['date']]
del df['date']
df.set_index(['order_book_id', 'datetime'], inplace=True)
df.sort_index(inplace=True)
for order_book_id in df.index.levels[0]:
h5.create_dataset(order_book_id, data=df.loc[order_book_id].to_records(), **kwargs)
i += step
yield len(order_book_ids)
if i >= len(self._order_book_ids):
break
try:
h5 = h5py.File(path, "w")
except OSError:
system_log.error("File {} update failed, if it is using, please update later, "
"or you can delete then update again".format(path))
sval.value = False
yield 1
else:
with h5:
i, step = 0, 300
while True:
order_book_ids = self._order_book_ids[i:i + step]
df = rqdatac.get_price(order_book_ids, START_DATE, datetime.date.today(), '1d',
adjust_type='none', fields=fields, expect_df=True)
if not (df is None or df.empty):
df.reset_index(inplace=True)
df['datetime'] = [convert_date_to_int(d) for d in df['date']]
del df['date']
df.set_index(['order_book_id', 'datetime'], inplace=True)
df.sort_index(inplace=True)
for order_book_id in df.index.levels[0]:
h5.create_dataset(order_book_id, data=df.loc[order_book_id].to_records(), **kwargs)
i += step
yield len(order_book_ids)
if i >= len(self._order_book_ids):
break


class UpdateDayBarTask(DayBarTask):
Expand All @@ -353,12 +366,15 @@ def __call__(self, path, fields, **kwargs):
if need_recreate_h5:
yield from GenerateDayBarTask(self._order_book_ids)(path, fields, **kwargs)
else:
h5 = None
try:
h5 = h5py.File(path, 'a')
except OSError:
raise OSError("File {} update failed, if it is using, please update later, "
"or you can delete then update again".format(path))
try:
system_log.error("File {} update failed, if it is using, please update later, "
"or you can delete then update again".format(path))
sval.value = False
yield 1
else:
is_futures = "futures" == os.path.basename(path).split(".")[0]
for order_book_id in self._order_book_ids:
# 特殊处理前复权合约,需要全量更新
Expand All @@ -367,8 +383,11 @@ def __call__(self, path, fields, **kwargs):
try:
last_date = int(h5[order_book_id]['datetime'][-1] // 1000000)
except OSError:
raise OSError("File {} update failed, if it is using, please update later, "
"or you can delete then update again".format(path))
system_log.error("File {} update failed, if it is using, please update later, "
"or you can delete then update again".format(path))
sval.value = False
yield 1
break
except ValueError:
h5.pop(order_book_id)
start_date = START_DATE
Expand Down Expand Up @@ -396,14 +415,20 @@ def __call__(self, path, fields, **kwargs):
h5.create_dataset(order_book_id, data=df.to_records(), **kwargs)
yield 1
finally:
h5.close()
if h5:
h5.close()


def init_rqdatac_with_warnings_catch():
def process_init(args: Optional[Synchronized] = None):
import warnings
with warnings.catch_warnings(record=True):
# catch warning: rqdatac is already inited. Settings will be changed
rqdatac.init()
init_logger()
# Initialize process shared variables
if args:
global sval
sval = args


def update_bundle(path, create, enable_compression=False, concurrency=1):
Expand All @@ -412,6 +437,7 @@ def update_bundle(path, create, enable_compression=False, concurrency=1):
else:
_DayBarTask = UpdateDayBarTask

init_logger()
kwargs = {}
if enable_compression:
kwargs['compression'] = 9
Expand All @@ -430,14 +456,16 @@ def update_bundle(path, create, enable_compression=False, concurrency=1):
gen_suspended_days, gen_yield_curve, gen_share_transformation, gen_future_info
)

succeed = multiprocessing.Value(c_bool, True)
with ProgressedProcessPoolExecutor(
max_workers=concurrency, initializer=init_rqdatac_with_warnings_catch
max_workers=concurrency, initializer=process_init, initargs=(succeed, )
) as executor:
# windows上子进程需要执行rqdatac.init, 其他os则需要执行rqdatac.reset; rqdatac.init包含了rqdatac.reset的功能
for func in gen_file_funcs:
executor.submit(GenerateFileTask(func), path)
for file, order_book_id, field in day_bar_args:
executor.submit(_DayBarTask(order_book_id), os.path.join(path, file), field, **kwargs)
return succeed.value


class AutomaticUpdateBundle(object):
Expand Down
6 changes: 6 additions & 0 deletions rqalpha/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@
from rqalpha.utils.logger import system_log, user_log, user_system_log
from rqalpha.core.global_var import GlobalVars
from rqalpha.utils.i18n import gettext as _
from rqalpha.const import SIDE
if TYPE_CHECKING:
from rqalpha.model.order import Order



class Environment(object):
Expand Down Expand Up @@ -183,6 +185,10 @@ def _get_transaction_cost_decider(self, order_book_id):

def get_trade_tax(self, trade):
return self._get_transaction_cost_decider(trade.order_book_id).get_trade_tax(trade)

def get_transaction_cost_with_value(self, value: float) -> float:
side = SIDE.BUY if value >= 0 else SIDE.SELL
return self._transaction_cost_decider_dict[INSTRUMENT_TYPE.CS].get_transaction_cost_with_value(abs(value), side)

def get_trade_commission(self, trade):
return self._get_transaction_cost_decider(trade.order_book_id).get_trade_commission(trade)
Expand Down
17 changes: 10 additions & 7 deletions rqalpha/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from rqalpha.model.order import Order
from rqalpha.model.trade import Trade
from rqalpha.model.instrument import Instrument
from rqalpha.const import POSITION_DIRECTION, TRADING_CALENDAR_TYPE, INSTRUMENT_TYPE
from rqalpha.const import POSITION_DIRECTION, TRADING_CALENDAR_TYPE, INSTRUMENT_TYPE, SIDE


class AbstractPosition(with_metaclass(abc.ABCMeta)):
Expand Down Expand Up @@ -675,25 +675,28 @@ class AbstractTransactionCostDecider((with_metaclass(abc.ABCMeta))):
订单税费计算接口,通过实现次接口可以定义不同市场、不同合约的个性化税费计算逻辑。
"""
@abc.abstractmethod
def get_trade_tax(self, trade):
# type: (Trade) -> float
def get_trade_tax(self, trade: Trade) -> float:
"""
计算指定交易应付的印花税
"""
raise NotImplementedError

@abc.abstractmethod
def get_trade_commission(self, trade):
# type: (Trade) -> float
def get_trade_commission(self, trade: Trade) -> float:
"""
计算指定交易应付的佣金
"""
raise NotImplementedError

@abc.abstractmethod
def get_order_transaction_cost(self, order):
# type: (Order) -> float
def get_order_transaction_cost(self, order: Order) -> float:
"""
计算指定订单应付的交易成本(税 + 费)
"""
raise NotImplementedError

def get_transaction_cost_with_value(self, value: float, side: SIDE) -> float:
"""
计算指定价格交易应付的交易成本(税 + 费)
"""
raise NotImplementedError
53 changes: 39 additions & 14 deletions rqalpha/mod/rqalpha_mod_sys_accounts/api/api_stock.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
import datetime
from decimal import Decimal, getcontext
from itertools import chain
from typing import Dict, List, Optional, Union, Tuple
from typing import Dict, List, Optional, Union, Tuple, Callable
import math
from collections import defaultdict

import numpy as np
import pandas as pd
Expand Down Expand Up @@ -70,14 +72,14 @@ def _get_account_position_ins(id_or_ins):
return account, position, ins


def _round_order_quantity(ins, quantity) -> int:
def _round_order_quantity(ins, quantity, method: Callable = int) -> int:
if ins.type == "CS" and ins.board_type == "KSH":
# KSH can buy(sell) 201, 202 shares
return 0 if abs(quantity) < KSH_MIN_AMOUNT else int(quantity)
else:
round_lot = ins.round_lot
try:
return int(Decimal(quantity) / Decimal(round_lot)) * round_lot
return method(Decimal(quantity) / Decimal(round_lot)) * round_lot
except ValueError:
raise

Expand Down Expand Up @@ -372,8 +374,18 @@ def order_target_portfolio(
order_book_id, quantity, SIDE.SELL, MarketOrder(), POSITION_EFFECT.CLOSE
))

account_value = account.total_value
account_value = account.total_value
if total_percent == 1:
# 在此处形成的订单不包含交易费用,需要预留一点余额以供交易费用使用
estimate_transaction_cost = 0
for order_book_id, (target_percent, open_style, close_style, last_price) in target.items():
current_value = current_quantities.get(order_book_id, 0) * last_price
change_value = target_percent * account_value - current_value
estimate_transaction_cost += env.get_transaction_cost_with_value(change_value)
account_value = account_value - estimate_transaction_cost

close_orders, open_orders = [], []
waiting_to_buy = defaultdict()
for order_book_id, (target_percent, open_style, close_style, last_price) in target.items():
open_price = _get_order_style_price(order_book_id, open_style)
close_price = _get_order_style_price(order_book_id, close_style)
Expand All @@ -383,23 +395,36 @@ def order_target_portfolio(
)
env.order_creation_failed(order_book_id=order_book_id, reason=reason)
continue

delta_quantity = (account_value * target_percent / close_price) - current_quantities.get(order_book_id, 0)
delta_quantity = _round_order_quantity(env.data_proxy.instrument(order_book_id), delta_quantity)
delta_quantity = _round_order_quantity(env.data_proxy.instrument(order_book_id), delta_quantity, method=round)

# 优先生成卖单,以便计算出剩余现金,进行买单数量的计算
if delta_quantity == 0:
continue
elif delta_quantity > 0:
quantity, side, position_effect = delta_quantity, SIDE.BUY, POSITION_EFFECT.OPEN
order_list = open_orders
target_style = open_style
waiting_to_buy[order_book_id] = (delta_quantity, POSITION_EFFECT.OPEN, open_style, last_price)
continue
else:
quantity, side, position_effect = abs(delta_quantity), SIDE.SELL, POSITION_EFFECT.CLOSE
order_list = close_orders
target_style = close_style
order = Order.__from_create__(order_book_id, quantity, side, target_style, position_effect)
if isinstance(target_style, MarketOrder):
order = Order.__from_create__(order_book_id, quantity, side, close_style, position_effect)
if isinstance(close_style, MarketOrder):
order.set_frozen_price(last_price)
close_orders.append(order)

estimate_cash = account.cash + sum([o.quantity * o.frozen_price - env.get_order_transaction_cost(o) for o in close_orders])
for order_book_id, (delta_quantity, position_effect, open_style, last_price) in waiting_to_buy.items():
cost = delta_quantity * last_price + env.get_transaction_cost_with_value(delta_quantity * last_price)
if cost > estimate_cash:
delta_quantity = estimate_cash / last_price
delta_quantity = _round_order_quantity(env.data_proxy.instrument(order_book_id), delta_quantity)
if delta_quantity == 0:
continue
cost = delta_quantity * last_price + env.get_transaction_cost_with_value(delta_quantity * last_price)
order = Order.__from_create__(order_book_id, delta_quantity, SIDE.BUY, open_style, position_effect)
if isinstance(open_style, MarketOrder):
order.set_frozen_price(last_price)
order_list.append(order)
open_orders.append(order)
estimate_cash -= cost

return list(env.submit_order(o) for o in chain(close_orders, open_orders))

Expand Down
4 changes: 2 additions & 2 deletions rqalpha/mod/rqalpha_mod_sys_simulation/matcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ def match(self, account, order, open_auction):
trade._commission = self._env.get_trade_commission(trade)
trade._tax = self._env.get_trade_tax(trade)

if order.side == SIDE.BUY and self._slippage_decider.decider.rate != 0:
if order.position_effect == POSITION_EFFECT.OPEN and self._slippage_decider.decider.rate != 0:
# 标的价格经过滑点处理后,账户资金可能不够买入,需要进行验证
cost_money = instrument.calc_cash_occupation(price, order.quantity, order.position_direction, order.trading_datetime.date())
cost_money += trade.transaction_cost
Expand Down Expand Up @@ -467,7 +467,7 @@ def match(self, account, order, open_auction): # type: (Account, Order, bool) -
trade._commission = self._env.get_trade_commission(trade)
trade._tax = self._env.get_trade_tax(trade)

if order.side == SIDE.BUY and self._slippage_decider.decider.rate != 0:
if order.position_effect == POSITION_EFFECT.OPEN and self._slippage_decider.decider.rate != 0:
cost_money = instrument.calc_cash_occupation(price, order.quantity, order.position_direction, order.trading_datetime.date())
cost_money += trade.transaction_cost
if cost_money > account.cash + order.init_frozen_cash:
Expand Down
8 changes: 8 additions & 0 deletions rqalpha/mod/rqalpha_mod_sys_transaction_cost/deciders.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ def get_order_transaction_cost(self, order):
commission = self._get_order_commission(order.order_book_id, order.side, order.frozen_price, order.quantity)
tax = self._get_tax(order.order_book_id, order.side, order.frozen_price * order.quantity)
return tax + commission

def get_transaction_cost_with_value(self, value: float, side: SIDE) -> float:
raise NotImplementedError


class CNStockTransactionCostDecider(StockTransactionCostDecider):
Expand All @@ -99,6 +102,11 @@ def set_tax_rate(self, event):
else:
self.tax_rate = 0.0005

def get_transaction_cost_with_value(self, value: float, side: SIDE) -> float:
tax = value * self.tax_rate * self.tax_multiplier if side == SIDE.SELL else 0
commission = max(value * self.commission_rate * self.commission_multiplier, self.min_commission)
return tax + commission


class CNFutureTransactionCostDecider(AbstractTransactionCostDecider):
def __init__(self, commission_multiplier):
Expand Down
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

[metadata]
name = rqalpha
version = 5.4.1
version = 5.4.2

[versioneer]
VCS = git
Expand Down
Loading

0 comments on commit e608e57

Please sign in to comment.