Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add openai client backend #565

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Open

Conversation

RyanMarten
Copy link
Contributor

@RyanMarten RyanMarten commented Mar 2, 2025

Test ping

curl https://api.deepseek.com/chat/completions \
  -H "Content-Type: application/json" \
  -H "Authorization: Bearer ${DEEPSEEK_API_KEY}" \
  -d '{
        "model": "deepseek-reasoner",
        "messages": [
          {"role": "system", "content": "You are a helpful assistant."},
          {"role": "user", "content": "Hello!"}
        ],
        "stream": false
      }'

@RyanMarten RyanMarten closed this Mar 2, 2025
@RyanMarten RyanMarten reopened this Mar 2, 2025
@RyanMarten
Copy link
Contributor Author

RyanMarten commented Mar 2, 2025

Test calling openai through curator using new "openai_client" backend

from bespokelabs import curator

# Make sure the backend works
llm = curator.LLM(model_name="gpt-4o-mini", backend="openai_client")
response = llm("Hello, world!")
print(response["response"])

@RyanMarten
Copy link
Contributor Author

RyanMarten commented Mar 2, 2025

Test calling deepseek through openai client

# Make sure that the client works
# Answer for Hello very quickly
# Answer for the question 2m14s
import os
from openai import OpenAI

client = OpenAI(api_key=os.environ.get("DEEPSEEK_API_KEY"), base_url="https://api.deepseek.com")

response = client.chat.completions.create(
    model="deepseek-reasoner",
    messages=[
        {"role": "system", "content": "You are a helpful assistant"},
        {"role": "user", "content": "Given a rational number, write it as a fraction in lowest terms and calculate the product of the resulting numerator and denominator. For how many rational numbers between 0 and 1 will $20_{}^{}!$ be the resulting product?"}, # noqa
        # {"role": "user", "content": "Hello"},
    ],
    stream=False,
)
print(response.choices[0].message.content)

@RyanMarten
Copy link
Contributor Author

RyanMarten commented Mar 2, 2025

Test calling deepseek through curator using openai_client backend (hello)

from bespokelabs import curator
import os

class Reasoner(curator.LLM):
    """Curator class for reasoning."""

    return_completions_object = True

    def parse(self, input, response):
        """Parse the LLM response to extract reasoning and solution."""
        input["deepseek_reasoning"] = response["choices"][0]["message"]["reasoning_content"]
        input["deepseek_solution"] = response["choices"][0]["message"]["content"]
        return input


llm = Reasoner(
    model_name="deepseek-reasoner",
    backend="openai_client",
    generation_params={"temperature": 0.0},
    backend_params={
        "max_requests_per_minute": 5,
        "max_tokens_per_minute": 100_000_000,
        "base_url": "https://api.deepseek.com/",
        "api_key": os.environ.get("DEEPSEEK_API_KEY"),
    },
)

ds = llm("Hello")
print("REASONING: ", ds[0]["deepseek_reasoning"])
print("\n\nSOLUTION: ", ds[0]["deepseek_solution"])

Finishes in 9s with the following response

REASONING: Okay, the user just said "Hello". I should respond in a friendly and welcoming manner. Let me make sure to keep it natural and not too formal. Maybe add a smiley to keep the tone light. Something like, "Hello! How can I assist you today? 😊" That should work. Wait, should I use an emoji here? The previous example did, so maybe it's okay. Alright, that's a good response.
SOLUTION: Hello! How can I assist you today? 😊

@RyanMarten
Copy link
Contributor Author

RyanMarten commented Mar 2, 2025

Test calling deepseek through curator using openai_client backend (reasoning problem)

from bespokelabs import curator
import os
from datasets import load_dataset

class Reasoner(curator.LLM):
    """Curator class for reasoning."""

    return_completions_object = True

    def prompt(self, input):
        """Create a prompt for the LLM to reason about the problem."""
        return [{"role": "user", "content": input["question"]}]

    def parse(self, input, response):
        """Parse the LLM response to extract reasoning and solution."""
        input["deepseek_reasoning"] = response["choices"][0]["message"]["reasoning_content"]
        input["deepseek_solution"] = response["choices"][0]["message"]["content"]
        return input


llm = Reasoner(
    model_name="deepseek-reasoner",
    backend="openai_client",
    generation_params={"temperature": 0.0},
    backend_params={
        "max_requests_per_minute": 5,
        "max_tokens_per_minute": 100_000_000,
        "base_url": "https://api.deepseek.com/",
        "api_key": os.environ.get("DEEPSEEK_API_KEY"),
    },
)

ds = load_dataset("simplescaling/s1K", split="train")
ds = ds.remove_columns(["thinking_trajectories", "cot", "attempt"])
ds = llm(ds.take(1))
print("REASONING: ", ds[0]["deepseek_reasoning"])
print("\n\nSOLUTION: ", ds[0]["deepseek_solution"])

Screenshot 2025-03-02 at 10 59 51 AM

10 min for 15k tokens

@RyanMarten
Copy link
Contributor Author

RyanMarten commented Mar 2, 2025

Testing hello with 50 RPM and 100 requests

from bespokelabs import curator
import os
from datasets import Dataset

class Reasoner(curator.LLM):
    """Curator class for reasoning."""

    return_completions_object = True

    def prompt(self, input):
        """Create a prompt for the LLM to reason about the problem."""
        return [{"role": "user", "content": input["question"]}]

    def parse(self, input, response):
        """Parse the LLM response to extract reasoning and solution."""
        input["deepseek_reasoning"] = response["choices"][0]["message"]["reasoning_content"]
        input["deepseek_solution"] = response["choices"][0]["message"]["content"]
        return input


llm = Reasoner(
    model_name="deepseek-reasoner",
    backend="openai_client",
    generation_params={"temperature": 0.0},
    backend_params={
        "max_requests_per_minute": 50,
        "max_tokens_per_minute": 100_000_000,
        "base_url": "https://api.deepseek.com/",
        "api_key": os.environ.get("DEEPSEEK_API_KEY"),
    },
)

ds = Dataset.from_dict({"question": ["Hello"]*100})
ds = llm(ds) 
print("REASONING: ", ds[0]["deepseek_reasoning"])
print("\n\nSOLUTION: ", ds[0]["deepseek_solution"])

Takes ~2 mins and getting around 44 RPM

Screenshot 2025-03-02 at 11 26 58 AM

@RyanMarten
Copy link
Contributor Author

RyanMarten commented Mar 2, 2025

Testing s1 with 50 RPM and 100 requests

from bespokelabs import curator
import os
from datasets import load_dataset

class Reasoner(curator.LLM):
    """Curator class for reasoning."""

    return_completions_object = True

    def prompt(self, input):
        """Create a prompt for the LLM to reason about the problem."""
        return [{"role": "user", "content": input["question"]}]

    def parse(self, input, response):
        """Parse the LLM response to extract reasoning and solution."""
        input["deepseek_reasoning"] = response["choices"][0]["message"]["reasoning_content"]
        input["deepseek_solution"] = response["choices"][0]["message"]["content"]
        return input


llm = Reasoner(
    model_name="deepseek-reasoner",
    backend="openai_client",
    generation_params={"temperature": 0.0},
    backend_params={
        "max_requests_per_minute": 50,
        "max_tokens_per_minute": 100_000_000,
        "base_url": "https://api.deepseek.com/",
        "api_key": os.environ.get("DEEPSEEK_API_KEY"),
    },
)

ds = load_dataset("simplescaling/s1K", split="train")
ds = ds.remove_columns(["thinking_trajectories", "cot", "attempt"])
ds = llm(ds.take(100))
print("REASONING: ", ds[0]["deepseek_reasoning"])
print("\n\nSOLUTION: ", ds[0]["deepseek_solution"])

Screenshot 2025-03-02 at 11 28 15 AM

~17min with 6RPM

@RyanMarten RyanMarten requested a review from kartik4949 March 2, 2025 19:22
@RyanMarten
Copy link
Contributor Author

RyanMarten commented Mar 2, 2025

Testing s1 with 100 RPM and 1,000 requests

from bespokelabs import curator
import os
from datasets import load_dataset

class Reasoner(curator.LLM):
    """Curator class for reasoning."""

    return_completions_object = True

    def prompt(self, input):
        """Create a prompt for the LLM to reason about the problem."""
        return [{"role": "user", "content": input["question"]}]

    def parse(self, input, response):
        """Parse the LLM response to extract reasoning and solution."""
        input["deepseek_reasoning"] = response["choices"][0]["message"]["reasoning_content"]
        input["deepseek_solution"] = response["choices"][0]["message"]["content"]
        return input


llm = Reasoner(
    model_name="deepseek-reasoner",
    backend="openai_client",
    generation_params={"temperature": 0.0},
    backend_params={
        "max_requests_per_minute": 100,
        "max_tokens_per_minute": 100_000_000,
        "base_url": "https://api.deepseek.com/",
        "api_key": os.environ.get("DEEPSEEK_API_KEY"),
    },
)

ds = load_dataset("simplescaling/s1K", split="train")
ds = ds.remove_columns(["thinking_trajectories", "cot", "attempt"])
ds = llm(ds)
print("REASONING: ", ds[0]["deepseek_reasoning"])
print("\n\nSOLUTION: ", ds[0]["deepseek_solution"])

25min and ~44RPM

@RyanMarten RyanMarten closed this Mar 2, 2025
@RyanMarten RyanMarten reopened this Mar 2, 2025
@RyanMarten
Copy link
Contributor Author

RyanMarten commented Mar 2, 2025

Testing 5,000 requests with 500 RPM

from bespokelabs import curator
import os
from datasets import load_dataset

class Reasoner(curator.LLM):
    """Curator class for reasoning."""

    return_completions_object = True

    def prompt(self, input):
        """Create a prompt for the LLM to reason about the problem."""
        return [{"role": "user", "content": input["problem"]}]

    def parse(self, input, response):
        """Parse the LLM response to extract reasoning and solution."""
        input["deepseek_reasoning"] = response["choices"][0]["message"]["reasoning_content"]
        input["deepseek_solution"] = response["choices"][0]["message"]["content"]
        return input


llm = Reasoner(
    model_name="deepseek-reasoner",
    backend="openai_client",
    generation_params={"temperature": 0.0},
    backend_params={
        "max_requests_per_minute": 500,
        "max_tokens_per_minute": 100_000_000,
        "base_url": "https://api.deepseek.com/",
        "api_key": os.environ.get("DEEPSEEK_API_KEY"),
    },
)

ds = load_dataset("mlfoundations-dev/herorun1_code", split="train")
ds = llm(ds.take(5_000))
print("REASONING: ", ds[0]["deepseek_reasoning"])
print("\n\nSOLUTION: ", ds[0]["deepseek_solution"])
ds.push_to_hub("mlfoundations-dev/herorun1_code-test")

Screenshot 2025-03-02 at 12 10 15 PM

max in progress: 1600

stragglers and max retries being 10... only 6 in progress canceled after 47 minutes (since a couple were on their third try)

@RyanMarten
Copy link
Contributor Author

RyanMarten commented Mar 2, 2025

Testing 25,000 requests with 2,500 RPM

import os

from datasets import load_dataset

from bespokelabs import curator


class Reasoner(curator.LLM):
    """Curator class for reasoning."""

    return_completions_object = True

    def prompt(self, input):
        """Create a prompt for the LLM to reason about the problem."""
        return [{"role": "user", "content": input["problem"]}]

    def parse(self, input, response):
        """Parse the LLM response to extract reasoning and solution."""
        input["deepseek_reasoning"] = response["choices"][0]["message"]["reasoning_content"]
        input["deepseek_solution"] = response["choices"][0]["message"]["content"]
        return input


llm = Reasoner(
    model_name="deepseek-reasoner",
    backend="openai_client",
    generation_params={"temperature": 0.0},
    backend_params={
        "max_requests_per_minute": 2_500,
        "max_tokens_per_minute": 1_000_000_000,
        "base_url": "https://api.deepseek.com/",
        "api_key": os.environ.get("DEEPSEEK_API_KEY"),
        "require_all_responses": False,
        "max_retries": 2,
    },
)

ds = load_dataset("mlfoundations-dev/herorun1_code", split="train")
ds = llm(ds.take(25_000))
# print("REASONING: ", ds[0]["deepseek_reasoning"])
# print("\n\nSOLUTION: ", ds[0]["deepseek_solution"])
ds.push_to_hub("mlfoundations-dev/herorun1_code-test")

I think here we are bumping up against the openai client's ability to send parallel requests.
in progress requests are increasing at an identical rate to max RPM 500.

        "require_all_responses": False,
        "max_retries": 2,

which will reduce the impact of length finish reason stragglers

same max concurrent of 1,600 or so
20 mins in seeing 255 RPM
Screenshot 2025-03-02 at 12 52 40 PM

Screenshot 2025-03-02 at 1 43 37 PM

@RyanMarten
Copy link
Contributor Author

RyanMarten commented Mar 2, 2025

The bottleneck here is the openai client. Started another one in parallel on a different machine and getting good RPM there 250 RPM.

Use multiple clients in the backend? - will try implementing this. New backend param number_of_clients

This just uses multiple curator instances (and then later you need to merge together)

import os
import argparse
from datasets import load_dataset

from bespokelabs import curator


class Reasoner(curator.LLM):
    """Curator class for reasoning."""

    return_completions_object = True

    def prompt(self, input):
        """Create a prompt for the LLM to reason about the problem."""
        return [{"role": "user", "content": input["problem"]}]

    def parse(self, input, response):
        """Parse the LLM response to extract reasoning and solution."""
        input["deepseek_reasoning"] = response["choices"][0]["message"]["reasoning_content"]
        input["deepseek_solution"] = response["choices"][0]["message"]["content"]
        return input


def main():
    # Parse command line arguments
    parser = argparse.ArgumentParser(description="Process a dataset with DeepSeek and shard it")
    parser.add_argument("--worker", type=int, required=True, help="Worker ID (0-indexed)")
    parser.add_argument("--global", type=int, dest="global_workers", required=True, 
                        help="Total number of workers")
    parser.add_argument("--dataset", type=str, required=True, 
                        help="Input dataset name (e.g., 'mlfoundations-dev/herorun1_code')")
    parser.add_argument("--output", type=str, required=False,
                        help="Output dataset name base (worker ID will be appended). Defaults to input dataset name with 'annotated' suffix.")
    args = parser.parse_args()
    
    # Validate arguments
    if args.worker < 0 or args.worker >= args.global_workers:
        raise ValueError(f"Worker ID must be between 0 and {args.global_workers-1}")
    
    if args.global_workers <= 0:
        raise ValueError("Total number of workers must be positive")
    
    # Initialize the LLM
    llm = Reasoner(
        model_name="deepseek-reasoner",
        backend="openai_client",
        generation_params={"temperature": 0.0},
        backend_params={
            "max_requests_per_minute": 500,
            "max_tokens_per_minute": 1_000_000_000,
            "base_url": "https://api.deepseek.com/",
            "api_key": os.environ.get("DEEPSEEK_API_KEY"),
            "require_all_responses": False,
            "max_retries": 2,
        },
    )
    
    # Load the dataset
    print(f"Loading dataset: {args.dataset}")
    ds = load_dataset(args.dataset, split="train")
    
    # Calculate shard size and indices
    total_examples = len(ds)
    shard_size = total_examples // args.global_workers
    start_idx = args.worker * shard_size
    end_idx = start_idx + shard_size if args.worker < args.global_workers - 1 else total_examples
    
    print(f"Worker {args.worker}/{args.global_workers}: Processing examples {start_idx} to {end_idx-1} (total: {end_idx-start_idx})")
    
    # Extract the shard
    ds_shard = ds.select(range(start_idx, end_idx))
    
    # Process the shard with the LLM
    processed_ds = llm(ds_shard)
    
    # Create output dataset name with worker ID
    output_name = f"{args.output}-{args.worker}" if args.output else f"{args.dataset}-annotated-{args.worker}"
    
    # Push to hub
    print(f"Pushing processed dataset to {output_name}")
    processed_ds.push_to_hub(output_name)
    print(f"Worker {args.worker} completed successfully")


if __name__ == "__main__":
    main()

- Add num_clients parameter to OnlineBackendParams and OnlineRequestProcessorConfig
- Modify OpenAIClientOnlineRequestProcessor to initialize multiple clients
- Implement round-robin client selection in call_single_request method
- Add example file demonstrating multi-client usage with DeepSeek
- Add unit tests for multi-client functionality

This enhancement addresses bottlenecks in parallel request handling by
allowing users to create multiple AsyncOpenAI clients that distribute
requests in a round-robin fashion.

🤖 Generated with [Claude Code](https://claude.ai/code)
Co-Authored-By: Claude <[email protected]>
@RyanMarten
Copy link
Contributor Author

RyanMarten commented Mar 2, 2025

multiple clients, one curator

import os

from datasets import load_dataset

from bespokelabs import curator

# This example demonstrates using multiple OpenAI clients in parallel
# to overcome the bottleneck of parallel requests that a single client can send.
# The `num_clients` parameter creates multiple AsyncOpenAI clients that are used
# in a round-robin fashion to distribute requests.


class Reasoner(curator.LLM):
    """Curator class for reasoning."""

    return_completions_object = True

    def prompt(self, input):
        """Create a prompt for the LLM to reason about the problem."""
        return [{"role": "user", "content": input["problem"]}]

    def parse(self, input, response):
        """Parse the LLM response to extract reasoning and solution."""
        input["deepseek_reasoning"] = response["choices"][0]["message"]["reasoning_content"]
        input["deepseek_solution"] = response["choices"][0]["message"]["content"]
        return input


# Initialize the LLM with multiple clients for parallel request handling
llm = Reasoner(
    model_name="deepseek-reasoner",
    backend="openai_client",
    generation_params={"temperature": 0.0},
    backend_params={
        "max_requests_per_minute": 2_500,
        "max_tokens_per_minute": 1_000_000_000,
        "base_url": "https://api.deepseek.com/",
        "api_key": os.environ.get("DEEPSEEK_API_KEY"),
        "require_all_responses": False,
        "max_retries": 2,
        "num_clients": 2,  # Create 2 OpenAI clients for parallel requests
    },
)


ds = load_dataset("mlfoundations-dev/herorun1_code", split="train")
ds = llm(ds.select(range(25_000, 50_000)))
ds.push_to_hub("mlfoundations-dev/herorun1_code-second-25k")

Screenshot 2025-03-02 at 2 37 36 PM

Screenshot 2025-03-02 at 3 45 51 PM

Copy link
Contributor

@kartik4949 kartik4949 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!, please override the existing backend.


llm = Reasoner(
model_name="deepseek-reasoner",
backend="openai_client",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

openai vs openai_client
isn't it confusing?

_OPENAI_ALLOWED_IMAGE_SIZE_MB = 20


class OpenAIClientOnlineRequestProcessor(BaseOnlineRequestProcessor, OpenAIRequestMixin):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please inherit from OpenAIOnlineProcessor, since there is too much redundancy with existing one.

lets only override changed functions.

@@ -0,0 +1,76 @@
import unittest
from unittest.mock import AsyncMock, MagicMock, patch
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets Integration test it?
since other backends are also tested like that.
having this as UT will be bit weird.

@RyanMarten
Copy link
Contributor Author

Screenshot 2025-03-06 at 11 06 50 AM

800+ RPM with two clients

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants