AutoScaler

class lightning.app.components.serve.auto_scaler.AutoScaler(work_cls, min_replicas=1, max_replicas=4, scale_out_interval=10, scale_in_interval=10, max_batch_size=8, timeout_batching=1, endpoint='api/predict', input_type=typing.Dict, output_type=typing.Dict, cold_start_proxy=None, *work_args, **work_kwargs)

Bases: LightningFlow

The AutoScaler can be used to automatically change the number of replicas of the given server in response to changes in the number of incoming requests. Incoming requests will be batched and balanced across the replicas.

Parameters
  • min_replicas (int) – The number of works to start when app initializes.

  • max_replicas (int) – The max number of works to spawn to handle the incoming requests.

  • scale_out_interval (SupportsFloat) – The number of seconds to wait before checking whether to increase the number of servers.

  • scale_in_interval (SupportsFloat) – The number of seconds to wait before checking whether to decrease the number of servers.

  • endpoint (str) – Provide the REST API path.

  • max_batch_size (int) – (auto-batching) The number of requests to process at once.

  • timeout_batching (float) – (auto-batching) The number of seconds to wait before sending the requests to process.

  • input_type (Type[BaseModel]) – Input type.

  • output_type (Type[BaseModel]) – Output type.

  • cold_start_proxy (Union[ColdStartProxy, str, None]) – If provided, the proxy will be used while the worker machines are warming up.

from lightning.app import LightningApp
from lightning.app.components import AutoScaler

# Example 1: Auto-scaling serve component out-of-the-box
app = LightningApp(
    app.components.AutoScaler(
        MyPythonServer,
        min_replicas=1,
        max_replicas=8,
        scale_out_interval=10,
        scale_in_interval=10,
    )
)


# Example 2: Customizing the scaling logic
class MyAutoScaler(AutoScaler):
    def scale(self, replicas: int, metrics: dict) -> int:
        pending_requests_per_running_or_pending_work = metrics["pending_requests"] / (
            replicas + metrics["pending_works"]
        )

        # upscale
        max_requests_per_work = self.max_batch_size
        if pending_requests_per_running_or_pending_work >= max_requests_per_work:
            return replicas + 1

        # downscale
        min_requests_per_work = max_requests_per_work * 0.25
        if pending_requests_per_running_or_pending_work < min_requests_per_work:
            return replicas - 1

        return replicas


app = LightningApp(
    MyAutoScaler(
        MyPythonServer,
        min_replicas=1,
        max_replicas=8,
        scale_out_interval=10,
        scale_in_interval=10,
        max_batch_size=8,  # for auto batching
        timeout_batching=1,  # for auto batching
    )
)

The LightningFlow is used by the LightningApp to coordinate and manage long- running jobs contained, the LightningWork.

A LightningFlow is characterized by:

  • A set of state variables.

  • Long-running jobs (LightningWork).

  • Its children LightningFlow or LightningWork with their state variables.

State variables

The LightningFlow are special classes whose attributes require to be json-serializable (e.g., int, float, bool, list, dict, …).

They also may not reach into global variables unless they are constant.

The attributes need to be all defined in __init__ method, and eventually assigned to different values throughout the lifetime of the object. However, defining new attributes outside of __init__ is not allowed.

Attributes taken together represent the state of the component. Components are capable of retrieving their state and that of their children recursively at any time. They are also capable of setting an externally provided state recursively to its children.

Execution model and work

The entry point for execution is the run method at the root component. The run method of the root component may call the run method of its children, and the children may call the run methods of their children and so on.

The run method of the root component is called repeatedly in a while loop forever until the app gets terminated. In this programming model (reminiscent of React, Vue or Streamlit from the JavaScript world), the values of the state variables, or their changes, are translated into actions throughout the component hierarchy. This means the flow of execution will only be affected by state changes in a component or one of its children, and otherwise remain idempotent.

The actions themselves are self-contained within LightningWork. The LightningWork are typically used for long-running jobs, like downloading a dataset, performing a query, starting a computationally heavy script. While one may access any state variable in a LightningWork from a LightningFlow, one may not directly call methods of other components from within a LightningWork as LightningWork can’t have any children. This limitation allows applications to be distributed at scale.

Component hierarchy and App

Given the above characteristics, a root LightningFlow, potentially containing children components, can be passed to an App object and its execution can be distributed (each LightningWork will be run within its own process or different arrangements).

Example

>>> from lightning.app import LightningFlow
>>> class RootFlow(LightningFlow):
...     def __init__(self):
...         super().__init__()
...         self.counter = 0
...     def run(self):
...         self.counter += 1
...
>>> flow = RootFlow()
>>> flow.run()
>>> assert flow.counter == 1
>>> assert flow.state["vars"]["counter"] == 1
add_work(work)

Adds a new LightningWork instance.

Return type

str

Returns

The name of the new work attribute.

autoscale()

Adjust the number of works based on the target number returned by self.scale.

Return type

None

configure_layout()

Configure the UI layout of this LightningFlow.

You can either

  1. Return a single Frontend object to serve a user interface for this Flow.

  2. Return a single dictionary to expose the UI of a child flow.

  3. Return a list of dictionaries to arrange the children of this flow in one or multiple tabs.

Example: Serve a static directory (with at least a file index.html inside).

from lightning.app.frontend import StaticWebFrontend


class Flow(LightningFlow):
    ...

    def configure_layout(self):
        return StaticWebFrontend("path/to/folder/to/serve")

Example: Serve a streamlit UI (needs the streamlit package to be installed).

from lightning.app.frontend import StaticWebFrontend


class Flow(LightningFlow):
    ...

    def configure_layout(self):
        return StreamlitFrontend(render_fn=my_streamlit_ui)


def my_streamlit_ui(state):
    # add your streamlit code here!
    import streamlit as st

Example: Arrange the UI of my children in tabs (default UI by Lightning).

class Flow(LightningFlow):
    def configure_layout(self):
        return [
            dict(name="First Tab", content=self.child0),
            dict(name="Second Tab", content=self.child1),
            dict(name="Lightning", content="https://lightning.ai"),
        ]

If you don’t implement configure_layout, Lightning will collect all children and display their UI in a tab (if they have their own configure_layout implemented).

Note

This hook gets called at the time of app creation and then again as part of the loop. If desired, the returned layout configuration can depend on the state. The only exception are the flows that return a Frontend. These need to be provided at the time of app creation in order for the runtime to start the server.

Learn more about adding UI


create_work()

Replicates a LightningWork instance with args and kwargs provided via __init__.

Return type

LightningWork

get_work(index)

Returns the LightningWork instance with the given index.

Return type

LightningWork

remove_work(index)

Removes the index th LightningWork instance.

Return type

str

run()

Override with your own logic.

scale(replicas, metrics)

The default scaling logic that users can override.

Parameters
  • replicas (int) – The number of running works.

  • metrics (dict) – metrics['pending_requests'] is the total number of requests that are currently pending. metrics['pending_works'] is the number of pending works.

Return type

int

Returns

The target number of running works. The value will be adjusted after this method runs so that it satisfies min_replicas<=replicas<=max_replicas.

property num_pending_requests: int

Fetches the number of pending requests via load balancer.

property num_pending_works: int

The number of pending works.

property ready: bool

Override to customize when your App should be ready.