# Directory Structure
```
├── .gitignore
├── jest.config.js
├── package.json
├── README.md
├── src
│ ├── config
│ │ └── env.ts
│ ├── index.ts
│ ├── services
│ │ ├── api.ts
│ │ ├── mongodb.ts
│ │ └── tools.ts
│ ├── tools
│ │ └── analyze-volume-walls.ts
│ └── types
│ └── tools.ts
├── tsconfig.json
├── vld-logo.png
└── volume_wall_detector.py
```
# Files
--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------
```
# Dependencies
node_modules/
npm-debug.log*
yarn-debug.log*
yarn-error.log*
package-lock.json
yarn.lock
# Build outputs
dist/
build/
*.tsbuildinfo
# Environment variables
.env
.env.local
.env.*.local
# IDE and editor files
.idea/
.vscode/
*.swp
*.swo
.DS_Store
# Test coverage
coverage/
.nyc_output/
# Logs
logs/
*.log
# Temporary files
tmp/
temp/
# Azure specific
*.pem
*.pfx
*.cer
*.crt
*.key
# TypeScript
*.tsbuildinfo
# Optional npm cache directory
.npm
# Optional eslint cache
.eslintcache
# Optional REPL history
.node_repl_history
# Output of 'npm pack'
*.tgz
# Yarn Integrity file
.yarn-integrity
```
--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------
```markdown
# Volume Wall Detector MCP Server 📊
> 🔌 **Compatible with Cline, Cursor, Claude Desktop, and any other MCP Clients!**
>
> Volume Wall Detector MCP works seamlessly with any MCP client
<p align="center">
<img src="vld-logo.png" width="300" alt="VLD Logo">
</p>
The Model Context Protocol (MCP) is an open standard that enables AI systems to interact seamlessly with various data sources and tools, facilitating secure, two-way connections.
The Volume Wall Detector MCP server provides:
* Real-time stock trading volume analysis
* Detection of significant price levels (volume walls)
* Trading imbalance tracking and analysis
* After-hours trading analysis
* MongoDB-based data persistence
## Prerequisites 🔧
Before you begin, ensure you have:
* MongoDB instance running
* Stock market API access
* Node.js (v20 or higher)
* Git installed (only needed if using Git installation method)
## Volume Wall Detector MCP Server Installation ⚡
### Running with NPX
```bash
npx -y volume-wall-detector-mcp@latest
```
### Installing via Smithery
To install Volume Wall Detector MCP Server for Claude Desktop automatically via Smithery:
```bash
npx -y @smithery/cli install volume-wall-detector-mcp --client claude
```
## Configuring MCP Clients ⚙️
### Configuring Cline 🤖
1. Open the Cline MCP settings file:
```bash
# For macOS:
code ~/Library/Application\ Support/Code/User/globalStorage/saoudrizwan.claude-dev/settings/cline_mcp_settings.json
# For Windows:
code %APPDATA%\Code\User\globalStorage\saoudrizwan.claude-dev\settings\cline_mcp_settings.json
```
2. Add the Volume Wall Detector server configuration:
```json
{
"mcpServers": {
"volume-wall-detector-mcp": {
"command": "npx",
"args": ["-y", "volume-wall-detector-mcp@latest"],
"env": {
"TIMEZONE": "GMT+7",
"API_BASE_URL": "your-api-url-here",
"MONGO_HOST": "localhost",
"MONGO_PORT": "27017",
"MONGO_DATABASE": "volume_wall_detector",
"MONGO_USER": "admin",
"MONGO_PASSWORD": "password",
"MONGO_AUTH_SOURCE": "admin",
"MONGO_AUTH_MECHANISM": "SCRAM-SHA-1",
"PAGE_SIZE": "50",
"TRADES_TO_FETCH": "10000",
"DAYS_TO_FETCH": "1",
"TRANSPORT_TYPE": "stdio",
"PORT": "8080"
},
"disabled": false,
"autoApprove": []
}
}
}
```
### Configuring Cursor 🖥️
> **Note**: Requires Cursor version 0.45.6 or higher
1. Open Cursor Settings
2. Navigate to Open MCP
3. Click on "Add New Global MCP Server"
4. Fill out the following information:
* **Name**: "volume-wall-detector-mcp"
* **Type**: "command"
* **Command**:
```bash
env TIMEZONE=GMT+7 API_BASE_URL=your-api-url-here MONGO_HOST=localhost MONGO_PORT=27017 MONGO_DATABASE=volume_wall_detector MONGO_USER=admin MONGO_PASSWORD=password MONGO_AUTH_SOURCE=admin MONGO_AUTH_MECHANISM=SCRAM-SHA-1 PAGE_SIZE=50 TRADES_TO_FETCH=10000 DAYS_TO_FETCH=1 npx -y volume-wall-detector-mcp@latest
```
### Configuring Claude Desktop 🖥️
Create or edit the Claude Desktop configuration file:
#### For macOS:
```bash
code "$HOME/Library/Application Support/Claude/claude_desktop_config.json"
```
#### For Windows:
```bash
code %APPDATA%\Claude\claude_desktop_config.json
```
Add the configuration:
```json
{
"mcpServers": {
"volume-wall-detector-mcp": {
"command": "npx",
"args": ["-y", "volume-wall-detector-mcp@latest"],
"env": {
"TIMEZONE": "GMT+7",
"API_BASE_URL": "your-api-url-here",
"MONGO_HOST": "localhost",
"MONGO_PORT": "27017",
"MONGO_DATABASE": "volume_wall_detector",
"MONGO_USER": "admin",
"MONGO_PASSWORD": "password",
"MONGO_AUTH_SOURCE": "admin",
"MONGO_AUTH_MECHANISM": "SCRAM-SHA-1",
"PAGE_SIZE": "50",
"TRADES_TO_FETCH": "10000",
"DAYS_TO_FETCH": "1",
"TRANSPORT_TYPE": "stdio",
"PORT": "8080"
}
}
}
}
```
## License
MIT
```
--------------------------------------------------------------------------------
/jest.config.js:
--------------------------------------------------------------------------------
```javascript
module.exports = {
preset: "ts-jest",
testEnvironment: "node",
moduleFileExtensions: ["ts", "js"],
transform: {
"^.+\\.ts$": "ts-jest",
},
testMatch: ["**/__tests__/**/*.test.ts"],
moduleNameMapper: {
"^@/(.*)$": "<rootDir>/src/$1",
},
};
```
--------------------------------------------------------------------------------
/tsconfig.json:
--------------------------------------------------------------------------------
```json
{
"compilerOptions": {
"target": "ES2020",
"module": "CommonJS",
"lib": ["ES2020"],
"declaration": true,
"outDir": "./dist",
"rootDir": "./src",
"strict": true,
"esModuleInterop": true,
"skipLibCheck": true,
"forceConsistentCasingInFileNames": true,
"moduleResolution": "node",
"resolveJsonModule": true,
"isolatedModules": true,
"noEmit": false
},
"include": ["src/**/*"],
"exclude": ["node_modules", "dist", "**/*.test.ts"]
}
```
--------------------------------------------------------------------------------
/src/index.ts:
--------------------------------------------------------------------------------
```typescript
#!/usr/bin/env node
"use strict";
import { FastMCP } from "fastmcp";
import { tools } from "./services/tools";
import { Tool } from "./types/tools";
import { getConfig } from "./config/env";
const config = getConfig();
const server = new FastMCP({
name: "Volume Wall Detector MCP",
version: "1.0.0"
});
// Register all tools
tools.forEach((tool) => {
(server.addTool as Tool)(tool);
});
// Start server with appropriate transport
if (config.TRANSPORT_TYPE === "sse") {
server.start({
transportType: "sse",
sse: {
endpoint: "/sse",
port: parseInt(config.PORT, 10)
}
});
} else {
server.start({
transportType: "stdio"
});
}
```
--------------------------------------------------------------------------------
/src/config/env.ts:
--------------------------------------------------------------------------------
```typescript
"use strict";
import { z } from "zod";
const envSchema = z.object({
TIMEZONE: z.string().default("GMT+7"),
API_BASE_URL: z.string().url(),
MONGO_HOST: z.string(),
MONGO_PORT: z.string(),
MONGO_DATABASE: z.string(),
MONGO_USER: z.string().optional(),
MONGO_PASSWORD: z.string().optional(),
MONGO_AUTH_SOURCE: z.string().optional(),
MONGO_AUTH_MECHANISM: z.string().optional(),
PAGE_SIZE: z.string().transform(Number).default("50"),
TRADES_TO_FETCH: z.string().transform(Number).default("10000"),
DAYS_TO_FETCH: z.string().transform(Number).default("1"),
TRANSPORT_TYPE: z.enum(["stdio", "sse"]).default("stdio"),
PORT: z.string().default("8080")
});
export const getConfig = () => {
const result = envSchema.safeParse(process.env);
if (!result.success) {
throw new Error(`Configuration error: ${result.error.message}`);
}
return result.data;
};
```
--------------------------------------------------------------------------------
/package.json:
--------------------------------------------------------------------------------
```json
{
"name": "volume-wall-detector-mcp",
"version": "1.0.1",
"description": "Volume Wall Detector MCP Server",
"main": "dist/index.js",
"type": "commonjs",
"bin": {
"volume-wall-detector-mcp": "dist/index.js"
},
"files": [
"dist",
"README.md",
"LICENSE"
],
"scripts": {
"dev": "tsx src/index.ts",
"build": "tsc",
"start": "node dist/index.js",
"test": "jest",
"test:watch": "jest --watch",
"test:coverage": "jest --coverage",
"prepare": "npm run build"
},
"keywords": ["trading", "volume", "analysis", "mcp"],
"author": "",
"license": "ISC",
"dependencies": {
"axios": "^1.6.7",
"dotenv": "^16.4.5",
"fastmcp": "^1.0.0",
"mongodb": "^6.3.0",
"zod": "^3.22.4"
},
"devDependencies": {
"@types/jest": "^29.5.12",
"@types/node": "^20.11.19",
"jest": "^29.7.0",
"ts-jest": "^29.1.2",
"tsx": "^4.7.1",
"typescript": "^5.3.3"
}
}
```
--------------------------------------------------------------------------------
/src/services/tools.ts:
--------------------------------------------------------------------------------
```typescript
"use strict";
import { z } from "zod";
import { ToolConfig } from "../types/tools";
import { analyzeVolumeWalls } from "../tools/analyze-volume-walls";
import { fetchOrderBook, fetchTrades } from "./api";
import { storeStockData } from "./mongodb";
export const tools: ToolConfig[] = [
{
name: "fetch-order-book",
description: "Fetch current order book data for a symbol",
parameters: z.object({
symbol: z.string().describe("Stock symbol to fetch order book for")
}),
execute: async (args) => {
const orderBook = await fetchOrderBook(args.symbol);
const result = await storeStockData(orderBook, "order_books");
return JSON.stringify(result);
}
},
{
name: "fetch-trades",
description: "Fetch recent trades for a symbol",
parameters: z.object({
symbol: z.string().describe("Stock symbol to fetch trades for")
}),
execute: async (args) => {
const trades = await fetchTrades(args.symbol);
const result = await storeStockData(trades, "trades");
return JSON.stringify(result);
}
},
{
name: "analyze-stock",
description: "Analyze stock data including volume and value analysis",
parameters: z.object({
symbol: z.string().describe("Stock symbol to analyze"),
days: z.number().optional().describe("Number of days to analyze (optional)")
}),
execute: async (args) => {
const result = await analyzeVolumeWalls(args.symbol, args.days);
return JSON.stringify(result);
}
}
];
```
--------------------------------------------------------------------------------
/src/types/tools.ts:
--------------------------------------------------------------------------------
```typescript
"use strict";
import { z } from "zod";
import { FastMCP } from "fastmcp";
export type ToolConfig = {
name: string;
description: string;
parameters: z.ZodObject<any>;
execute: (args: any) => Promise<string>;
};
export type Tool = FastMCP["addTool"];
// Data Models
export const OrderBookLevelSchema = z.object({
price: z.number(),
volume: z.number()
});
export const OrderBookSchema = z.object({
symbol: z.string(),
timestamp: z.string(),
match_price: z.number(),
bid_1: OrderBookLevelSchema,
ask_1: OrderBookLevelSchema,
change_percent: z.number(),
volume: z.number()
});
export const TradeSchema = z.object({
trade_id: z.string(),
symbol: z.string(),
price: z.number(),
volume: z.number(),
side: z.enum(["bu", "sd", "after-hour"]),
time: z.number()
});
export const PriceVolumeDataSchema = z.object({
buy_volume: z.number().default(0),
sell_volume: z.number().default(0),
after_hour_buy: z.number().default(0),
after_hour_sell: z.number().default(0),
after_hour_unknown: z.number().default(0),
buy_value: z.number().default(0),
sell_value: z.number().default(0),
after_hour_buy_value: z.number().default(0),
after_hour_sell_value: z.number().default(0),
after_hour_unknown_value: z.number().default(0),
total_volume: z.number().default(0),
total_value: z.number().default(0),
volume_imbalance: z.number().default(0),
value_imbalance: z.number().default(0),
total_trades: z.number().default(0),
last_trade_time: z.string().optional()
});
export type OrderBookLevel = z.infer<typeof OrderBookLevelSchema>;
export type OrderBook = z.infer<typeof OrderBookSchema>;
export type Trade = z.infer<typeof TradeSchema>;
export type PriceVolumeData = z.infer<typeof PriceVolumeDataSchema>;
```
--------------------------------------------------------------------------------
/src/services/api.ts:
--------------------------------------------------------------------------------
```typescript
"use strict";
import axios from "axios";
import { getConfig } from "../config/env";
import { OrderBook, Trade } from "../types/tools";
const config = getConfig();
const headers = {
"User-Agent": "Mozilla/5.0",
"Accept": "application/json"
};
export const fetchOrderBook = async (symbol: string): Promise<OrderBook> => {
const url = `${config.API_BASE_URL}/v2/stock/${symbol}`;
const response = await axios.get(url, { headers });
const data = response.data.data;
return {
symbol,
timestamp: new Date().toISOString(),
match_price: data.mp,
bid_1: {
price: data.b1,
volume: data.b1v
},
ask_1: {
price: data.o1,
volume: data.o1v
},
change_percent: data.lpcp,
volume: data.lv
};
};
export const fetchTrades = async (symbol: string): Promise<Trade[]> => {
const trades: Trade[] = [];
let lastId: string | undefined;
while (trades.length < config.TRADES_TO_FETCH) {
const params: Record<string, any> = {
stockSymbol: symbol,
pageSize: Math.min(config.PAGE_SIZE, config.TRADES_TO_FETCH - trades.length)
};
if (lastId) {
params.lastId = lastId;
}
const url = `${config.API_BASE_URL}/le-table`;
const response = await axios.get(url, { headers, params });
const items = response.data.data.items;
if (!items || items.length === 0) {
break;
}
const batchTrades = items.map((item: any) => ({
trade_id: item._id,
symbol: item.stockSymbol,
price: item.price,
volume: item.vol,
side: item.side === "bu" || item.side === "sd" ? item.side : "after-hour",
time: Math.floor(new Date().setHours(
Number(item.time.split(":")[0]),
Number(item.time.split(":")[1]),
Number(item.time.split(":")[2] || 0)
) / 1000)
}));
trades.push(...batchTrades);
lastId = items[items.length - 1]._id;
// Add small delay to avoid hitting rate limits
await new Promise(resolve => setTimeout(resolve, 100));
}
return trades.slice(0, config.TRADES_TO_FETCH);
};
```
--------------------------------------------------------------------------------
/src/services/mongodb.ts:
--------------------------------------------------------------------------------
```typescript
"use strict";
import { MongoClient } from "mongodb";
import { getConfig } from "../config/env";
import { OrderBook, Trade } from "../types/tools";
const config = getConfig();
export const getMongoUrl = () => {
if (config.MONGO_USER && config.MONGO_PASSWORD) {
return `mongodb://${config.MONGO_USER}:${config.MONGO_PASSWORD}@${config.MONGO_HOST}:${config.MONGO_PORT}/${config.MONGO_DATABASE}?authSource=${config.MONGO_AUTH_SOURCE}&authMechanism=${config.MONGO_AUTH_MECHANISM}`;
}
return `mongodb://${config.MONGO_HOST}:${config.MONGO_PORT}/${config.MONGO_DATABASE}`;
};
export const storeStockData = async (data: OrderBook | Trade[], collectionName: string) => {
const client = new MongoClient(getMongoUrl());
try {
await client.connect();
const db = client.db(config.MONGO_DATABASE);
const collection = db.collection(collectionName);
// Setup indexes if they don't exist
if (collectionName === "order_books") {
await collection.createIndex({ symbol: 1, timestamp: -1 });
} else if (collectionName === "trades") {
await collection.createIndex({ symbol: 1, time: -1 });
await collection.createIndex({ trade_id: 1 }, { unique: true });
}
if (Array.isArray(data)) {
if (data.length === 0) {
return { success: true, inserted_count: 0 };
}
const operations = data.map(doc => ({
updateOne: {
filter: { trade_id: doc.trade_id },
update: { $set: doc },
upsert: true
}
}));
const result = await collection.bulkWrite(operations);
return {
success: true,
inserted_count: result.upsertedCount + result.modifiedCount
};
} else {
const result = await collection.insertOne(data);
return {
success: result.acknowledged,
inserted_count: result.acknowledged ? 1 : 0
};
}
} catch (error) {
return {
success: false,
error: error instanceof Error ? error.message : "Unknown error"
};
} finally {
await client.close();
}
};
export const getLatestOrderBook = async (symbol: string): Promise<OrderBook | null> => {
const client = new MongoClient(getMongoUrl());
try {
await client.connect();
const db = client.db(config.MONGO_DATABASE);
const doc = await db.collection("order_books")
.findOne({ symbol }, { sort: { timestamp: -1 } });
return doc as OrderBook | null;
} finally {
await client.close();
}
};
export const getRecentTrades = async (
symbol: string,
limit: number = 100,
days: number = config.DAYS_TO_FETCH
): Promise<Trade[]> => {
const client = new MongoClient(getMongoUrl());
try {
await client.connect();
const db = client.db(config.MONGO_DATABASE);
const startTime = new Date();
startTime.setDate(startTime.getDate() - days);
startTime.setHours(0, 0, 0, 0);
const trades = await db.collection("trades")
.find({
symbol,
time: { $gte: Math.floor(startTime.getTime() / 1000) }
})
.sort({ time: -1 })
.limit(limit)
.toArray();
return trades.map(trade => ({
symbol: trade.symbol,
price: trade.price,
volume: trade.volume,
trade_id: trade.trade_id,
side: trade.side,
time: trade.time
}));
} finally {
await client.close();
}
};
```
--------------------------------------------------------------------------------
/src/tools/analyze-volume-walls.ts:
--------------------------------------------------------------------------------
```typescript
"use strict";
import { OrderBook, Trade, PriceVolumeData } from "../types/tools";
import { getConfig } from "../config/env";
import { getLatestOrderBook, getRecentTrades } from "../services/mongodb";
const config = getConfig();
export const analyzeVolumeAtPrice = (
trades: Trade[],
orderBook: OrderBook
): Record<number, PriceVolumeData> => {
const priceVolumes: Record<number, PriceVolumeData> = {};
for (const trade of trades.reverse()) {
const price = trade.price;
if (!priceVolumes[price]) {
priceVolumes[price] = {
buy_volume: 0,
sell_volume: 0,
after_hour_buy: 0,
after_hour_sell: 0,
after_hour_unknown: 0,
buy_value: 0,
sell_value: 0,
after_hour_buy_value: 0,
after_hour_sell_value: 0,
after_hour_unknown_value: 0,
total_volume: 0,
total_value: 0,
volume_imbalance: 0,
value_imbalance: 0,
total_trades: 0
};
}
const data = priceVolumes[price];
const value = trade.price * trade.volume;
if (trade.side === "bu") {
data.buy_volume += trade.volume;
data.buy_value += value;
} else if (trade.side === "sd") {
data.sell_volume += trade.volume;
data.sell_value += value;
} else {
if (trade.price >= orderBook.ask_1.price) {
data.after_hour_buy += trade.volume;
data.after_hour_buy_value += value;
} else if (trade.price <= orderBook.bid_1.price) {
data.after_hour_sell += trade.volume;
data.after_hour_sell_value += value;
} else {
data.after_hour_unknown += trade.volume;
data.after_hour_unknown_value += value;
}
}
data.total_trades += 1;
data.last_trade_time = new Date(trade.time * 1000).toISOString();
}
// Calculate totals and imbalances
for (const data of Object.values(priceVolumes)) {
data.total_volume =
data.buy_volume +
data.sell_volume +
data.after_hour_buy +
data.after_hour_sell +
data.after_hour_unknown;
data.total_value =
data.buy_value +
data.sell_value +
data.after_hour_buy_value +
data.after_hour_sell_value +
data.after_hour_unknown_value;
data.volume_imbalance =
(data.buy_volume + data.after_hour_buy) -
(data.sell_volume + data.after_hour_sell);
data.value_imbalance =
(data.buy_value + data.after_hour_buy_value) -
(data.sell_value + data.after_hour_sell_value);
}
return priceVolumes;
};
export const analyzeVolumeWalls = async (symbol: string, days?: number) => {
const orderBook = await getLatestOrderBook(symbol);
if (!orderBook) {
throw new Error("No order book data available");
}
const trades = await getRecentTrades(symbol, config.TRADES_TO_FETCH, days);
const priceVolumes = analyzeVolumeAtPrice(trades, orderBook);
// Sort by total value
const sortedLevels = Object.entries(priceVolumes)
.sort(([, a], [, b]) => b.total_value - a.total_value)
.slice(0, 5);
// Calculate trading summaries
const buyVolume = trades
.filter(t => t.side === "bu")
.reduce((sum, t) => sum + t.volume, 0);
const sellVolume = trades
.filter(t => t.side === "sd")
.reduce((sum, t) => sum + t.volume, 0);
const afterHourTrades = trades.filter(t => t.side === "after-hour");
const afterHourBuy = afterHourTrades
.filter(t => t.price >= orderBook.ask_1.price)
.reduce((sum, t) => sum + t.volume, 0);
const afterHourSell = afterHourTrades
.filter(t => t.price <= orderBook.bid_1.price)
.reduce((sum, t) => sum + t.volume, 0);
const afterHourUnknown = afterHourTrades
.filter(t => orderBook.bid_1.price < t.price && t.price < orderBook.ask_1.price)
.reduce((sum, t) => sum + t.volume, 0);
const totalVolume = buyVolume + sellVolume + afterHourBuy + afterHourSell + afterHourUnknown;
return {
timestamp: orderBook.timestamp,
symbol,
market_status: {
current_price: orderBook.match_price,
bid_price: orderBook.bid_1.price,
bid_volume: orderBook.bid_1.volume,
ask_price: orderBook.ask_1.price,
ask_volume: orderBook.ask_1.volume,
spread: orderBook.ask_1.price - orderBook.bid_1.price
},
volume_analysis: {
significant_levels: sortedLevels.map(([price, data]) => ({
price: Number(price),
...data
})),
current_bid_accumulated: priceVolumes[orderBook.bid_1.price] || {},
current_ask_accumulated: priceVolumes[orderBook.ask_1.price] || {}
},
trading_summary: {
period: `last ${config.TRADES_TO_FETCH} trades`,
total_trades: trades.length,
volume: {
buy: buyVolume,
sell: sellVolume,
after_hour: {
buy: afterHourBuy,
sell: afterHourSell,
unknown: afterHourUnknown,
total: afterHourBuy + afterHourSell + afterHourUnknown
},
total: totalVolume,
buy_ratio: (buyVolume + afterHourBuy) /
(buyVolume + sellVolume + afterHourBuy + afterHourSell) || 0
}
}
};
};
```
--------------------------------------------------------------------------------
/volume_wall_detector.py:
--------------------------------------------------------------------------------
```python
import os
from datetime import date, datetime, timedelta, timezone
from dataclasses import dataclass, asdict
from typing import List, Optional, Union, Dict, Any
import pymongo
import requests
from pymongo import MongoClient
from dotenv import load_dotenv
import time
from pydantic import BaseModel, Field
load_dotenv()
# Mandatory environment variables
TIMEZONE = os.getenv("TIMEZONE", "GMT+7") # Default to GMT+7 if not specified
API_BASE_URL = os.getenv("API_BASE_URL")
MONGO_HOST = os.getenv("MONGO_HOST")
MONGO_PORT = os.getenv("MONGO_PORT")
MONGO_DATABASE = os.getenv("MONGO_DATABASE")
MONGO_USER = os.getenv("MONGO_USER")
MONGO_PASSWORD = os.getenv("MONGO_PASSWORD")
MONGO_AUTH_SOURCE = os.getenv("MONGO_AUTH_SOURCE")
MONGO_AUTH_MECHANISM = os.getenv("MONGO_AUTH_MECHANISM")
# Optional environment variables
PAGE_SIZE = os.getenv("PAGE_SIZE", 50)
TRADES_TO_FETCH = int(os.getenv("TRADES_TO_FETCH", "10000"))
DAYS_TO_FETCH = int(os.getenv("DAYS_TO_FETCH", "1")) # Default to 1 day if not specified
# Headers for API requests
HEADERS: dict = {
"User-Agent": "Mozilla/5.0",
"Accept": "application/json"
}
def MONGO_URL() -> str:
"""Build MongoDB connection URL from components"""
if MONGO_USER and MONGO_PASSWORD:
return (
f"mongodb://{MONGO_USER}:{MONGO_PASSWORD}@{MONGO_HOST}:{MONGO_PORT}/{MONGO_DATABASE}"
f"?authSource={MONGO_AUTH_SOURCE}"
f"&authMechanism={MONGO_AUTH_MECHANISM}"
)
return f"mongodb://{MONGO_HOST}:{MONGO_PORT}/{MONGO_DATABASE}"
def parse_timezone(tz_str: str) -> timezone:
"""Parse timezone string (e.g., 'GMT+7' or 'GMT-5') into timezone object"""
try:
if not tz_str.startswith(('GMT+', 'GMT-')):
raise ValueError("Timezone must be in format 'GMT+n' or 'GMT-n'")
offset = int(tz_str[4:]) if tz_str[3] == '+' else -int(tz_str[4:])
return timezone(timedelta(hours=offset))
except Exception as e:
raise ValueError(f"Invalid timezone format: {tz_str}. Error: {str(e)}")
class OrderBookLevel(BaseModel):
"""Order book level with price and volume"""
price: float
volume: int
class OrderBook(BaseModel):
"""Order book data"""
symbol: str
timestamp: str
match_price: float
bid_1: OrderBookLevel
ask_1: OrderBookLevel
change_percent: float
volume: int
class Trade(BaseModel):
"""Trade data"""
trade_id: str
symbol: str
price: float
volume: int
side: str # "bu" or "sd" or "after-hour"
time: int
@property
def value(self) -> float:
"""Calculate trade value"""
return self.price * self.volume
class PriceVolumeData(BaseModel):
"""Volume and value data at a price level"""
buy_volume: int = 0
sell_volume: int = 0
after_hour_buy: int = 0
after_hour_sell: int = 0
after_hour_unknown: int = 0
buy_value: float = 0.0
sell_value: float = 0.0
after_hour_buy_value: float = 0.0
after_hour_sell_value: float = 0.0
after_hour_unknown_value: float = 0.0
total_volume: int = 0
total_value: float = 0.0
volume_imbalance: int = 0
value_imbalance: float = 0.0
total_trades: int = 0
last_trade_time: Optional[str] = None
class AfterHourVolume(BaseModel):
"""After-hour trading volume data"""
buy: int
sell: int
unknown: int
total: int
class AfterHourValue(BaseModel):
"""After-hour trading value data"""
buy: float
sell: float
unknown: float
total: float
class VolumeAnalysis(BaseModel):
"""Volume analysis data"""
buy: int
sell: int
after_hour: AfterHourVolume
total: int
buy_ratio: float
class ValueAnalysis(BaseModel):
"""Value analysis data"""
buy: float
sell: float
after_hour: AfterHourValue
total: float
buy_ratio: float
class MarketStatus(BaseModel):
"""Current market status"""
current_price: float
bid_price: float
bid_volume: int
ask_price: float
ask_volume: int
spread: float
class VolumeAnalysisResult(BaseModel):
"""Volume analysis results"""
significant_levels: list[Dict[str, Any]]
current_bid_accumulated: PriceVolumeData
current_ask_accumulated: PriceVolumeData
class TradingSummary(BaseModel):
"""Trading summary data"""
period: str
total_trades: int
volume: VolumeAnalysis
value: ValueAnalysis
unique_price_levels: int
average_price: float
# class StockAnalysis(BaseModel):
# """Complete stock analysis result"""
# timestamp: str
# symbol: str
# market_status: MarketStatus
# volume_analysis: VolumeAnalysisResult
# trading_summary: TradingSummary
class MongoResult(BaseModel):
"""MongoDB operation result"""
success: bool = False
inserted_count: int = 0
error: Optional[str] = None
class TradesResult(MongoResult):
"""Result of trades operation"""
trades_fetched: int = 0
class StoreResult(BaseModel):
"""Result of storing stock data"""
order_book: MongoResult
trades: TradesResult
def store_stock_data(data: Union[OrderBook, List[Trade]], collection_name: str) -> MongoResult:
"""Store stock data into MongoDB"""
result = MongoResult()
try:
client = MongoClient(MONGO_URL())
db = client[MONGO_DATABASE]
collection = db[collection_name]
# Setup indexes if they don't exist
if collection_name == "order_books":
collection.create_index([("symbol", 1), ("timestamp", -1)])
elif collection_name == "trades":
collection.create_index([("symbol", 1), ("time", -1)])
collection.create_index([("trade_id", 1)], unique=True)
# Convert and store data
if isinstance(data, OrderBook):
insert_result = collection.insert_one(data.model_dump())
result.success = insert_result.acknowledged
result.inserted_count = 1 if insert_result.acknowledged else 0
elif isinstance(data, list):
if not data:
result.success = True
return result
trade_docs = [trade.model_dump() for trade in data]
try:
operations = [
pymongo.UpdateOne(
{"trade_id": doc["trade_id"]},
{"$set": doc},
upsert=True
) for doc in trade_docs
]
# Delete today's records using configured timezone
tz = parse_timezone(TIMEZONE)
today_start = datetime.now(tz).replace(
hour=0,
minute=0,
second=0,
microsecond=0
).timestamp()
collection.delete_many({
"symbol": trade_docs[0]["symbol"],
"time": {"$gte": today_start}
})
bulk_result = collection.bulk_write(operations, ordered=False)
result.success = True
result.inserted_count = bulk_result.upserted_count + bulk_result.modified_count
except Exception as e:
result.success = False
result.error = f"Bulk upsert failed: {str(e)}"
except Exception as e:
result.error = str(e)
finally:
client.close()
return result
def fetch_order_book(symbol) -> OrderBook:
"""Fetch current order book data for a symbol"""
url = f"{API_BASE_URL}/v2/stock/{symbol}"
response = requests.get(url, headers=HEADERS)
response.raise_for_status()
data = response.json().get("data", {})
return OrderBook(
symbol=symbol,
timestamp=datetime.now().isoformat(),
match_price=data.get("mp"),
bid_1=OrderBookLevel(
price=data.get("b1"),
volume=data.get("b1v")
),
ask_1=OrderBookLevel(
price=data.get("o1"),
volume=data.get("o1v")
),
change_percent=data.get("lpcp"),
volume=data.get("lv")
)
def fetch_trades(symbol: str) -> List[Trade]:
"""
Fetch specified number of trades for a symbol using lastId pagination
Args:
symbol: Stock symbol
Returns:
List[Trade]: List of trades, newest first
"""
trades = []
last_id = None
while len(trades) < TRADES_TO_FETCH:
# Prepare request parameters
params = {
"stockSymbol": symbol,
"pageSize": min(PAGE_SIZE, TRADES_TO_FETCH - len(trades))
}
if last_id:
params["lastId"] = last_id
# Make API request
url = f"{API_BASE_URL}/le-table"
response = requests.get(url, headers=HEADERS, params=params)
response.raise_for_status()
# Process response
items = response.json().get("data", {}).get("items", [])
if not items: # No more trades available
break
# Convert items to Trade objects
batch_trades = [
Trade(
trade_id=item["_id"],
symbol=item["stockSymbol"],
price=item["price"],
volume=item["vol"],
side=item["side"] if item.get("side") in ["bu", "sd"] else "after-hour",
time=datetime.combine(date.today(), datetime.strptime(item["time"], "%H:%M:%S").time()).timestamp()
) for item in items
]
trades.extend(batch_trades)
# Update last_id for next iteration
last_id = items[-1]["_id"]
# Add small delay to avoid hitting rate limits
time.sleep(0.1)
return trades[:TRADES_TO_FETCH] # Ensure we don't return more than requested
def fetch_and_store_stock_data(symbol: str) -> StoreResult:
"""
Fetch and store both order book and trades data
Returns:
StoreResult: Results of both operations
"""
# Fetch and store order book
order_book = fetch_order_book(symbol)
order_book_result = store_stock_data(order_book, "order_books")
# Fetch and store trades
trades = fetch_trades(symbol)
trades_result = store_stock_data(trades, "trades")
return StoreResult(
order_book=order_book_result,
trades=TradesResult(
success=trades_result.success,
inserted_count=trades_result.inserted_count,
error=trades_result.error,
trades_fetched=len(trades)
)
)
def get_latest_order_book(symbol: str) -> Optional[OrderBook]:
"""Get the latest order book from MongoDB"""
try:
client = MongoClient(MONGO_URL())
db = client[MONGO_DATABASE]
doc = db.order_books.find_one(
{"symbol": symbol},
sort=[("timestamp", -1)]
)
if doc:
return OrderBook(
symbol=doc["symbol"],
timestamp=doc["timestamp"],
match_price=doc["match_price"],
bid_1=OrderBookLevel(**doc["bid_1"]),
ask_1=OrderBookLevel(**doc["ask_1"]),
change_percent=doc["change_percent"],
volume=doc["volume"]
)
return None
finally:
client.close()
def get_recent_trades(symbol: str, limit: int = 100, days: int = None) -> List[Trade]:
"""
Get recent trades from MongoDB
Args:
symbol: Stock symbol
limit: Maximum number of trades to return
days: Number of days to look back (defaults to DAYS_TO_FETCH from env)
Returns:
List[Trade]: List of trades, newest first
"""
try:
client = MongoClient(MONGO_URL())
db = client[MONGO_DATABASE]
# Use provided days or fall back to environment variable
days_to_fetch = days if days is not None else DAYS_TO_FETCH
# Calculate the timestamp for N days ago using configured timezone
tz = parse_timezone(TIMEZONE)
now = datetime.now(tz)
days_ago = now - timedelta(days=days_to_fetch)
start_timestamp = days_ago.replace(
hour=0,
minute=0,
second=0,
microsecond=0
).timestamp()
# Query with date filter
trades = list(db.trades.find(
{
"symbol": symbol,
"time": {"$gte": start_timestamp}
},
sort=[("time", -1)],
limit=limit
))
return [Trade(
trade_id=t["trade_id"],
symbol=t["symbol"],
price=t["price"],
volume=t["volume"],
side=t["side"],
time=t["time"]
) for t in trades]
finally:
client.close()
def analyze_volume_at_price(trades: List[Trade], order_book: OrderBook) -> Dict[float, PriceVolumeData]:
"""Analyze accumulated volume and value at each price level"""
price_volumes: Dict[float, PriceVolumeData] = {}
for trade in reversed(trades):
price = trade.price
if price not in price_volumes:
price_volumes[price] = PriceVolumeData()
data = price_volumes[price]
# Accumulate volumes and values by side
if trade.side == "bu":
data.buy_volume += trade.volume
data.buy_value += trade.value
elif trade.side == "sd":
data.sell_volume += trade.volume
data.sell_value += trade.value
else: # after-hour trade classification
if price >= order_book.ask_1.price:
data.after_hour_buy += trade.volume
data.after_hour_buy_value += trade.value
elif price <= order_book.bid_1.price:
data.after_hour_sell += trade.volume
data.after_hour_sell_value += trade.value
else:
data.after_hour_unknown += trade.volume
data.after_hour_unknown_value += trade.value
tz = parse_timezone(TIMEZONE)
data.total_trades += 1
data.last_trade_time = str(datetime.fromtimestamp(trade.time, tz=tz))
# Calculate additional metrics
for price_data in price_volumes.values():
price_data.total_volume = (
price_data.buy_volume +
price_data.sell_volume +
price_data.after_hour_buy +
price_data.after_hour_sell +
price_data.after_hour_unknown
)
price_data.total_value = (
price_data.buy_value +
price_data.sell_value +
price_data.after_hour_buy_value +
price_data.after_hour_sell_value +
price_data.after_hour_unknown_value
)
# Volume imbalance includes classified after-hour trades
price_data.volume_imbalance = (
price_data.buy_volume +
price_data.after_hour_buy
) - (
price_data.sell_volume +
price_data.after_hour_sell
)
# Value imbalance includes classified after-hour trades
price_data.value_imbalance = (
price_data.buy_value +
price_data.after_hour_buy_value
) - (
price_data.sell_value +
price_data.after_hour_sell_value
)
return price_volumes
def analyze_stock_data(symbol: str, days: int = None) -> dict:
"""Analyze stock data including volume and value analysis"""
order_book = get_latest_order_book(symbol)
trades = get_recent_trades(symbol, limit=TRADES_TO_FETCH, days=days)
if not order_book:
raise ValueError("No order book data available")
# Analyze volumes at each price level
price_volumes = analyze_volume_at_price(trades, order_book)
# Sort prices for significant levels
sorted_levels = sorted(
[(price, data) for price, data in price_volumes.items()],
key=lambda x: x[1].total_value,
reverse=True
)
# Get top 5 levels
significant_levels = [
{
"price": price,
"buy_volume": data.buy_volume,
"sell_volume": data.sell_volume,
"after_hour_buy": data.after_hour_buy,
"after_hour_sell": data.after_hour_sell,
"after_hour_unknown": data.after_hour_unknown,
"buy_value": data.buy_value,
"sell_value": data.sell_value,
"after_hour_buy_value": data.after_hour_buy_value,
"after_hour_sell_value": data.after_hour_sell_value,
"after_hour_unknown_value": data.after_hour_unknown_value,
"total_volume": data.total_volume,
"total_value": data.total_value,
"volume_imbalance": data.volume_imbalance,
"value_imbalance": data.value_imbalance,
"total_trades": data.total_trades,
"last_trade_time": data.last_trade_time
}
for price, data in sorted_levels[:5]
]
# Calculate trading summaries
buy_volume = sum(t.volume for t in trades if t.side == "bu")
sell_volume = sum(t.volume for t in trades if t.side == "sd")
after_hour_trades = [t for t in trades if t.side == "after-hour"]
after_hour_buy = sum(t.volume for t in after_hour_trades if t.price >= order_book.ask_1.price)
after_hour_sell = sum(t.volume for t in after_hour_trades if t.price <= order_book.bid_1.price)
after_hour_unknown = sum(t.volume for t in after_hour_trades
if order_book.bid_1.price < t.price < order_book.ask_1.price)
buy_value = sum(t.value for t in trades if t.side == "bu")
sell_value = sum(t.value for t in trades if t.side == "sd")
after_hour_buy_value = sum(t.value for t in after_hour_trades if t.price >= order_book.ask_1.price)
after_hour_sell_value = sum(t.value for t in after_hour_trades if t.price <= order_book.bid_1.price)
after_hour_unknown_value = sum(t.value for t in after_hour_trades
if order_book.bid_1.price < t.price < order_book.ask_1.price)
total_volume = buy_volume + sell_volume + after_hour_buy + after_hour_sell + after_hour_unknown
total_value = buy_value + sell_value + after_hour_buy_value + after_hour_sell_value + after_hour_unknown_value
return {
"timestamp": order_book.timestamp,
"symbol": symbol,
"market_status": {
"current_price": order_book.match_price,
"bid_price": order_book.bid_1.price,
"bid_volume": order_book.bid_1.volume,
"ask_price": order_book.ask_1.price,
"ask_volume": order_book.ask_1.volume,
"spread": order_book.ask_1.price - order_book.bid_1.price
},
"volume_analysis": {
"significant_levels": significant_levels,
"current_bid_accumulated": price_volumes.get(order_book.bid_1.price, PriceVolumeData()).model_dump(),
"current_ask_accumulated": price_volumes.get(order_book.ask_1.price, PriceVolumeData()).model_dump()
},
"trading_summary": {
"period": f"last {TRADES_TO_FETCH} trades",
"total_trades": len(trades),
"volume": {
"buy": buy_volume,
"sell": sell_volume,
"after_hour": {
"buy": after_hour_buy,
"sell": after_hour_sell,
"unknown": after_hour_unknown,
"total": after_hour_buy + after_hour_sell + after_hour_unknown
},
"total": total_volume,
"buy_ratio": (buy_volume + after_hour_buy) /
(buy_volume + sell_volume + after_hour_buy + after_hour_sell)
if (buy_volume + sell_volume + after_hour_buy + after_hour_sell) > 0 else 0
},
"value": {
"buy": buy_value,
"sell": sell_value,
"after_hour": {
"buy": after_hour_buy_value,
"sell": after_hour_sell_value,
"unknown": after_hour_unknown_value,
"total": after_hour_buy_value + after_hour_sell_value + after_hour_unknown_value
},
"total": total_value,
"buy_ratio": (buy_value + after_hour_buy_value) /
(buy_value + sell_value + after_hour_buy_value + after_hour_sell_value)
if (buy_value + sell_value + after_hour_buy_value + after_hour_sell_value) > 0 else 0
},
"unique_price_levels": len(price_volumes),
"average_price": total_value / total_volume if total_volume > 0 else 0
}
}
if __name__ == "__main__":
# Test the functions
symbol = "VIC"
fetch_and_store_stock_data(symbol)
print(analyze_stock_data(symbol))
```