Files
JSETC/bot_new.py
2026-05-09 16:16:07 +09:00

370 lines
14 KiB
Python

#!/usr/bin/env python3
import argparse
from collections import deque
from enum import Enum
import time
import socket
import json
from itertools import chain
team_name = "HanyangFloorFunction"
class Dir(str, Enum):
BUY = "BUY"
SELL = "SELL"
class ExchangeConnection:
def __init__(self, args):
self.message_timestamps = deque(maxlen=500)
self.exchange_hostname = args.exchange_hostname
self.port = args.port
exchange_socket = self._connect(add_socket_timeout=args.add_socket_timeout)
self.reader = exchange_socket.makefile("r", 1)
self.writer = exchange_socket
self._write_message({"type": "hello", "team": team_name.upper()})
def read_message(self):
message = json.loads(self.reader.readline())
if "dir" in message:
message["dir"] = Dir(message["dir"])
return message
def send_add_message(self, order_id: int, symbol: str, dir: Dir, price: int, size: int):
self._write_message(
{
"type": "add",
"order_id": order_id,
"symbol": symbol,
"dir": dir,
"price": price,
"size": size,
"tif": "DAY",
}
)
def send_convert_message(self, order_id: int, symbol: str, dir: Dir, size: int):
self._write_message(
{
"type": "convert",
"order_id": order_id,
"symbol": symbol,
"dir": dir,
"size": size,
}
)
def send_cancel_message(self, order_id: int):
self._write_message({"type": "cancel", "order_id": order_id})
def _connect(self, add_socket_timeout):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if add_socket_timeout:
s.settimeout(5)
s.connect((self.exchange_hostname, self.port))
return s
def _write_message(self, message):
what_to_write = json.dumps(message)
if not what_to_write.endswith("\n"):
what_to_write = what_to_write + "\n"
length_to_send = len(what_to_write)
total_sent = 0
while total_sent < length_to_send:
sent_this_time = self.writer.send(
what_to_write[total_sent:].encode("utf-8")
)
if sent_this_time == 0:
raise Exception("Unable to send data to exchange")
total_sent += sent_this_time
now = time.time()
self.message_timestamps.append(now)
if len(self.message_timestamps) == self.message_timestamps.maxlen and self.message_timestamps[0] > (now - 1):
print("WARNING: You are sending messages too frequently.")
class Order:
def __init__(self, symbol, size, price, dir):
self.symbol = symbol
self.size = size
self.price = price
self.dir = dir
class StateManager:
def __init__(self, exchange):
self.exchange = exchange
self.order_id_counter = -1
self.positions_by_symbol = {}
self.unacked_orders = {}
self.open_orders = {}
self.pending_cancels = set()
self.average_vale_price = []
self.average_valbz_price = []
self.average_xlf_price = []
self.CONVERSION_FEE = 10
self.ETF_COMPONENTS = {"XLF": {"BOND": 3, "GS": 2, "MS": 3, "WFC": 2}}
self.ETF_SHARES = {"XLF": 10}
self.POSITION_LIMIT = {
"BOND": 100,
"VALBZ": 10,
"VALE": 10,
"GS": 100,
"MS": 100,
"WFC": 100,
"XLF": 100,
}
self.last_strategy_time = 0
self.strategy_interval = 0.1
def position_for_symbol(self, symbol):
return self.positions_by_symbol.get(symbol, 0)
def next_order_id(self):
self.order_id_counter += 1
return self.order_id_counter
def on_ack(self, message):
order_id = message["order_id"]
if order_id in self.unacked_orders:
self.open_orders[order_id] = self.unacked_orders.pop(order_id)
def on_fill(self, message):
order_id = message["order_id"]
symbol = message["symbol"]
dir = message["dir"]
raw_size = message["size"]
size_multiplier = 1 if dir == Dir.BUY.value else -1
size = raw_size * size_multiplier
if order_id in self.open_orders:
self.open_orders[order_id].size -= raw_size
self.positions_by_symbol[symbol] = self.positions_by_symbol.get(symbol, 0) + size
def on_out(self, message):
order_id = int(message["order_id"])
if order_id in self.open_orders:
del self.open_orders[order_id]
self.pending_cancels.discard(order_id)
def on_hello(self, message):
symbol_positions = message["symbols"]
for symbol_position in symbol_positions:
symbol = symbol_position["symbol"]
position = symbol_position["position"]
self.positions_by_symbol[symbol] = position
def on_reject(self, message):
order_id = message["order_id"]
if order_id in self.unacked_orders:
del self.unacked_orders[order_id]
def send_order(self, symbol, dir, price, size):
order_id = self.next_order_id()
order = Order(symbol, size, price, Dir(dir))
self.unacked_orders[order_id] = order
self.exchange.send_add_message(order_id, symbol, dir, price, size)
return order_id
def cancel_order(self, order_id):
self.pending_cancels.add(order_id)
self.exchange.send_cancel_message(order_id)
def open_and_pending_orders_in_symbol_and_direction_by_price_level(self, symbol, dir):
output = {}
for order_id, order in chain(self.open_orders.items(), self.unacked_orders.items()):
if order.symbol == symbol and order.dir == dir and order_id not in self.pending_cancels:
price_level = order.price
if price_level not in output:
output[price_level] = {}
output[price_level][order_id] = order
return output
def set_orders_in_symbol_for_direction(self, symbol, dir, size_by_price_level):
current_orders = self.open_and_pending_orders_in_symbol_and_direction_by_price_level(symbol, dir)
for price_level in set(size_by_price_level.keys()) | set(current_orders.keys()):
current_orders_by_order_id = current_orders.get(price_level, {})
current_size_at_price_level = sum(order.size for order in current_orders_by_order_id.values())
desired_size_for_price_level = size_by_price_level.get(price_level, 0)
if current_size_at_price_level == desired_size_for_price_level:
pass
elif current_size_at_price_level < desired_size_for_price_level:
self.send_order(symbol, dir, price_level, desired_size_for_price_level - current_size_at_price_level)
else:
for order_id in current_orders_by_order_id:
if order_id not in self.pending_cancels:
self.cancel_order(order_id)
if desired_size_for_price_level != 0:
self.send_order(symbol, dir, price_level, desired_size_for_price_level)
def get_average_price(self, average_list):
if not average_list:
return 0
return int(sum(average_list) / len(average_list))
def convert(self, symbol, dir, size):
order_id = self.next_order_id()
self.exchange.send_convert_message(order_id, symbol, dir, size)
def update_avg_prices(self, trade_message):
symbol = trade_message["symbol"]
price = trade_message["price"]
if symbol == "VALBZ":
if len(self.average_valbz_price) < 5:
self.average_valbz_price.append(price)
else:
self.average_valbz_price.pop(0)
self.average_valbz_price.append(price)
elif symbol == "VALE":
if len(self.average_vale_price) < 5:
self.average_vale_price.append(price)
else:
self.average_vale_price.pop(0)
self.average_vale_price.append(price)
elif symbol == "XLF":
if len(self.average_xlf_price) < 25:
self.average_xlf_price.append(price)
else:
self.average_xlf_price.pop(0)
self.average_xlf_price.append(price)
def can_add_position(self, symbol, size, is_buy):
current = self.position_for_symbol(symbol)
if is_buy:
return current + size <= self.POSITION_LIMIT.get(symbol, 100)
else:
return current - size >= -self.POSITION_LIMIT.get(symbol, 100)
def execute_strategies(self):
now = time.time()
if now - self.last_strategy_time < self.strategy_interval:
return
self.last_strategy_time = now
avg_valbz = self.get_average_price(self.average_valbz_price)
avg_vale = self.get_average_price(self.average_vale_price)
avg_xlf = self.get_average_price(self.average_xlf_price)
if len(self.average_valbz_price) >= 5:
sell_size = 1 if self.can_add_position("VALBZ", 1, False) else 0
buy_size = 1 if self.can_add_position("VALBZ", 1, True) else 0
self.set_orders_in_symbol_for_direction("VALBZ", "SELL", {avg_valbz + 1: sell_size})
self.set_orders_in_symbol_for_direction("VALBZ", "BUY", {avg_valbz - 1: buy_size})
if len(self.average_vale_price) >= 5:
sell_size = 1 if self.can_add_position("VALE", 1, False) else 0
buy_size = 1 if self.can_add_position("VALE", 1, True) else 0
self.set_orders_in_symbol_for_direction("VALE", "SELL", {avg_vale + 1: sell_size})
self.set_orders_in_symbol_for_direction("VALE", "BUY", {avg_vale - 1: buy_size})
if len(self.average_xlf_price) >= 25:
sell_size = 1 if self.can_add_position("XLF", 1, False) else 0
buy_size = 1 if self.can_add_position("XLF", 1, True) else 0
self.set_orders_in_symbol_for_direction("XLF", "SELL", {avg_xlf + 1: sell_size})
self.set_orders_in_symbol_for_direction("XLF", "BUY", {avg_xlf - 1: buy_size})
self.execute_arb()
def execute_arb(self):
vale_pos = self.position_for_symbol("VALE")
valbz_pos = self.position_for_symbol("VALBZ")
xlf_pos = self.position_for_symbol("XLF")
if len(self.average_valbz_price) >= 5 and len(self.average_vale_price) >= 5:
avg_valbz = self.get_average_price(self.average_valbz_price)
avg_vale = self.get_average_price(self.average_vale_price)
spread = avg_valbz - avg_vale
if spread > self.CONVERSION_FEE:
if self.can_add_position("VALE", 1, True) and self.can_add_position("VALBZ", 1, False):
self.send_order("VALE", "BUY", avg_vale, 1)
self.convert("VALE", Dir.SELL, 1)
self.send_order("VALBZ", "SELL", avg_valbz, 1)
elif spread < -self.CONVERSION_FEE:
if self.can_add_position("VALE", 1, False) and self.can_add_position("VALBZ", 1, True):
self.send_order("VALE", "SELL", avg_vale, 1)
self.convert("VALE", Dir.BUY, 1)
self.send_order("VALBZ", "BUY", avg_valbz, 1)
def on_startup(self):
bond_pos = self.position_for_symbol("BOND")
if bond_pos > 0:
self.send_order("BOND", "SELL", 1001, min(bond_pos, self.POSITION_LIMIT["BOND"]))
elif bond_pos < 0:
self.send_order("BOND", "BUY", 999, min(-bond_pos, self.POSITION_LIMIT["BOND"]))
def parse_arguments():
test_exchange_port_offsets = {"prod-like": 0, "slower": 1, "empty": 2}
parser = argparse.ArgumentParser(description="Trade on an ETC exchange!")
exchange_address_group = parser.add_mutually_exclusive_group(required=True)
exchange_address_group.add_argument("--production", action="store_true", help="Connect to the production exchange.")
exchange_address_group.add_argument("--test", type=str, choices=test_exchange_port_offsets.keys(), help="Connect to a test exchange.")
exchange_address_group.add_argument("--specific-address", type=str, metavar="HOST:PORT", help=argparse.SUPPRESS)
args = parser.parse_args()
args.add_socket_timeout = True
if args.production:
args.exchange_hostname = "production"
args.port = 25000
elif args.test:
args.exchange_hostname = "test-exch-" + team_name
args.port = 22000 + test_exchange_port_offsets[args.test]
if args.test == "empty":
args.add_socket_timeout = False
elif args.specific_address:
args.exchange_hostname, port = args.specific_address.split(":")
args.port = int(port)
return args
def main():
args = parse_arguments()
exchange = ExchangeConnection(args=args)
state_manager = StateManager(exchange)
hello_message = exchange.read_message()
print("First message from exchange:", hello_message)
state_manager.on_hello(hello_message)
state_manager.on_startup()
while True:
message = exchange.read_message()
if message["type"] == "close":
print("The round has ended")
break
elif message["type"] == "error":
print(message)
elif message["type"] == "reject":
print(message)
state_manager.on_reject(message)
elif message["type"] == "fill":
print(message)
state_manager.on_fill(message)
elif message["type"] == "trade":
state_manager.update_avg_prices(message)
state_manager.execute_strategies()
elif message["type"] == "ack":
state_manager.on_ack(message)
elif message["type"] == "out":
state_manager.on_out(message)
if __name__ == "__main__":
assert team_name != "REPLACEME", "Please put your team name in the variable [team_name]."
main()