【開発記1】24時間全自動のAI経済ニュース局を支える「司令塔(main_station.py)」の仕組み

はじめまして。M4 MacBook AirとローカルLLM(Qwen 2.5)を使い、人間の代わりに24時間相場を監視して動画を生成・配信し続けるAI経済ニュース局「NFC Market Live」を個人開発しています。 (※プロジェクトの全体像や、なぜ作ろうと思ったのかについては、以下のページをご覧ください)

今回は、無数にある番組制作スクリプトを24時間ノンストップで回し続け、OBS(配信ソフト)に動画を送り続けるシステムの心臓部、**「司令塔(main_station.py)」**のコードと設計思想を公開します。

目次

アーキテクチャの思想:なぜ「疎結合」にしたのか

このシステムを作るにあたり、最初は「データ取得」「原稿作成」「動画化」「YouTube配信」をすべて1つのPythonファイルに書こうとして、見事に地獄を見ました。エラーが起きるたびにシステム全体がストップし、どこで止まったのか分からなくなるからです。

そこで、システム全体を「完全に独立した部署(疎結合)」に分けることにしました。

  1. 編成局(Scheduler): 何時にどの番組をやるかだけを決める。
  2. 制作部(Producers): 各指標(CPIや日銀オペなど)のデータ取得と原稿作成だけを行う。
  3. 編集室(Composer): 渡された素材を動画(MP4)にするだけ。
  4. 司令塔(Main Station): 上記の部署に指示を出し、完成した動画をOBSに投げる。

今回は、この全体を統括する「4. 司令塔」のコードを見ていきます。

司令塔(main_station.py)のコアコード公開

このコードの最大のミッションは、「APIが落ちても、LLMが変な回答をしても、絶対に放送を止めないこと」です。そのため、マルチスレッド(並行処理)と厳重なエラーハンドリングを実装しています。

以下が、その心臓部となるPythonコードです。

import sys
import os
import time
import datetime
import traceback
import threading
import queue
import importlib
from collections import deque
from concurrent.futures import ThreadPoolExecutor

from obswebsocket import obsws, requests as obsreq 

# ========================================================
# ⚙️ OBS設定 (セキュリティのため一部伏せ字)
# ========================================================
OBS_HOST = "127.0.0.1"       
OBS_PORT = 4455              
OBS_PASSWORD = "[YOUR_OBS_PASSWORD]" 
OBS_SOURCE_NAME = "Main_News_Loop" 

# ========================================================
# 📱 SNS投稿設定 (ホワイトリスト)
# ========================================================
SNS_WHITELIST = [
    "boj_ops",            # 日銀オペ
    "us_jobs_bls_flash",  # 米雇用統計
    "us_cpi_flash",       # 米CPI
    # ...他多数...
]

# --- プロデューサーIDとファイル名の対応表 ---
PRODUCER_MAP = {
    "boj_ops": "producers.prod_boj_ops",
    "cpi": "producers.prod_estat_cpi",
    "us_jobs_flash": "producers.prod_us_jobs_flash",
    # ...他多数...
}

ready_queue = queue.Queue(maxsize=10) # 完成した動画を入れるキュー

# ==========================================
#  🏭 制作マネージャー (マルチスレッド管理)
# ==========================================
def production_manager():
    print("👷 [Manager] 制作マネージャー起動 (Multi-thread)")
    
    # 最大3つのプロデューサーを同時に走らせる
    executor = ThreadPoolExecutor(max_workers=3)
    submission_history = {} 

    while True:
        try:
            # 1. 編成局から指示を取得
            producer_id, label, is_priority = nfc_scheduler.get_program_instruction()
            
            # 2. 連続実行を防ぐ重複チェック
            now = time.time()
            last_time = submission_history.get(producer_id, 0)
            cooldown = 120 if is_priority else 600
            
            if now - last_time < cooldown:
                time.sleep(1)
                continue

            # 3. スレッドプールへタスクを投入(ここで待機せず次のループへ)
            print(f"📨 [Manager] タスク投入 -> Pool: {label} (Priority={is_priority})")
            submission_history[producer_id] = now
            
            executor.submit(execute_producer_task, producer_id, label, is_priority)
            time.sleep(2)

        except Exception as e:
            print(f"🔥 [Manager] 重大エラー: {e}")
            time.sleep(10)

# ==========================================
#  📡 放送スレッド(ループ&速報割り込み対応)
# ==========================================
def run_station_loop():
    print("📺 NFC MAIN STATION - 24H Auto Pilot")

    # 制作マネージャーを別スレッドで起動
    worker = threading.Thread(target=production_manager, daemon=True)
    worker.start()
    
    # 過去動画の履歴(最新4件を保持してループ再生させる)
    play_history = deque(maxlen=4)
    history_index = 0
    
    while True:
        try:
            target_video_item = None
            is_breaking_news = False

            try:
                # 新しい動画(速報など)が完成しているかチェック
                target_video_item = ready_queue.get_nowait()
                is_breaking_news = True
                play_history.append(target_video_item)
                print(f"\n🚀 [New] 速報を受信しました!: {target_video_item['label']}")
                
            except queue.Empty:
                # 新しい動画がない場合は、履歴から順番に再生(ラウンドロビン)
                if len(play_history) > 0:
                    target_video_item = play_history[history_index]
                    target_video_item['is_fresh'] = False
                    history_index = (history_index + 1) % len(play_history)
                else:
                    time.sleep(2)
                    continue

            # OBSへ動画パスを送信して再生開始
            video_path = target_video_item["path"]
            update_obs_source(video_path)
            
            # --- 再生時間の管理(速報の割り込み対応) ---
            duration = get_video_duration(video_path)
            elapsed = 0
            interrupted = False
            
            while elapsed < duration:
                # ループ再生中に「新しい動画」が完成したら、現在の再生を中断して速報へ切り替え
                if not is_breaking_news and not ready_queue.empty():
                    print("⚡ [Interrupt] 新しい動画を検知!現在のループを中断します。")
                    interrupted = True
                    break 
                time.sleep(1)
                elapsed += 1
            
            if interrupted:
                continue

        except Exception as e:
            print(f"\n🔥 [Station] システムエラー: {e}")
            time.sleep(5)

if __name__ == "__main__":
    run_station_loop()

コードの解説:こだわりのポイント

このコードには、24時間運用に耐えるための2つの大きな工夫を組み込んでいます。

1. 「ThreadPoolExecutor」による非同期制作

経済指標の発表が重なる時間帯(例えば、米雇用統計と日本の何かが被った時など)は、1つのAIに直列で作業させると速報性が失われます。 そこで ThreadPoolExecutor を使い、**「最大3つの番組を同時に裏側で作る(生成する)」**という処理を行っています。これにより、APIへの通信待ちやLLMの生成待ちの間もシステムがブロックされません。

2. 「速報割り込み」のためのスマート待機ループ

一番苦労したのがこの部分です。 通常、動画を再生する時は time.sleep(動画の長さ) としてシステムを待機させますが、これだと待機中に「超特大の速報ニュース」が完成しても、今の動画が終わるまで放送できません。

そこで、while elapsed < duration: という1秒ごとのループを作り、1秒ごとに「新しいニュースが届いていないか?」を監視(ready_queue.empty())するようにしました。新しい動画を検知した瞬間、break でループを抜け、即座にOBSを上書きして速報を流します。

まとめ

これが、私の代わりに24時間休まず働き続けるNFCの心臓部です。 現在、光回線の開通待ちにつきテザリング環境で動かしていますが、この司令塔の堅牢なエラーハンドリングのおかげで、通信が不安定な環境でもなんとか(力技で)放送を継続できています。

次回は、実際にニュース原稿を作る「制作部(プロデューサー)」の中から、日銀の動向を監視するコードと、AI(Qwen2.5)への指示書(プロンプト)を丸裸にして公開します!

お楽しみに。

コメント

コメントする

目次