Chen_py


import pandas as pd
import numpy as np
from datetime import datetime, timedelta

class FuturesVolumeAnalyzer:
    """
    期指成交量分析器
    分析买多和卖空的连续单量(L)和非连续单量(S)
    """
    
    def __init__(self):
        self.data = []
        
    def add_tick(self, timestamp, side, volume, price):
        """
        添加一个tick数据
        
        参数:
        - timestamp: 时间戳
        - side: 'buy' 或 'sell' (买多或卖空)
        - volume: 成交量
        - price: 成交价格
        """
        self.data.append({
            'timestamp': timestamp,
            'side': side,
            'volume': volume,
            'price': price
        })
    
    def analyze_volume(self, start_time=None, end_time=None):
        """
        分析指定时间段内的成交量结构
        
        返回:
        - 多L: 买多的连续单量(>=2笔)
        - 多S: 买多的非连续单量
        - 空L: 卖空的连续单量(>=2笔)
        - 空S: 卖空的非连续单量
        """
        if not self.data:
            return {"多L": 0, "多S": 0, "空L": 0, "空S": 0}
        
        # 转换为DataFrame便于处理
        df = pd.DataFrame(self.data)
        df['timestamp'] = pd.to_datetime(df['timestamp'])
        
        # 筛选时间范围
        if start_time:
            df = df[df['timestamp'] >= start_time]
        if end_time:
            df = df[df['timestamp'] <= end_time]
        
        if df.empty:
            return {"多L": 0, "多S": 0, "空L": 0, "空S": 0}
        
        # 按时间排序
        df = df.sort_values('timestamp').reset_index(drop=True)
        
        # 标记连续相同方向的单子
        df['group'] = (df['side'] != df['side'].shift()).cumsum()
        
        # 计算每个连续组的总量
        group_stats = df.groupby(['group', 'side'])['volume'].agg(['sum', 'count']).reset_index()
        
        # 初始化结果
        result = {"多L": 0, "多S": 0, "空L": 0, "空S": 0}
        
        for _, row in group_stats.iterrows():
            if row['side'] == 'buy':
                if row['count'] >= 2:  # 连续2笔或以上
                    result["多L"] += row['sum']
                else:
                    result["多S"] += row['sum']
            else:  # sell
                if row['count'] >= 2:  # 连续2笔或以上
                    result["空L"] += row['sum']
                else:
                    result["空S"] += row['sum']
        
        return result
    
    def get_volume_summary(self, period='1H'):
        """
        按指定周期统计成交量结构
        
        参数:
        - period: 时间周期,如 '1H'(1小时), '30min'(30分钟), '1D'(1天)
        
        返回: DataFrame,包含每个周期的成交量结构
        """
        if not self.data:
            return pd.DataFrame()
        
        df = pd.DataFrame(self.data)
        df['timestamp'] = pd.to_datetime(df['timestamp'])
        df = df.sort_values('timestamp').reset_index(drop=True)
        
        # 设置时间索引
        df.set_index('timestamp', inplace=True)
        
        # 标记连续相同方向的单子
        df['group'] = (df['side'] != df['side'].shift()).cumsum()
        
        # 为每个tick计算其所属组的大小
        group_sizes = df.groupby('group').size()
        df['group_size'] = df['group'].map(group_sizes)
        
        # 分类每个tick
        df['category'] = df.apply(lambda x: 
            f"{'多' if x['side'] == 'buy' else '空'}{'L' if x['group_size'] >= 2 else 'S'}", 
            axis=1)
        
        # 按周期分组统计
        result = df.groupby([pd.Grouper(freq=period), 'category'])['volume'].sum().unstack(fill_value=0)
        
        # 确保所有列都存在
        for col in ['多L', '多S', '空L', '空S']:
            if col not in result.columns:
                result[col] = 0
        
        # 添加汇总列
        result['总买多'] = result['多L'] + result['多S']
        result['总卖空'] = result['空L'] + result['空S']
        result['总成交量'] = result['总买多'] + result['总卖空']
        result['多L占比'] = (result['多L'] / result['总买多'] * 100).round(2)
        result['空L占比'] = (result['空L'] / result['总卖空'] * 100).round(2)
        
        return result


# 使用示例
def demo():
    # 创建分析器
    analyzer = FuturesVolumeAnalyzer()
    
    # 模拟添加tick数据
    base_time = datetime.now()
    
    # 模拟一些交易数据
    # 连续买多(3笔)
    analyzer.add_tick(base_time, 'buy', 100, 5000)
    analyzer.add_tick(base_time + timedelta(seconds=1), 'buy', 150, 5001)
    analyzer.add_tick(base_time + timedelta(seconds=2), 'buy', 200, 5002)
    
    # 单独卖空(1笔)
    analyzer.add_tick(base_time + timedelta(seconds=3), 'sell', 80, 4999)
    
    # 连续卖空(2笔)
    analyzer.add_tick(base_time + timedelta(seconds=4), 'sell', 120, 4998)
    analyzer.add_tick(base_time + timedelta(seconds=5), 'sell', 180, 4997)
    
    # 单独买多(1笔)
    analyzer.add_tick(base_time + timedelta(seconds=6), 'buy', 90, 5000)
    
    # 分析结果
    print("=== 整体成交量分析 ===")
    result = analyzer.analyze_volume()
    for key, value in result.items():
        print(f"{key}: {value}")
    
    print(f"\n总买多: {result['多L'] + result['多S']}")
    print(f"总卖空: {result['空L'] + result['空S']}")
    print(f"总成交量: {sum(result.values())}")
    
    # 按时间周期统计
    print("\n=== 按分钟统计 ===")
    summary = analyzer.get_volume_summary('1min')  # 1分钟
    print(summary)


# 实际使用时的数据导入示例
def load_from_csv(filename):
    """
    从CSV文件加载tick数据
    CSV格式应包含: timestamp, side, volume, price
    """
    analyzer = FuturesVolumeAnalyzer()
    
    df = pd.read_csv(filename)
    for _, row in df.iterrows():
        analyzer.add_tick(
            timestamp=row['timestamp'],
            side=row['side'],
            volume=row['volume'],
            price=row['price']
        )
    
    return analyzer


if __name__ == "__main__":
    demo()

python chen.py 
=== 整体成交量分析 ===
多L: 450
多S: 90
空L: 380
空S: 0

总买多: 540
总卖空: 380
总成交量: 920

=== 按分钟统计 ===
category              多L  多S   空L  空S  总买多  总卖空  总成交量   多L占比   空L占比
timestamp                                                          
2025-07-20 12:54:00  450  90  380   0  540  380   920  83.33  100.0