Reasoning Model - Time Series
This model identifies directional trends by predicting how likely price action in the selected timeframe will propagate to lower-frequency timeframes, distinguishing structurally significant market moves from noise.
We recommend using higher frequency data compared to your target forecast horizon, as this leverages the model’s propagation methodology effectively (e.g., use 15 or 30 min data to predict daily moves).
Time Series Data
import pandas as pd
df = pd.read_csv("filename.csv")[['datetime','open','high','low','close']]
print(df)
import requests
import pandas as pd
import io
def download_github_data(url):
# Convert GitHub repo URL to raw content URL
if "github.com" in url and "/blob/" not in url and "/tree/" in url:
# For /tree/ links, convert to raw
url = url.replace("github.com", "raw.githubusercontent.com")
url = url.replace("/tree/", "/")
elif "github.com" in url and "/blob/" in url:
# For /blob/ links, convert to raw
url = url.replace("github.com", "raw.githubusercontent.com")
url = url.replace("/blob/", "/")
print(f"Downloading from: {url}")
response = requests.get(url)
if response.status_code == 200:
return pd.read_csv(io.StringIO(response.text))
else:
raise Exception(f"Failed to download data: {response.status_code}")
# Example usage
github_url = "https://github.com/sumtyme-ade/sample_data/tree/main/sample/spy_etf_1min.csv"
df = download_github_data(github_url)
print(f"Downloaded {len(df)} rows of data")
Data Preparation
# Import required libraries
import pandas as pd
import json
def time_series_dict(df, interval, interval_unit, reasoning_mode):
"""
Converts a DataFrame with OHLC price data into a dictionary format.
Parameters:
df (DataFrame): DataFrame containing datetime and OHLC price columns
interval (int): The time interval between data points
interval_unit (str): The unit of time for intervals (seconds, minutes, days)
reasoning_mode (str): Reasoning strategy (proactive or reactive)
Returns:
dict: Dictionary with lists of datetime and price data along with metadata
"""
return {
"datetime": df['datetime'].tolist(), # Convert datetime column to list
"open": df['open'].tolist(), # Convert open prices to list
"high": df['high'].tolist(), # Convert high prices to list
"low": df['low'].tolist(), # Convert low prices to list
"close": df['close'].tolist(), # Convert close prices to list
"interval": interval, # Time frequency value
"interval_unit": interval_unit, # Time unit (seconds, minutes, days)
"reasoning_mode": reasoning_mode # Reasoning approach (proactive/reactive)
}
# Read the CSV file and select only the required columns
df = pd.read_csv("filename.csv")[['datetime', 'open', 'high', 'low', 'close']]
# Set the time interval between data points
interval = 1
# Set the unit of time for the interval (options: seconds, minutes, days)
interval_unit = "minutes"
# Set the reasoning mode (options: proactive or reactive)
reasoning_mode = "reactive"
# Create the JSON-compatible dictionary from the DataFrame and parameters
json_payload = time_series_dict(df, interval, interval_unit, reasoning_mode)
# Import required libraries
import pandas as pd
import json
def time_series_dict(df, interval, interval_unit, reasoning_mode):
"""
Converts a DataFrame with OHLC price data into a dictionary format.
Parameters:
df (DataFrame): DataFrame containing datetime and OHLC price columns
interval (int): The time interval between data points
interval_unit (str): The unit of time for intervals (seconds, minutes, days)
reasoning_mode (str): Reasoning strategy (proactive or reactive)
Returns:
dict: Dictionary with lists of datetime and price data along with metadata
"""
return {
"datetime": df['datetime'].tolist(), # Convert datetime column to list
"open": df['open'].tolist(), # Convert open prices to list
"high": df['high'].tolist(), # Convert high prices to list
"low": df['low'].tolist(), # Convert low prices to list
"close": df['close'].tolist(), # Convert close prices to list
"interval": interval, # Time frequency value
"interval_unit": interval_unit, # Time unit (seconds, minutes, days)
"reasoning_mode": reasoning_mode # Reasoning approach (proactive/reactive)
}
# Read the CSV file and select only the required columns
df = pd.read_csv("filename.csv")[['datetime', 'open', 'high', 'low', 'close']]
# Set the time interval between data points
interval = 1
# Set the unit of time for the interval (options: seconds, minutes, days)
interval_unit = "minutes"
# Set the reasoning mode (options: proactive or reactive)
reasoning_mode = "reactive"
# Split data for efficient submission
chunk_size = 5000
df_list = [df.iloc[i:i+chunk_size] for i in range(0, len(df), chunk_size)]
# Create API-ready payloads for each data chunk
json_payloads = [time_series_dict(chunk, interval, interval_unit, reasoning_mode) for chunk in df_list]
Full API Script
import requests
import pandas as pd
import io
import os
from datetime import datetime
def download_github_data(url):
# Convert GitHub repo URL to raw content URL
if "github.com" in url and "/blob/" not in url and "/tree/" in url:
# For /tree/ links, convert to raw
url = url.replace("github.com", "raw.githubusercontent.com")
url = url.replace("/tree/", "/")
elif "github.com" in url and "/blob/" in url:
# For /blob/ links, convert to raw
url = url.replace("github.com", "raw.githubusercontent.com")
url = url.replace("/blob/", "/")
print(f"Downloading from: {url}")
response = requests.get(url)
if response.status_code == 200:
return pd.read_csv(io.StringIO(response.text))
else:
raise Exception(f"Failed to download data: {response.status_code}")
def time_series_dict(df, interval, interval_unit, reasoning_mode):
"""
Converts a DataFrame with OHLC price data into a dictionary format.
Parameters:
df (DataFrame): DataFrame containing datetime and OHLC price columns
interval (int): The time interval between data points
interval_unit (str): The unit of time for intervals (seconds, minutes, days)
reasoning_mode (str): Reasoning strategy (proactive or reactive)
Returns:
dict: Dictionary with lists of datetime and price data along with metadata
"""
return {
"datetime": df['datetime'].tolist(), # Convert datetime column to list
"open": df['open'].tolist(), # Convert open prices to list
"high": df['high'].tolist(), # Convert high prices to list
"low": df['low'].tolist(), # Convert low prices to list
"close": df['close'].tolist(), # Convert close prices to list
"interval": interval, # Time frequency value
"interval_unit": interval_unit, # Time unit (seconds, minutes, days)
"reasoning_mode": reasoning_mode # Reasoning approach (proactive/reactive)
}
def log_forecast_check(task_id, forecast_date, log_file="sumtyme_task_ids.log"):
"""
Logs a forecast check to a CSV file.
Parameters:
- task_id (str): The ID of the task
- forecast_date (str): The date of the forecast
- log_file (str): Path to the log file, defaults to "sumtyme_task_ids.log"
"""
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# Create log file with header if it doesn't exist
if not os.path.exists(log_file):
with open(log_file, 'w') as f:
f.write("timestamp,forecast_date,task_id,checked\n")
# Write to log file
with open(log_file, 'a') as f:
f.write(f"{current_time},{forecast_date},{task_id},0\n")
return True
def post_request(df,interval,interval_unit,reasoning_mode):
# Prepare request
api_endpoint = f"https://www.sumtyme.com/shared/v1/reasoning-model-ts"
headers = {"Content-Type": "application/json"}
# Create payload from data
payload = time_series_dict(df,interval,interval_unit,reasoning_mode)
# Send POST request to API
response = requests.post(api_endpoint, json=payload, headers=headers)
if response.status_code != 200:
print(f"API request failed for {forecast_date} with status code {response.status_code}: {response.text}")
else:
return response.json()
def log_forecast_check(task_id, forecast_date, log_file="sumtyme_task_ids.log"):
"""
Logs a forecast check to a CSV file.
Parameters:
- task_id (str): The ID of the task
- forecast_date (str): The date of the forecast
- log_file (str): Path to the log file, defaults to "sumtyme_task_ids.log"
"""
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# Create log file with header if it doesn't exist
if not os.path.exists(log_file):
with open(log_file, 'w') as f:
f.write("timestamp,forecast_date,task_id,checked\n")
# Write to log file
with open(log_file, 'a') as f:
f.write(f"{current_time},{forecast_date},{task_id},0\n")
return True
# Example usage
github_url = "https://github.com/sumtyme-ade/sample_data/tree/main/sample/spy_etf_1min.csv"
df = download_github_data(github_url).tail(5000)
# Get the last datetime as forecast date
forecast_date = df['datetime'].iloc[-1]
# Set the time interval between data points
interval = 1
# Set the unit of time for the interval (options: seconds, minutes, days)
interval_unit = "minutes"
# Set the reasoning mode (options: proactive or reactive)
reasoning_mode = "reactive"
# Send request to API
response = post_request(df,interval,interval_unit,reasoning_mode)
# Process response if successful
if response:
print(response)
task_id = response.get("task_id")
log_forecast_check(task_id,forecast_date)
import requests
import pandas as pd
import io
import os
from datetime import datetime
def download_github_data(url):
# Convert GitHub repo URL to raw content URL
if "github.com" in url and "/blob/" not in url and "/tree/" in url:
# For /tree/ links, convert to raw
url = url.replace("github.com", "raw.githubusercontent.com")
url = url.replace("/tree/", "/")
elif "github.com" in url and "/blob/" in url:
# For /blob/ links, convert to raw
url = url.replace("github.com", "raw.githubusercontent.com")
url = url.replace("/blob/", "/")
print(f"Downloading from: {url}")
response = requests.get(url)
if response.status_code == 200:
return pd.read_csv(io.StringIO(response.text))
else:
raise Exception(f"Failed to download data: {response.status_code}")
def time_series_dict(df, interval, interval_unit, reasoning_mode):
"""
Converts a DataFrame with OHLC price data into a dictionary format.
Parameters:
df (DataFrame): DataFrame containing datetime and OHLC price columns
interval (int): The time interval between data points
interval_unit (str): The unit of time for intervals (seconds, minutes, days)
reasoning_mode (str): Reasoning strategy (proactive or reactive)
Returns:
dict: Dictionary with lists of datetime and price data along with metadata
"""
return {
"datetime": df['datetime'].tolist(), # Convert datetime column to list
"open": df['open'].tolist(), # Convert open prices to list
"high": df['high'].tolist(), # Convert high prices to list
"low": df['low'].tolist(), # Convert low prices to list
"close": df['close'].tolist(), # Convert close prices to list
"interval": interval, # Time frequency value
"interval_unit": interval_unit, # Time unit (seconds, minutes, days)
"reasoning_mode": reasoning_mode # Reasoning approach (proactive/reactive)
}
def create_moving_window(data, window_size, step_size=1):
"""
Creates a moving window for OHLC (Open, High, Low, Close) financial data.
Optimized for speed using NumPy arrays.
This function converts financial time series data into overlapping windows
of fixed size, useful for rolling analysis, feature engineering, or
building datasets for machine learning models.
Parameters:
- data (pd.DataFrame or list): Input OHLC financial data
- window_size (int): Number of time periods in each window
- step_size (int, optional): Number of periods to advance for each new window, defaults to 1
Returns:
- list: List of DataFrame windows, each containing window_size rows of OHLC data
Raises:
- ValueError: If input data format is invalid, window_size is out of range,
step_size is not positive, or required columns are missing
"""
# Validate input is a proper non-empty DataFrame
if not isinstance(data, pd.DataFrame) or data.empty:
raise ValueError("Data must be a non-empty DataFrame or list of dicts.")
# Ensure window size is valid (positive and not larger than data)
if window_size <= 0 or window_size > len(data):
raise ValueError("Window size must be between 1 and length of data.")
# Ensure step size is positive
if step_size <= 0:
raise ValueError("Step size must be greater than 0.")
# Check that all required OHLC columns exist
required_columns = ['datetime', 'open', 'high', 'low', 'close']
if not all(col in data.columns for col in required_columns):
missing = [col for col in required_columns if col not in data.columns]
raise ValueError(f"Missing required columns: {missing}")
# Filter to only keep essential columns for efficiency
data = data[required_columns]
# Convert to NumPy array for faster operations
values = data.values
# Calculate how many windows will be generated
n_samples = (len(values) - window_size) // step_size + 1
if n_samples <= 0:
return [] # Return empty list if no complete windows can be formed
# Create windows using list comprehension with fast NumPy slicing
# Each window is a slice of the original data converted to DataFrame
windows = [
pd.DataFrame(values[i:i + window_size], columns=required_columns)
for i in range(0, len(values) - window_size + 1, step_size)
]
return windows
def post_request(df, interval, interval_unit, reasoning_mode, log_file="sumtyme_task_ids.log"):
"""
Sends time series data to API using a walk-forward approach with 5000-row chunks.
Parameters:
- df (DataFrame): Time series data with datetime and OHLC columns
- interval (int): Time interval between data points
- interval_unit (str): Unit of time interval (seconds, minutes, days)
- reasoning_mode (str): Model reasoning mode (proactive, reactive)
- filename (str): Base filename for logging results (without extension)
Returns:
- list: Collection of task IDs from successful API calls
"""
# Check if file exists, if not create it with headers
if not os.path.exists(log_file):
with open(log_file, 'w') as f:
f.write("timestamp,forecast_date,task_id,trend_identified\n")
# Define base API configuration
BASE_URL = "https://www.sumtyme.com/shared"
api_endpoint = f"{BASE_URL}/v1/reasoning-model-ts"
headers = {"Content-Type": "application/json"}
# Split data into 5000 row chunks
df_chunks = create_moving_window(df, window_size=5000)
task_ids = [] # Collect task IDs for return
# Process each chunk with API calls
for chunk in df_chunks:
# Date to forecast for (last timestamp in chunk)
forecast_date = chunk['datetime'].iloc[-1]
# Convert datetime to proper format if needed
chunk['datetime'] = pd.to_datetime(chunk['datetime']).dt.strftime('%Y-%m-%d %H:%M:%S')
# Create payload from data
payload = time_series_dict(chunk, interval, interval_unit, reasoning_mode)
try:
# Send POST request to API
response = requests.post(api_endpoint, json=payload, headers=headers)
# Check for successful request
if response.status_code != 200:
print(f"API request failed for {forecast_date} with status code {response.status_code}: {response.text}")
continue
# Parse response
result = response.json()
task_id = result.get("task_id")
if task_id:
task_ids.append(task_id)
# Get current timestamp for logging
current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# Write to log file
with open(log_file, 'a') as f:
f.write(f"{current_time},{forecast_date},{task_id},0\n")
print(f"Successfully processed forecast date {forecast_date}, task_id: {task_id}")
else:
print(f"Warning: No task ID returned for forecast date {forecast_date}")
except Exception as e:
# Log errors too
current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
with open(log_file, 'a') as f:
f.write(f"{current_time},{forecast_date},ERROR: {str(e)},0\n")
print(f"Error processing forecast_date={forecast_date}: {str(e)}")
return task_ids
# Example usage
github_url = "https://github.com/sumtyme-ade/sample_data/tree/main/sample/spy_etf_1min.csv"
df = download_github_data(github_url).tail(5003)
# Get the last datetime as forecast date
forecast_date = df['datetime'].iloc[-1]
# Set the time interval between data points
interval = 1
# Set the unit of time for the interval (options: seconds, minutes, days)
interval_unit = "minutes"
# Set the reasoning mode (options: proactive or reactive)
reasoning_mode = "reactive"
# Send request to API
task_ids = post_request(df, interval, interval_unit, reasoning_mode)
Retrieve Analysis
import pandas as pd
import requests
import json
def get_analysis_results(task_id, max_attempts=10, delay=30):
"""
Poll the results endpoint until analysis is complete or max attempts reached
Parameters:
task_id (str): Task ID from initial request
max_attempts (int): Maximum number of polling attempts
delay (int): Seconds to wait between polling attempts
Returns:
dict: Analysis results or error message
"""
results_endpoint = f"https://www.sumtyme.com/results/{task_id}"
for attempt in range(max_attempts):
print(f"Checking results (attempt {attempt+1}/{max_attempts})...")
response = requests.get(results_endpoint)
if response.status_code != 200:
print(f"Error checking results: {response.status_code}, {response.text}")
return None
result_data = response.json()
# Check if processing is complete
if "status" in result_data and result_data["status"] == "processing":
print(f"Still processing. Tasks ahead: {result_data.get('tasks_ahead', 'unknown')}")
time.sleep(delay)
continue
# If we get here, we have results
return result_data
print("Maximum polling attempts reached. Try checking results manually later.")
return None
# Function to transform response dict to dataframe
def dict_to_dataframe(response_dict):
# Extract the result dictionary
result_dict = response_dict['result']
# Create lists for datetime and trend_identified values
datetimes = []
trends = []
# Extract values from the nested dictionary
for timestamp, data in result_dict.items():
datetimes.append(timestamp)
trends.append(data['trend_identified'])
# Create the DataFrame
df = pd.DataFrame({
'datetime': datetimes,
'trend_identified': trends
})
# Convert datetime strings to pandas datetime objects
df['datetime'] = pd.to_datetime(df['datetime'])
# Sort by datetime
df = df.sort_values('datetime')
return df
def update_task_results(task_file, output_file=None):
"""
Update task results by checking task_ids against API results.
This function reads a CSV file containing task IDs, queries an API for
results for each task, and updates a status column based on the results.
Parameters:
- task_file (str): Path to CSV file containing task IDs and forecast dates
- output_file (str, optional): Path to save updated results. If None, overwrites the original file
Returns:
- pd.DataFrame: Updated dataframe with checked statuses
"""
# Set default output file if none provided
if output_file is None:
output_file = task_file
# Load data
dataset = pd.read_csv(task_file)
# Create a copy for modifications
change_file = dataset.copy()
# Pre-convert forecast_date column to datetime for efficiency
change_file['forecast_date'] = pd.to_datetime(change_file['forecast_date'])
# Filter to only process unchecked entries
blank_entries = change_file['trend_identified'] == 0
tasks_to_check = change_file[blank_entries]
tasks_to_check['forecast_date'] = pd.to_datetime(tasks_to_check['forecast_date'])
# Process each task that needs checking
for idx, row in tasks_to_check.iterrows():
print(idx)
try:
task_id = str(row['task_id'])
target_datetime = row['forecast_date']
# Retrieve results from API
analysis_results = get_analysis_results(task_id)
# Convert to dataframe (assuming dict_to_dataframe function exists)
df1 = dict_to_dataframe(analysis_results)
# Extract last datetime and direction
last_datetime = pd.to_datetime(df1['datetime'].iloc[-1])
last_direction = df1['trend_identified'].iloc[-1]
# Update status based on datetime match
if last_datetime == target_datetime:
change_file.loc[idx, 'trend_identified'] = last_direction
else:
change_file.loc[idx, 'trend_identified'] = None
except Exception as e:
print(f"Error processing task_id={task_id}, datetime={target_datetime}: {str(e)}")
# Optional: mark errors in the file
change_file.loc[idx, 'trend_identified'] = "ERROR"
# Save updated data
change_file.to_csv(output_file, index=False)
return change_file
task_file = "sumtyme_task_ids.log"
output_file = "sumtyme_results.log"
update_task_results(task_file, output_file=output_file)
import pandas as pd
import requests
def check_task_status(task_id):
"""Check the status of a submitted task"""
api_endpoint = f"https://www.sumtyme.com/results/{task_id}"
# Make the API request
response = requests.get(api_endpoint)
# Check if the request was successful (status code 200)
if response.status_code == 200:
result_data = response.json()
# Check if processing is complete
if "status" in result_data and result_data["status"] == "complete":
return result_data
elif "status" in result_data and result_data["status"] == "processing":
print(f"{task_id}: Still processing")
return None
# If we get here, either the request failed or status wasn't complete
print(f"Failed to retrieve task status for {task_id}: {response.status_code}")
return None
def dict_to_dataframe(response_dict):
"""Transform response dictionary to dataframe more efficiently"""
# Extract the result dictionary
result_dict = response_dict['result']
# Create the DataFrame directly from the dictionary
df = pd.DataFrame([
{'datetime': timestamp, 'trend_identified': data['trend_identified']}
for timestamp, data in result_dict.items()
])
# Convert datetime strings to pandas datetime objects
df['datetime'] = pd.to_datetime(df['datetime'])
# Sort by datetime
return df.sort_values('datetime').reset_index(drop=True)
task_id = ""
response_dict = check_task_status(task_id)
if response_dict is not None:
final = dict_to_dataframe(response_dict)
final.to_csv("sumtyme_analysis.csv",index=False)
Example Responses
{
'status': 'processing',
'task_id': 'task_id_6ed2f11b761d4eb589b1a223efb802d5',
'tasks_ahead': 10
}
{
"status": "complete",
"result": {
"2025-04-07T13:43:00": {
"trend_identified": 1.0
},
"2025-04-07T13:55:00": {
"trend_identified": 1.0
},
"2025-04-07T14:06:00": {
"trend_identified": 1.0
},
"2025-04-07T14:09:00": {
"trend_identified": 1.0
},
"2025-04-07T14:13:00": {
"trend_identified": 1.0
},
"2025-04-07T14:14:00": {
"trend_identified": 1.0
},
"2025-04-07T14:15:00": {
"trend_identified": 1.0
},
"2025-04-07T14:19:00": {
"trend_identified": 1.0
},
"2025-04-07T14:25:00": {
"trend_identified": 1.0
},
"2025-04-07T14:30:00": {
"trend_identified": 1.0
},
"2025-04-07T14:34:00": {
"trend_identified": 1.0
}}
}
Performance Evaluation
Quantify how many minutes/hours/days before major market events the model provides reliable signals, demonstrating its ability to anticipate systemic shifts before they fully develop in the market.
Measure how quickly the model adjusts after sudden market shocks, calculated as the time between a shock event and when the model’s predictions catch up to the new reality.
Verify that predictions for neighbouring time periods (e.g. 1 hour, 2 hour, 3 hour) consistently build upon each other without logical contradictions or conflicting signals.
Key Concepts
The reasoning model operates as a unified system where predictions across multiple time horizons work together rather than independently. This integrated approach reveals how market movements propagate across timeframes, uncovering interconnected relationships that traditional single-timeframe approaches fundamentally cannot detect.
The reasoning model assumes that all price movements originate in high-frequency time series before cascading into lower-frequency time series. Therefore, the model analyses price action and only returns trends that it believes will translate into lower frequency timeframes.
The reasoning model begins its analysis with as little as 10 historical data points. Your results include both the forecast (if identified) and the trend context that shaped it, all in one view. This helps users understand if predictions occur at the beginning, middle, or end of broader trends, making forecasts easier to understand and increasing your confidence in decisions.
Each forecast should be interpreted within its trend context: the first signal in a consecutive trend marks the start of a trend, while consecutive forecasts in the same direction indicate the consistency of the current trend.
The reasoning model delivers immediate market insights without requiring market-specific training data. While predictive out of the box, it can be fine-tuned to optimise performance for specific time horizons through customised post-training. This flexibility allows users to incorporate multiple timeframes or any combination of traditional and alternative datasets of their choosing.
The sandbox processes API requests in a FIFO (first-in, first-out) queue, with each request taking approximately 5 seconds to complete. Wait times may extend during high-usage periods. For testing without queue delays, contact us at [email protected] to arrange a dedicated environment.