1. File config appsettings.json chứa cấu hình json kết nối kafka 

- Kết nối với file Kafka ở máy cá nhân Localhost

 "RedisConfig_local": {
        "ServerWrite""127.0.0.1:6379,AllowAdmin = true,abortConnect=false,SyncTimeout = 5000",
        "ServerRead""127.0.0.1:6379,AllowAdmin = true,abortConnect=false,SyncTimeout = 5000",
        "DatabaseNumber"1
    },


- Kết nối với file Kafka ở máy cá nhân Localhost   

    "RedisConfig": {
        "ServerWrite""redis.haipv.com:6379,password=mB7@!Ye6Mxh*dS%S,AllowAdmin = true,abortConnect=false,SyncTimeout = 5000",
        "ServerRead":  "redis.haipv.com:6379,password=mB7@!Ye6Mxh*dS%S,AllowAdmin = true,abortConnect=false,SyncTimeout = 5000",
        "DatabaseNumber"1
    },





2. Code C# file RedisProvider kết nối redis

using Services.Common.Helpers;
using Services.Common.Models;
using StackExchange.Redis;
using System;
using System.Collections.Generic;
using System.Linq;

namespace Services.Common.Caching
{
    public class RedisProvider : ICacheProvider
    {
        private static readonly RedisConfig _redisConfig = Setting.Get<RedisConfig>("RedisConfig");

        /// <summary>
        /// Lưu trữ key-value
        /// </summary>
        /// <param name="Key"></param>
        /// <param name="Value"></param>
        public RedisReturnModel<string> SetString(string key, string value)
        {
            RedisReturnModel<string> result = new RedisReturnModel<string>();
            try
            {
                using (var redis = ConnectionMultiplexer.Connect(_redisConfig.ServerWrite))
                {
                    IDatabase database = redis.GetDatabase(_redisConfig.DatabaseNumber);
                    bool rs = database.StringSet(key, value, when: When.NotExists);
                    if (rs == false)
                    {
                        result.Code = RedisResultCode.ErrorInputExists;
                        result.Message = "Key đã tồn tại.";
                    }
                }
            }
            catch (Exception ex)
            {
                result.Code = RedisResultCode.ErrorConnect;
                result.Message = "Lỗi không kết nối được redis.";
                result.ExMessage = ex.ToString();
            }
            return result;
        }

        /// <summary>
        /// Get value
        /// </summary>
        /// <param name="Key"></param>
        /// <returns></returns>
        public RedisReturnModel<string> GetString(string key)
        {
            RedisReturnModel<string> result = new RedisReturnModel<string>();
            try
            {
                using (var redis = ConnectionMultiplexer.Connect(_redisConfig.ServerRead))
                {
                    IDatabase database = redis.GetDatabase(_redisConfig.DatabaseNumber);
                    if (database.KeyExists(key))
                    {
                        result.Message = "Thành công.";
                        result.Data = database.StringGet(key);
                    }    
                    else
                    {
                        result.Code = RedisResultCode.ErrorNotFound;
                        result.Message = "Không tìm thấy key.";
                        result.Data = "";
                    }    
                }
            }
            catch (Exception ex)
            {
                result.Code = RedisResultCode.ErrorConnect;
                result.Message = "Lỗi không kết nối được redis.";
                result.ExMessage = ex.ToString();
            }
            return result;
        }

        /// <summary>
        /// Lưu trữ hash table 
        /// </summary>
        ///<param name="hashKey"></param>
        /// <param name="Key"></param>
        /// <param name="Value"></param>
        public RedisReturnModel<string> HashSet(string hashKey, string key, string value)
        {
            RedisReturnModel<string> result = new RedisReturnModel<string>();
            try
            {
                using (var redis = ConnectionMultiplexer.Connect(_redisConfig.ServerWrite))
                {
                    IDatabase database = redis.GetDatabase(_redisConfig.DatabaseNumber);
                    bool rs = database.HashSet(hashKey, key, value, when: When.NotExists);
                    if (rs == false)
                    {
                        result.Code = RedisResultCode.ErrorInputExists;
                        result.Message = "Key đã tồn tại.";
                    }
                }
            }
            catch (Exception ex)
            {
                result.Code = RedisResultCode.ErrorConnect;
                result.Message = "Lỗi không kết nối được redis.";
                result.ExMessage = ex.ToString();
            }
            return result;
        }




        /// <summary>
        /// Lấy hết key-value trong hash table
        /// </summary>
        /// <param name="hashKey"></param>
        public RedisReturnModel<List<QueueInfoModel>> HashGetAll_Delete(string hashKey)
        {
            RedisReturnModel<List<QueueInfoModel>> result = new RedisReturnModel<List<QueueInfoModel>>();
            List<QueueInfoModel> listData = new List<QueueInfoModel>();
            try
            {
                using (var redis = ConnectionMultiplexer.Connect(_redisConfig.ServerRead))
                {
                    IDatabase database = redis.GetDatabase(_redisConfig.DatabaseNumber);
                    var listkey = database.HashGetAll(hashKey);

                    if (listkey != null)
                    {
                        Int32 unixTimestamp = (Int32)(DateTime.UtcNow.Subtract(new DateTime(197011))).TotalSeconds;
                        listData = listkey.Where(k => Int32.Parse(k.Value) <= unixTimestamp).Select(i => new QueueInfoModel { IdQueue = (int) i.Name, TimestampDelay=i.Value } ).OrderBy(o => o.TimestampDelay).ToList();

                        //Sau khi lấy xong thì xóa luôn trong redis không quan tâm đã bắn cho Kafka chưa
                        if (listData?.Count > 0)
                            HashDelete(hashKey, listData);
                    }

                    result.Message = "Thành công.";
                    result.Data = listData;
                }
            }
            catch (Exception ex)
            {
                result.Code = RedisResultCode.ErrorConnect;
                result.Message = "Lỗi không kết nối được redis.";
                result.ExMessage = ex.ToString();
            }
            return result;
        }


        /// <summary>
        /// Lấy hết key-value trong hash table
        /// </summary>
        /// <param name="hashKey"></param>
        public RedisReturnModel<List<QueueInfoModel>> HashGetAll(string hashKey)
        {
            RedisReturnModel<List<QueueInfoModel>> result = new RedisReturnModel<List<QueueInfoModel>>();
            List<QueueInfoModel> listData = new List<QueueInfoModel>();
            try
            {
                using (var redis = ConnectionMultiplexer.Connect(_redisConfig.ServerRead))
                {
                    IDatabase database = redis.GetDatabase(_redisConfig.DatabaseNumber);
                    var listkey = database.HashGetAll(hashKey);

                    if (listkey != null)
                    {
                        Int32 unixTimestamp = (Int32)(DateTime.UtcNow.Subtract(new DateTime(197011))).TotalSeconds;
                        listData = listkey.Where(k => Int32.Parse(k.Value) <= unixTimestamp).Select(i => new QueueInfoModel { IdQueue = (int)i.Name, TimestampDelay = i.Value }).OrderBy(o => o.TimestampDelay).ToList();
                       
                    }

                    result.Message = "Thành công.";
                    result.Data = listData;
                }
            }
            catch (Exception ex)
            {
                result.Code = RedisResultCode.ErrorConnect;
                result.Message = "Lỗi không kết nối được redis.";
                result.ExMessage = ex.ToString();
            }
            return result;
        }
        /// <summary>
        /// Delete list key 
        /// </summary>
        ///<param name="hashKey"></param>
        /// <param name="listKey"></param>
        public RedisReturnModel<string> HashDelete(string hashKey, List<QueueInfoModel> listKey)
        {
            RedisReturnModel<string> result = new RedisReturnModel<string>();
            try
            {
                using (var redis = ConnectionMultiplexer.Connect(_redisConfig.ServerWrite))
                {
                    IDatabase database = redis.GetDatabase(_redisConfig.DatabaseNumber);

                    foreach(QueueInfoModel q in listKey)
                    {
                        _ = database.HashDelete(hashKey, q.IdQueue);
                    }    
                }
            }
            catch (Exception ex)
            {
                result.Code = RedisResultCode.ErrorConnect;
                result.Message = "Lỗi không kết nối được redis.";
                result.ExMessage = ex.ToString();
            }
            return result;
        }

    }
}




3. File C# ICacheProvider là interface của RedisProvider 

using Services.Common.Models;
using System.Collections.Generic;

namespace Services.Common.Caching
{
    public interface ICacheProvider
    {
        /// <summary>
        ///  Set Key-Value, Expire default is minutes in setting
        /// </summary>
        /// <param name="key"></param>
        /// <param name="value"></param>
         RedisReturnModel<string> SetString(string key, string value);

        /// <summary>
        /// 
        /// </summary>
        /// <param name="Key"></param>
        /// <param name="Value"></param>
         RedisReturnModel<string> HashSet(string hashKey, string key, string value);


        /// <summary>
        /// 
        /// </summary>
        /// <param name="hashKey"></param>
        /// <param name="listKey"></param>
        /// <returns></returns>
        RedisReturnModel<string> HashDelete(string hashKey, List<QueueInfoModel> listKey);

        /// <summary>
        /// Get Key-Value
        /// </summary>
        /// <param name="key"></param>
        /// <returns></returns>
        RedisReturnModel<string> GetString(string key);

        /// <summary>
        /// Lấy hết key-value trong hash table
        /// </summary>
        /// <param name="hashKey"></param>
        RedisReturnModel<List<QueueInfoModel>> HashGetAll_Delete(string hashKey);

        RedisReturnModel<List<QueueInfoModel>> HashGetAll(string hashKey);
    }
}




4. File C# CacheManager sẽ gọi tới RedisProvider 


using Services.Common.Models;
using System.Collections.Generic;

namespace Services.Common.Caching
{
    public class CacheManager
    {
        private static ICacheProvider _cacheProvider;

        /// <summary>
        /// Constructor
        /// </summary>
        static CacheManager()
        {
            _cacheProvider = new RedisProvider();
        }

        /// <summary>
        /// 
        /// </summary>
        /// <param name="Key"></param>
        /// <returns></returns>
        public static RedisReturnModel<string> Get(string Key)
        {
            try
            {
                if (_cacheProvider != null)
                    return _cacheProvider.GetString(Key);
                else
                    return null;
            }
            catch
            {
                throw;
            }
        }

        /// <summary>
        /// 
        /// </summary>
        /// <param name="Key"></param>
        /// <param name="Value"></param>
        /// <returns></returns>
        public static RedisReturnModel<string> Set(string Key, string Value)
        {
            try
            {
                if (_cacheProvider != null)
                {
                    return _cacheProvider.SetString(Key, Value);
                }
                else
                {
                    return null;
                }
            }
            catch
            {
                throw;
            }
        }

        /// <summary>
        /// Lưu trữ hash table 
        /// </summary>
        /// <param name="Key"></param>
        /// <param name="Value"></param>
        /// <returns></returns>
        public static RedisReturnModel<string> HashSet(string hashKey, string Key, string Value)
        {
            try
            {
                if (_cacheProvider != null)
                {
                    return _cacheProvider.HashSet(hashKey, Key, Value);
                }
                else
                {
                    return null;
                }
            }
            catch
            {
                throw;
            }
        }


        /// <summary>
        /// Xóa list key trong redis
        /// </summary>
        /// <param name="hashKey"></param>
        /// <param name="listKey"></param>
        /// <returns></returns>
        public static RedisReturnModel<string> HashDelete(string hashKey, List<QueueInfoModel> listKey)
        {
            try
            {
                if (_cacheProvider != null)
                {
                    return _cacheProvider.HashDelete(hashKey, listKey);
                }
                else
                {
                    return null;
                }
            }
            catch
            {
                throw;
            }
        }



        /// <summary>
        /// Lấy hết key-value trong hash table và xóa
        /// </summary>
        /// <param name="hashKey"></param>
        public static RedisReturnModel<List<QueueInfoModel>> HashGetAll_Delete(string hashKey)
        {
            try
            {
                if (_cacheProvider != null)
                {
                    return _cacheProvider.HashGetAll_Delete(hashKey);
                }
                else
                {
                    return null;
                }
            }
            catch
            {
                throw;
            }
        }


        /// <summary>
        /// Chỉ lấy danh sách QueueId - ko xóa
        /// </summary>
        /// <param name="hashKey"></param>
        /// <returns></returns>
        public static RedisReturnModel<List<QueueInfoModel>> HashGetAll(string hashKey)
        {
            try
            {
                if (_cacheProvider != null)
                {
                    return _cacheProvider.HashGetAll(hashKey);
                }
                else
                {
                    return null;
                }
            }
            catch
            {
                throw;
            }
        }

    }
}



5. File Service gọi CacheManager để thao tác dữ liệu với Redis

public ReturnModel InsertQueue(int idQueue, string utcTimeDelay)
        {
           var redisResult = CacheManager.HashSet("queuedelay",idQueue.ToString(), utcTimeDelay);

            if (redisResult != null)
            {
                ...
            }
        }