package etcd import ( "crypto/tls" "encoding/json" "path" "sync" "git.nobla.cn/golang/aeus/pkg/errors" "git.nobla.cn/golang/aeus/registry" "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" clientv3 "go.etcd.io/etcd/client/v3" ) type etcdRegistry struct { prefix string client *clientv3.Client options *registry.RegistryOptions leaseID clientv3.LeaseID sync.RWMutex } func (e *etcdRegistry) prefixKey(name string) string { return path.Join(e.prefix, name) + "/" } func (e *etcdRegistry) buildKey(s *registry.Service) string { return path.Join(e.prefix, s.Name, s.ID) } func (e *etcdRegistry) marshalService(s *registry.Service) string { buf, err := json.Marshal(s) if err == nil { return string(buf) } return "" } func (e *etcdRegistry) unmarshalService(b []byte) (*registry.Service, error) { var service registry.Service err := json.Unmarshal(b, &service) if err == nil { return &service, nil } return nil, err } func (e *etcdRegistry) Name() string { return Name } func (e *etcdRegistry) Init(opts ...registry.RegistryOption) (err error) { for _, cb := range opts { cb(e.options) } cfg := clientv3.Config{} if e.options.Context != nil { clientOpts := FromContext(e.options.Context) if clientOpts != nil { cfg.Username = clientOpts.Username cfg.Password = clientOpts.Password cfg.DialTimeout = clientOpts.DialTimeout if clientOpts.Prefix != "" { e.prefix = clientOpts.Prefix } } } if e.options.TLSConfig != nil { cfg.TLS = e.options.TLSConfig } else if e.options.Secure { cfg.TLS = &tls.Config{ InsecureSkipVerify: true, } } if e.prefix == "" { e.prefix = "/micro/service/" } cfg.Endpoints = append(cfg.Endpoints, e.options.Addrs...) e.client, err = clientv3.New(cfg) return nil } func (e *etcdRegistry) Register(s *registry.Service, cbs ...registry.RegisterOption) (err error) { var lgr *clientv3.LeaseGrantResponse opts := registry.NewRegisterOption(cbs...) if opts.TTL.Seconds() > 0 { if e.leaseID > 0 { if _, err = e.client.KeepAliveOnce(opts.Context, e.leaseID); err == nil { return } else { if err != rpctypes.ErrLeaseNotFound { return err } } } lgr, err = e.client.Grant(opts.Context, int64(opts.TTL.Seconds())) if err != nil { return err } } if lgr != nil { if _, err = e.client.Put(opts.Context, e.buildKey(s), e.marshalService(s), clientv3.WithLease(lgr.ID)); err == nil { e.leaseID = lgr.ID } } else { _, err = e.client.Put(opts.Context, e.buildKey(s), e.marshalService(s)) } if err != nil { return err } return nil } func (e *etcdRegistry) Deregister(s *registry.Service, cbs ...registry.DeregisterOption) error { opts := registry.NewDeregisterOption(cbs...) e.client.Delete(opts.Context, e.buildKey(s)) return nil } func (e *etcdRegistry) GetService(name string, cbs ...registry.GetOption) ([]*registry.Service, error) { opts := registry.NewGetOption(cbs...) rsp, err := e.client.Get(opts.Context, e.prefixKey(name), clientv3.WithPrefix(), clientv3.WithSerializable()) if err != nil { return nil, err } if len(rsp.Kvs) == 0 { return nil, errors.ErrNotFound } ss := make([]*registry.Service, 0, len(rsp.Kvs)) for _, n := range rsp.Kvs { if s, err := e.unmarshalService(n.Value); err == nil { ss = append(ss, s) } } return ss, nil } func (e *etcdRegistry) Watch(opts ...registry.WatchOption) (registry.Watcher, error) { return newWatcher(e, opts...) } func NewEtcdRegistry() registry.Registry { e := &etcdRegistry{ options: ®istry.RegistryOptions{}, } return e }