您的位置:首页 > 编程语言

Cassandra源代码分析:数据写入流程

2015-08-08 01:24 274 查看
org.apache.cassandra.thrift.CassandraServer类的add方法将接受客户端的请求,该函数定义如下:

 public void add(ByteBuffer
key, ColumnParent column_parent, CounterColumn column, ConsistencyLevel consistency_level)   
           throws InvalidRequestException, UnavailableException, TimedOutException, TException   
   {   
    // 数据验证   
       logger.debug("add");   
   
       state().hasColumnFamilyAccess(column_parent.column_family, Permission.WRITE);   
       String keyspace = state().getKeyspace();   
   
       CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace, column_parent.column_family, true);   
       ThriftValidation.validateKey(metadata, key);   
       ThriftValidation.validateCommutativeForWrite(metadata, consistency_level);   
       ThriftValidation.validateColumnParent(metadata, column_parent);   
       // SuperColumn field is usually optional, but not when we're adding   
       if (metadata.cfType == ColumnFamilyType.Super && column_parent.super_column == null)   
       {   
           throw new InvalidRequestException("missing mandatory super column name for super CF " + column_parent.column_family);   
       }   
       ThriftValidation.validateColumnNames(metadata, column_parent, Arrays.asList(column.name));   
          
       // 创建一个 RowMutation 对象,封装用户插入数据信息   
       RowMutation rm = new RowMutation(keyspace,
key);   
       try   
       {   
           rm.addCounter(new QueryPath(column_parent.column_family,
column_parent.super_column, column.name), column.value);   
       }   
       catch (MarshalException e)   
       {   
           throw new InvalidRequestException(e.getMessage());   
       }   
       // 插入数据   
       doInsert(consistency_level,
Arrays.asList(new CounterMutation(rm, consistency_level)));   
   }   

函数内部实现上首先将kv信息封装成RowMutation对象,之后创建QueryPath对象(主要是对数据进行封转),

最后调用doInsert方法执行插入动作,doInsert函数定义如下: 

 // 执行数据插入操作    
private void doInsert(ConsistencyLevel
consistency_level, List<? extends IMutation> mutations) throws UnavailableException, TimedOutException, InvalidRequestException   
{   
    // 数据验证   
    ThriftValidation.validateConsistencyLevel(state().getKeyspace(), consistency_level);   
    if (mutations.isEmpty())   
        return;   
    try   
    {   
        schedule(DatabaseDescriptor.getRpcTimeout());   
        try   
        {   
            StorageProxy.mutate(mutations,
consistency_level);   
        }   
        finally   
        {   
            release();   
        }   
    }   
    catch (TimeoutException e)   
    {   
        throw new TimedOutException();   
    }   
}   

函数内部首先进行数据检查,调用StorageProxy.mutate(mutations, consistency_level);执行数据的插入操作。

mute方法定义如下: 

public static void mutate(List<? extends IMutation> mutations, ConsistencyLevel consistency_level) throws UnavailableException, TimeoutException   
{   
    logger.debug("Mutations/ConsistencyLevel are {}/{}", mutations, consistency_level);   
    // 本地数据中心   
    final String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress());   
   
    long startTime = System.nanoTime();   
    // 封装条件变量   
    List<IWriteResponseHandler> responseHandlers = new ArrayList<IWriteResponseHandler>();   
   
    IMutation mostRecentMutation = null;   
    try   
    {   
        for (IMutation mutation : mutations)    // 对于每个Mutation   
        {   
            mostRecentMutation = mutation;   
            // CounterMutation:首先需要被写入到replicas leader中,之后在向其他的replicas中去分发   
            if (mutation instanceof CounterMutation)   
            {   
                responseHandlers.add(mutateCounter((CounterMutation)mutation,
localDataCenter));   
            }   
            else   
            {   
                // WritePerformer:普通类型的数据分发
  
                responseHandlers.add(performWrite(mutation,
consistency_level, localDataCenter, standardWritePerformer));   
            }   
        }   
   
        // wait for writes.  throws TimeoutException if necessary   
        for (IWriteResponseHandler responseHandler : responseHandlers)   
        {   
            // 等待任务结束或者是抛出异常   
            responseHandler.get();   
        }   
   
    }   
    catch (TimeoutException ex)     // 捕获异常   
    {   
        if (logger.isDebugEnabled())   
        {   
            List<String> mstrings = new ArrayList<String>();   
            for (IMutation mutation : mutations)   
                mstrings.add(mutation.toString(true));   
            logger.debug("Write timeout {} for one (or more) of: ", ex.toString(), mstrings);   
        }   
        throw ex;   
    }   
    catch (IOException e)   
    {   
        assert mostRecentMutation != null;   
        throw new RuntimeException("error writing key " + ByteBufferUtil.bytesToHex(mostRecentMutation.key()), e);   
    }   
    finally   
    {   
        writeStats.addNano(System.nanoTime() - startTime);   
    }   
}   

对于每个Mutation对象,如果是CounterMutation类型的Mutation的话,首先要确保一个replica的写入成功,之后在向另外的N-1个replicas写入;其他类型的Mutation的话,没有这个要求,做法是首先得到N个replicas节点,向这个N个节点发送命令。

这两种类型的Mutation是通过两个函数mutateCounter和performWrite分别生成的,这里我们仅仅来看一下performWrite的实现:首先得到复制策略,通过复制策略得到所有replica的endpoints,将任务交给代理WritePerformer.apply执行。代码如下:

  
   public static IWriteResponseHandler performWrite(IMutation
mutation,   
                                                    ConsistencyLevel consistency_level,   
                                                    String localDataCenter,   
                                                    WritePerformer performer)   
   throws UnavailableException, TimeoutException, IOException   
   {   
    // 得到复制策略   
       String table = mutation.getTable();   
       AbstractReplicationStrategy rs = Table.open(table).getReplicationStrategy();   
          
       // 得到所有replica的endpoints   
       Collection<InetAddress> writeEndpoints = getWriteEndpoints(table, mutation.key());   
          
       // 满足一致性的条件变量   
       IWriteResponseHandler responseHandler = rs.getWriteResponseHandler(writeEndpoints, consistency_level);   
   
       // exit early if we can't fulfill the CL at this time   
       // 如果已经能够确定不能满足一致性的条件,例如live的节点数量小于W,直接返回   
       responseHandler.assureSufficientLiveNodes();   
          
       // 代理给WritePerformer执行   
       performer.apply(mutation,
writeEndpoints, responseHandler, localDataCenter, consistency_level);   
          
          
       return responseHandler;   
   }   

同时需要注意的是在文件org.apache.cassandra.service.StorageProxy.java中有三个实现而来WritePerformer接口的类,WritePerformer接口定义如下:

private interface WritePerformer   
{   
    public void apply(IMutation
mutation, Collection<InetAddress> targets, IWriteResponseHandler responseHandler, String localDataCenter, ConsistencyLevel consistency_level) throws IOException, TimeoutException;   

}   

也就是说最终完成数据写入任务的是WritePerformer的apply方法。StorageProxy的三个实现该接口的类型如下: 

// 最终的数据使用实现了WritePerformer接口的standardWritePerformer,counterWritePerformer   

      // 和counterWriteOnCoordinatorPerformer   

      standardWritePerformer = new WritePerformer()  

      {  

          public void apply(IMutation mutation,  

                            Collection<InetAddress> targets,  

                            IWriteResponseHandler responseHandler,  

                            String localDataCenter,  

                            ConsistencyLevel consistency_level)  

          throws IOException, TimeoutException  

          {  

              assert mutation instanceof RowMutation;  

              sendToHintedEndpoints((RowMutation) mutation, targets, responseHandler, localDataCenter, consistency_level);  

          }  

      };  

  

      /* 

       * We execute counter writes in 2 places: either directly in the coordinator node if it is a replica, or 

       * in CounterMutationVerbHandler on a replica othewise. The write must be executed on the MUTATION stage 

       * but on the latter case, the verb handler already run on the MUTATION stage, so we must not execute the 

       * underlying on the stage otherwise we risk a deadlock. Hence two different performer. 

       * 执行CounterMutation 

       */  
      counterWritePerformer = new WritePerformer()  

      {  

          public void apply(IMutation mutation,  

                            Collection<InetAddress> targets,  

                            IWriteResponseHandler responseHandler,  

                            String localDataCenter,  

                            ConsistencyLevel consistency_level)   

          throws IOException  

          {  

              if (logger.isDebugEnabled())  

                  logger.debug("insert writing local & replicate " + mutation.toString(true));  

  

              Runnable runnable = counterWriteTask(mutation, targets, responseHandler, localDataCenter, consistency_level);  

              runnable.run();  

          }  

      };  

        

      // 执行CounterMutation   

      counterWriteOnCoordinatorPerformer = new WritePerformer()  

      {  

          public void apply(IMutation mutation,  

                            Collection<InetAddress> targets,  

                            IWriteResponseHandler responseHandler,  

                            String localDataCenter,  

                            ConsistencyLevel consistency_level)  

          throws IOException  

          {  

              if (logger.isDebugEnabled())  

                  logger.debug("insert writing local & replicate " + mutation.toString(true));  

  

              Runnable runnable = counterWriteTask(mutation, targets, responseHandler, localDataCenter, consistency_level);  

              StageManager.getStage(Stage.MUTATION).execute(runnable);  

          }  

      };  

我们分别来看上面的几个实现,standardWritePerformer的实现方式比较简单,对于endpoints的集合,如果该节点还live,那么其发送写命令,如果该节点dead,那么这时执行hinted-handoff策略: 

/** 

    * Send the mutations to the right targets, write it locally if it corresponds or writes a hint when the node 

    * is not available. 

    * 

    * Note about hints: 

    * 

    * | Hinted Handoff | Consist. Level | 

    * | on             |       >=1      | --> wait for hints. We DO NOT notify the handler with handler.response() for hints;  

    * | on             |       ANY      | --> wait for hints. Responses count towards consistency. 

    * | off            |       >=1      | --> DO NOT fire hints. And DO NOT wait for them to complete. 

    * | off            |       ANY      | --> DO NOT fire hints. And DO NOT wait for them to complete. 

    * 

    * @throws TimeoutException if the hints cannot be written/enqueued  

    */  

   private static void sendToHintedEndpoints(final RowMutation rm,   

                                             Collection<InetAddress> targets,  

                                             IWriteResponseHandler responseHandler,  

                                             String localDataCenter,  

                                             ConsistencyLevel consistency_level)  

   throws IOException, TimeoutException  

   {  

       // Multimap that holds onto all the messages and addresses meant for a specific datacenter   

       Map<String, Multimap<Message, InetAddress>> dcMessages = new HashMap<String, Multimap<Message, InetAddress>>(targets.size());  

       MessageProducer producer = new CachingMessageProducer(rm);  

  

       for (InetAddress destination : targets)      // 对于每个endpoint   

       {  

           if (FailureDetector.instance.isAlive(destination))       // 如果endpoint还live   

           {  

               String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(destination);  

  

               if (destination.equals(FBUtilities.getBroadcastAddress()) && OPTIMIZE_LOCAL_REQUESTS)  

               {  

                // 如果当前机器就是replicas中的一个,直接写入到本地   

                   insertLocal(rm, responseHandler);  

               }  

               else  

               {  

                // 否则需要向远程服务器发送命令    

                   // belongs on a different server   

                   if (logger.isDebugEnabled())  

                       logger.debug("insert writing key " + ByteBufferUtil.bytesToHex(rm.key()) + " to " + destination);  

  

                   Multimap<Message, InetAddress> messages = dcMessages.get(dc);  

                   if (messages == null)  

                   {  

                      messages = HashMultimap.create();  

                      dcMessages.put(dc, messages);  

                   }  

  

                   messages.put(producer.getMessage(Gossiper.instance.getVersion(destination)), destination);  

               }  

           }  

           else     // 否则,这里的话,可能是需要使用hinted-handoff机制   

           {  

               if (!shouldHint(destination))  

                   continue;  

  

               // Avoid OOMing from hints waiting to be written.  (Unlike ordinary mutations, hint   

               // not eligible to drop if we fall behind.)   

               if (hintsInProgress.get() > maxHintsInProgress)  

                   throw new TimeoutException();  

  

               // Schedule a local hint and let the handler know it needs to wait for the hint to complete too   

               Future<Void> hintfuture = scheduleLocalHint(rm, destination, responseHandler, consistency_level);  

               responseHandler.addFutureForHint(new CreationTimeAwareFuture<Void>(hintfuture));  

           }  

       }  

         

       // 向replicas发送message   

       sendMessages(localDataCenter, dcMessages, responseHandler);  

   }  

到此我们已经完成了数据从StorageProxy到各个replicas的转发工作,当然这里还存在一些问题,会在下面的继续:

1. 首先replicas收到命令之后的处理动作

2. cassandra中如何生成replicas,如何发现endpoints的拓扑结构,这就涉及到cassandra中snitch的实现

3. cassandra中如何实现DHT?
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: