Documentation
¶
Overview ¶
Package core implements Genkit actions, flows and other essential machinery. This package is primarily intended for Genkit internals and for plugins. Genkit applications should use the genkit package.
Index ¶
- func InternalInit(ctx context.Context, opts *Options) error
- func InternalRun[Out any](ctx context.Context, name string, f func() (Out, error)) (Out, error)
- func NewFlowServeMux(flows []string) *http.ServeMux
- func RegisterSpanProcessor(sp sdktrace.SpanProcessor)
- func RegisterTraceStore(ts tracing.Store) (shutdown func(context.Context) error)
- func ValidateRaw(dataBytes json.RawMessage, schemaBytes json.RawMessage) error
- type Action
- func DefineAction[In, Out any](provider, name string, atype atype.ActionType, metadata map[string]any, ...) *Action[In, Out, struct{}]
- func DefineActionWithInputSchema[Out any](provider, name string, atype atype.ActionType, metadata map[string]any, ...) *Action[any, Out, struct{}]
- func DefineCustomAction[In, Out, Stream any](provider, name string, metadata map[string]any, fn Func[In, Out, Stream]) *Action[In, Out, Stream]
- func DefineStreamingAction[In, Out, Stream any](provider, name string, atype atype.ActionType, metadata map[string]any, ...) *Action[In, Out, Stream]
- func LookupActionFor[In, Out, Stream any](typ atype.ActionType, provider, name string) *Action[In, Out, Stream]
- type Environment
- type FileFlowStateStore
- type Flow
- type FlowResult
- type FlowStateStore
- type Func
- type NoStream
- type Options
- type StreamFlowValue
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func InternalInit ¶
InternalInit is for use by the genkit package only. It is not subject to compatibility guarantees.
func InternalRun ¶
InternalRun is for use by genkit.Run exclusively. It is not subject to any backwards compatibility guarantees.
func NewFlowServeMux ¶
NewFlowServeMux constructs a net/http.ServeMux. If flows is non-empty, the each of the named flows is registered as a route. Otherwise, all defined flows are registered.
All routes take a single query parameter, "stream", which if true will stream the flow's results back to the client. (Not all flows support streaming, however.)
To use the returned ServeMux as part of a server with other routes, either add routes to it, or install it as part of another ServeMux, like so:
mainMux := http.NewServeMux() mainMux.Handle("POST /flow/", http.StripPrefix("/flow/", NewFlowServeMux()))
func RegisterSpanProcessor ¶
func RegisterSpanProcessor(sp sdktrace.SpanProcessor)
RegisterSpanProcessor registers an OpenTelemetry SpanProcessor for tracing.
func RegisterTraceStore ¶
RegisterTraceStore uses the given trace.Store to record traces in the prod environment. (A trace.Store that writes to the local filesystem is always installed in the dev environment.) The returned function should be called before the program ends to ensure that all pending data is stored. RegisterTraceStore panics if called more than once.
func ValidateRaw ¶
func ValidateRaw(dataBytes json.RawMessage, schemaBytes json.RawMessage) error
ValidateRaw will validate JSON data against the JSON schema. It will return an error if it doesn't match the schema, otherwise it will return nil.
Types ¶
type Action ¶
type Action[In, Out, Stream any] struct { // contains filtered or unexported fields }
An Action is a named, observable operation. It consists of a function that takes an input of type I and returns an output of type O, optionally streaming values of type S incrementally by invoking a callback. It optionally has other metadata, like a description and JSON Schemas for its input and output.
Each time an Action is run, it results in a new trace span.
func DefineAction ¶
func DefineAction[In, Out any](provider, name string, atype atype.ActionType, metadata map[string]any, fn func(context.Context, In) (Out, error)) *Action[In, Out, struct{}]
DefineAction creates a new Action and registers it.
func DefineActionWithInputSchema ¶
func DefineActionWithInputSchema[Out any]( provider, name string, atype atype.ActionType, metadata map[string]any, inputSchema *jsonschema.Schema, fn func(context.Context, any) (Out, error), ) *Action[any, Out, struct{}]
DefineActionWithInputSchema creates a new Action and registers it. This differs from DefineAction in that the input schema is defined dynamically; the static input type is "any". This is used for prompts.
func DefineCustomAction ¶
func DefineStreamingAction ¶
func LookupActionFor ¶
func LookupActionFor[In, Out, Stream any](typ atype.ActionType, provider, name string) *Action[In, Out, Stream]
LookupActionFor returns the action for the given key in the global registry, or nil if there is none. It panics if the action is of the wrong type.
type Environment ¶
type Environment string
An Environment is the execution context in which the program is running.
const ( EnvironmentDev Environment = "dev" // development: testing, debugging, etc. EnvironmentProd Environment = "prod" // production: user data, SLOs, etc. )
type FileFlowStateStore ¶
type FileFlowStateStore struct {
// contains filtered or unexported fields
}
A FileFlowStateStore is a FlowStateStore that writes flowStates to files.
func NewFileFlowStateStore ¶
func NewFileFlowStateStore(dir string) (*FileFlowStateStore, error)
NewFileFlowStateStore creates a FileFlowStateStore that writes traces to the given directory. The directory is created if it does not exist.
type Flow ¶
type Flow[In, Out, Stream any] struct { // contains filtered or unexported fields }
A Flow is an Action with additional support for observability and introspection. A Flow[In, Out, Stream] represents a function from In to Out. The Stream parameter is for flows that support streaming: providing their results incrementally.
func InternalDefineFlow ¶
func InternalDefineFlow[In, Out, Stream any](name string, fn Func[In, Out, Stream]) *Flow[In, Out, Stream]
InternalDefineFlow is for use by genkit.DefineFlow exclusively. It is not subject to any backwards compatibility guarantees.
func (*Flow[In, Out, Stream]) Run ¶
Run runs the flow in the context of another flow. The flow must run to completion when started (that is, it must not have interrupts).
func (*Flow[In, Out, Stream]) Stream ¶
func (f *Flow[In, Out, Stream]) Stream(ctx context.Context, input In) func(func(*StreamFlowValue[Out, Stream], error) bool)
Stream runs the flow on input and delivers both the streamed values and the final output. It returns a function whose argument function (the "yield function") will be repeatedly called with the results.
If the yield function is passed a non-nil error, the flow has failed with that error; the yield function will not be called again. An error is also passed if the flow fails to complete (that is, it has an interrupt). Genkit Go does not yet support interrupts.
If the yield function's StreamFlowValue argument has Done == true, the value's Output field contains the final output; the yield function will not be called again.
Otherwise the Stream field of the passed StreamFlowValue holds a streamed result.
type FlowResult ¶
type FlowResult[Out any] struct { Response Out `json:"response,omitempty"` Error string `json:"error,omitempty"` StackTrace string `json:"stacktrace,omitempty"` // contains filtered or unexported fields }
A FlowResult is the result of a flow: either success, in which case Response is the return value of the flow's function; or failure, in which case Error is the non-empty error string.
type FlowStateStore ¶
type FlowStateStore interface { // Save saves the FlowState to the store, overwriting an existing one. Save(ctx context.Context, id string, fs flowStater) error // Load reads the FlowState with the given ID from the store. // It returns an error that is fs.ErrNotExist if there isn't one. // pfs must be a pointer to a flowState[I, O] of the correct type. Load(ctx context.Context, id string, pfs any) error }
A FlowStateStore stores flow states. Every flow state has a unique string identifier. A durable FlowStateStore is necessary for durable flows.
type Func ¶
type Func[In, Out, Stream any] func(context.Context, In, func(context.Context, Stream) error) (Out, error)
Func is the type of function that Actions and Flows execute. It takes an input of type Int and returns an output of type Out, optionally streaming values of type Stream incrementally by invoking a callback. If the StreamingCallback is non-nil and the function supports streaming, it should stream the results by invoking the callback periodically, ultimately returning with a final return value. Otherwise, it should ignore the StreamingCallback and just return a result.
type NoStream ¶
NoStream indicates that the action or flow does not support streaming. A Func[I, O, NoStream] will ignore its streaming callback. Such a function corresponds to a Flow[I, O, struct{}].
type Options ¶
type Options struct { // If "-", do not start a FlowServer. // Otherwise, start a FlowServer on the given address, or the // default if empty. FlowAddr string // The names of flows to serve. // If empty, all registered flows are served. Flows []string }
Options are options to InternalInit.
type StreamFlowValue ¶
type StreamFlowValue[Out, Stream any] struct { Done bool Output Out // valid if Done is true Stream Stream // valid if Done is false }
StreamFlowValue is either a streamed value or a final output of a flow.