pytdx多线程示例

发布时间 2023-04-11 15:21:50作者: pu369com
# encoding=utf-8
import math
from pytdx.hq import TdxHq_API
import pathlib
import multiprocessing as mp
from multiprocessing import Pool


class myTdx:
    def __init__(self):
        self.HqHOSTS = pathlib.Path("HqHOSTS.txt").read_text().split()
        self.SYMBOLS = pathlib.Path("A.txt").read_text().split()
        self.SYMBOLS2 = self.getsymbols2()
        self.HSDICT = dict(zip(self.HqHOSTS, self.SYMBOLS2))
        self.manager = mp.Manager
        self.RESULT = self.manager().list()

    def getsymbols2(self):
        symbols = self.SYMBOLS
        total = len(symbols)
        page = len(self.HqHOSTS)
        perpage = math.ceil(total / page)
        return [symbols[i:i + perpage] for i in range(0, len(symbols), perpage)]

    def getdata(self, host, slist):
        # 数据获取接口一般返回list结构,
        api = TdxHq_API()
        liveapi = api.connect(host, 7709)
        if liveapi:
            r_list = []
            for symbol in slist:
                market_code = 1 if str(symbol[0]) == '6' else 0
                data = liveapi.get_transaction_data(market_code, symbol, 0, 30)
                rdic = {symbol: data}
                r_list.append(rdic)
            self.RESULT.extend(r_list)
        else:
            print(host, "服務器不可用...")
        liveapi.disconnect()

    def save(self):
        # print(self.RESULT)
        log = open('r.txt', 'a')
        print(self.RESULT, file=log)


if __name__ == '__main__':
    a = myTdx()
    pool = Pool(len(a.HqHOSTS))
    for host in a.HqHOSTS:
        alist = a.HSDICT.get(host)
        pool.apply_async(func=a.getdata, args=(host, alist))
    pool.close()
    pool.join()
    a.save()

  

# encoding=utf-8
from pytdx.hq import TdxHq_API
import pathlib
import re

if __name__ == '__main__':
    ips_text = pathlib.Path("HQHOSTA.txt").read_text(encoding='utf-8')  # 读取文件
    ips_list = re.compile(r'\d+\.\d+\.\d+\.\d+').findall(ips_text)  # 正则查找所有ip
    ips_listOK = []
    api = TdxHq_API()
    for ip in ips_list:
        if api.connect(ip, 7709):
            ips_listOK.append(ip)
    if len(ips_listOK) > 0:
        ips_listOld = pathlib.Path("HqHOSTS.txt").read_text(encoding='utf-8').split()
        ips_listOld.extend(ips_listOK)
        ips_new = list(set(ips_listOld))  # 去重
    pathlib.Path("HqHOSTS.txt").write_text("\n".join(ips_new))
    pathlib.Path('HQHOSTA.txt').unlink()
    print("完成")