Back to Blog
Tutorial

Building Your First Prediction Market Trading Bot with Python

A complete Python tutorial for building a prediction market trading bot using the Propheseer API — from API client setup to arbitrage detection, alerts, and continuous deployment.

Propheseer Team
11 min read

Introduction

A prediction market trading bot monitors markets across platforms, identifies opportunities, and takes action — whether that's alerting you to a spread, logging data for analysis, or executing trades. In this tutorial, you'll build a complete Python bot from scratch using the Propheseer API.

By the end, you'll have a working bot that:

  • Authenticates with the Propheseer API using a reusable client class
  • Fetches and filters markets across Polymarket, Kalshi, and Gemini
  • Detects arbitrage opportunities and unusual market activity
  • Sends notifications when opportunities arise
  • Runs continuously with proper error handling and rate limit management

This tutorial assumes basic Python knowledge. No prior experience with prediction markets or trading is required — we'll explain the concepts as we go. If you're new to prediction markets, start with our complete guide.

Prerequisites and Setup

Requirements

  • Python 3.9+
  • A Propheseer API key (sign up free)
  • For alerting: a Slack webhook URL, Discord webhook, or email (optional)

Project Setup

Create a new directory and install dependencies:

mkdir prediction-bot && cd prediction-bot
python -m venv venv
source venv/bin/activate  # On Windows: venv\Scripts\activate
pip install requests python-dotenv

Create a .env file for your credentials:

PROPHESEER_API_KEY=pk_live_your_key_here
SLACK_WEBHOOK_URL=https://hooks.slack.com/services/...  # optional

Project Structure

prediction-bot/
├── .env
├── client.py          # Reusable API client
├── bot.py             # Main bot logic
├── alerts.py          # Notification system
└── requirements.txt

Building the API Client

First, let's build a reusable PropheseerClient class that handles authentication, pagination, rate limits, and error retries.

# client.py
import requests
import time
import os
from dotenv import load_dotenv

load_dotenv()

class PropheseerClient:
    """Reusable client for the Propheseer prediction market API."""

    def __init__(self, api_key=None, base_url="https://api.propheseer.com/v1"):
        self.api_key = api_key or os.getenv("PROPHESEER_API_KEY")
        if not self.api_key:
            raise ValueError("API key required. Set PROPHESEER_API_KEY env var or pass api_key.")
        self.base_url = base_url
        self.session = requests.Session()
        self.session.headers.update({
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json",
        })
        self.rate_limit_remaining = None
        self.rate_limit_minute_remaining = None

    def _request(self, method, endpoint, params=None, max_retries=3):
        """Make an API request with retry logic and rate limit tracking."""
        url = f"{self.base_url}{endpoint}"

        for attempt in range(max_retries):
            try:
                response = self.session.request(method, url, params=params)

                # Track rate limits from response headers
                self.rate_limit_remaining = int(
                    response.headers.get("X-RateLimit-Remaining-Day", -1)
                )
                self.rate_limit_minute_remaining = int(
                    response.headers.get("X-RateLimit-Remaining-Minute", -1)
                )

                # Handle rate limiting
                if response.status_code == 429:
                    retry_after = int(response.headers.get("Retry-After", 60))
                    print(f"Rate limited. Waiting {retry_after}s...")
                    time.sleep(retry_after)
                    continue

                response.raise_for_status()
                return response.json()

            except requests.exceptions.ConnectionError:
                if attempt < max_retries - 1:
                    wait = 2 ** attempt
                    print(f"Connection error. Retrying in {wait}s...")
                    time.sleep(wait)
                else:
                    raise

        return None

    def get_markets(self, **kwargs):
        """Fetch markets with optional filters.

        Args:
            q: Search query
            source: polymarket, kalshi, or gemini
            category: politics, crypto, economics, etc.
            status: open, closed, or resolved
            limit: Max results (default 50, max 200)
            offset: Pagination offset
        """
        return self._request("GET", "/markets", params=kwargs)

    def get_all_markets(self, **filters):
        """Fetch ALL markets using pagination."""
        all_markets = []
        offset = 0
        limit = 200

        while True:
            result = self.get_markets(limit=limit, offset=offset, **filters)
            if not result:
                break

            batch = result["data"]
            all_markets.extend(batch)

            if len(batch) < limit:
                break
            offset += limit

            # Respect rate limits
            if self.rate_limit_minute_remaining and self.rate_limit_minute_remaining < 3:
                print("Approaching minute rate limit, pausing...")
                time.sleep(10)

        return all_markets

    def get_arbitrage(self, min_spread=0.03, category=None):
        """Find arbitrage opportunities across platforms."""
        params = {"min_spread": min_spread}
        if category:
            params["category"] = category
        result = self._request("GET", "/arbitrage", params=params)
        return result["data"] if result else []

    def get_unusual_trades(self, min_score=None, source=None, limit=50):
        """Fetch unusual/whale trades."""
        params = {"limit": limit}
        if min_score:
            params["min_score"] = min_score
        if source:
            params["source"] = source
        result = self._request("GET", "/unusual-trades", params=params)
        return result["data"] if result else []

    def get_categories(self):
        """List available market categories."""
        result = self._request("GET", "/categories")
        return result["data"] if result else []

Testing the Client

# Quick test
from client import PropheseerClient

client = PropheseerClient()

# Fetch 5 markets
result = client.get_markets(limit=5, status="open")
for market in result["data"]:
    prob = market["outcomes"][0]["probability"]
    print(f"[{market['source']:>10}] {market['question'][:60]}... ({prob:.0%})")

Fetching and Filtering Markets

Now let's build market monitoring functions that go beyond basic fetching.

# bot.py
from client import PropheseerClient
from alerts import send_alert
import time

client = PropheseerClient()

def find_high_volume_markets(min_volume=500000):
    """Find markets with significant trading volume."""
    markets = client.get_all_markets(status="open")

    high_volume = [
        m for m in markets
        if m.get("volume", 0) and m["volume"] >= min_volume
    ]

    # Sort by volume descending
    high_volume.sort(key=lambda m: m.get("volume", 0), reverse=True)
    return high_volume

def find_close_markets(max_probability_distance=0.05):
    """Find markets near 50/50 — the most uncertain and potentially volatile."""
    markets = client.get_all_markets(status="open")

    close_markets = []
    for m in markets:
        yes_prob = m["outcomes"][0]["probability"]
        distance_from_50 = abs(yes_prob - 0.50)
        if distance_from_50 <= max_probability_distance:
            close_markets.append({
                "market": m,
                "yes_prob": yes_prob,
                "distance": distance_from_50,
            })

    close_markets.sort(key=lambda x: x["distance"])
    return close_markets

def find_markets_by_keyword(keywords):
    """Search for markets matching specific keywords."""
    results = {}
    for keyword in keywords:
        result = client.get_markets(q=keyword, status="open", limit=20)
        if result and result["data"]:
            results[keyword] = result["data"]
    return results

Arbitrage Detection Logic

The Propheseer /v1/arbitrage endpoint handles cross-platform matching for you, but let's also build custom logic for more control.

def monitor_arbitrage(min_spread=0.03):
    """Check for arbitrage opportunities using the built-in endpoint."""
    opportunities = client.get_arbitrage(min_spread=min_spread)

    actionable = []
    for opp in opportunities:
        # Only alert on opportunities with meaningful return
        spread = opp["spread"]
        if spread >= 0.05:  # 5%+ spread
            actionable.append({
                "question": opp["question"],
                "spread": spread,
                "return": opp["potentialReturn"],
                "buy_platform": opp["markets"][0]["source"],
                "buy_price": opp["markets"][0]["yesPrice"],
                "sell_platform": opp["markets"][1]["source"],
                "sell_price": opp["markets"][1]["yesPrice"],
            })

    return actionable

def monitor_unusual_activity(min_score=75):
    """Monitor for unusual/whale trades that might signal opportunities."""
    trades = client.get_unusual_trades(min_score=min_score, limit=20)

    significant = []
    for trade in trades:
        usdc_value = trade["trade"]["usdc_value"]
        if usdc_value >= 10000:  # $10K+ trades
            significant.append({
                "market": trade["market"]["question"],
                "source": trade["market"]["source"],
                "side": trade["trade"]["side"],
                "size": usdc_value,
                "score": trade["detection"]["anomaly_score"],
                "reason": trade["detection"]["reason"],
            })

    return significant

For a deeper dive into how arbitrage detection works and how to build a more sophisticated scanner, see our dedicated arbitrage guide.

Alert Notifications

Set up alerts so your bot notifies you when it finds something interesting.

# alerts.py
import requests
import os
import json
from datetime import datetime

SLACK_WEBHOOK = os.getenv("SLACK_WEBHOOK_URL")

def send_alert(title, details, level="info"):
    """Send an alert via available channels."""
    timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    message = f"[{timestamp}] [{level.upper()}] {title}\n{details}"

    # Always log to console
    print(message)

    # Send to Slack if configured
    if SLACK_WEBHOOK:
        send_slack_alert(title, details, level)

def send_slack_alert(title, details, level="info"):
    """Send alert to Slack via webhook."""
    color_map = {"info": "#36a64f", "warning": "#ff9500", "alert": "#ff0000"}

    payload = {
        "attachments": [{
            "color": color_map.get(level, "#36a64f"),
            "title": title,
            "text": details,
            "footer": "Propheseer Trading Bot",
            "ts": int(datetime.now().timestamp()),
        }]
    }

    try:
        requests.post(SLACK_WEBHOOK, json=payload, timeout=10)
    except requests.exceptions.RequestException as e:
        print(f"Failed to send Slack alert: {e}")

def format_arbitrage_alert(opportunity):
    """Format an arbitrage opportunity as a readable alert."""
    return (
        f"Question: {opportunity['question']}\n"
        f"Spread: {opportunity['spread']:.1%}\n"
        f"Return: {opportunity['return']}\n"
        f"Buy on {opportunity['buy_platform']} @ {opportunity['buy_price']:.1%}\n"
        f"Sell on {opportunity['sell_platform']} @ {opportunity['sell_price']:.1%}"
    )

def format_whale_alert(trade):
    """Format an unusual trade as a readable alert."""
    return (
        f"Market: {trade['market']}\n"
        f"Platform: {trade['source']}\n"
        f"Side: {trade['side']} | Size: ${trade['size']:,.0f}\n"
        f"Anomaly Score: {trade['score']:.0f}/100\n"
        f"Reason: {trade['reason']}"
    )

Running Continuously

Now let's tie everything together into a bot that runs on a schedule.

# bot.py (continued — main loop)
import time
from datetime import datetime

# Configuration
SCAN_INTERVAL = 120       # seconds between scans
MIN_ARBITRAGE_SPREAD = 0.05  # 5% minimum
MIN_WHALE_SCORE = 75      # anomaly score threshold
MIN_WHALE_SIZE = 10000    # $10K minimum trade size

def run_scan():
    """Run a single scan cycle."""
    timestamp = datetime.now().strftime("%H:%M:%S")
    print(f"\n[{timestamp}] Running scan...")

    # 1. Check for arbitrage
    arb_opportunities = monitor_arbitrage(min_spread=MIN_ARBITRAGE_SPREAD)
    if arb_opportunities:
        for opp in arb_opportunities:
            send_alert(
                f"Arbitrage: {opp['spread']:.1%} spread",
                format_arbitrage_alert(opp),
                level="alert"
            )
    else:
        print(f"  No arbitrage opportunities above {MIN_ARBITRAGE_SPREAD:.0%}")

    # 2. Check for whale trades
    whale_trades = monitor_unusual_activity(min_score=MIN_WHALE_SCORE)
    if whale_trades:
        for trade in whale_trades:
            send_alert(
                f"Whale Trade: ${trade['size']:,.0f} on {trade['source']}",
                format_whale_alert(trade),
                level="warning"
            )
    else:
        print(f"  No unusual trades above score {MIN_WHALE_SCORE}")

    # 3. Log rate limit status
    remaining = client.rate_limit_remaining
    if remaining is not None:
        print(f"  Rate limit remaining: {remaining} requests today")
        if remaining < 100:
            print("  WARNING: Running low on daily requests!")

    return {
        "arbitrage": len(arb_opportunities),
        "whales": len(whale_trades),
    }

def main():
    """Main bot loop with error handling."""
    print("=" * 50)
    print("Prediction Market Trading Bot")
    print(f"Scan interval: {SCAN_INTERVAL}s")
    print(f"Min arbitrage spread: {MIN_ARBITRAGE_SPREAD:.0%}")
    print(f"Min whale score: {MIN_WHALE_SCORE}")
    print("=" * 50)

    # Verify API connection
    try:
        result = client.get_markets(limit=1)
        total = result["meta"]["total"]
        print(f"Connected. {total} markets available.\n")
    except Exception as e:
        print(f"Failed to connect: {e}")
        return

    scan_count = 0
    while True:
        try:
            scan_count += 1
            print(f"--- Scan #{scan_count} ---")
            results = run_scan()

        except KeyboardInterrupt:
            print("\nBot stopped by user.")
            break
        except Exception as e:
            print(f"Error during scan: {e}")
            # Don't exit on transient errors
            time.sleep(30)
            continue

        time.sleep(SCAN_INTERVAL)

if __name__ == "__main__":
    main()

Run the bot:

python bot.py

You'll see output like:

==================================================
Prediction Market Trading Bot
Scan interval: 120s
Min arbitrage spread: 5%
Min whale score: 75
==================================================
Connected. 1,250 markets available.

--- Scan #1 ---
[14:30:05] Running scan...
  No arbitrage opportunities above 5%
  No unusual trades above score 75
  Rate limit remaining: 9,985 requests today

Deployment Tips

Running on a Server

For 24/7 operation, deploy your bot to a cloud server. A small VPS ($5-10/month) is more than sufficient:

# Using screen for persistent sessions
screen -S trading-bot
python bot.py
# Ctrl+A, D to detach

# Reattach later
screen -r trading-bot

Using systemd (Linux)

For production deployments, create a systemd service:

# /etc/systemd/system/prediction-bot.service
[Unit]
Description=Prediction Market Trading Bot
After=network.target

[Service]
Type=simple
User=ubuntu
WorkingDirectory=/home/ubuntu/prediction-bot
ExecStart=/home/ubuntu/prediction-bot/venv/bin/python bot.py
Restart=always
RestartSec=30
EnvironmentFile=/home/ubuntu/prediction-bot/.env

[Install]
WantedBy=multi-user.target
sudo systemctl enable prediction-bot
sudo systemctl start prediction-bot
sudo journalctl -u prediction-bot -f  # View logs

Rate Limit Considerations

PlanDaily LimitMax Scan Frequency
Free100/day~Every 15 minutes (96 scans × 1 req)
Pro10,000/day~Every 30 seconds
Business100,000/day~Every 3 seconds

For continuous monitoring, the Pro plan ($19.99/month) gives you comfortable headroom at 10,000 requests per day. Compare plans.

Logging and Monitoring

Add file logging for production:

import logging

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
    handlers=[
        logging.FileHandler("bot.log"),
        logging.StreamHandler(),
    ]
)

logger = logging.getLogger(__name__)
# Replace print() calls with logger.info(), logger.warning(), etc.

Next Steps

You now have a working trading bot. Here's where to take it from here:

  • Add more strategies — Monitor specific categories, track probability momentum, or watch for resolution-date arbitrage
  • Build a dashboard — Follow our React dashboard tutorial to visualize your bot's findings
  • Go real-time — Replace polling with WebSocket connections for instant updates
  • Add backtesting — Use historical data to validate your strategies before going live
  • Read the FAQ — Common questions answered in our prediction market API FAQ

For the fundamentals behind arbitrage detection, see our complete arbitrage guide. And if you haven't already, make sure to get your API key set up first.


Ready to automate your market monitoring? Get your API key and start building. The complete source code in this tutorial is ready to copy, customize, and deploy.

Share this post