Skip to main content
This model identifies directional trends by analysing price action within the selected timeframe in isolation, responding quickly to timeframe specific movements and generating frequent trend signals without broader context.
Due to this model’s reliance on isolated timeframe analysis, analyse multiple time horizons simultaneously for a holistic understanding of market behaviour.

Time Series Data

import pandas as pd

df = pd.read_csv("filename.csv")[['datetime','open','high','low','close']]

print(df)

import requests
import pandas as pd
import io

def download_github_data(url):

    # Convert GitHub repo URL to raw content URL
    if "github.com" in url and "/blob/" not in url and "/tree/" in url:
    
        # For /tree/ links, convert to raw
        url = url.replace("github.com", "raw.githubusercontent.com")
        url = url.replace("/tree/", "/")
        
    elif "github.com" in url and "/blob/" in url:
    
        # For /blob/ links, convert to raw
        url = url.replace("github.com", "raw.githubusercontent.com")
        url = url.replace("/blob/", "/")
        
    print(f"Downloading from: {url}")
    
    response = requests.get(url)
    
    if response.status_code == 200:
    
        return pd.read_csv(io.StringIO(response.text))
    
    else:
    
        raise Exception(f"Failed to download data: {response.status_code}")

# Example usage
github_url = "https://github.com/sumtyme-ade/sample_data/tree/main/sample/spy_etf_1min.csv"
df = download_github_data(github_url)
print(f"Downloaded {len(df)} rows of data")

Data Preparation

# Import required libraries
import pandas as pd
import json

def time_series_dict(df):
    """
    Converts a DataFrame with OHLC price data into a dictionary format.
    
    Parameters:
    df (DataFrame): DataFrame containing datetime and OHLC price columns
    
    Returns:
    dict: Dictionary with lists of datetime and price data along with metadata
    """
    return {
        "datetime": df['datetime'].tolist(),  # Convert datetime column to list
        "open": df['open'].tolist(),          # Convert open prices to list
        "high": df['high'].tolist(),          # Convert high prices to list
        "low": df['low'].tolist(),            # Convert low prices to list
        "close": df['close'].tolist()         # Convert close prices to list
    }

# Read the CSV file and select only the required columns
df = pd.read_csv("filename.csv")[['datetime', 'open', 'high', 'low', 'close']]

# Create the JSON-compatible dictionary from the DataFrame and parameters
json_payload = time_series_dict(df)

# Import required libraries
import pandas as pd
import json

def time_series_dict(df, interval, interval_unit, reasoning_mode):
    """
    Converts a DataFrame with OHLC price data into a dictionary format.
    
    Parameters:
    df (DataFrame): DataFrame containing datetime and OHLC price columns
    
    Returns:
    dict: Dictionary with lists of datetime and price data
    """
    return {
        "datetime": df['datetime'].tolist(),  # Convert datetime column to list
        "open": df['open'].tolist(),          # Convert open prices to list
        "high": df['high'].tolist(),          # Convert high prices to list
        "low": df['low'].tolist(),            # Convert low prices to list
        "close": df['close'].tolist()         # Convert close prices to list
    }

# Read the CSV file and select only the required columns
df = pd.read_csv("filename.csv")[['datetime', 'open', 'high', 'low', 'close']]

# Split data for efficient submission
chunk_size = 5000
df_list = [df.iloc[i:i+chunk_size] for i in range(0, len(df), chunk_size)]

# Create API-ready payloads for each data chunk
json_payloads = [time_series_dict(chunk) for chunk in df_list]

Full API Script

  import requests
  import pandas as pd
  import io
  import os
  from datetime import datetime
  
  def download_github_data(url):
      # Convert GitHub repo URL to raw content URL
      if "github.com" in url and "/blob/" not in url and "/tree/" in url:
          # For /tree/ links, convert to raw
          url = url.replace("github.com", "raw.githubusercontent.com")
          url = url.replace("/tree/", "/")
      elif "github.com" in url and "/blob/" in url:
          # For /blob/ links, convert to raw
          url = url.replace("github.com", "raw.githubusercontent.com")
          url = url.replace("/blob/", "/")
          
      print(f"Downloading from: {url}")
      
      response = requests.get(url)
      if response.status_code == 200:
          return pd.read_csv(io.StringIO(response.text))
      else:
          raise Exception(f"Failed to download data: {response.status_code}")
  
  def time_series_dict(df):
      """
      Converts a DataFrame with OHLC price data into a dictionary format.
      
      Parameters:
      df (DataFrame): DataFrame containing datetime and OHLC price columns

      
      Returns:
      dict: Dictionary with lists of datetime and price data
      """
      return {
          "datetime": df['datetime'].tolist(),  # Convert datetime column to list
          "open": df['open'].tolist(),          # Convert open prices to list
          "high": df['high'].tolist(),          # Convert high prices to list
          "low": df['low'].tolist(),            # Convert low prices to list
          "close": df['close'].tolist()         # Convert close prices to list

      }
  
  def log_forecast_check(task_id, forecast_date, log_file="sumtyme_task_ids.log"):
      """
      Logs a forecast check to a CSV file.
      
      Parameters:
      - task_id (str): The ID of the task 
      - forecast_date (str): The date of the forecast
      - log_file (str): Path to the log file, defaults to "sumtyme_task_ids.log"
      """
      current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
      
      # Create log file with header if it doesn't exist
      if not os.path.exists(log_file):
          with open(log_file, 'w') as f:
              f.write("timestamp,forecast_date,task_id,checked\n")
  
      # Write to log file
      with open(log_file, 'a') as f:
          f.write(f"{current_time},{forecast_date},{task_id},0\n")
      
      return True
  
  def time_series_dict(df):
      """
      Converts a DataFrame with OHLC price data into a dictionary format.
      
      Parameters:
      df (DataFrame): DataFrame containing datetime and OHLC price columns

      
      Returns:
      dict: Dictionary with lists of datetime and price data 
      """
      return {
          "datetime": df['datetime'].tolist(),  # Convert datetime column to list
          "open": df['open'].tolist(),          # Convert open prices to list
          "high": df['high'].tolist(),          # Convert high prices to list
          "low": df['low'].tolist(),            # Convert low prices to list
          "close": df['close'].tolist()        # Convert close prices to list

      }
  
  def post_request(df):
  
    # Prepare request
    api_endpoint = f"https://www.sumtyme.com/shared/v1/base-model-ts"
    headers = {"Content-Type": "application/json"}
    
    # Create payload from data
    payload = time_series_dict(df,interval,interval_unit,reasoning_mode)
    
    # Send POST request to API
    response = requests.post(api_endpoint, json=payload, headers=headers)
  
    if response.status_code != 200:
      print(f"API request failed for {forecast_date} with status code {response.status_code}: {response.text}")
    else:
      return response.json()

  
  # Example usage
  github_url = "https://github.com/sumtyme-ade/sample_data/tree/main/sample/spy_etf_1min.csv"
  df = download_github_data(github_url).tail(5000)
  
  # Get the last datetime as forecast date
  forecast_date = df['datetime'].iloc[-1]
  
  # Send request to API
  response = post_request(df)
  
  # Process response if successful
  if response:
      print(response)
      task_id = response.get("task_id")
  
      log_forecast_check(task_id,forecast_date)
    
  import requests
  import pandas as pd
  import io
  import os
  from datetime import datetime
  
  def download_github_data(url):
      # Convert GitHub repo URL to raw content URL
      if "github.com" in url and "/blob/" not in url and "/tree/" in url:
          # For /tree/ links, convert to raw
          url = url.replace("github.com", "raw.githubusercontent.com")
          url = url.replace("/tree/", "/")
      elif "github.com" in url and "/blob/" in url:
          # For /blob/ links, convert to raw
          url = url.replace("github.com", "raw.githubusercontent.com")
          url = url.replace("/blob/", "/")
          
      print(f"Downloading from: {url}")
      
      response = requests.get(url)
      if response.status_code == 200:
          return pd.read_csv(io.StringIO(response.text))
      else:
          raise Exception(f"Failed to download data: {response.status_code}")
  
  def time_series_dict(df):
      """
      Converts a DataFrame with OHLC price data into a dictionary format.
      
      Parameters:
      df (DataFrame): DataFrame containing datetime and OHLC price columns
      
      Returns:
      dict: Dictionary with lists of datetime and price data
      """
      return {
          "datetime": df['datetime'].tolist(),  # Convert datetime column to list
          "open": df['open'].tolist(),          # Convert open prices to list
          "high": df['high'].tolist(),          # Convert high prices to list
          "low": df['low'].tolist(),            # Convert low prices to list
          "close": df['close'].tolist()         # Convert close prices to list

      }
  
  def create_moving_window(data, window_size, step_size=1):
     """
     Creates a moving window for OHLC (Open, High, Low, Close) financial data.
     Optimized for speed using NumPy arrays.
     
     This function converts financial time series data into overlapping windows
     of fixed size, useful for rolling analysis, feature engineering, or 
     building datasets for machine learning models.
     
     Parameters:
     - data (pd.DataFrame or list): Input OHLC financial data
     - window_size (int): Number of time periods in each window
     - step_size (int, optional): Number of periods to advance for each new window, defaults to 1
     
     Returns:
     - list: List of DataFrame windows, each containing window_size rows of OHLC data
     
     Raises:
     - ValueError: If input data format is invalid, window_size is out of range,
                   step_size is not positive, or required columns are missing
     """
     
     # Validate input is a proper non-empty DataFrame
     if not isinstance(data, pd.DataFrame) or data.empty:
         raise ValueError("Data must be a non-empty DataFrame or list of dicts.")
     
     # Ensure window size is valid (positive and not larger than data)
     if window_size <= 0 or window_size > len(data):
         raise ValueError("Window size must be between 1 and length of data.")
     
     # Ensure step size is positive
     if step_size <= 0:
         raise ValueError("Step size must be greater than 0.")
     
     # Check that all required OHLC columns exist
     required_columns = ['datetime', 'open', 'high', 'low', 'close']
     if not all(col in data.columns for col in required_columns):
         missing = [col for col in required_columns if col not in data.columns]
         raise ValueError(f"Missing required columns: {missing}")
     
     # Filter to only keep essential columns for efficiency
     data = data[required_columns]
     
     # Convert to NumPy array for faster operations
     values = data.values
     
     # Calculate how many windows will be generated
     n_samples = (len(values) - window_size) // step_size + 1
     if n_samples <= 0:
         return []  # Return empty list if no complete windows can be formed
     
     # Create windows using list comprehension with fast NumPy slicing
     # Each window is a slice of the original data converted to DataFrame
     windows = [
         pd.DataFrame(values[i:i + window_size], columns=required_columns)
         for i in range(0, len(values) - window_size + 1, step_size)
     ]
     
     return windows
  
  def post_request(df, log_file="sumtyme_task_ids.log"):
      """
      Sends time series data to API using a walk-forward approach with 5000-row chunks.
      
      Parameters:
      - df (DataFrame): Time series data with datetime and OHLC columns
      
      Returns:
      - list: Collection of task IDs from successful API calls
      """
      
      # Check if file exists, if not create it with headers
      if not os.path.exists(log_file):
          with open(log_file, 'w') as f:
              f.write("timestamp,forecast_date,task_id,trend_identified\n")
  
      # Define base API configuration
      BASE_URL = "https://www.sumtyme.com/shared"
      api_endpoint = f"{BASE_URL}/v1/base-model-ts"
      headers = {"Content-Type": "application/json"}
  
      # Split data into 5000 row chunks
      df_chunks = create_moving_window(df, window_size=5000)
      
      task_ids = []  # Collect task IDs for return
  
      # Process each chunk with API calls
      for chunk in df_chunks:
  
          # Date to forecast for (last timestamp in chunk)
          forecast_date = chunk['datetime'].iloc[-1]
         
          # Convert datetime to proper format if needed
          chunk['datetime'] = pd.to_datetime(chunk['datetime']).dt.strftime('%Y-%m-%d %H:%M:%S')
  
          # Create payload from data
          payload = time_series_dict(chunk, interval, interval_unit, reasoning_mode)
  
          try:
              # Send POST request to API
              response = requests.post(api_endpoint, json=payload, headers=headers)
  
              # Check for successful request
              if response.status_code != 200:
                  print(f"API request failed for {forecast_date} with status code {response.status_code}: {response.text}")
                  continue
  
              # Parse response
              result = response.json()
              task_id = result.get("task_id")
              
              if task_id:
                  task_ids.append(task_id)
        
                  # Get current timestamp for logging
                  current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
                  
                  # Write to log file
                  with open(log_file, 'a') as f:
                      f.write(f"{current_time},{forecast_date},{task_id},0\n")
                      
                  print(f"Successfully processed forecast date {forecast_date}, task_id: {task_id}")
              else:
                  print(f"Warning: No task ID returned for forecast date {forecast_date}")
             
          except Exception as e:
              # Log errors too
              current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
              with open(log_file, 'a') as f:
                  f.write(f"{current_time},{forecast_date},ERROR: {str(e)},0\n")
              print(f"Error processing forecast_date={forecast_date}: {str(e)}")
      
      return task_ids
  
  # Example usage
  github_url = "https://github.com/sumtyme-ade/sample_data/tree/main/sample/spy_etf_1min.csv"
  df = download_github_data(github_url).tail(5003)
  
  # Send request to API
  task_ids = post_request(df)
    

Retrieve Analysis



import pandas as pd
import requests
import json

def get_analysis_results(task_id, max_attempts=10, delay=30):
    """
    Poll the results endpoint until analysis is complete or max attempts reached
    
    Parameters:
    task_id (str): Task ID from initial request
    max_attempts (int): Maximum number of polling attempts
    delay (int): Seconds to wait between polling attempts
    
    Returns:
    dict: Analysis results or error message
    """
    results_endpoint = f"https://www.sumtyme.com/results/{task_id}"
    
    for attempt in range(max_attempts):
        print(f"Checking results (attempt {attempt+1}/{max_attempts})...")
        
        response = requests.get(results_endpoint)
        
        if response.status_code != 200:
            print(f"Error checking results: {response.status_code}, {response.text}")
            return None
            
        result_data = response.json()
        
        # Check if processing is complete
        if "status" in result_data and result_data["status"] == "processing":
            print(f"Still processing. Tasks ahead: {result_data.get('tasks_ahead', 'unknown')}")
            time.sleep(delay)
            continue
            
        # If we get here, we have results
        return result_data
        
    print("Maximum polling attempts reached. Try checking results manually later.")
    return None

# Function to transform response dict to dataframe
def dict_to_dataframe(response_dict):
    # Extract the result dictionary
    result_dict = response_dict['result']
    
    # Create lists for datetime and trend_identified values
    datetimes = []
    trends = []
    
    # Extract values from the nested dictionary
    for timestamp, data in result_dict.items():
        datetimes.append(timestamp)
        trends.append(data['trend_identified'])
    
    # Create the DataFrame
    df = pd.DataFrame({
        'datetime': datetimes,
        'trend_identified': trends
    })
    
    # Convert datetime strings to pandas datetime objects
    df['datetime'] = pd.to_datetime(df['datetime'])
    
    # Sort by datetime
    df = df.sort_values('datetime')
    
    return df

def update_task_results(task_file, output_file=None):
    """
    Update task results by checking task_ids against API results.
    
    This function reads a CSV file containing task IDs, queries an API for 
    results for each task, and updates a status column based on the results.
    
    Parameters:
    - task_file (str): Path to CSV file containing task IDs and forecast dates
    - output_file (str, optional): Path to save updated results. If None, overwrites the original file
    
    Returns:
    - pd.DataFrame: Updated dataframe with checked statuses
    """
    # Set default output file if none provided
    if output_file is None:
        output_file = task_file
    
    # Load data
    dataset = pd.read_csv(task_file)
    
    # Create a copy for modifications
    change_file = dataset.copy()
    
    # Pre-convert forecast_date column to datetime for efficiency
    change_file['forecast_date'] = pd.to_datetime(change_file['forecast_date'])
    
    # Filter to only process unchecked entries
    blank_entries = change_file['trend_identified'] == 0
    tasks_to_check = change_file[blank_entries]
    tasks_to_check['forecast_date'] = pd.to_datetime(tasks_to_check['forecast_date'])
    
    # Process each task that needs checking
    for idx, row in tasks_to_check.iterrows():
        print(idx)
        try:
            task_id = str(row['task_id'])
            target_datetime = row['forecast_date']
            
            # Retrieve results from API
            analysis_results = get_analysis_results(task_id)
            
            # Convert to dataframe (assuming dict_to_dataframe function exists)
            df1 = dict_to_dataframe(analysis_results)
            
            # Extract last datetime and direction
            last_datetime = pd.to_datetime(df1['datetime'].iloc[-1])
            last_direction = df1['trend_identified'].iloc[-1]
            
            # Update status based on datetime match
            if last_datetime == target_datetime:
                change_file.loc[idx, 'trend_identified'] = last_direction
            else:
                change_file.loc[idx, 'trend_identified'] = None
                
        except Exception as e:
            print(f"Error processing task_id={task_id}, datetime={target_datetime}: {str(e)}")
            # Optional: mark errors in the file
            change_file.loc[idx, 'trend_identified'] = "ERROR"
    
    # Save updated data
    change_file.to_csv(output_file, index=False)
    
    return change_file



task_file = "sumtyme_task_ids.log"

output_file = "sumtyme_results.log"

update_task_results(task_file, output_file=output_file)
  
    


import pandas as pd
import requests

def check_task_status(task_id):
  """Check the status of a submitted task"""
  api_endpoint = f"https://www.sumtyme.com/results/{task_id}"
  
  # Make the API request
  response = requests.get(api_endpoint)
  
  # Check if the request was successful (status code 200)
  if response.status_code == 200:
      result_data = response.json()
      
      # Check if processing is complete
      if "status" in result_data and result_data["status"] == "complete":
          return result_data

      elif "status" in result_data and result_data["status"] == "processing":
           print(f"{task_id}: Still processing") 
           return None
  
  # If we get here, either the request failed or status wasn't complete
  print(f"Failed to retrieve task status for {task_id}: {response.status_code}")
  return None

def dict_to_dataframe(response_dict):
  """Transform response dictionary to dataframe more efficiently"""
  # Extract the result dictionary
  result_dict = response_dict['result']
  
  # Create the DataFrame directly from the dictionary
  df = pd.DataFrame([
      {'datetime': timestamp, 'trend_identified': data['trend_identified']}
      for timestamp, data in result_dict.items()
  ])
  
  # Convert datetime strings to pandas datetime objects
  df['datetime'] = pd.to_datetime(df['datetime'])
  
  # Sort by datetime
  return df.sort_values('datetime').reset_index(drop=True)    

task_id = ""

response_dict = check_task_status(task_id)

if response_dict is not None:

  final = dict_to_dataframe(response_dict)

  final.to_csv("sumtyme_analysis.csv",index=False)

    

Example Responses

  {
   'status': 'processing',
   'task_id': 'task_id_6ed2f11b761d4eb589b1a223efb802d5',
   'tasks_ahead': 10
   }
    
  {
    "status": "complete",
    "result": {
    "2025-04-07T13:43:00": {
      "trend_identified": 1.0
    },
    "2025-04-07T13:55:00": {
      "trend_identified": 1.0
    },
    "2025-04-07T14:06:00": {
      "trend_identified": 1.0
    },
    "2025-04-07T14:09:00": {
      "trend_identified": 1.0
    },
    "2025-04-07T14:13:00": {
      "trend_identified": 1.0
    },
    "2025-04-07T14:14:00": {
      "trend_identified": 1.0
    },
    "2025-04-07T14:15:00": {
      "trend_identified": 1.0
    },
    "2025-04-07T14:19:00": {
      "trend_identified": 1.0
    },
    "2025-04-07T14:25:00": {
      "trend_identified": 1.0
    },
    "2025-04-07T14:30:00": {
      "trend_identified": 1.0
    },
    "2025-04-07T14:34:00": {
      "trend_identified": 1.0
    }}
  }
    

Performance Evaluation

Quantify how many minutes/hours/days before major market events the model provides reliable signals, demonstrating its ability to anticipate systemic shifts before they fully develop in the market.
Measure how quickly the model adjusts after sudden market shocks, calculated as the time between a shock event and when the model’s predictions catch up to the new reality.
Verify that predictions for neighbouring time periods (e.g. 1 hour, 2 hour, 3 hour) consistently build upon each other without logical contradictions or conflicting signals.

Key Concepts

The base model operates as a unified system where predictions across multiple time horizons work together rather than independently. This integrated approach reveals how market movements propagate across timeframes, uncovering interconnected relationships that traditional single-timeframe approaches fundamentally cannot detect.
The base model assumes that all price movements originate in high-frequency time series before cascading into lower-frequency time series. Therefore, the model analyses the selected timeframe in isolation with the assumption that it will be combined with other time horizons to capture these interconnected effects.
The base model begins its analysis with as little as 10 historical data points. Your results include both the forecast (if identified) and the trend context that shaped it, all in one view. This helps users understand if predictions occur at the beginning, middle, or end of broader trends, making forecasts easier to understand and increasing your confidence in decisions.
Each forecast should be interpreted within its trend context: the first signal in a consecutive trend marks the start of a trend, while consecutive forecasts in the same direction indicate the consistency of the current trend.
The base model delivers immediate market insights without requiring market-specific training data. While predictive out of the box, it can be fine-tuned to optimise performance for specific time horizons through customised post-training. This flexibility allows users to incorporate multiple timeframes or any combination of traditional and alternative datasets of their choosing.
The sandbox processes API requests in a FIFO (first-in, first-out) queue, with each request taking approximately 5 seconds to complete. Wait times may extend during high-usage periods. For testing without queue delays, contact us at [email protected] to arrange a dedicated environment.