bencher.job

Attributes

scoop_future_executor

_MISSING

Classes

Job

Represents a benchmarking job to be executed or retrieved from cache.

JobFuture

A wrapper for a job result or future that handles caching.

Executors

Enumeration of available execution strategies for benchmark jobs.

FutureCache

A cache system for benchmark job results with executor support.

JobFunctionCache

A specialized cache for a specific function with various input parameters.

Functions

run_job(→ dict)

Execute a job by calling its function with the provided arguments.

Module Contents

bencher.job.scoop_future_executor = None
bencher.job._MISSING
class bencher.job.Job(job_id: str, function: Callable, job_args: dict, job_key: str | None = None, tag: str = '')

Represents a benchmarking job to be executed or retrieved from cache.

A Job encapsulates a function, its arguments, and metadata for caching and tracking purposes.

job_id

A unique identifier for the job, used for logging

Type:

str

function

The function to be executed

Type:

Callable

job_args

Arguments to pass to the function

Type:

dict

job_key

A hash key for caching, derived from job_args if not provided

Type:

str

tag

Optional tag for grouping related jobs

Type:

str

job_id
function
job_args
tag = ''
class bencher.job.JobFuture(job: Job, res: dict | None = None, future: concurrent.futures.Future | None = None, cache: diskcache.Cache | None = None)

A wrapper for a job result or future that handles caching.

This class provides a unified interface for handling both immediate results and futures (for asynchronous execution). It also handles caching results when they become available.

job

The job this future corresponds to

Type:

Job

res

The result, if available immediately

Type:

dict

future

The future representing the pending job, if executed asynchronously

Type:

Future

cache

The cache to store results in when they become available

job
res = None
future = None
cache = None
result() dict

Get the job result, waiting for completion if necessary.

If the result is not immediately available (i.e., it’s a future), this method will wait for the future to complete. Once the result is available, it will be cached if a cache is provided.

Returns:

The job result

Return type:

dict

bencher.job.run_job(job: Job) dict

Execute a job by calling its function with the provided arguments.

Sets the _current_job_key context variable so that gen_path() places media files into a per-job-key directory for clean lifecycle management.

Parameters:

job (Job) – The job to execute

Returns:

The result of the job execution

Return type:

dict

class bencher.job.Executors

Bases: strenum.StrEnum

Enumeration of available execution strategies for benchmark jobs.

This enum defines the execution modes for running benchmark jobs and provides a factory method to create appropriate executors.

SERIAL
MULTIPROCESSING
SCOOP
static factory(provider: Executors) concurrent.futures.Future | None

Create an executor instance based on the specified execution strategy.

Parameters:

provider (Executors) – The type of executor to create

Returns:

The executor instance, or None for serial execution

Return type:

Future | None

class bencher.job.FutureCache(executor=Executors.SERIAL, overwrite: bool = True, cache_name: str = 'fcache', tag_index: bool = True, size_limit: int = int(20000000000.0), cache_samples: bool = True)

A cache system for benchmark job results with executor support.

This class provides a unified interface for running benchmark jobs either serially or in parallel, with optional caching of results. It manages the execution strategy, caching policy, and tracks statistics about job execution.

executor_type

The execution strategy to use

Type:

Executors

executor

The executor instance, created on demand

cache

Cache for storing job results

Type:

Cache

overwrite

Whether to overwrite existing cached results

Type:

bool

call_count

Counter for job calls

Type:

int

size_limit

Maximum size of the cache in bytes

Type:

int

worker_wrapper_call_count

Number of job submissions

Type:

int

worker_fn_call_count

Number of actual function executions

Type:

int

worker_cache_call_count

Number of cache hits

Type:

int

executor_type
executor = None
overwrite = True
call_count = 0
size_limit = 0
worker_wrapper_call_count = 0
worker_fn_call_count = 0
worker_cache_call_count = 0
prefetch(keys: list[str]) dict

Pre-load cached values for a batch of keys in one pass.

Returns a dict mapping key -> cached value for all cache hits. This avoids per-job cache round-trips in the submit loop.

submit(job: Job, prefetched: dict | None = None) JobFuture

Submit a job for execution, with caching if enabled.

This method first checks the prefetched dict (if provided), then falls back to a single cache.get() query. If not found, it executes the job either serially or using the configured executor.

Parameters:
  • job (Job) – The job to submit

  • prefetched (dict, optional) – Pre-fetched cache results from prefetch(). Defaults to None.

Returns:

A future representing the job execution

Return type:

JobFuture

overwrite_msg(job: Job, suffix: str) None

Log a message about overwriting or using cache.

Parameters:
  • job (Job) – The job being executed

  • suffix (str) – Additional text to add to the log message

clear_call_counts() None

Clear the worker and cache call counts, to help debug and assert caching is happening properly.

clear_cache() None

Clear all entries from the cache.

clear_tag(tag: str) None

Remove all cache entries with the specified tag.

Note: diskcache.evict() does not return the evicted values, so media files referenced by evicted entries may become orphans. Use clean_orphaned_media() periodically to reclaim them.

Parameters:

tag (str) – The tag identifying entries to remove from the cache

close() None

Close the cache and shutdown the executor if they exist.

stats() str

Get statistics about cache usage.

Returns:

A string with cache size information

Return type:

str

class bencher.job.JobFunctionCache(function: Callable, overwrite: bool = False, executor: Executors = Executors.SERIAL, cache_name: str = 'fcache', tag_index: bool = True, size_limit: int = int(10000000000.0))

Bases: FutureCache

A specialized cache for a specific function with various input parameters.

This class simplifies caching results for a specific function called with different sets of parameters. It wraps the general FutureCache with a focus on a single function.

function

The function to cache results for

Type:

Callable

function
call(**kwargs) JobFuture

Call the wrapped function with the provided arguments.

This method creates a Job for the function call and submits it through the cache.

Parameters:

**kwargs – Arguments to pass to the function

Returns:

A future representing the function call

Return type:

JobFuture