一尘不染

错误未知类型Go

go

我对Go真的很陌生,su只在这里忍受我。我正在尝试使用以下代码编写将mysql数据加载到Redis集群的代码:redis-go-
cluster

load2redis

这是代码。它有点长,请在这里忍受。

package main

import (
    "bytes"
    "database/sql"
    "flag"
    // "github.com/garyburd/redigo/redis"
    _ "github.com/go-sql-driver/mysql"
    //"gopkg.in/redis.v4"
    "github.com/chasex/redis-go-cluster"
    "log"
    "runtime"
    // "strings"
    "sync"
    "time"

)

var client *redis.Cluster

type Task interface {
    Execute()
}

type Pool struct {
    mu sync.Mutex

    size  int
    tasks chan Task
    kill  chan struct{}
    wg    sync.WaitGroup
}

func NewPool(size int) *Pool {
    pool := &Pool{
        tasks: make(chan Task, 128),
        kill:  make(chan struct{}),
    }
    pool.Resize(size)
    return pool
}

func (p *Pool) worker() {
    defer p.wg.Done()
    for {
        select {
        case task, ok := <-p.tasks:
            if !ok {
                return
            }
            task.Execute()
        case <-p.kill:
            return
        }
    }
}

func (p *Pool) Resize(n int) {
    p.mu.Lock()
    defer p.mu.Unlock()
    for p.size < n {
        p.size++
        p.wg.Add(1)
        go p.worker()
    }
    for p.size > n {
        p.size--
        p.kill <- struct{}{}
    }
}

func (p *Pool) Close() {
    close(p.tasks)
}

func (p *Pool) Wait() {
    p.wg.Wait()
}

func (p *Pool) Exec(task Task) {
    p.tasks <- task
}

type RedisTask struct {
    Index   int
    Command string
    Key     string
    Value   string
    MapData map[string]string
}

func (e RedisTask) Execute() {
    log.Println("executing:", e.Key, ",", e.Index)

    if e.Command == "SET" {
        _,err := redis.String(client.Do("SET", e.Key, e.Value))
        checkErr(err, "set error:")
    } else if e.Command == "SADD" {
        _,err := redis.Strings(client.Do("SADD", e.Key, e.Value))
        checkErr(err, "sadd error:") 
    } else if e.Command == "HMSET" {
        _,err := redis.StringMap(client.Do("HMSET", e.Key, e.MapData))
        checkErr(err, "hmset error:")
    }
    // TODO: clean data
}

func main() {
    runtime.GOMAXPROCS(runtime.NumCPU())
    startTime := time.Now().UnixNano() / int64(time.Millisecond)
    host := flag.String("s", "localhost:3306", "mysql server host and port ,eg localhost:3306")
    username := flag.String("u", "test", "username to login mysql")
    password := flag.String("p", "test", "password for mysql")
    database := flag.String("d", "test", "database you want to execute query")
    query := flag.String("q", "select 1;", "your query sql")
    ds := flag.String("ds", "key", "redis structure")
    PK := flag.String("pk", "Rkey", "the redis Key in the fields of mysql query result")

    //redisHost := flag.String("rs", "localhost:6379", "redis host and port ,eg localhost:6379")
    //redisPassword := flag.String("rp", "test", "redis password")

    poolSize := flag.Int("size", 10000, "redis pool size")

    flag.Parse()
    var buf bytes.Buffer = bytes.Buffer{}
    buf.WriteString(*username)
    buf.WriteString(":")
    buf.WriteString(*password)
    buf.WriteString("@tcp(")
    buf.WriteString(*host)
    buf.WriteString(")/")
    buf.WriteString(*database)

    db, err := sql.Open("mysql", buf.String())
    checkErr(err, "connect to mysql error !")
    defer db.Close()

    poolWorker := NewPool(*poolSize)

    // Execute the query
    rows, err := db.Query(*query)
    checkErr(err, "execute sql error!")

    // pool = newPool(*redisHost, *redisPassword, *poolSize)

    //client = redis.NewClient(&redis.Options{
    //  Addr:     *redisHost,
    //  Password: *redisPassword, // no password set
    //  DB:       0,              // use default DB
    //})

    client,_ = redis.NewCluster(&redis.Options{
            StartNodes: []string{"10.x.x.x:6000", "10.x.x.x:6001", "10.x.x.x:6002"},
            ConnTimeout: 50 * time.Millisecond,
            ReadTimeout: 50 * time.Millisecond,
            WriteTimeout: 50 * time.Millisecond,
            KeepAlive: 16,
            AliveTime: 60 * time.Second,
    })
    //checkErr(err, "client error:")

    //pong, err := client.Ping().Result()
    //checkErr(err, "redis client error:")
    //log.Println(pong)

    columns, err := rows.Columns()
    checkErr(err, "get columns error!")

    length := len(columns)
    values := make([]sql.RawBytes, length)

    scanArgs := make([]interface{}, len(values))
    for i := range values {
        scanArgs[i] = &values[i]
    }

    count := 0
    for rows.Next() {
        count += 1
        err = rows.Scan(scanArgs...)
        checkErr(err, "scan error")

        var value string
        var key string

        var task RedisTask

        if *ds == "key" {
            key = getStringData(values[0])
            value = getStringData(values[1])
            if value != "" {
                task = RedisTask{
                    Index:   count,
                    Command: "SET",
                    Key:     key,
                    Value:   value,
                }
            }
        } else if *ds == "set" {
            key = getStringData(values[0])
            value = getStringData(values[1])
            if value != "" {
                task = RedisTask{
                    Index:   count,
                    Command: "SADD",
                    Key:     key,
                    Value:   value,
                }
            }
        } else if *ds == "hash" {
            key = getStringData(values[0])
            // args := redis.Args{}.Add(key)

            m := make(map[string]string)

            for i, col := range values {
                if col != nil && columns[i] != *PK {
                    value = getStringData(col)
                    m[columns[i]] = value
                }
            }
            task = RedisTask{
                Index:   count,
                Command: "HMSET",
                Key:     key,
                MapData: m,
            }
        }
        poolWorker.Exec(task)
    }
    if err = rows.Err(); err != nil {
        panic(err.Error()) // proper error handling instead of panic in your app
    }

    poolWorker.Close()

    poolWorker.Wait()

    EndTime := time.Now().UnixNano() / int64(time.Millisecond)
    log.Println("======================================== executing time:", EndTime-startTime, " ms, total:", count)
}

func getStringData(data sql.RawBytes) string {
    if data == nil {
        return ""
    }
    value := string(data)
    return clearBad(value)
}

func clearBad(str string) string {
    // str = strings.Trim(str, "`")
    // str = strings.Trim(str, "ï½€")
    // str = strings.Trim(str, "-")
    // str = strings.Trim(str, ".")
    // str = strings.Trim(str, " ")
    // str = strings.Trim(str, ";")
    // str = strings.Trim(str, ",")
    // str = strings.Trim(str, ":")
    // str = strings.Trim(str, ";")
    // str = strings.Trim(str, "'")
    // str = strings.Trim(str, "!")
    return str
}

func checkErr(err error, msg string) {
    if err != nil {
        log.Fatalln(msg, err)
    }
}

当我执行它时,出现以下异常:

./rak -u user -p user -s 10.X.X.X:8080 -d test -q "SELECT CONCAT( 'student:', c.sid ) Rkey, c.sname SNAME, c.saddress SADDRESS, c.sage SAGE FROM STUDENT c WHERE c.sid  > 0;" -ds hash -size 1200
2017/07/21 10:29:09 rak.go:93: executing: student:2 , 2
2017/07/21 10:29:09 rak.go:93: executing: student:1 , 1
2017/07/21 10:29:09 rak.go:93: executing: student:3 , 3
2017/07/21 10:29:09 rak.go:268: hmset error: Do: unknown type map[string]string
$

有人可以向我解释我在做什么错吗?我将不胜感激。


阅读 192

收藏
2020-07-02

共1个答案

一尘不染

如前所述,Do不适用于地图。这是您可以解决它的一种方法。

} else if e.Command == "HMSET" {
    // Build up a string slice to hold the key value pairs
    args := make([]string, 0, len(e.MapData) * 2)
    for k, v := range e.MapData {
        args = append(args, k, v)
    }
    _,err := redis.StringMap(client.Do("HMSET", e.Key, args...))
    checkErr(err, "hmset error:")
}

Do方法映射到Redis命令集,并且期望参数的方式相同。例如。

127.0.0.1:6379> HMSET myKey foo bar baz boff
OK
127.0.0.1:6379> HGETALL myKey
1) "foo"
2) "bar"
3) "baz"
4) "boff"
127.0.0.1:6379>

在代码中使用redis客户端进行相同的地图设置操作将是

client.Do("HMSET", "myKey", "foo", "bar", "baz", "boff")

当映射的键和值的参数是动态的时,最直接的方法是

client.Do("HMSET", "myKey", []string{"foo", "bar", "baz", "boff"}...)

这正是上面的第一个代码块所做的。

2020-07-02