core

package
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 1, 2024 License: Apache-2.0 Imports: 34 Imported by: 0

Documentation

Overview

Package core implements Genkit actions, flows and other essential machinery. This package is primarily intended for Genkit internals and for plugins. Genkit applications should use the genkit package.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func InternalInit

func InternalInit(ctx context.Context, opts *Options) error

InternalInit is for use by the genkit package only. It is not subject to compatibility guarantees.

func InternalRun

func InternalRun[Out any](ctx context.Context, name string, f func() (Out, error)) (Out, error)

InternalRun is for use by genkit.Run exclusively. It is not subject to any backwards compatibility guarantees.

func NewFlowServeMux

func NewFlowServeMux(flows []string) *http.ServeMux

NewFlowServeMux constructs a net/http.ServeMux. If flows is non-empty, the each of the named flows is registered as a route. Otherwise, all defined flows are registered.

All routes take a single query parameter, "stream", which if true will stream the flow's results back to the client. (Not all flows support streaming, however.)

To use the returned ServeMux as part of a server with other routes, either add routes to it, or install it as part of another ServeMux, like so:

mainMux := http.NewServeMux()
mainMux.Handle("POST /flow/", http.StripPrefix("/flow/", NewFlowServeMux()))

func RegisterSpanProcessor

func RegisterSpanProcessor(sp sdktrace.SpanProcessor)

RegisterSpanProcessor registers an OpenTelemetry SpanProcessor for tracing.

func RegisterTraceStore

func RegisterTraceStore(ts tracing.Store) (shutdown func(context.Context) error)

RegisterTraceStore uses the given trace.Store to record traces in the prod environment. (A trace.Store that writes to the local filesystem is always installed in the dev environment.) The returned function should be called before the program ends to ensure that all pending data is stored. RegisterTraceStore panics if called more than once.

func ValidateRaw

func ValidateRaw(dataBytes json.RawMessage, schemaBytes json.RawMessage) error

ValidateRaw will validate JSON data against the JSON schema. It will return an error if it doesn't match the schema, otherwise it will return nil.

Types

type Action

type Action[In, Out, Stream any] struct {
	// contains filtered or unexported fields
}

An Action is a named, observable operation. It consists of a function that takes an input of type I and returns an output of type O, optionally streaming values of type S incrementally by invoking a callback. It optionally has other metadata, like a description and JSON Schemas for its input and output.

Each time an Action is run, it results in a new trace span.

func DefineAction

func DefineAction[In, Out any](provider, name string, atype atype.ActionType, metadata map[string]any, fn func(context.Context, In) (Out, error)) *Action[In, Out, struct{}]

DefineAction creates a new Action and registers it.

func DefineActionWithInputSchema

func DefineActionWithInputSchema[Out any](
	provider, name string,
	atype atype.ActionType,
	metadata map[string]any,
	inputSchema *jsonschema.Schema,
	fn func(context.Context, any) (Out, error),
) *Action[any, Out, struct{}]

DefineActionWithInputSchema creates a new Action and registers it. This differs from DefineAction in that the input schema is defined dynamically; the static input type is "any". This is used for prompts.

func DefineCustomAction

func DefineCustomAction[In, Out, Stream any](provider, name string, metadata map[string]any, fn Func[In, Out, Stream]) *Action[In, Out, Stream]

func DefineStreamingAction

func DefineStreamingAction[In, Out, Stream any](provider, name string, atype atype.ActionType, metadata map[string]any, fn Func[In, Out, Stream]) *Action[In, Out, Stream]

func LookupActionFor

func LookupActionFor[In, Out, Stream any](typ atype.ActionType, provider, name string) *Action[In, Out, Stream]

LookupActionFor returns the action for the given key in the global registry, or nil if there is none. It panics if the action is of the wrong type.

func (*Action[In, Out, Stream]) Name

func (a *Action[In, Out, Stream]) Name() string

Name returns the Action's Name.

func (*Action[In, Out, Stream]) Run

func (a *Action[In, Out, Stream]) Run(ctx context.Context, input In, cb func(context.Context, Stream) error) (output Out, err error)

Run executes the Action's function in a new trace span.

type Environment

type Environment string

An Environment is the execution context in which the program is running.

const (
	EnvironmentDev  Environment = "dev"  // development: testing, debugging, etc.
	EnvironmentProd Environment = "prod" // production: user data, SLOs, etc.
)

type FileFlowStateStore

type FileFlowStateStore struct {
	// contains filtered or unexported fields
}

A FileFlowStateStore is a FlowStateStore that writes flowStates to files.

func NewFileFlowStateStore

func NewFileFlowStateStore(dir string) (*FileFlowStateStore, error)

NewFileFlowStateStore creates a FileFlowStateStore that writes traces to the given directory. The directory is created if it does not exist.

func (*FileFlowStateStore) Load

func (s *FileFlowStateStore) Load(ctx context.Context, id string, pfs any) error

func (*FileFlowStateStore) Save

func (s *FileFlowStateStore) Save(ctx context.Context, id string, fs flowStater) error

type Flow

type Flow[In, Out, Stream any] struct {
	// contains filtered or unexported fields
}

A Flow is an Action with additional support for observability and introspection. A Flow[In, Out, Stream] represents a function from In to Out. The Stream parameter is for flows that support streaming: providing their results incrementally.

func InternalDefineFlow

func InternalDefineFlow[In, Out, Stream any](name string, fn Func[In, Out, Stream]) *Flow[In, Out, Stream]

InternalDefineFlow is for use by genkit.DefineFlow exclusively. It is not subject to any backwards compatibility guarantees.

func (*Flow[In, Out, Stream]) Name

func (f *Flow[In, Out, Stream]) Name() string

func (*Flow[In, Out, Stream]) Run

func (f *Flow[In, Out, Stream]) Run(ctx context.Context, input In) (Out, error)

Run runs the flow in the context of another flow. The flow must run to completion when started (that is, it must not have interrupts).

func (*Flow[In, Out, Stream]) Stream

func (f *Flow[In, Out, Stream]) Stream(ctx context.Context, input In) func(func(*StreamFlowValue[Out, Stream], error) bool)

Stream runs the flow on input and delivers both the streamed values and the final output. It returns a function whose argument function (the "yield function") will be repeatedly called with the results.

If the yield function is passed a non-nil error, the flow has failed with that error; the yield function will not be called again. An error is also passed if the flow fails to complete (that is, it has an interrupt). Genkit Go does not yet support interrupts.

If the yield function's StreamFlowValue argument has Done == true, the value's Output field contains the final output; the yield function will not be called again.

Otherwise the Stream field of the passed StreamFlowValue holds a streamed result.

type FlowResult

type FlowResult[Out any] struct {
	Response Out    `json:"response,omitempty"`
	Error    string `json:"error,omitempty"`

	StackTrace string `json:"stacktrace,omitempty"`
	// contains filtered or unexported fields
}

A FlowResult is the result of a flow: either success, in which case Response is the return value of the flow's function; or failure, in which case Error is the non-empty error string.

type FlowStateStore

type FlowStateStore interface {
	// Save saves the FlowState to the store, overwriting an existing one.
	Save(ctx context.Context, id string, fs flowStater) error
	// Load reads the FlowState with the given ID from the store.
	// It returns an error that is fs.ErrNotExist if there isn't one.
	// pfs must be a pointer to a flowState[I, O] of the correct type.
	Load(ctx context.Context, id string, pfs any) error
}

A FlowStateStore stores flow states. Every flow state has a unique string identifier. A durable FlowStateStore is necessary for durable flows.

type Func

type Func[In, Out, Stream any] func(context.Context, In, func(context.Context, Stream) error) (Out, error)

Func is the type of function that Actions and Flows execute. It takes an input of type Int and returns an output of type Out, optionally streaming values of type Stream incrementally by invoking a callback. If the StreamingCallback is non-nil and the function supports streaming, it should stream the results by invoking the callback periodically, ultimately returning with a final return value. Otherwise, it should ignore the StreamingCallback and just return a result.

type NoStream

type NoStream = func(context.Context, struct{}) error

NoStream indicates that the action or flow does not support streaming. A Func[I, O, NoStream] will ignore its streaming callback. Such a function corresponds to a Flow[I, O, struct{}].

type Options

type Options struct {
	// If "-", do not start a FlowServer.
	// Otherwise, start a FlowServer on the given address, or the
	// default if empty.
	FlowAddr string
	// The names of flows to serve.
	// If empty, all registered flows are served.
	Flows []string
}

Options are options to InternalInit.

type StreamFlowValue

type StreamFlowValue[Out, Stream any] struct {
	Done   bool
	Output Out    // valid if Done is true
	Stream Stream // valid if Done is false
}

StreamFlowValue is either a streamed value or a final output of a flow.

Directories

Path Synopsis
Package logger provides a context-scoped slog.Logger.
Package logger provides a context-scoped slog.Logger.
Package gtime provides time functionality for Go Genkit.
Package gtime provides time functionality for Go Genkit.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL